目录

kafka 消费者配置优化

../../images/weixin_public.png

基本消费者配置

基本的消费者配置必须有一个host:port引导服务器地址,用于连接到 Kafka 代理。它还需要反序列化器来转换消息键和值。建议使用客户端 ID,因为它可用于将客户端标识为日志和指标中请求的来源。

1
2
3
4
bootstrap.servers=localhost:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
client.id=my-client

关键属性

除了我们的最低配置之外,您还可以使用许多属性来微调您的使用者配置。我们不会在这里涵盖所有可能的消费者配置选项,但会检查一组精选的属性,这些属性为经常需要解决的需求提供特定的解决方案:

  • group.id
  • fetch.max.wait.ms
  • fetch.min.bytes
  • fetch.max.bytes
  • max.partition.fetch.bytes
  • max.message.bytes (topic or broker)
  • enable.auto.commit
  • auto.commit.interval.ms
  • isolation.level
  • session.timeout.ms
  • heartbeat.interval.ms
  • auto.offset.reset
  • group.instance.id
  • max.poll.interval.ms
  • max.poll.records

我们将看看如何使用这些属性的组合来调节:

  • 消费者可扩展性以适应增加的吞吐量
  • 吞吐量以特定时间段内处理的消息数量衡量
  • 延迟以从代理获取消息所需的时间来衡量
  • 提交偏移或从故障中恢复时数据丢失或重复
  • 处理来自生产者和消费者端的事务消息
  • 最小化重新平衡的影响以减少停机时间

与生产者一样,您将希望在满足您的需求的吞吐量和延迟之间取得平衡。您还需要确保您的配置有助于减少因消费者组的不必要重新平衡而导致的延迟。但是请注意,您应该避免使用与应用程序提供的属性或保证发生冲突的任何属性。

如果您想详细了解每个属性的作用,请参阅 Kafka 的消费者配置

消息排序保证

Kafka只为单个分区中的消息提供排序保证。如果您希望对来自一个主题的消息进行严格排序,唯一的选择是每个主题使用一个分区。然后,消费者可以按照提交给代理的相同顺序观察消息。

当消费者使用来自多个分区的消息时,事情可能会变得不那么精确。尽管保留了每个分区的顺序,但不能保证从所有分区获取消息的顺序,因为它不一定反映它们发送的顺序。然而,在实践中,当从多个分区消费时,保留每个分区中消息的顺序通常就足够了,因为可以将顺序重要的消息发送到同一个分区(通过具有相同的键,或者可能通过使用自定义分区器)。

与消费者群体一起扩展

消费者组的使用非常普遍,以至于它们可能被视为基本消费者配置的一部分。消费者组是一种通过在多个消费者之间划分分区来共享从一组分区中消费消息的工作的方式。消费者使用 a 进行分组group.id,允许消息在共享相同 id 的成员之间传播。

1
group.id=my-group-id

消费者组对于根据需求扩展消费者非常有用。组内的消费者不会从同一个分区读取数据,但可以独占从零个或多个分区接收数据。每个分区都被分配给消费者组的一个成员。

在考虑在一个组中包含多少消费者时,有几件事值得一提。添加比分区更多的消费者不会增加吞吐量。多余的消费者将是无分区且空闲的。然而,这可能不是完全没有意义的,因为如果确实分配了分区的消费者之一发生故障,空闲消费者实际上处于待命状态。

消费滞后和消费群体

对于依赖处理(接近)实时数据的应用程序,消费者延迟是一个非常重要的指标。消费者滞后表示消息的生产和消费率的差异。具体来说,给定消费者组的消费者延迟表示添加到主题分区的最后一条消息与该分区的消费者最后接收的消息之间的延迟。如果消费者滞后是一个问题,您可以采取明确的措施来减少它。

consumer-lag.png
kafka

如果瓶颈在消费者进程中并且消费者比分区少,那么向订阅主题的消费者组添加更多消费者应该会有所帮助。不过,这里要说明的重要一点是,单独的消费者配置调整可能不足以满足您的优化目标。

