Producer → Topic (docs/partition 0, 1, 2...) → Consumer Group
↓
Broker Cluster
↓
ZooKeeper / KRaft
核心组件:
- Topic:消息分类
- Partition:分区,提供并行处理能力
- Producer:生产者
- Consumer:消费者
- Consumer Group:消费者组
- Broker:服务器节点
- Offset:消息在分区中的位置
import "github.com/IBM/sarama"
// 1. 同步发送
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("hello"),
})
// 2. 异步发送
producer.Input() <- &sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("hello"),
}
// 监听响应
select {
case success := <-producer.Successes():
fmt.Printf("Offset: %d\n", success.Offset)
case err := <-producer.Errors():
fmt.Printf("Error: %v\n", err)
}// 1. 轮询(默认)
// partition = hash(key) % partition_count
// 2. 指定分区
msg := &sarama.ProducerMessage{
Topic: "my-topic",
Partition: 2,
Value: sarama.StringEncoder("hello"),
}
// 3. 自定义分区器
type CustomPartitioner struct{}
func (p *CustomPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
// 自定义逻辑
return 0, nil
}config := sarama.NewConfig()
// 批量发送
config.Producer.Flush.Messages = 100 // 100条消息批量发送
config.Producer.Flush.Frequency = time.Millisecond * 100 // 或100ms触发
// 压缩
config.Producer.Compression = sarama.CompressionSnappy // snappy, gzip, lz4
// ACK 确认
config.Producer.RequiredAcks = sarama.WaitForAll // 0, 1, -1(all)// 创建消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
defer consumer.Close()
// 消费指定分区
partitionConsumer, err := consumer.ConsumePartition("my-topic", 0, sarama.OffsetNewest)
defer partitionConsumer.Close()
// 读取消息
for message := range partitionConsumer.Messages() {
fmt.Printf("Offset: %d, Key: %s, Value: %s\n",
message.Offset, message.Key, message.Value)
}import "github.com/IBM/sarama"
// 实现 ConsumerGroupHandler 接口
type Consumer struct{}
func (Consumer) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message: %s\n", msg.Value)
sess.MarkMessage(msg, "") // 标记消息已消费
}
return nil
}
// 使用消费者组
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
defer group.Close()
ctx := context.Background()
err = group.Consume(ctx, []string{"my-topic"}, &Consumer{})// 1. 自动提交(默认)
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = time.Second
// 2. 手动提交
config.Consumer.Offsets.AutoCommit.Enable = false
// 同步提交
sess.Commit()
// 异步提交
sess.MarkMessage(msg, "")配置:
# server.properties
# 副本因子(建议3)
default.replication.factor=3
# 最小同步副本数
min.insync.replicas=2角色:
- Leader:处理读写请求
- Follower:同步 Leader 数据
- ISR(In-Sync Replicas):同步副本集合
Producer 端:
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
config.Producer.Retry.Max = 3 // 重试次数
config.Producer.Return.Successes = true // 返回成功确认Broker 端:
# 禁用自动创建Topic
auto.create.topics.enable=false
# 日志保留
log.retention.hours=168 # 7天
log.segment.bytes=1073741824 # 1GBConsumer 端:
// 手动提交 offset
config.Consumer.Offsets.AutoCommit.Enable = false原因:
- Producer 未等待 ACK
- Broker 副本未同步完成就宕机
- Consumer 自动提交 offset 但处理失败
解决方案:
// Producer
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 3
// Broker
min.insync.replicas=2
// Consumer
config.Consumer.Offsets.AutoCommit.Enable = false
// 处理成功后再手动提交
sess.MarkMessage(msg, "")原因:
- 网络抖动导致重试
- Consumer 重启后重复消费
解决方案:
- 幂等性设计(业务层保证)
- 消息去重(Redis 记录已处理 offset)
单分区内有序:
// 相同 key 的消息发送到同一分区
msg := &sarama.ProducerMessage{
Topic: "my-topic",
Key: sarama.StringEncoder(orderID), // 使用订单ID作为key
Value: sarama.StringEncoder(data),
}全局有序:
- Topic 只设置 1 个分区(牺牲性能)
排查:
# 查看消费者 lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --describe优化:
- 增加分区数
- 增加消费者实例
- 优化消费逻辑(异步处理)
# 1. Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
# 2. 消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe
# 3. Broker 性能
# 通过 JMX 监控:
# - MessagesInPerSec: 消息生产速率
# - BytesInPerSec: 字节生产速率
# - UnderReplicatedPartitions: 未充分复制的分区数# prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['localhost:9308'] # kafka_exporterKafka 是高吞吐、高可用的分布式消息队列,广泛应用于日志收集、流处理、事件驱动架构。
关键要点:
- ✅ Partition 提供并行处理能力
- ✅ Consumer Group 实现负载均衡和故障转移
- ✅ ISR + ACK机制保证消息可靠性
- ✅ 业务层需保证幂等性处理重复消息
- 《Kafka 权威指南》
- 《深入理解 Kafka:核心设计与实践原理》
- Kafka 官方文档
💡 思考题:
- Kafka 如何保证消息不丢失?
- 为什么单分区内消息有序,多分区无序?
- 如何处理消息重复消费问题?