Skip to content
鼓励作者:欢迎打赏犒劳

go处理kafka

go
package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"strconv"
	"time"

	"github.com/segmentio/kafka-go"
)

const (
	brokerAddress = "localhost:9092" // 您的Kafka Broker地址
	topic         = "test-topic"     // 主题名称
	partition     = 0                // 分区号
)

func createTopic() {
	// 创建一个连接到 Kafka 的管理客户端
	conn, err := kafka.Dial("tcp", brokerAddress)
	if err != nil {
		log.Fatal("无法连接到kafka:", err)
	}
	defer conn.Close()

	// 获取当前控制器信息
	controller, err := conn.Controller()
	if err != nil {
		log.Fatal(err)
	}
	// 连接到控制器
	controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		log.Fatal(err)
	}
	defer controllerConn.Close()

	// 创建主题
	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             topic,
			NumPartitions:     1,
			ReplicationFactor: 1,
		},
	}

	err = controllerConn.CreateTopics(topicConfigs...)
	if err != nil {
		// 如果主题已存在,CreateTopics 会返回错误,我们可以忽略它
		fmt.Printf("创建主题时可能已存在: %v\n", err)
	} else {
		fmt.Println("成功创建主题:", topic)
	}
}

// 生产者函数
func produceMessages() {
	// 创建一个写入器 (writer)
	w := &kafka.Writer{
		Addr:     kafka.TCP(brokerAddress),
		Topic:    topic,
		Balancer: &kafka.LeastBytes{},
	}

	// 确保在函数退出时关闭写入器
	defer w.Close()

	for i := 0; i < 5; i++ {
		message := fmt.Sprintf("这是第 %d 条消息", i+1)
		err := w.WriteMessages(context.Background(), kafka.Message{
			Key:   []byte(fmt.Sprintf("key-%d", i)),
			Value: []byte(message),
			Time:  time.Now(),
		})
		if err != nil {
			log.Fatal("发送消息失败:", err)
		}
		fmt.Printf("生产者: 发送消息 - %s\n", message)
		time.Sleep(1 * time.Second) // 模拟间隔
	}
}

// 消费者函数
func consumeMessages() {
	// 创建一个读取器 (reader)
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{brokerAddress},
		Topic:     topic,
		Partition: partition,
		// GroupID:   "consumer-group-1", // 使用消费者组时取消注释此行
		MinBytes: 10,   // 最小字节数
		MaxBytes: 10e6, // 最大字节数
	})

	// 确保在函数退出时关闭读取器
	defer r.Close()

	// 设置上下文用于控制超时或取消
	ctx := context.Background()

	fmt.Println("消费者: 开始监听消息...")

	for {
		// 从 Kafka 读取消息
		msg, err := r.ReadMessage(ctx)
		if err != nil {
			// 处理常见的结束或中断情况
			if err == context.Canceled || err == context.DeadlineExceeded {
				fmt.Println("消费者: 接收到取消信号,停止消费。")
				break
			}
			log.Printf("消费者: 读取消息时出错: %v\n", err)
			continue
		}

		// 成功读取到消息
		fmt.Printf("消费者: 接收消息 - 时间: %s, 键: %s, 值: %s, 分区: %d, 偏移量: %d\n",
			msg.Time.Format("2006-01-02 15:04:05"),
			string(msg.Key),
			string(msg.Value),
			msg.Partition,
			msg.Offset)
	}
}

func main() {
	fmt.Println("Kafka Go 示例程序开始运行...")

	// 可选:自动创建主题
	createTopic()

	// 在 goroutine 中启动消费者
	go func() {
		consumeMessages()
	}()

	// 给消费者一点时间启动
	time.Sleep(2 * time.Second)

	// 启动生产者发送消息
	produceMessages()

	// 让消费者有足够的时间接收所有消息
	// 实际应用中,消费者通常会持续运行
	time.Sleep(10 * time.Second)

	fmt.Println("程序结束。")
}

结果:

text
Kafka Go 示例程序开始运行...
成功创建主题: test-topic
消费者: 开始监听消息...
生产者: 发送消息 - 这是第 1 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:04, 键: key-0, 值: 这是第 1 条消息, 分区: 0, 偏移量: 0
生产者: 发送消息 - 这是第 2 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:06, 键: key-1, 值: 这是第 2 条消息, 分区: 0, 偏移量: 1
生产者: 发送消息 - 这是第 3 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:08, 键: key-2, 值: 这是第 3 条消息, 分区: 0, 偏移量: 2
生产者: 发送消息 - 这是第 4 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:10, 键: key-3, 值: 这是第 4 条消息, 分区: 0, 偏移量: 3
生产者: 发送消息 - 这是第 5 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:12, 键: key-4, 值: 这是第 5 条消息, 分区: 0, 偏移量: 4
程序结束。

如有转载或 CV 的请标注本站原文地址