消费者滞后持续增长表明消费者群体无法跟上消息产生的速度。如果这种情况发生的时间足够长,则主题保留配置可能意味着消息在被消费者读取之前被代理删除。在这种情况下,一个短期的解决方案是增加主题的retention.bytes或retention.ms,但这只会推迟不可避免的事情。长期的解决方案是增加消费者的吞吐量(或减慢消息生产)。一种更常见的情况是工作量激增,这意味着消费者延迟会增长和缩小。这只是消费者需要(接近)实时操作的问题。

通过增加请求中获取的最小数据量来提高吞吐量

使用fetch.max.wait.ms和fetch.min.bytes配置属性来设置控制来自消费者的请求数量的阈值。

  • fetch.max.wait.ms 设置基于时间的批处理的最大阈值。
  • fetch.min.bytes 为基于大小的批处理设置最小阈值。 当客户端应用程序轮询数据时,这两个属性都控制消费者从代理获取的数据量。您可以将属性调整得更高,以减少请求,并以更大的批次传递消息。如果生成的数据量很低,您可能希望这样做。减少来自消费者的请求数量也降低了消费者和代理的 CPU 利用率开销。

通过增加批量获取的数据量来提高吞吐量,但会增加一些延迟成本。因此,将这些属性调整得更低会降低端到端延迟。在下一节中,我们将通过增加批量大小来更多地了解目标延迟。

您可以使用这些属性中的一个或两个。如果两者都使用,当达到任一阈值中的第一个时,Kafka 将响应获取请求。

1
2
fetch.max.wait.ms=500
fetch.min.bytes=16384

通过增加最大批量大小来降低延迟

增加请求中获取的最小数据量有助于提高吞吐量。但是,如果您想做一些事情来改善延迟,您可以通过增加消费者可以从代理获取的最大数据量来扩展您的阈值。

  • fetch.max.bytes 设置一次从代理获取的数据量的最大字节数限制
  • max.partition.fetch.bytes 设置为每个分区返回多少数据的最大字节数限制,该限制必须始终大于在代理或主题配置中设置的字节数max.message.bytes。

通过增加这两个属性的值,并在每个请求中允许更多数据,延迟可能会随着获取请求的减少而得到改善。

1
2
fetch.max.bytes=52428800
max.partition.fetch.bytes=1048576

小心使用内存。客户端可以消耗的最大内存量大约计算为:

NUMBER-OF-BROKERS * fetch.max.bytes 和NUMBER-OF-PARTITIONS * max.partition.fetch.bytes

提交偏移量时避免数据丢失或重复

对于需要持久消息传递的应用程序,您可以在提交偏移量时提高对消费者的控制级别,以最大程度地降低数据丢失或重复的风险。

Kafka 的自动提交机制非常方便(有时也很合适,具体取决于用例)。auto.commit.interval.ms启用后,消费者每毫秒自动提交消息的偏移量。但一如既往,便利是有代价的。通过允许您的消费者提交抵消,您将引入数据丢失和重复的风险。

  • 数据丢失 如果您的应用程序提交了一个偏移量,然后在实际处理该偏移量之前的所有消息之前崩溃,那么当应用程序重新启动时,这些消息将不会得到处理。
  • 数据重复 如果您的应用程序已经处理了所有消息,然后在偏移量自动提交之前崩溃,则在应用程序重新启动时使用最后一个偏移量,您将再次处理这些消息。

如果这种潜在的情况让您有点担心,您能做些什么呢?首先,您可以使用该auto.commit.interval.ms属性来减少提交之间那些令人担忧的间隔。

1
auto.commit.interval.ms=1000

但这并不能完全消除消息丢失或重复的可能性。enable.auto.commit或者,您可以通过设置来关闭自动提交false。然后,您负责您的消费者应用程序如何正确处理提交。这是一个重要的决定。定期自动偏移提交确实意味着您无需担心。并且只要在下一次轮询之前完成所有消息处理,就会提交所有处理的偏移量。但是,如果要避免数据丢失或数据重复,则可能需要更高级别的控制。

