目录

kafka Broker服务器配置

../../images/weixin_public.png

基本服务器配置

一个基本的代理配置可能看起来像这样。您将看到主题、话题和日志的设置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
num.partitions=1
default.replication.factor=3
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=3
num.io.threads=8
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
group.initial.rebalance.delay.ms=0
zookeeper.connection.timeout.ms=6000

您可以通过自定义资源的config属性配置这些设置。Kafka

在这篇文章中,我们建议您还可以添加什么来优化您的 Kafka 代理。在显示属性示例值的地方,这通常是默认值——相应地进行调整。

复制主题以实现高可用性

配置主题时,分区数量、同步副本的最小数量和分区复制因子通常在主题级别设置。您可以使用StrimziKafkaTopic来执行此操作。您也可以在代理级别设置这些属性。在这种情况下,默认值将应用于未明确设置这些属性的主题,包括自动创建的主题。

1
2
3
num.partitions=1
default.replication.factor=3
min.insync.replicas=2

高可用性环境要求主题的复制因子至少为 3,并且同步副本的最小数量比复制因子小 1。为了提高数据的持久性,min.insync.replicas请在您的主题acks=all配置中设置,并在您的生产者配置中使用消息传递确认。

使用该属性设置复制领导分区数据的每个跟随者replica.fetch.max.bytes获取的消息的最大大小。该值基于平均消息大小和吞吐量。在考虑读/写缓冲所需的总内存分配时,可用内存还必须能够容纳乘以所有跟随者时的最大复制消息大小。

1
replica.fetch.max.bytes=1048576

Kafka 的主题复制机制的重要性怎么强调都不为过。主题复制是 Kafka 可靠性和数据持久性的核心。使用复制,失败的代理可以从其他代理上的同步副本中恢复。在讨论分区重新平衡以实现可用性时,我们将详细介绍领导者、追随者和同步副本。

创建和删除主题

默认情况下启用该auto.create.topics.enable属性,以便在生产者或消费者需要时创建主题。它通常在生产中被禁用,因为 Kafka 用户倾向于对主题创建应用更多控制。如果您使用自动创建主题,您可以使用num.partitions. 将其与default.replication.factor属性一起使用。在这种情况下,您可能希望将复制因子设置为至少三个副本,以便默认情况下数据更持久。

默认情况下启用该delete.topic.enable属性以允许删除主题。Kafka 用户通常也会在生产环境中禁用此属性。这一次您要确保数据不会意外丢失,但如果情况需要,您可以临时启用该属性以删除主题。

1
2
auto.create.topics.enable=false
delete.topic.enable=true

如果该属性设置为 false ,则无法删除包含KafkaTopic资源的主题。delete.topic.enable

事务和提交的内部主题设置

注意内部Kafka主题的配置要求。

如果您使用事务来启用从生产者到分区的原子写入,__transaction_state则流程中使用的内部主题至少需要三个具有默认设置的代理。

1
2
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

存储分区偏移提交的__consumer_offsets主题具有分区数和复制因子的默认设置。

1
2
3
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.commit.required.acks=-1

警告:不要在生产中减少这些设置。

通过增加 I/O 线程来提高请求处理吞吐量

网络线程 ( num.network.threads) 处理对 Kafka 集群的请求,例如从客户端应用程序生成和获取请求。调整网络线程的数量以反映复制因子以及与 Kafka 集群交互的客户端生产者和消费者的活动水平。为了减少拥塞和调节请求流量,您可以queued.max.requests在网络线程被阻塞之前限制请求队列中允许的请求数。

Kafka 代理指标可以帮助计算所需的线程数。例如,网络线程空闲的平均时间指标 ( kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent) 表示使用的资源百分比。如果空闲时间为 0%,则所有资源都在使用中,这意味着更多线程可能是有益的。

I/O 线程 ( num.io.threads) 从请求队列中提取请求来处理它们。添加更多线程可以提高吞吐量,但 CPU 内核数和磁盘带宽会施加实际上限。一个好的起点可能是从默认值 8 乘以磁盘数开始。

用于num.recovery.threads.per.data.dir指定用于在启动时加载日志和在关闭时刷新的线程数。

1
2
3
4
num.io.threads=8
queued.max.requests=500
num.network.threads=3
num.recovery.threads.per.data.dir=1

对所有代理的线程池的配置更新可能会在集群级别动态发生。这些更新限制在当前大小的一半和当前大小的两倍之间。 如果线程由于磁盘数量而变慢或受限,请尝试增加网络请求的缓冲区大小以提高吞吐量:

1
replica.socket.receive.buffer.bytes=65536

并且还增加了 Kafka 可以接收的最大字节数:

1
socket.request.max.bytes=104857600

增加高延迟连接的吞吐量

