kafka消费组及重平衡的影响

消费组应该算是kafka中一个比较有特色的设计模式了,而他的重平衡机制也是我们在实际生产使用中,无法避免的一个问题。

消费组

Consumer Groupkafka提供了可扩展、高容错特性的消费者机制。简单介绍下,大致有以下特点:

一个Consumer Group内可以有多个Consumer实例,该实例可以是一个进程,也可以是进程下的多线程每个Consumer Group有一个唯一标识的Group ID不同Consumer Group之间相互独立,互不影响Consumer Group内实例,与订阅的topic分区关系是一对一,或一对多的关系,Consumer Group会通过Coordinator尽量保持公平分配

理想情况下,我们应该设置Consumer实例的数量等于该Group订阅topic的分区总数,可以最大发挥消费性能。若设置的Consumer实例数少于订阅的分区数,则会为每个Consumer实例分配多个分区,消费性能会有所下降。若设置的Consumer实例数大于订阅的分区数,则会为每个Consumer实例分配 1 个分区进行消费,多余的Consumer实例则会闲置,只会浪费资源。

重平衡

重平衡(Rebalance)就是让一个Consumer Group下所有的Consumer实例,合理分配消费订阅topic的所有分区的过程。有 3 种情况会触发Consumer GroupRebalance

Group下实例数发生变化。有新的Consumer实例加入或者离开组。订阅的topic数发生变化。Consumer Group可以使用正则的方式订阅topic,比如 consumer.subscribe(Pattern.compile(“public.*log”)),该Group订阅所有以 public 开头,log 结尾的topic。这期间,新建了一个满足这样条件的topic,那么该Group也会发生Rebalancetopic分区数发生变化。比如topic扩分区的时候,也会触发Rebalance

单看上面任一触发条件,都没啥毛病。问题在于Rebalance过程中会出现以下问题:

Rebalance过程的表现有些类似JVM FGC的情况,期间整个应用都会夯住,所有Consumer实例都会停止消费,等待Rebalance完成。Rebalance过程中,所有Consumer实例都会参与重新分配。即便Consumer Group中部分Consumer实例分配合理,也需要打散重新分配,会导致TCP重新建立连接,是一个比较重的操作,较为浪费资源。Rebalance的耗时取决于Consumer Group下的实例数量,一旦实例数过多,耗时极长,会造成大量消费延迟。

避免重平衡

对于上述Rebalance带来的一些弊端,从目前的社区版来看,暂时还没有很好的解决办法,我们只能尽量避免Rebalance的发生。
在生产业务场景中,很多Rebalance都是预期外或者不必要的。我们应用的TPS大多是被这类Rebalance拖慢的。

从上述的 3 个Rebalance触发条件抓手,后两条topic数量及分区数变化,一般都是主动运维的相关操作,这种操作带来的Rebalance一般是必然发生,难以避免的,我们组要来讨论下Consumer Group组成员变化引发的Rebalance

Consumer Group实例增加的情况比较单一,当新启动一个Consumergroup.id已经存在,Coordinator会接管这个新实例,将其加入group.id相同的组,并重分配分区。这种操作场景,一般都还是预期内的,可能是通过扩容来提高TPS的操作。
Consumer Group实例数减少的情况就比较复杂了。除了正常停止下线某些Consumer实例,还会出现Coordinator误判实例为已停止状态,从而主动踢出Group。导致Rebalance发生。每个Consumer会定期向Coordinator发心跳包,保持keepalive。如果因为某些特殊原因,如网络抖动时,某个Consumer实例没有及时发送心跳请求,Coordinator会将其判定为离线,并从Group中移除,并开启新一轮Rebalance。针对这个问题,可以通过设置Consumer端一下几个参数来进行优化调整:

session.timeout.ms
Consumer Group内实例的心跳超时时间,默认值是 10sheartbeat.interval.ms
即心跳请求频率,频繁发送心跳请求会额外消耗带宽资源,但是能够更及时的触发Rebalance,默认值为 3smax.poll.interval.ms
调用poll方法的时间间隔,默认值为 5min。期间没消费完poll回的消息,Coordinator会开启新一轮Rebalance

根据平时的实践经验,建议:
session.timeout.ms=6s
heartbeat.interval.ms=2s
原则上最好是满足session.timeout.ms >= 3 * heartbeat.interval.ms公式。

max.poll.interval.ms则需要根据下游实际消费能力进行调整,尽量设置的大一点,需要大于下游的最大消息处理时间。

如果进行完上述的各种调整后,还是频发触发Rebalance,最好再去排查下Consumer端的 GC 情况,实际生产环境中我经常碰到因为 GC 设置问题导致的Consumer程序频发 FGC 的问题,从而导致非预期内的Rebalance

文章来源:

Author:hyperxu
link:http://www.hyperxu.com/2020/06/14/kafka-8/