1
enable.auto.commit=false

关闭自动提交后,一个更强大的操作过程是将您的消费者客户端应用程序设置为仅在执行了所有处理并使用消息之后才提交偏移量。您确定何时使用消息。为此,您可以引入对 KafkacommitSync和commitAsyncAPI 的调用,它们将主题和分区的指定偏移量提交给 Kafka。

  • commitSynccommitSyncAPI 提交从轮询返回的所有消息的偏移量 。通常,当您完成批处理中的所有消息时调用 API,并且在提交批处理中的最后一个偏移量之前不要轮询新消息。这种方法会影响吞吐量和延迟,轮询时返回的消息数量也会影响,因此您可以将应用程序设置为较少提交。
  • commitAsynccommitAsyncAPI 不会等待代理响应提交请求 。API的commitAsync延迟比commitSyncAPI 低,但在重新平衡时有创建重复的风险。 一种常见的方法是利用使用这两种 API 的好处,因此commitAsync默认使用较低延迟的 API,但commitSyncAPI 在关闭消费者或重新平衡之前接管以保护最终提交。

关闭自动提交功能有助于数据丢失,因为您可以编写代码以仅在实际处理消息时提交偏移量。但是手动提交并不能完全消除数据重复,因为您不能保证偏移提交消息将始终由代理处理。即使您在处理每条消息后执行同步偏移提交也是如此。

您可以将 Kafka 解决方案设置为与提供 ACID(原子性、一致性、隔离性和持久性)可靠性保证的数据库交互,以存储消费者偏移量。

配置可靠的数据管道

为了保证生产者端消息传递的可靠性,您可以将生产者配置为使用幂等性和事务性 ID。

幂等性

1
2
3
4
enable.idempotence=true
max.in.flight.requests.per.connection=5
acks=all
retries=2147483647

幂等性属性用于保证来自生产者的消息传递的不重复性和顺序性,恰好一次写入单个分区。

交易编号

事务属性保证使用相同事务的消息只生成一次,要么全部成功写入各自的日志,要么都不成功。超时设置了实现这一目标的时间限制。

1
2
transactional.id=唯一ID
transaction.timeout.ms=900000

通过以这种方式设置生产者,您可以通过引入isolation.level属性使管道从消费者端更加安全。该isolation.level属性控制消费者如何读取事务性消息,并具有两个有效值:

  • read_committed
  • read_uncommitted(默认) 如果您从默认切换到read_committed,则消费者只会读取已提交的事务性消息。与往常一样,需要权衡取舍。使用此模式将导致端到端延迟增加,因为消费者只会在代理写入记录事务结果(已提交或已中止)的事务标记时返回消息。
1
2
enable.auto.commit=false
isolation.level=read_committed

从消费者组内的故障中恢复

您可以定义检查消费者组内消费者健康状况的频率。如果消费者在消费者组中失败,则会触发重新平衡并将分区所有权重新分配给该组的成员。您希望检查的时间安排恰到好处,以便消费者组可以快速恢复,但不会触发不必要的重新平衡。你使用两个属性来做到这一点:session.timeout.ms和heartbeat.interval.ms.

  • session.timeout.ms 指定消费者组中的消费者在被视为不活动之前可以与代理断开联系的最长时间(以毫秒为单位),并且在组中的活动消费者之间触发重新平衡。将session.timeout.ms属性设置得较低意味着更早检测到失败的消费者,并且可以更快地进行重新平衡。但是,您不希望将超时设置得太低,以至于代理无法及时接收到心跳并触发不必要的重新平衡。
  • heartbeat.interval.ms指定对消费者组协调器的心跳检查 之间的间隔(以毫秒为单位),以指示消费者处于活动状态并已连接。心跳间隔必须低于会话超时间隔,通常是三分之一。根据预期的重新平衡减少心跳间隔可以减少意外重新平衡的机会,但请记住,频繁的心跳检查会增加代理资源的开销。
