Skip to content
鼓励作者:欢迎打赏犒劳

并发控制

🌰 示例:限制最多 3 个 goroutine 同时工作

在 Go 中,limiter chan struct{} 这样的定义是一种常见的并发控制模式,用于限制同时运行的 goroutine 数量(即“并发限流”或“信号量”机制)。下面我们详细解释它的含义和用途。 ✅ 输出特点:

  • 任意时刻最多只有 3 个 worker 在运行。
  • 第 4 个 goroutine 会阻塞在 limiter <- struct{}{},直到前面某个 worker 执行 <-limiter 释放位置。

🔍 1. chan struct{} 是什么?

  • struct{} 是 Go 中的 空结构体,占用 0 字节内存
  • chan struct{} 是一个 只用于传递信号、不传递数据 的通道。
  • chan struct{} 发送或接收一个值(如 struct{}{})只是表示“事件发生”,不携带实际数据。

✅ 优点:轻量、高效,适合做“信号通知”或“占位符”。


limiter chan struct{} 的典型用途:控制最大并发数

代码示例

go
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	const maxWorkers = 3
	limiter := make(chan struct{}, maxWorkers) // 缓冲通道,容量为3

	var wg sync.WaitGroup

	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			// 获取“许可”:向 limiter 发送一个空结构体
			limiter <- struct{}{}
			fmt.Printf("Worker %d started\n", id)

			// 模拟工作
			time.Sleep(2 * time.Second)

			fmt.Printf("Worker %d done\n", id)
			// 释放“许可”:从 limiter 接收(或者直接 <-limiter)
			<-limiter
		}(i)
	}

	wg.Wait()
	fmt.Println("All done!")
}

结果

text
Worker 9 started
Worker 0 started
Worker 4 started
Worker 0 done
Worker 5 started
Worker 4 done
Worker 6 started
Worker 9 done
Worker 7 started
Worker 5 done
Worker 8 started
Worker 7 done
Worker 2 started
Worker 6 done
Worker 1 started
Worker 8 done
Worker 3 started
Worker 1 done
Worker 2 done
Worker 3 done
All done!

📌 关键原理

  • limiter 是一个带缓冲的通道,缓冲区大小 = 最大并发数。
  • 发送操作 limiter <- struct{}{}:相当于“申请一个资源槽位”,如果缓冲区满(已达上限),则阻塞等待。
  • 接收操作 <-limiter:相当于“释放一个槽位”,让其他 goroutine 可以进入。

这本质上是一个 计数信号量(Counting Semaphore) 的实现。


✅ 为什么用 struct{} 而不用 boolint

类型内存占用说明
chan bool每个元素 1 字节可用,但浪费
chan int每个元素 8 字节(64位)更浪费
chan struct{}0 字节✅ 最轻量,语义清晰(仅用于同步)

Go 社区约定俗成用 struct{} 表示“无数据信号”。


✅ 总结

代码含义
limiter chan struct{}一个用于并发控制的信号通道
make(chan struct{}, N)创建容量为 N 的缓冲通道,代表最多 N 个并发
limiter <- struct{}{}申请一个并发槽位(若满则阻塞)
<-limiter释放一个槽位

这是一种 简洁、高效、地道的 Go 并发控制模式,广泛应用于生产代码中。

如果你需要更高级的限流(如速率限制),可以结合 time.Ticker 或使用 golang.org/x/time/rate 包。

工具类

工具类

go
package concurrency_util

import (
	"context"
	"sync"
)

// TaskFunc 定义通用的任务函数签名
// T: 输入参数的类型
// R: 返回结果的类型
// 如果不需要输入,T 传 struct{}
// 如果不需要返回,R 传 struct{}
type TaskFunc[T any, R any] func(input T) (R, error)

