
go处理kafka
go
package main
import (
"context"
"fmt"
"log"
"net"
"strconv"
"time"
"github.com/segmentio/kafka-go"
)
const (
brokerAddress = "localhost:9092" // 您的Kafka Broker地址
topic = "test-topic" // 主题名称
partition = 0 // 分区号
)
func createTopic() {
// 创建一个连接到 Kafka 的管理客户端
conn, err := kafka.Dial("tcp", brokerAddress)
if err != nil {
log.Fatal("无法连接到kafka:", err)
}
defer conn.Close()
// 获取当前控制器信息
controller, err := conn.Controller()
if err != nil {
log.Fatal(err)
}
// 连接到控制器
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
log.Fatal(err)
}
defer controllerConn.Close()
// 创建主题
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
// 如果主题已存在,CreateTopics 会返回错误,我们可以忽略它
fmt.Printf("创建主题时可能已存在: %v\n", err)
} else {
fmt.Println("成功创建主题:", topic)
}
}
// 生产者函数
func produceMessages() {
// 创建一个写入器 (writer)
w := &kafka.Writer{
Addr: kafka.TCP(brokerAddress),
Topic: topic,
Balancer: &kafka.LeastBytes{},
}
// 确保在函数退出时关闭写入器
defer w.Close()
for i := 0; i < 5; i++ {
message := fmt.Sprintf("这是第 %d 条消息", i+1)
err := w.WriteMessages(context.Background(), kafka.Message{
Key: []byte(fmt.Sprintf("key-%d", i)),
Value: []byte(message),
Time: time.Now(),
})
if err != nil {
log.Fatal("发送消息失败:", err)
}
fmt.Printf("生产者: 发送消息 - %s\n", message)
time.Sleep(1 * time.Second) // 模拟间隔
}
}
// 消费者函数
func consumeMessages() {
// 创建一个读取器 (reader)
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerAddress},
Topic: topic,
Partition: partition,
// GroupID: "consumer-group-1", // 使用消费者组时取消注释此行
MinBytes: 10, // 最小字节数
MaxBytes: 10e6, // 最大字节数
})
// 确保在函数退出时关闭读取器
defer r.Close()
// 设置上下文用于控制超时或取消
ctx := context.Background()
fmt.Println("消费者: 开始监听消息...")
for {
// 从 Kafka 读取消息
msg, err := r.ReadMessage(ctx)
if err != nil {
// 处理常见的结束或中断情况
if err == context.Canceled || err == context.DeadlineExceeded {
fmt.Println("消费者: 接收到取消信号,停止消费。")
break
}
log.Printf("消费者: 读取消息时出错: %v\n", err)
continue
}
// 成功读取到消息
fmt.Printf("消费者: 接收消息 - 时间: %s, 键: %s, 值: %s, 分区: %d, 偏移量: %d\n",
msg.Time.Format("2006-01-02 15:04:05"),
string(msg.Key),
string(msg.Value),
msg.Partition,
msg.Offset)
}
}
func main() {
fmt.Println("Kafka Go 示例程序开始运行...")
// 可选:自动创建主题
createTopic()
// 在 goroutine 中启动消费者
go func() {
consumeMessages()
}()
// 给消费者一点时间启动
time.Sleep(2 * time.Second)
// 启动生产者发送消息
produceMessages()
// 让消费者有足够的时间接收所有消息
// 实际应用中,消费者通常会持续运行
time.Sleep(10 * time.Second)
fmt.Println("程序结束。")
}结果:
text
Kafka Go 示例程序开始运行...
成功创建主题: test-topic
消费者: 开始监听消息...
生产者: 发送消息 - 这是第 1 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:04, 键: key-0, 值: 这是第 1 条消息, 分区: 0, 偏移量: 0
生产者: 发送消息 - 这是第 2 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:06, 键: key-1, 值: 这是第 2 条消息, 分区: 0, 偏移量: 1
生产者: 发送消息 - 这是第 3 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:08, 键: key-2, 值: 这是第 3 条消息, 分区: 0, 偏移量: 2
生产者: 发送消息 - 这是第 4 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:10, 键: key-3, 值: 这是第 4 条消息, 分区: 0, 偏移量: 3
生产者: 发送消息 - 这是第 5 条消息
消费者: 接收消息 - 时间: 2025-11-01 05:51:12, 键: key-4, 值: 这是第 5 条消息, 分区: 0, 偏移量: 4
程序结束。