如果您已微调消息批次的大小,则用于发送和接收消息的缓冲区的默认值可能对于所需的吞吐量来说太小了。

1
2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576

如果您想更改这些值,您可以使用带宽延迟乘积计算来计算缓冲区的最佳大小。你需要一些数字来处理。本质上,您希望将带宽容量与往返延迟相乘以提供数据量。去看看这个wiki 定义,看看计算的例子。

使用数据保留策略管理日志

Kafka 使用日志来存储消息数据。日志是一系列具有各种相关索引的段。新消息被写入单个活动段。消息永远不会被修改。

使用获取请求,消费者从段中读取。一段时间后,活动段滚动为只读,因为新的活动段将替换它。较旧的段将被保留,直到它们符合删除条件。

您可以配置日志段的最大大小(以字节为单位)以及滚动活动段之前的时间量(以毫秒为单位)。segment.bytes使用和将它们设置在主题级别segment.ms。或者在代理级别为没有这些设置的任何主题设置默认值:

1
2
log.segment.bytes=1073741824
log.roll.ms=604800000

更大的尺寸意味着活动段包含更多的消息并且滚动的频率更低。非活动段也不太常符合删除条件。

您可以结合我们接下来介绍的清理策略设置基于时间或基于大小的日志保留策略,以便保持日志易于管理。达到保留限制时,将删除非活动日志段。通过控制您保留的日志的大小,您可以确保您不太可能超出磁盘容量。

对于基于时间的日志保留,请使用毫秒配置,该配置优先于相关的分钟和小时配置。毫秒配置也会动态更新,而其他配置则不会。

1
log.retention.ms=1680000

保留期基于消息附加到分段的时间。如果 log.retention.ms设置为 -1,则不会对日志保留应用时间限制,因此会保留所有日志。应始终监视磁盘使用情况,但通常不建议使用 -1 设置,因为它特别有可能导致磁盘已满的问题,这很难纠正。

对于基于大小的日志保留,设置最大日志大小(以字节为单位):

1
log.retention.bytes=1073741824

最大日志大小适用于日志中的所有段。换句话说,一旦达到稳定状态,日志通常会以大约log.retention.bytes/log.segment.bytes段结束。当达到最大日志大小时,将删除较旧的段。

设置最大日志大小不考虑将消息附加到段的时间。如果这是一个潜在问题,您可以使用基于时间和基于规模的保留。当您同时使用两者时,达到的第一个阈值会触发清理。

如果您希望在从系统中删除段文件之前添加时间延迟,您可以log.segment.delete.delay.ms为代理级别的所有主题或file.delete.delay.ms主题配置中的特定主题添加延迟。

1
log.segment.delete.delay.ms=60000

使用清理策略删除日志数据

Kafka 的日志清理器可以满足您的期望。它从日志中删除旧数据。默认启用 ( log.cleaner.enable=true),您可以通过定义清理策略来控制清理器的操作方式。

1
log.cleanup.policy=compact,delete
  • 删除策略 delete策略对应于使用数据保留策略管理日志。适用于不需要永久保留数据的情况。根据日志保留限制删除较旧的段。否则,如果未启用日志清理器,并且没有日志保留限制,则日志将继续增长。

  • 紧凑策略 compact策略保证为每个消息密钥保留最新消息。日志压缩适用于消息值可变且您希望保留最新更新的情况。新消息附加到日志的 头部,其作用与按顺序附加写入的非压缩日志相同。在压缩日志的尾部,即日志清理器操作的地方,如果稍后在日志中出现另一条具有相同键的记录,则记录将被删除。因此,虽然 Kafka 保证将保留每个键的最新消息,但它不能保证整个压缩日志不会包含重复项。

显示压缩前具有偏移位置的键值写入的日志

../broker-tuning-compaction-1.png
kafka

压缩后的日志

如果不同键的数量没有上限,则仅使用压缩仍然可以使日志变得任意大。您可以通过将策略设置为压缩和删除日志来控制这一点。首先压缩日志数据,删除日志头部有键的旧记录。然后删除日志保留阈值之前的数据。

../broker-tuning-compaction-2.png
kafka

日志保留点和压缩点

../broker-tuning-compaction-3.png
kafka

使用 调整检查日志以进行清理的频率(以毫秒为单位)log.retention.check.interval.ms。基于日志保留设置的频率。清理应该经常足以管理磁盘空间,但不会经常影响主题的性能。较小的保留大小可能需要更频繁的检查。如果在设定的时间段内没有要清理的日志,您可以使用log.cleaner.backoff.ms.