// Execute 并发执行器
// 核心逻辑:
// 1. 使用带缓冲的 channel 作为信号量控制并发数。
// 2. 保证结果切片的顺序与输入切片一致(通过索引写入)。
// 3. 遇到第一个错误时记录,但会等待所有任务执行完毕(或 Context 取消)。
func Execute[T any, R any](ctx context.Context, maxWorkers int, inputs []T, fn TaskFunc[T, R]) ([]R, error) {
	if len(inputs) == 0 {
		return []R{}, nil
	}
	if maxWorkers <= 0 {
		maxWorkers = 1
	}

	// 信号量,控制最大并发数
	semaphore := make(chan struct{}, maxWorkers)
	var wg sync.WaitGroup

	// 预分配结果切片,保证顺序
	results := make([]R, len(inputs))

	var firstErr error
	var mu sync.Mutex // 用于保护 firstErr 的并发写

	// 1. 启动协程
	for i, input := range inputs {
		wg.Add(1)
		go func(idx int, in T) {
			defer wg.Done()

			// --- 获取许可 ---
			select {
			case semaphore <- struct{}{}:
			// 获取成功,继续
			case <-ctx.Done():
				// 上下文已取消,直接返回
				mu.Lock()
				if firstErr == nil {
					firstErr = ctx.Err()
				}
				mu.Unlock()
				return
			}

			// --- 释放许可 (重要:defer) ---
			defer func() { <-semaphore }()

			// --- 执行业务逻辑 ---
			res, err := fn(in)
			if err != nil {
				mu.Lock()
				if firstErr == nil {
					firstErr = err // 只记录第一个错误
				}
				mu.Unlock()
				return
			}

			// --- 保存结果 ---
			// 利用闭包中的 idx,直接写入对应位置,线程安全且保序
			results[idx] = res

		}(i, input)
	}

	// 2. 等待所有任务完成
	// 使用一个单独的 goroutine 来关闭 done channel,避免死锁
	done := make(chan struct{})
	go func() {
		wg.Wait()
		close(done)
	}()

	// 3. 监听完成信号或上下文取消
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-done:
		// 所有任务跑完了
		if firstErr != nil {
			return nil, firstErr
		}
		return results, nil
	}
}

无参无返

go
package main

import (
	"context"
	"demo/utils/concurrency_util"
	"fmt"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 1. 准备数据
	// 构造 3 个占位任务
	tasks := make([]struct{}, 3)

	fmt.Println("🚀 开始场景四:无参无返 (心跳/等待)")

	// 2. 执行并发
	// 输入输出都是 struct{}
	_, err := concurrency_util.Execute(ctx, 3, tasks, func(_ struct{}) (struct{}, error) {
		fmt.Println("   💓 发送心跳...")
		time.Sleep(1 * time.Second)
		return struct{}{}, nil
	})

	// 3. 处理结果
	if err != nil {
		fmt.Printf("❌ 心跳异常: %v\n", err)
	} else {
		fmt.Println("✅ 心跳检测完成")
	}
}

结果示例

text
🚀 开始场景四:无参无返 (心跳/等待)
   💓 发送心跳...
   💓 发送心跳...
   💓 发送心跳...
✅ 心跳检测完成

有参无返

go
package main

import (
	"context"
	"demo/utils/concurrency_util"
	"fmt"
	"time"
)

// 模拟业务数据结构
type User struct {
	ID   int
	Name string
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// 1. 准备数据 (有参数)
	users := []User{
		{ID: 1, Name: "Alice"},
		{ID: 2, Name: "Bob"},
		{ID: 3, Name: "Charlie"},
		{ID: 4, Name: "David"},
		{ID: 5, Name: "Eve"},
	}

	fmt.Println("🚀 开始场景一:有参无返 (批量保存用户)")

	// 2. 执行并发
	// 注意:返回类型我们传 struct{},表示不关心返回值
	_, err := concurrency_util.Execute(ctx, 2, users, func(u User) (struct{}, error) {
		// --- 模拟耗时操作 ---
		fmt.Printf("   📝 正在保存用户: %s (ID: %d)\n", u.Name, u.ID)
		time.Sleep(1 * time.Second)

		// 模拟偶尔的错误 (例如 ID=3 失败)
		if u.ID == 3 {
			return struct{}{}, fmt.Errorf("保存失败: 用户 %s 已存在", u.Name)
		}
		fmt.Printf("   📝 用户: %s (ID: %d)  保存success \n", u.Name, u.ID)
		return struct{}{}, nil
	})

	// 3. 处理结果
	if err != nil {
		fmt.Printf("❌ 任务执行遇到错误: %v\n", err)
	} else {
		fmt.Println("✅ 所有用户保存成功")
	}
}

