Skip to content

Latest commit

 

History

History
336 lines (247 loc) · 7.16 KB

File metadata and controls

336 lines (247 loc) · 7.16 KB

3.3 Kafka 消息队列

📍 导航返回目录 | 上一节:MySQL | 下一节:Flink


核心概念

基本架构

Producer → Topic (docs/partition 0, 1, 2...) → Consumer Group
                ↓
            Broker Cluster
                ↓
            ZooKeeper / KRaft

核心组件

  • Topic:消息分类
  • Partition:分区,提供并行处理能力
  • Producer:生产者
  • Consumer:消费者
  • Consumer Group:消费者组
  • Broker:服务器节点
  • Offset:消息在分区中的位置

生产者

发送模式

import "github.com/IBM/sarama"

// 1. 同步发送
partition, offset, err := producer.SendMessage(&sarama.ProducerMessage{
    Topic: "my-topic",
    Value: sarama.StringEncoder("hello"),
})

// 2. 异步发送
producer.Input() <- &sarama.ProducerMessage{
    Topic: "my-topic",
    Value: sarama.StringEncoder("hello"),
}

// 监听响应
select {
case success := <-producer.Successes():
    fmt.Printf("Offset: %d\n", success.Offset)
case err := <-producer.Errors():
    fmt.Printf("Error: %v\n", err)
}

分区策略

// 1. 轮询(默认)
// partition = hash(key) % partition_count

// 2. 指定分区
msg := &sarama.ProducerMessage{
    Topic:     "my-topic",
    Partition: 2,
    Value:     sarama.StringEncoder("hello"),
}

// 3. 自定义分区器
type CustomPartitioner struct{}

func (p *CustomPartitioner) Partition(message *sarama.ProducerMessage, numPartitions int32) (int32, error) {
    // 自定义逻辑
    return 0, nil
}

性能优化

config := sarama.NewConfig()

// 批量发送
config.Producer.Flush.Messages = 100          // 100条消息批量发送
config.Producer.Flush.Frequency = time.Millisecond * 100  // 或100ms触发

// 压缩
config.Producer.Compression = sarama.CompressionSnappy  // snappy, gzip, lz4

// ACK 确认
config.Producer.RequiredAcks = sarama.WaitForAll  // 0, 1, -1(all)

消费者

消费模式

// 创建消费者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
defer consumer.Close()

// 消费指定分区
partitionConsumer, err := consumer.ConsumePartition("my-topic", 0, sarama.OffsetNewest)
defer partitionConsumer.Close()

// 读取消息
for message := range partitionConsumer.Messages() {
    fmt.Printf("Offset: %d, Key: %s, Value: %s\n", 
        message.Offset, message.Key, message.Value)
}

消费者组

import "github.com/IBM/sarama"

// 实现 ConsumerGroupHandler 接口
type Consumer struct{}

func (Consumer) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message: %s\n", msg.Value)
        sess.MarkMessage(msg, "") // 标记消息已消费
    }
    return nil
}

// 使用消费者组
config := sarama.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin

group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
defer group.Close()

ctx := context.Background()
err = group.Consume(ctx, []string{"my-topic"}, &Consumer{})

Offset 管理

// 1. 自动提交(默认)
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = time.Second

// 2. 手动提交
config.Consumer.Offsets.AutoCommit.Enable = false

// 同步提交
sess.Commit()

// 异步提交
sess.MarkMessage(msg, "")

高可用与可靠性

副本机制

配置

# server.properties
# 副本因子(建议3)
default.replication.factor=3

# 最小同步副本数
min.insync.replicas=2

角色

  • Leader:处理读写请求
  • Follower:同步 Leader 数据
  • ISR(In-Sync Replicas):同步副本集合

消息可靠性

Producer 端

config.Producer.RequiredAcks = sarama.WaitForAll  // 等待所有副本确认
config.Producer.Retry.Max = 3                     // 重试次数
config.Producer.Return.Successes = true           // 返回成功确认

Broker 端

# 禁用自动创建Topic
auto.create.topics.enable=false

# 日志保留
log.retention.hours=168  # 7天
log.segment.bytes=1073741824  # 1GB

Consumer 端

// 手动提交 offset
config.Consumer.Offsets.AutoCommit.Enable = false

常见问题

消息丢失

原因

  • Producer 未等待 ACK
  • Broker 副本未同步完成就宕机
  • Consumer 自动提交 offset 但处理失败

解决方案

// Producer
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 3

// Broker
min.insync.replicas=2

// Consumer
config.Consumer.Offsets.AutoCommit.Enable = false
// 处理成功后再手动提交
sess.MarkMessage(msg, "")

消息重复

原因

  • 网络抖动导致重试
  • Consumer 重启后重复消费

解决方案

  • 幂等性设计(业务层保证)
  • 消息去重(Redis 记录已处理 offset)

消息顺序

单分区内有序

// 相同 key 的消息发送到同一分区
msg := &sarama.ProducerMessage{
    Topic: "my-topic",
    Key:   sarama.StringEncoder(orderID),  // 使用订单ID作为key
    Value: sarama.StringEncoder(data),
}

全局有序

  • Topic 只设置 1 个分区(牺牲性能)

消息延迟

排查

# 查看消费者 lag
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group my-group --describe

优化

  • 增加分区数
  • 增加消费者实例
  • 优化消费逻辑(异步处理)

性能监控

关键指标

# 1. Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

# 2. 消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe

# 3. Broker 性能
# 通过 JMX 监控:
# - MessagesInPerSec: 消息生产速率
# - BytesInPerSec: 字节生产速率
# - UnderReplicatedPartitions: 未充分复制的分区数

Prometheus 监控

# prometheus.yml
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['localhost:9308']  # kafka_exporter

本章小结

Kafka 是高吞吐、高可用的分布式消息队列,广泛应用于日志收集、流处理、事件驱动架构。

关键要点

  • ✅ Partition 提供并行处理能力
  • ✅ Consumer Group 实现负载均衡和故障转移
  • ✅ ISR + ACK机制保证消息可靠性
  • ✅ 业务层需保证幂等性处理重复消息

扩展阅读

  • 《Kafka 权威指南》
  • 《深入理解 Kafka:核心设计与实践原理》
  • Kafka 官方文档

💡 思考题

  1. Kafka 如何保证消息不丢失?
  2. 为什么单分区内消息有序,多分区无序?
  3. 如何处理消息重复消费问题?

⏮️ 上一节:MySQL | ⏭️ 下一节:Flink