
并发控制
🌰 示例:限制最多 3 个 goroutine 同时工作
在 Go 中,limiter chan struct{} 这样的定义是一种常见的并发控制模式,用于限制同时运行的 goroutine 数量(即“并发限流”或“信号量”机制)。下面我们详细解释它的含义和用途。 ✅ 输出特点:
- 任意时刻最多只有 3 个 worker 在运行。
- 第 4 个 goroutine 会阻塞在
limiter <- struct{}{},直到前面某个 worker 执行<-limiter释放位置。
🔍 1. chan struct{} 是什么?
struct{}是 Go 中的 空结构体,占用 0 字节内存。chan struct{}是一个 只用于传递信号、不传递数据 的通道。- 向
chan struct{}发送或接收一个值(如struct{}{})只是表示“事件发生”,不携带实际数据。
✅ 优点:轻量、高效,适合做“信号通知”或“占位符”。
limiter chan struct{} 的典型用途:控制最大并发数
代码示例
go
package main
import (
"fmt"
"sync"
"time"
)
func main() {
const maxWorkers = 3
limiter := make(chan struct{}, maxWorkers) // 缓冲通道,容量为3
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
// 获取“许可”:向 limiter 发送一个空结构体
limiter <- struct{}{}
fmt.Printf("Worker %d started\n", id)
// 模拟工作
time.Sleep(2 * time.Second)
fmt.Printf("Worker %d done\n", id)
// 释放“许可”:从 limiter 接收(或者直接 <-limiter)
<-limiter
}(i)
}
wg.Wait()
fmt.Println("All done!")
}结果
text
Worker 9 started
Worker 0 started
Worker 4 started
Worker 0 done
Worker 5 started
Worker 4 done
Worker 6 started
Worker 9 done
Worker 7 started
Worker 5 done
Worker 8 started
Worker 7 done
Worker 2 started
Worker 6 done
Worker 1 started
Worker 8 done
Worker 3 started
Worker 1 done
Worker 2 done
Worker 3 done
All done!📌 关键原理
limiter是一个带缓冲的通道,缓冲区大小 = 最大并发数。- 发送操作
limiter <- struct{}{}:相当于“申请一个资源槽位”,如果缓冲区满(已达上限),则阻塞等待。 - 接收操作
<-limiter:相当于“释放一个槽位”,让其他 goroutine 可以进入。
这本质上是一个 计数信号量(Counting Semaphore) 的实现。
✅ 为什么用 struct{} 而不用 bool 或 int?
| 类型 | 内存占用 | 说明 |
|---|---|---|
chan bool | 每个元素 1 字节 | 可用,但浪费 |
chan int | 每个元素 8 字节(64位) | 更浪费 |
chan struct{} | 0 字节 | ✅ 最轻量,语义清晰(仅用于同步) |
Go 社区约定俗成用 struct{} 表示“无数据信号”。
✅ 总结
| 代码 | 含义 |
|---|---|
limiter chan struct{} | 一个用于并发控制的信号通道 |
make(chan struct{}, N) | 创建容量为 N 的缓冲通道,代表最多 N 个并发 |
limiter <- struct{}{} | 申请一个并发槽位(若满则阻塞) |
<-limiter | 释放一个槽位 |
这是一种 简洁、高效、地道的 Go 并发控制模式,广泛应用于生产代码中。
如果你需要更高级的限流(如速率限制),可以结合 time.Ticker 或使用 golang.org/x/time/rate 包。
工具类
工具类
go
package concurrency_util
import (
"context"
"sync"
)
// TaskFunc 定义通用的任务函数签名
// T: 输入参数的类型
// R: 返回结果的类型
// 如果不需要输入,T 传 struct{}
// 如果不需要返回,R 传 struct{}
type TaskFunc[T any, R any] func(input T) (R, error)
// Execute 并发执行器
// 核心逻辑:
// 1. 使用带缓冲的 channel 作为信号量控制并发数。
// 2. 保证结果切片的顺序与输入切片一致(通过索引写入)。
// 3. 遇到第一个错误时记录,但会等待所有任务执行完毕(或 Context 取消)。
func Execute[T any, R any](ctx context.Context, maxWorkers int, inputs []T, fn TaskFunc[T, R]) ([]R, error) {
if len(inputs) == 0 {
return []R{}, nil
}
if maxWorkers <= 0 {
maxWorkers = 1
}
// 信号量,控制最大并发数
semaphore := make(chan struct{}, maxWorkers)
var wg sync.WaitGroup
// 预分配结果切片,保证顺序
results := make([]R, len(inputs))
var firstErr error
var mu sync.Mutex // 用于保护 firstErr 的并发写
// 1. 启动协程
for i, input := range inputs {
wg.Add(1)
go func(idx int, in T) {
defer wg.Done()
// --- 获取许可 ---
select {
case semaphore <- struct{}{}:
// 获取成功,继续
case <-ctx.Done():
// 上下文已取消,直接返回
mu.Lock()
if firstErr == nil {
firstErr = ctx.Err()
}
mu.Unlock()
return
}
// --- 释放许可 (重要:defer) ---
defer func() { <-semaphore }()
// --- 执行业务逻辑 ---
res, err := fn(in)
if err != nil {
mu.Lock()
if firstErr == nil {
firstErr = err // 只记录第一个错误
}
mu.Unlock()
return
}
// --- 保存结果 ---
// 利用闭包中的 idx,直接写入对应位置,线程安全且保序
results[idx] = res
}(i, input)
}
// 2. 等待所有任务完成
// 使用一个单独的 goroutine 来关闭 done channel,避免死锁
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
// 3. 监听完成信号或上下文取消
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-done:
// 所有任务跑完了
if firstErr != nil {
return nil, firstErr
}
return results, nil
}
}无参无返
go
package main
import (
"context"
"demo/utils/concurrency_util"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 1. 准备数据
// 构造 3 个占位任务
tasks := make([]struct{}, 3)
fmt.Println("🚀 开始场景四:无参无返 (心跳/等待)")
// 2. 执行并发
// 输入输出都是 struct{}
_, err := concurrency_util.Execute(ctx, 3, tasks, func(_ struct{}) (struct{}, error) {
fmt.Println(" 💓 发送心跳...")
time.Sleep(1 * time.Second)
return struct{}{}, nil
})
// 3. 处理结果
if err != nil {
fmt.Printf("❌ 心跳异常: %v\n", err)
} else {
fmt.Println("✅ 心跳检测完成")
}
}结果示例
text
🚀 开始场景四:无参无返 (心跳/等待)
💓 发送心跳...
💓 发送心跳...
💓 发送心跳...
✅ 心跳检测完成有参无返
go
package main
import (
"context"
"demo/utils/concurrency_util"
"fmt"
"time"
)
// 模拟业务数据结构
type User struct {
ID int
Name string
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 1. 准备数据 (有参数)
users := []User{
{ID: 1, Name: "Alice"},
{ID: 2, Name: "Bob"},
{ID: 3, Name: "Charlie"},
{ID: 4, Name: "David"},
{ID: 5, Name: "Eve"},
}
fmt.Println("🚀 开始场景一:有参无返 (批量保存用户)")
// 2. 执行并发
// 注意:返回类型我们传 struct{},表示不关心返回值
_, err := concurrency_util.Execute(ctx, 2, users, func(u User) (struct{}, error) {
// --- 模拟耗时操作 ---
fmt.Printf(" 📝 正在保存用户: %s (ID: %d)\n", u.Name, u.ID)
time.Sleep(1 * time.Second)
// 模拟偶尔的错误 (例如 ID=3 失败)
if u.ID == 3 {
return struct{}{}, fmt.Errorf("保存失败: 用户 %s 已存在", u.Name)
}
fmt.Printf(" 📝 用户: %s (ID: %d) 保存success \n", u.Name, u.ID)
return struct{}{}, nil
})
// 3. 处理结果
if err != nil {
fmt.Printf("❌ 任务执行遇到错误: %v\n", err)
} else {
fmt.Println("✅ 所有用户保存成功")
}
}结果:
text
🚀 开始场景一:有参无返 (批量保存用户)
📝 正在保存用户: Alice (ID: 1)
📝 正在保存用户: David (ID: 4)
📝 用户: David (ID: 4) 保存success
📝 正在保存用户: Charlie (ID: 3)
📝 用户: Alice (ID: 1) 保存success
📝 正在保存用户: Bob (ID: 2)
📝 正在保存用户: Eve (ID: 5)
📝 用户: Bob (ID: 2) 保存success
📝 用户: Eve (ID: 5) 保存success
❌ 任务执行遇到错误: 保存失败: 用户 Charlie 已存在无参有返
go
package main
import (
"context"
"demo/utils/concurrency_util"
"fmt"
"math/rand"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 1. 准备数据
// 无参数场景:我们需要构造一个“占位”切片。
// 这里我们要生成 5 个随机数,所以构造长度为 5 的空结构体切片。
tasks := make([]struct{}, 5)
fmt.Println("🚀 开始场景三:无参有返 (生成随机数)")
// 2. 执行并发
// 输入是 struct{} (空),输出是 int
results, err := concurrency_util.Execute(ctx, 2, tasks, func(_ struct{}) (int, error) {
time.Sleep(500 * time.Millisecond)
num := rand.Intn(100)
return num, nil
})
// 3. 处理结果
if err != nil {
fmt.Printf("❌ 生成出错: %v\n", err)
} else {
fmt.Println("✅ 生成的随机数:", results)
}
}结果示例
text
🚀 开始场景三:无参有返 (生成随机数)
✅ 生成的随机数: [42 15 88 3 99]有参有返
go
package main
import (
"context"
"demo/utils/concurrency_util"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 1. 准备数据:5 个数字
nums := []int{1, 2, 3, 4, 5}
fmt.Println("🚀 开始测试:有参有返")
fmt.Println(" 规则:计算平方,但如果数字是 3 则报错")
// 2. 执行并发
results, err := concurrency_util.Execute(ctx, 5, nums, func(n int) (int, error) {
time.Sleep(500 * time.Millisecond) // 模拟耗时
// 模拟 ID=3 时报错
if n == 3 {
return 0, fmt.Errorf("数字 3 禁止计算")
}
return n * n, nil
})
// 3. 结果分析
fmt.Println("\n--- 执行结果 ---")
if err != nil {
fmt.Printf("⚠️ 任务组遇到错误: %v\n", err)
}
fmt.Println("📊 结果切片详情:")
for i, res := range results {
inputVal := nums[i]
if res == 0 {
// 结果为 0,说明要么计算结果是 0,要么出错了
// 在这个例子里,0 代表出错(因为 0 的平方也是 0,实际业务可以用 -1 或指针区分)
fmt.Printf(" [%d] 输入: %d -> 结果: %d (发生错误/零值)\n", i, inputVal, res)
} else {
fmt.Printf(" [%d] 输入: %d -> 结果: %d (成功)\n", i, inputVal, res)
}
}
}结果
text
🚀 开始测试:有参有返
规则:计算平方,但如果数字是 3 则报错
--- 执行结果 ---
⚠️ 任务组遇到错误: 数字 3 禁止计算
📊 结果切片详情:
[0] 输入: 1 -> 结果: 1 (成功)
[1] 输入: 2 -> 结果: 4 (成功)
[2] 输入: 3 -> 结果: 0 (发生错误/零值)
[3] 输入: 4 -> 结果: 16 (成功)
[4] 输入: 5 -> 结果: 25 (成功)