结果:

text
🚀 开始场景一:有参无返 (批量保存用户)
   📝 正在保存用户: Alice (ID: 1)
   📝 正在保存用户: David (ID: 4)
   📝 用户: David (ID: 4)  保存success 
   📝 正在保存用户: Charlie (ID: 3)
   📝 用户: Alice (ID: 1)  保存success 
   📝 正在保存用户: Bob (ID: 2)
   📝 正在保存用户: Eve (ID: 5)
   📝 用户: Bob (ID: 2)  保存success 
   📝 用户: Eve (ID: 5)  保存success 
❌ 任务执行遇到错误: 保存失败: 用户 Charlie 已存在

无参有返

go
package main

import (
	"context"
	"demo/utils/concurrency_util"
	"fmt"
	"math/rand"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 1. 准备数据
	// 无参数场景:我们需要构造一个“占位”切片。
	// 这里我们要生成 5 个随机数,所以构造长度为 5 的空结构体切片。
	tasks := make([]struct{}, 5)

	fmt.Println("🚀 开始场景三:无参有返 (生成随机数)")

	// 2. 执行并发
	// 输入是 struct{} (空),输出是 int
	results, err := concurrency_util.Execute(ctx, 2, tasks, func(_ struct{}) (int, error) {
		time.Sleep(500 * time.Millisecond)
		num := rand.Intn(100)
		return num, nil
	})

	// 3. 处理结果
	if err != nil {
		fmt.Printf("❌ 生成出错: %v\n", err)
	} else {
		fmt.Println("✅ 生成的随机数:", results)
	}
}

结果示例

text
🚀 开始场景三:无参有返 (生成随机数)
✅ 生成的随机数: [42 15 88 3 99]

有参有返

go
package main

import (
	"context"
	"demo/utils/concurrency_util"
	"fmt"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	// 1. 准备数据:5 个数字
	nums := []int{1, 2, 3, 4, 5}

	fmt.Println("🚀 开始测试:有参有返")
	fmt.Println("   规则:计算平方,但如果数字是 3 则报错")

	// 2. 执行并发
	results, err := concurrency_util.Execute(ctx, 5, nums, func(n int) (int, error) {
		time.Sleep(500 * time.Millisecond) // 模拟耗时

		// 模拟 ID=3 时报错
		if n == 3 {
			return 0, fmt.Errorf("数字 3 禁止计算")
		}

		return n * n, nil
	})

	// 3. 结果分析
	fmt.Println("\n--- 执行结果 ---")
	if err != nil {
		fmt.Printf("⚠️  任务组遇到错误: %v\n", err)
	}

	fmt.Println("📊 结果切片详情:")
	for i, res := range results {
		inputVal := nums[i]
		if res == 0 {
			// 结果为 0,说明要么计算结果是 0,要么出错了
			// 在这个例子里,0 代表出错(因为 0 的平方也是 0,实际业务可以用 -1 或指针区分)
			fmt.Printf("   [%d] 输入: %d -> 结果: %d (发生错误/零值)\n", i, inputVal, res)
		} else {
			fmt.Printf("   [%d] 输入: %d -> 结果: %d (成功)\n", i, inputVal, res)
		}
	}
}

结果

text
🚀 开始测试:有参有返
   规则:计算平方,但如果数字是 3 则报错

--- 执行结果 ---
⚠️  任务组遇到错误: 数字 3 禁止计算
📊 结果切片详情:
   [0] 输入: 1 -> 结果: 1 (成功)
   [1] 输入: 2 -> 结果: 4 (成功)
   [2] 输入: 3 -> 结果: 0 (发生错误/零值)
   [3] 输入: 4 -> 结果: 16 (成功)
   [4] 输入: 5 -> 结果: 25 (成功)

如有转载或 CV 的请标注本站原文地址