kafka 生产者配置优化
基本生产者配置
|
|
此配置指定连接到 Kafka 集群的引导地址,以及将消息的键和值从字符串转换为其相应的原始字节数据表示的序列化程序。
(可选)添加唯一的客户端 ID 是一种很好的做法,用于在日志和指标中识别请求的来源。
压缩对于提高吞吐量和减少存储负载很有用,但可能不适用于压缩或解压缩成本可能过高的低延迟应用程序。稍后将详细介绍压缩。
在我们的基本配置中,确认仅确认消息已到达代理,并不能保证消息的顺序。
我们可以改变…
关键属性
让我们看看如何添加或更改生产者配置属性以进行微调。
以下是我们将考虑的属性:
- acks
- min.insync.replicas(主题)
- enable.idempotence
- max.in.flight.requests.per.connection
- retries
- transactional.id
- transaction.timeout.ms 我们不会在这里孤立地讨论这些属性,因为我们将专注于如何使用它们来获得特定的结果。
我们将看看如何使用这些属性的组合来调节:
- 数据持久性,以最大限度地减少传递消息时的数据丢失
- 消息的有序传递
- 涉及批量相关消息的消息事务的可靠性保证
- 吞吐量以特定时间段内处理的消息数量衡量
- 延迟以消息到达代理所需的时间来衡量 您很可能希望平衡吞吐量和延迟目标,同时尽量减少数据丢失并保证排序。
如果您想详细了解每个属性的作用,请参阅 Kafka 的Producer configs。
请记住,您可以使用的生产者配置属性也将由您的应用程序的需求驱动。避免任何破坏您的应用程序提供的属性或保证的更改。
数据持久性
如果您想减少消息丢失的可能性,请使用消息传递确认。
在您的生产者配置中指定acks=all强制分区领导者在确认消息请求已成功接收之前将消息复制到一定数量的追随者。
|
|
将acks=all生产者配置与min.insync.replicas主题属性结合使用。您min.insync.replicas在资源中设置属性KafkaTopic。
该min.insync.replicas配置设置在向生产者发送确认之前需要记录消息的代理数量。
|
|
使用 3 的主题复制因子和其他代理上的 2 个同步副本,如果单个代理不可用并且至少一个其他代理处于同步状态,则生产者可以继续不受影响。
如果第二个代理不可用,acks=all则使用生产者将不会收到确认,也无法生成更多消息。
由于额外的检查,acks=all增加了生产者发送消息和接收确认之间的延迟。因此,在调查这是否适合您时,您必须考虑权衡取舍。
订购交货
您有两种方法可以保证从生产者传递消息的顺序。它们在很大程度上取决于您是否使用acks=all数据持久性。
如果您使用acks=all,您可以(并且应该)为生产者启用幂等性,以确保消息仅传递一次。使用幂等性,将 ID 和序列号分配给消息,以便即使在传递失败后仍保留顺序。
由于幂等性保留了消息顺序,因此您可以通过使用max.in.flight.requests.per.connection.
在这里,我们看到一个示例配置,显示启用了幂等性,并与max.in.flight.requests.per.connection和一起使用acks=all。
|
|
该retries属性设置重新发送失败消息请求时的重试次数。虽然这个数字可能看起来令人印象深刻,但我们所说的实际上是“永远重试”。 交货超时
如果您delivery.timeout.ms在生产者配置中使用,如果在成功确认之前超时到期,则生产者请求将在重试次数用完之前失败。delivery.timeout.ms设置了等待确认发送消息成功或失败的时间限制。您可以选择不retries设置并使用delivery.timeout.ms来执行类似的功能。
对交付顺序引入额外检查会产生性能成本。
如果您不喜欢使用acks=all幂等性,另一种选择是将进行中的请求数设置为 1(默认为 5)以保留顺序。
|
|
通过这种方式,您可以避免Message-A仅在Message-B已写入代理后才成功的情况。
可靠性保证
幂等性本身对于仅一次写入单个分区很有用。
但是,对于跨多个分区的一组消息,我们如何保证消息传递的可靠性呢?我们再次使用幂等性,但将它与为生产者定义的唯一事务 ID 结合起来。事务保证使用相同事务 ID 的消息只生成一次,要么全部成功写入各自的日志,要么都不成功。
您在生产者配置中指定唯一的事务 ID,并在返回超时错误之前设置事务的最大允许时间(以毫秒为单位)。默认值为900000或 15 分钟。
|
|
事务 ID 在第一次操作时注册到 Kafka 集群,以及epoch用于标识活动生产者实例的生产者检查点编号。
为什么我们要识别活跃的生产者?假设应用程序确定生产者失败并创建新的生产者实例以重新启动事务。如果两个生产者现在都在发送消息,则会创建重复的记录,我们就失去了曾经的完整性。
通过指定事务 ID,如果新的生产者实例启动,则生产者的较旧实例由其较旧的epoch编号标识,并由 Kafka隔离,以便不包含它们的消息。这通过确保只有一个具有事务 ID 的有效生产者来维护消息传递的完整性。每个都transactional.id应该用于一组唯一的主题分区。
您可以将主题分区名称映射到事务 ID,或使用避免冲突的函数从主题分区名称计算事务 ID。
优化吞吐量和延迟
根据您的目标,Kafka 提供了许多配置参数和技术来调整生产者的吞吐量和延迟性能。
通常,系统的要求是在给定延迟内满足一定比例的消息的特定吞吐量目标。例如,以每秒 50,000 条消息为目标,95% 的消息在 2 秒内得到确认。
批处理和缓冲消息
消息批处理延迟发送消息,以便将发往同一代理的更多消息批处理到单个请求中。如果没有消息批处理,则消息会立即发送。
您可以使用两个属性来设置批处理阈值:
- batch.size以字节为单位指定最大批量大小(默认为 16384)
- linger.ms以毫秒为单位指定填充批次的最大持续时间(默认为 0 或无延迟) 消息被延迟,直到达到这些阈值中的任何一个。
例如,如果我们使用linger.ms添加一个 500ms 的延迟,则在该时间累积的所有消息都在单个请求中发送。
|
|
如果batch.size还使用了最大值,则在消息累积到最大批量大小时发送请求,或者消息排队的时间超过linger.ms- 以较早者为准。
使用buffer.memory配置缓冲内存大小,该大小必须至少与批处理大小一样大,并且还能够容纳缓冲、压缩和进行中的请求。
大小很重要。如果批处理阈值对于生成消息的频率来说太大了,那么您正在为发送缓冲区中等待的消息添加不必要的延迟。您还分配了比您需要的更多的缓冲内存。如果批处理阈值太小,则可能会延迟较大的消息。
你在更高的吞吐量中获得了什么,你承认缓冲增加了消息传递的更高延迟。这是一种妥协,因此您需要考虑如何取得适当的平衡。
send() 阻塞和延迟
批处理和缓冲还减轻了send()阻塞对延迟的影响。
当您的应用程序调用KafkaProducer.send()时,产生的消息是:
- 由任何拦截器处理
- 序列化
- 分配给一个分区
- 压缩
- 添加到每个分区队列中的一批消息
此时send()方法返回。所以send()阻塞的时间由以下决定:
- 在拦截器、序列化器和分区器上花费的时间
- 使用的压缩算法
- 等待缓冲区用于压缩所花费的时间
批次将保留在队列中,直到发生以下情况之一:
- 批次已满(根据batch.size)
- 引入的延迟linger.ms已经过去
- 发送方即将发送其他分区的消息批次到同一个broker,也可以添加这个批次
- 生产者正在被刷新或关闭
压缩消息批次
使用该属性压缩数据批次compression.type可提高吞吐量并减少存储负载,但可能不适用于压缩或解压缩成本过高的低延迟应用程序。
消息压缩增加了生产者的延迟(压缩消息所花费的 CPU 时间),但使请求(以及可能的磁盘写入)更小,这可以提高吞吐量。
使用该compression.type属性指定有效的压缩编解码器。您可以选择gzip、snappy、lz4或zstd,每个都有不同的压缩速度。
|
|
如果您认为压缩是值得的,那么使用的最佳压缩类型将取决于发送的消息。
添加线程
压缩是在线程调用上处理的KafkaProducer.send(),因此如果此方法的延迟对您的应用程序很重要,您可以添加更多线程。
流水线消息
Pipelining 听起来可能与在夏威夷著名的Pipeline礁上冲浪有关,但实际上它是在收到对先前请求的响应之前从生产者发送更多请求。
对于流水线,我们使用我们的老朋友max.in.flight.requests.per.connection属性。您可能还记得本文前面提到的它对订购交付的贡献。
更频繁地移动消息请求显然会提高吞吐量。但是在某些情况下,您可能会看到不太有益的效果,例如批处理效率较低。
调整消息传递等待时间
通过调整在传递消息和完成发送请求之前等待的最长时间来提高消息请求的吞吐量。
使用该delivery.timeout.ms属性指定等待完成发送请求的最长时间(以毫秒为单位)。您可以将该值设置为MAX_LONG以将无限次重试委托给 Kafka。
您还可以通过编写自定义分区器来替换 Kafka 的默认值,将消息定向到指定的分区,并使用partitioner.class属性指定类名。自定义分区器允许您根据消息中的数据选择如何将消息映射到分区。
|
|