kafka客户端自动识别新增分区
# 一、背景
广告曝光日志会通过 filebeat 写入 Kafka,下游服务消费 Kafka 获取日志信息并进行后续处理。由于最近巴黎奥运会热点事件频发,经常短时间内流量急剧暴涨,Kafka集群中心部分机器达到性能瓶颈,导致这些机器上的分区出现了消费堆积(写入也有问题),影响下游服务,Kafka 同学通过扩展对应 topic 的分区数量解决该问题。
Kafka 客户端消费服务是基于 go 语言编写的,使用的是 sarama-cluster 库,当分区数量增加时,没有自动识别出来,此时如果没有手动重启客户端触发 rebalance,则无法消费到新的分区导致消息丢失,影响后续业务。
# 二、go 常用的 Kafka 库
- go 语言常用的 Kafka 第三方库有 sarama、sarama-cluster、Kafka-go、confluent-kafka-go 四种库;
- Sarama 是一个纯粹的 Kafka 客户端库,它提供了与 Kafka 生产者和消费者进行交互的基本功能,支持多版本 Kafka、灵活的配置选项、异步处理等功能;
- Sarama-cluster 是基于低版本的 Sarama 构建的扩展库,专门用于简化基于 Kafka 集群的消费者组管理,2020 年后就已经停止更新了,其所支持的功能在 sarama 中已经支持了,详见issueIBM/sarama#1099 (opens new window);
- kafka-go 是一个用于与 Kafka 进行交互的 Go 语言客户端库,支持 Kafka 的一些低级和高级功能,主要包括:自动管理分区和消费位移、消息压缩、分区副本管理、消息的序列化和反序列化、错误处理和重试机制等;
- confluent-kafka-go 是 Confluent 提供的一个 Go 语言客户端库,用于与 Apache Kafka 进行交互。这个库是基于 librdkafka,librdkafka 是一个流行的 C 语言库,用于 Kafka 的客户端实现。confluent-kafka-go 提供了高效且功能全面的 Kafka 客户端接口,适用于 Go 应用程序;
# 三、sarama
# 1,官方地址
「sarama官方地址」 (opens new window),「kafka协议地址」 (opens new window)
# 2,简单介绍
Package sarama 是一个纯 Go 客户端库,用于处理 0.8 及更高版本 Apache Kafka。它包括一个高级API,用于轻松地生产和消费消息,以及一个低级API,用于在高级API不足时控制线路上的字节。
若要生成消息,可以使用 AsyncProducer
或 SyncProducer
。AsyncProducer
接受管道上的消息 并尽可能高效地在后台异步生成它们,在大多数情况下,它是首选。SyncProducer
提供了一种方法,该方法将阻塞,直到 Kafka 确认生成的消息。但它通常效率较低,并且实际的可靠性依赖于“Producer.RequiredAcks”的配置值,在某些配置中,消息由 SyncProducer 有时仍可能丢失。
要消费消息,可以使用 Consumer 或 Consumer-Group API。对于较低级别的需求,Broker 和 Request/Response 对象允许对每个连接进行精确控制,并通过线路发送的消息,客户端提供更高级别的元数据管理,该管理在生产者和消费者之间共享。
# 3,客户端示例代码
- 此处选用的 sarama 包版本是 v1.38.1,更高版本与我们的服务端版本不兼容,无法正常消费消息;
client.Consume(ctx, topics, &consumer)
函数是一个阻塞调用,当(rebalance)分区数量变化或是消费者数量变化时结束阻塞,所以该函数调用一定要在 for 循环中,保证可以再次开始消费数据;
package main
// SIGUSR1 toggle the pause/resume consumption
import (
"context"
"errors"
"log"
"os"
"os/signal"
"sync"
"syscall"
"github.com/Shopify/sarama"
)
// Sarama configuration options
var (
brokers = []string{"yz-cen048.kafka.data.cn:9120", "bx-mq002.kafka.data.cn:9120"}
group = "go_part_auto_discover_test1_sarama"
topics = []string{"go_part_auto_discover_test1"}
SASLUser = "adEngine"
SASLPassword = "bc5007d7935"
)
func main() {
// v1.38.1 包版本,更高版本与服务端版本不兼容
log.Println("Starting a new Sarama consumer")
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
config := sarama.NewConfig()
config.Version = sarama.V0_10_2_1 // 配置版本
config.Consumer.Offsets.Initial = sarama.OffsetNewest // 配置消费起始位移
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.SASL.User = SASLUser
config.Net.SASL.Password = SASLPassword
// config.Consumer.Return.Errors = true
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(brokers, group, config)
if err != nil {
log.Printf("Error creating consumer group client: %v", err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
if err := client.Consume(ctx, topics, &consumer); err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
log.Panicf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()
<-consumer.ready // Await till the consumer has been set up
log.Println("Sarama consumer up and running!...")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
keepRunning := true
for keepRunning {
select {
case <-ctx.Done():
log.Println("terminating: context cancelled")
keepRunning = false
case <-sigterm:
log.Println("terminating: via signal")
keepRunning = false
}
}
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Panicf("Error closing client: %v", err)
}
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message, ok := <-claim.Messages():
if !ok {
log.Printf("message channel was closed")
return nil
}
log.Printf("topic: %s, group: %s, partition: %d, msg: %s", message.Topic, group, message.Partition, string(message.Value))
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# 四、sarama-cluster
# 1,官方地址
「sarama-cluster官方地址」 (opens new window)
# 2,简单介绍
- Sarama-cluster 是基于低版本的 Sarama 构建的扩展库,专门用于简化基于 Kafka 集群的消费者组管理,2020 年后就已经停止更新了,其所支持的功能在 sarama 中已经支持了,详见issueIBM/sarama#1099 (opens new window);
# 3,客户端示例代码
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"go.uber.org/zap"
)
var (
brokers = []string{"yz-cen048.kafka.data.sina.com.cn:9120", "bx-mq002.kafka.data.sina.com.cn:9120", "yz-cen063.kafka.data.sina.com.cn:9120", "bx-mq001.kafka.data.sina.com.cn:9120", "yz-cen049.kafka.data.sina.com.cn:9120", "yz-cen070.kafka.data.sina.com.cn:9120"}
group = "go_part_auto_discover_test1_sarama"
topics = []string{"go_part_auto_discover_test1"}
SASLUser = "adEngine"
SASLPassword = "bc5007d7935305b583fab9c93949dc82"
)
func main() {
sarama.Logger = log.New(os.Stdout, "[sarama-cluster] ", log.LstdFlags)
InitConsumer()
// 响应kill -15和ctrl-c信号
q := make(chan os.Signal, 1)
signal.Notify(q, syscall.SIGINT, syscall.SIGTERM)
<-q
}
func newConsumer() (*cluster.Consumer, error) {
config := cluster.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.User = SASLUser
config.Net.SASL.Password = SASLPassword
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
config.Consumer.MaxWaitTime = 800 * time.Millisecond
config.Consumer.Offsets.CommitInterval = time.Second
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Metadata.RefreshFrequency = time.Minute
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Version = sarama.V0_10_2_1
return cluster.NewConsumer(brokers, group, topics, config)
}
// InitConsumer kafka消费者, 不断地从kafka中消费数据, 写入管道中供工作者消费
func InitConsumer() {
consumer, err := newConsumer()
if err != nil {
panic(err)
}
sarama.Logger.Printf("⇨ create kafka consumer finish and start consume, topic : %s, group : %s\n", topics[0], group)
go consume(topics[0], group, consumer)
}
func consume(topic, group string, consumer *cluster.Consumer) {
go func(topic string, consumer *cluster.Consumer) {
for {
msg, ok := <-consumer.Messages()
if ok {
consumer.MarkOffset(msg, "")
sarama.Logger.Printf("input message, topic: %s, group: %s, partition: %d, offset: %d, msg: %s\n", msg.Topic, group, msg.Partition, msg.Offset, string(msg.Value))
}
}
}(topic, consumer)
// 和上面分成2个协程是有好处的:如果不分开,MessageChan写的时候堵塞住了,下面所有信息都打印不出来
go func(topic string, consumer *cluster.Consumer) {
// 协程的心跳
heartbeat := time.Tick(5 * time.Second)
for {
select {
case <-heartbeat:
// 如果读取kafka堵塞了, 则info日志里全是心跳日志
sarama.Logger.Println("consumer alive", zap.String("topic", topic))
case err, ok := <-consumer.Errors():
if ok {
sarama.Logger.Println(err.Error(), zap.String("topic", topic))
}
case ntf, ok := <-consumer.Notifications():
if ok {
sarama.Logger.Println(fmt.Sprintf("%+v", ntf), zap.String("topic", topic))
}
}
}
}(topic, consumer)
// 响应kill -15和ctrl-c信号
q := make(chan os.Signal, 1)
signal.Notify(q, syscall.SIGINT, syscall.SIGTERM)
<-q
consumer.Close()
sarama.Logger.Println("consumer quit finish", zap.String("topic", topic))
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# 五、kafka-go
# 1,官方地址
「kafka-go官方地址」 (opens new window)
# 2,简单介绍
- kafka-go 是一个用于与 Kafka 进行交互的 Go 语言客户端库,支持 Kafka 的一些低级和高级功能,主要包括:自动管理分区和消费位移、消息压缩、分区副本管理、消息的序列化和反序列化、错误处理和重试机制等;
# 3,客户端示例代码
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
var (
brokers = []string{"yz-cen048.kafka.data.sina.com.cn:9120", "bx-mq002.kafka.data.sina.com.cn:9120", "yz-cen063.kafka.data.sina.com.cn:9120", "bx-mq001.kafka.data.sina.com.cn:9120", "yz-cen049.kafka.data.sina.com.cn:9120", "yz-cen070.kafka.data.sina.com.cn:9120"}
group = "go_part_auto_discover_test1_group"
topic = "go_part_auto_discover_test1"
SASLUser = "adEngine"
SASLPassword = "bc5007d7935305b583fab9c93949dc82"
)
func main() {
InitConsumer()
q := make(chan os.Signal, 1)
signal.Notify(q, syscall.SIGINT, syscall.SIGTERM)
<-q
}
func newConsumer() *kafka.Reader {
s := plain.Mechanism{
Username: SASLUser,
Password: SASLPassword,
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
Dialer: &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: s,
},
QueueCapacity: 2000,
CommitInterval: time.Second,
StartOffset: kafka.LastOffset,
MaxBytes: 10e6, // 10MB
GroupID: group,
})
return r
}
func InitConsumer() {
consumer := newConsumer()
fmt.Printf("⇨ create kafka consumer finish and start consume, topic : %s, group : %s\n", topic, group)
go consume(group, consumer)
}
func consume(group string, r *kafka.Reader) {
defer r.Close()
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Printf("consume error: %v\v", err)
continue
}
fmt.Printf("input message, topic: %s, group: %s, partition: %d, offset: %d, msg: %s\n", msg.Topic, group, msg.Partition, msg.Offset, string(msg.Value))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# 六、confluent-kafka-go
# 1,官方地址
# 2,简单介绍
- confluent-kafka-go 是 Confluent 提供的一个 Go 语言客户端库,用于与 Apache Kafka 进行交互。这个库是基于 librdkafka,librdkafka 是一个流行的 C 语言库,用于 Kafka 的客户端实现。confluent-kafka-go 提供了高效且功能全面的 Kafka 客户端接口,适用于 Go 应用程序;
- confluent-kafka-go 库依赖于 C 语言的 librdkafka 库,这使得它能够利用 librdkafka 提供的高性能和可靠性。librdkafka 是 Kafka 的一个成熟和高效的客户端库,支持 Kafka 的所有核心功能:包括生产和消费消息、事务支持、分区管理、消费位移管理等,可以满足大多数生产环境中对 Kafka 的需求;
# 3,客户端示例代码
package main
import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
var (
brokers = []string{"yz-cen048.kafka.data.sina.com.cn:9120", "bx-mq002.kafka.data.sina.com.cn:9120", "yz-cen063.kafka.data.sina.com.cn:9120", "bx-mq001.kafka.data.sina.com.cn:9120", "yz-cen049.kafka.data.sina.com.cn:9120", "yz-cen070.kafka.data.sina.com.cn:9120"}
group = "go_part_auto_discover_test1_confluent"
topic = "go_part_auto_discover_test1"
SASLUser = "adEngine"
SASLPassword = "bc5007d7935305b583fab9c93949dc82"
)
func main() {
// v2.2.0 包版本,高版本会要求 go 1.21 以上
InitConsumer()
// 响应kill -15和ctrl-c信号
q := make(chan os.Signal, 1)
signal.Notify(q, syscall.SIGINT, syscall.SIGTERM)
<-q
}
func newConsumer() *kafka.Consumer {
broker := strings.Join(brokers, ",")
cli, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": group,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "PLAIN",
"sasl.username": SASLUser,
"sasl.password": SASLPassword,
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
if err = cli.Subscribe(topic, nil); err != nil {
panic(err)
}
return cli
}
func InitConsumer() {
consumer := newConsumer()
fmt.Printf("⇨ create kafka consumer finish and start consume, topic : %s, group : %s\n", topic, group)
go consume(group, consumer)
}
func consume(group string, c *kafka.Consumer) {
defer c.Close()
for {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("input message: topic: %s, group: %s, partition: %d, offset: %d, msg: %s\n", *msg.TopicPartition.Topic, group, msg.TopicPartition.Partition, int64(msg.TopicPartition.Offset), string(msg.Value))
continue
}
if !err.(kafka.Error).IsTimeout() {
// The client will automatically try to recover from all errors.
// Timeout is not considered an error because it is raised by
// ReadMessage in absence of messages.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# 七、总结
- kafka 的 go 客户端常用的几个库分别是 sarama、sarama-cluster、kafka-go、confluent-kafka-go。
- 通过对这几个库的测试发现,只有 sarama 和 confluent-kafka-go 两个库可以自动识别新分区,但一定需要注意sarama消费函数的写法,需要放在 for 循环中;
- 另外有一个问题是,即使能够自动识别新分区触发 rebalance,如果客户端的消费起始位移配置的是从最新处开始消费,当客户端 rebalance 完毕开始从新分区消费消息时,是从分区的最新处开始消费,客户端无法消费在新分区开始接收消息到客户端开始从新分区消费消息这段时间内的消息,导致消息丢失。