当删除与特定键相关的所有消息时,生产者会发送一条墓碑消息,该消息具有空值并充当标记告诉消费者该值已被删除。压缩后,只保留墓碑,必须有足够长的时间让消费者知道消息被删除。用于log.cleaner.delete.retention.ms确保消费者有机会在永久删除之前读取已删除记录的最终状态。当旧消息被删除时,没有价值,墓碑键也会从分区中删除。

1
2
3
4
log.retention.check.interval.ms=300000
log.cleaner.backoff.ms=15000
log.cleaner.delete.retention.ms=86400000
log.segment.delete.delay.ms=60000

管理磁盘利用率

还有许多其他与日志清理相关的配置设置,但特别重要的是内存分配。

重复数据删除属性指定用于清理所有日志清理线程的总内存。您可以通过缓冲区加载因子设置使用的内存百分比上限。

1
2
log.cleaner.dedupe.buffer.size=134217728
log.cleaner.io.buffer.load.factor=0.9

每个日志条目正好使用 24 个字节,因此您可以计算出缓冲区在一次运行中可以处理多少个日志条目并相应地调整设置。

如果可能,如果您希望减少日志清理时间,请考虑增加日志清理线程的数量:

1
log.cleaner.threads=8

如果您遇到 100% 磁盘带宽使用的问题,您可以限制日志清理器 I/O,以使读/写操作的总和小于基于执行操作的磁盘的能力指定的双精度值:

1
log.cleaner.io.max.bytes.per.second=1.7976931348623157E308

处理大消息大小

理想情况下,单个消息不超过 100KB,并由生产者批量处理,以获得更好的吞吐量,直至达到message.max.bytes(broker config) 或max.message.bytes(topic config) 配置的限制。这默认为 1MB。

您有四种方法来处理大消息大小:

  • 生产者端消息压缩
  • 基于参考的消息传递
  • 内联消息
  • 代理和生产者/消费者配置

我们建议尝试生产者端消息压缩或基于引用的消息传递,这涵盖了大多数情况。

生产者端压缩

对于生产者端压缩,您指定compression.type要应用于批次的 .,例如 Gzip。在代理配置中,您使用compression.type=producer以便代理保留生产者使用的任何压缩。一般来说,生产者和主题压缩类型最好匹配。否则,代理必须在将批次附加到日志之前解压缩并可能重新压缩它们,这可能是 CPU 密集型的。

压缩批次将增加生产者的额外处理开销和消费者的解压缩开销。但是您将批量获得更多数据,这有助于提高吞吐量。您还可以更好地利用您的存储空间。检查您的指标并将生产者端压缩与批量大小的微调相结合,以获得最佳结果。

基于参考的消息传递

基于引用的消息传递仅发送对存储在消息值中的某个其他系统中的数据的引用。当您不知道消息有多大时,这对于数据复制很有用。偶尔有大消息的时候也很好用,因为小消息可以直接通过Kafka发送,大消息可以通过引用发送。将数据写入数据存储区,该存储区必须快速、持久且高度可用,并返回对数据的引用。生产者将引用发送给 Kafka。消费者使用引用从数据存储中获取数据。

基于引用的消息流

../broker-tuning-reference-messaging.png
kafka

基于引用的消息传递需要更多的行程,因此端到端延迟会增加。这种方法的一个主要缺点是在清理 Kafka 消息时没有自动清理外部系统中的数据。还有一种风险是,在 Kafka 中的消息被删除之前,数据可能会从外部系统中删除。

减轻基于引用的消息传递限制的一种方法是采用简要提到的混合方法。将大型消息发送到数据存储并直接处理标准大小的消息。

内联消息 内联消息传递是一个复杂的过程,它将消息拆分为使用相同密钥的块,然后使用 Kafka Streams 等流处理器在输出上组合这些块。

基本步骤是:

  • 如果消息太大,生产客户端应用程序序列化然后分块数据。
  • 生产者使用 KafkaByteArraySerializer或类似工具在发送之前再次序列化每个块。
  • 消费者跟踪消息并缓冲块,直到它有完整的消息。
  • 消费客户端应用程序接收在反序列化之前组装的块。
  • 根据每组分块消息的第一个或最后一个块的偏移量,完整的消息将按顺序传递给消费应用程序的其余部分。
  • 根据偏移元数据检查完整消息的成功传递,以避免在重新平衡期间重复。

内联消息流

../broker-tuning-inline-messaging.png
kafka

内联消息不依赖于外部系统,例如基于引用的消息。但由于需要缓冲,它确实在消费者端产生了性能开销,尤其是在并行处理一系列大消息时。大消息块可以交错,因此如果缓冲区中另一条大消息的块不完整,则当一条消息的所有块都被消耗完时,并不总是可以提交。出于这个原因,通常通过持久化消息块或实现提交逻辑来支持缓冲。

处理较大消息的配置