1
2
heartbeat.interval.ms=3000
session.timeout.ms=10000

如果代理配置指定 agroup.min.session.timeout.ms和group.max.session.timeout.ms,则该session.timeout.ms值必须在该范围内。

管理抵消政策

当没有提交偏移量时,消费者应该如何表现?或者当提交的偏移量不再有效或被删除时?如果您auto.offset.reset正确设置该属性,它应该在这两个事件中都表现得无可挑剔。

让我们首先考虑当没有提交偏移量时会发生什么。假设一个新的消费者应用程序连接到一个代理并首次呈现一个新的消费者组 ID。偏移量决定了消费者已经从分区中读取了哪条消息。消费者偏移信息存在于一个名为 的内部 Kafka 主题中__ consumer_offsets。该__ consumer_offsets主题尚不包含此新应用程序的任何偏移信息。那么消费者从哪里开始呢?将auto.offset.reset属性设置为latest默认值,消费者将开始仅处理新消息。通过仅处理新消息,将丢失任何现有消息。或者,您可以将auto.offset.reset属性设置为earliest并处理从日志开头的现有消息。

当偏移不再有效时会发生什么?如果消费者组或独立消费者处于非活动状态,并且在为代理配置的偏移保留期( )期间未提交任何偏移offsets.retention.minutes,则先前提交的偏移将从 中删除__ consumer_offsets。同样,您可以earliest在这种情况下使用该选项,以便消费者返回到分区的开头,以避免在未提交偏移时数据丢失。

如果单个 fetch 请求中返回的数据量很大,根据消费者客户端应用程序轮询新消息的频率,可能会在消费者处理它之前发生超时。在这种情况下,您可以降低max.partition.fetch.bytes或增加session.timeout.ms抵消政策的一部分。

1
2
3
auto.offset.reset=earliest
session.timeout.ms=10000
max.partition.fetch.bytes=1048576

将重新平衡消费者群体的影响降至最低

重新平衡是将分区分配给组中的活动消费者所花费的时间。在重新平衡期间:

  • 消费者提交他们的补偿
  • 新的消费群体形成
  • 指定的组长将分区平均分配给组成员
  • 组中的消费者收到他们的分配并开始获取数据

显然,再平衡过程需要时间。并且在消费者组集群的滚动重启期间,它会反复发生。因此,重新平衡会对集群组的性能产生明显影响。如前所述,增加心跳检查的数量可以减少不必要的重新平衡的可能性。但是你可以采取更多的方法。一是设置静态成员以减少重新平衡的总数。静态成员使用持久性,以便在会话超时后重新启动期间识别消费者实例。

您可以使用该group.instance.id属性为消费者指定唯一的组实例 ID。然后,消费者组协调器可以在重新启动后识别新的消费者实例时使用该 ID。

1
group.instance.id=UNIQUE-ID

消费者组协调器为消费者实例分配一个新的成员 ID,但作为静态成员,它继续使用相同的实例 ID,并接收相同的主题分区分配。

重新平衡的另一个原因实际上可能是由于轮询间隔配置不足,然后将其解释为消费者失败。如果您在检查使用者日志时发现这种情况,您可以校准max.poll.interval.ms和max.poll.interval.ms轮询配置属性以减少重新平衡的次数。

  • max.poll.interval.ms 设置检查消费者是否继续处理消息的时间间隔。如果消费者应用程序没有至少每max.poll.interval.ms毫秒调用一次轮询,则认为消费者失败,导致重新平衡。如果应用程序无法及时处理从 poll 返回的所有记录,您可以通过使用此属性来增加对来自消费者的新消息的轮询之间的间隔(以毫秒为单位)来避免重新平衡。

  • max.poll.records 设置从消费者返回的已处理记录数。您可以使用该max.poll.records属性设置从使用者缓冲区返回的记录数的最大限制,从而允许您的应用程序在max.poll.interval.ms限制范围内处理更少的记录。

1
2
3
group.instance.id=UNIQUE-ID
max.poll.interval.ms=300000
max.poll.records=500

原文