Go Kafka Consumer Group Usage
目录
本文基于 Go 1.18
、Kafka 2.4.1
, 利用 Kafka Consumer Group
进行消息消费,提供了最佳范例及部分配置建议,减少因 Rebalance
引起的 timeout
问题发生。
版权声明:本文为博主 xwi88 的原创文章,遵循 CC BY-NC 4.0 版权协议,禁止商用,转载请注明出处,欢迎关注 https://github.com/xwi88
Consumer Group
consumer group
是 kafka
提供的可扩展且具有容错性的消费者机制。具有一些特性:
consumer group
由一个或多个consumer
组成,它们共同消费一个或多个topic
中的消息consumer group
由group ID
唯一标识,组内消费者共享group ID
- 一个
topic
的每个分区只能被同一个消费者组中的一个consumer
消费,但可以被不同组中的消费者进行消费
技巧
- 分区数应大于消费者个数,消费能力不足时可以考虑增加消费者个数
- 当单纯提升消费者个数不能显著增加消费能力时,考虑优化你的消费处理逻辑
- 消费者个数必须进行限制,避免
rebalance
耗时过长
Rebalance
分区的重平衡 (Rebalance
) 是 Kafka
一个很重要特性,它可以保证系统的高可用和系统的水平扩展。以下几种情况会触发 Kafka
发生重平衡。
- 消费者组成员变化:
- 新消费者加入
- 老消费者离开: 主动离开,主动
rebalance
- 老消费者故障离开或崩溃: 超过一定时间未发送心跳,可被视为宕机,需要在一个
session.timeout
周期才能检测到,被动rebalance
- 订阅的
topic
数量变化: 如通过正则方式进行订阅,所匹配topic
的数量变化 - 订阅的
topic
分区数变化
注意
- 优点: 可以保证高可用性和扩展性
- 缺点: 重平衡期间,整个消费者组不可用;重平衡会导致消费者需要更新状态,原消费者状态过期,降低消费者能力
GO Consumer Group
ConsumerGroup 接口及说明
|
|
ConsumerGroup Demo
|
|
部分配置参数说明
Net.ReadTimeout
: How long to wait for a response.Group.Rebalance.Timeout
: The maximum allowed time for each worker to join the group once a rebalance has begun.This is basically a limit on the amount of time needed for all tasks to flush any pending data and commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures (default 60s).Group.Session.Timeout
: The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration bygroup.min.session.timeout.ms
andgroup.max.session.timeout.ms
(default 10s)Group.Heartbeat.Interval
: The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no higher than 1/3 of that value.Group.Rebalance.Strategy
: Strategy for allocating topic partitions to members (default BalanceStrategyRange), [“range”, “roundrobin”, “sticky”]MaxWaitTime
:The maximum amount of time the broker will wait for Consumer.Fetch.Min bytes to become available before it returns fewer than that anyways.Retry.Backoff
: How long to wait after a failing to read from a partition before trying again.
参数配置建议
Net.ReadTimeout
: 默认值30s
,如果消费者常出现read tcp xxx i/o timeout
可考虑增大此值> session.timeout
> rebalance.timeout
Group.Rebalance.Timeout
: 默认值60s
,如果消费者较多可考虑增大此值;对于TPS
较低的主题可以适当降低此值Group.Session.Timeout
: 默认值10s
,消费者未上报心跳超过此时间则认为宕机,触发rebalance
shall >= 1/3 * Group.Heartbeat.Interval
Group.Heartbeat.Interval
: 默认值3s
消费者心跳上报间隔Group.Rebalance.Strategy
: rebalance 策略range
: default,针对每一个 topic,n=分区数/消费者数量, m=分区数%消费者数量,前 m 个消费者每个分配 n+1 个分区,后面的 (消费者数量-m)个消费者每个分配 n 个分区roundrobin
: 将消费组内所有消费者以及消费者所订阅的所有topic
的partition
按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者sticky
: 粘性策略,尽可能保证消费者消费数据均匀- 分区的分配要尽可能的均匀,分配给消费者者的主题分区数最多相差一个
- 分区的分配尽可能的与上次分配的保持相同
MaxWaitTime
: 默认值250ms
,最多等待此时间或数据达到fetch.min.bytes
后发送数据给消费者Retry.Backoff
: 默认值2s