message.max.bytes您可以通过配置设置最大记录批处理大小来增加消息限制。您可以在主题级别或代理级别设置限制。如果您在代理级别设置限制,则所有主题都允许使用更大的消息,超过最大限制的消息将被拒绝。max.request.size生产者 ( ) 和消费者 ( )的缓冲区大小message.max.bytes必须能够容纳较大的消息。

控制消息数据的日志刷新

通常,通常不会设置日志刷新阈值,并且操作系统会使用其默认设置执行后台刷新。但是,如果您使用的是应用程序刷新管理,则在使用更快的磁盘时设置较低的刷新阈值可能是合适的。

使用日志刷新属性来控制将缓存消息数据定期写入磁盘。您可以指定检查日志缓存的频率(以毫秒为单位)。根据消息在内存中保留的最长时间以及写入磁盘之前日志中的最大消息数来控制刷新频率。

1
2
3
log.flush.scheduler.interval.ms=2000
log.flush.interval.ms=50000
log.flush.interval.messages=100000

刷新之间的等待包括进行检查的时间和执行刷新之前的指定间隔。增加刷新频率会影响吞吐量。

分区重新平衡以提高可用性

当跨代理复制数据时,一个代理上的分区负责人处理所有生产请求(写入日志)。其他 broker 上的分区追随者复制领导者的分区数据。因此,在领导者失败的情况下,您可以获得数据可靠性。追随者需要同步才能恢复,这意味着追随者已经赶上了领导者最近提交的消息。如果领导者不再可用,则选择其中一个同步副本作为新领导者。领导者通过查看请求的最后一个偏移量来检查追随者。

如果当前的领导者失败,一个不同步的跟随者通常没有资格作为领导者,除非允许不干净的领导者选举。

您可以在关注者被认为不同步之前调整延迟时间:

1
replica.lag.time.max.ms

延迟时间限制了将消息复制到所有同步副本的时间以及生产者必须等待确认的时间。如果追随者未能在指定的延迟时间内发出获取请求并赶上最新消息,则将其从同步副本中删除。您使用的时间取决于网络延迟和代理磁盘带宽。您可以减少延迟时间以更快地检测到失败的副本,但这样做会增加追随者不必要地失去同步的机会

追随者仅用于复制来自分区领导者的消息,并在领导者失败时允许恢复。追随者通常不为客户端提供服务,尽管当 Kafka 集群跨越多个数据中心时,机架配置允许消费者使用来自最近副本的消息。

您可以使用 Cruise Control for Strimzi来确定分配给代理的副本分配,从而在集群中均匀地平衡负载。它的计算考虑了领导者和追随者所经历的不同负荷。失败的领导者会影响 Kafka 集群的平衡,因为剩余的代理会获得领导额外分区的额外工作。

为了使 Cruise Control 发现的分配实际平衡,分区必须由首选领导者领导。Kafka 可以自动确保使用首选领导者(在可能的情况下),并在必要时更改当前领导者。这可确保集群保持在 Cruise Control 发现的平衡状态。

您可以控制自动首选领导者检查的频率(以秒为单位)以及在触发重新平衡之前允许代理的最大不平衡百分比。

1
2
3
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10

broker 的领导者不平衡百分比是 broker 是当前领导者的当前分区数与它是首选领导者的分区数之间的比率。您可以将百分比设置为零以确保始终选出首选领导者,假设它们是同步的。

如果重新平衡检查需要更多控制,您可以禁用自动重新平衡。kafka-leader-election.sh然后,您可以使用命令行工具选择何时触发重新平衡。

Unclean leader election

同步副本的领导者选举被认为是干净的,因为它保证不会丢失数据。这就是默认情况下发生的情况。如果在旧领导者丢失时 ISR(同步副本)中没有其他代理,则 Kafka 将等待该领导者重新联机,然后才能写入或读取消息。

但是如果没有同步副本来担任领导呢?也许 ISR 只有在领导者的磁盘死机时才包含领导者。如果没有设置最小同步副本数,并且当分区领导者的硬盘发生不可撤销的故障时,没有与分区领导者同步的跟随者,则数据已经丢失。不仅如此,由于没有同步的追随者,因此无法选出新的领导者。

如果您的情况倾向于可用性而不是持久性,您可能希望启用不干净的领导者选举。

1
unclean.leader.election.enable=false

不干净的领导者选举意味着不同步的副本可以成为领导者,但您可能会丢失消息。

避免不必要的消费者组重新平衡

我们将通过建议一个非常有用的配置来结束对代理调优技巧的探索,以避免不必要的消费者组重新平衡。您可以添加延迟,以便组协调员在初始重新平衡之前等待成员加入:

1
group.initial.rebalance.delay.ms=3000

不可避免地,有一个权衡,因为延迟会阻止组消费,直到周期结束。

原文