At Datadog, we operate 40+ Kafka and ZooKeeper clusters that process trillions of datapoints across multiple infrastructure platforms, data centers, and regions every day. Over the course of operating and scaling these clusters to support increasingly diverse and demanding workloads, we’ve learned a lot about Kafka—and what happens when its default behavior doesn’t align with expectations. In this post, we’d like to share some of these lessons learned and highlight the metrics and logs that can help you keep tabs on the issues we encountered. The topics we’ll cover in this post include:
- The importance of coordinating changes to the maximum message size
- Unclean leader elections: To enable or not to enable?
- Investigating data reprocessing issues on our low-throughput topics
- Why low-traffic topics can retain data longer than expected
Kafka provides users with a ton of flexibility when it comes to its configuration and architecture. The default settings are designed to accommodate most types of workloads, so it usually doesn’t make sense to change them unless you have a good reason. Keep in mind that whatever worked (or didn’t work) for one organization might not apply to your setup—and vice versa. As such, we highly recommend testing configuration changes (e.g., by running blue/green deployments on replicated/mirrored clusters) before deploying them to production.
If you plan to increase the maximum message size in your cluster, you may run into errors if you’re not careful about gradually rolling out changes across consumers, brokers, and producers (in that order).
Kafka sends messages in batches as of version 0.10.0. The maximum message size is effectively limited by the maximum message batch size, since a batch can contain one or more messages. Note that the maximum message batch size is a pre-compression limit on the producer, and a post-compression limit on the broker and consumer. In general, if you set the same maximum message batch size across producers, brokers, and consumers, everything should run smoothly.
But if these settings aren’t correctly aligned across the producer, broker, and consumer, your messaging pipeline may get stalled. For example, your Kafka client library could generate an error like this if a producer application tries to write a message that exceeds the library’s configured produce request size limit:
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is XXXXXXX bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration
To minimize the likelihood of running into errors, make sure that you coordinate changes to maximum message batch size across the relevant producer, broker, and consumer settings. If you need to increase the maximum message batch size, we recommend changing it first on the consumer, then the broker, and finally the producer, to reduce the risk of stalling the message processing pipeline.
If you decide to increase the maximum message batch size on the broker, read your client library’s documentation to make sure that you understand the potential implications. We use librdkafka in some of our services; in this particular library,
message.max.bytes determines the maximum size of each fetch queue, and each partition has its own queue. This means that if one of your consumers is reading messages from 100 partitions, and you set
message.max.bytes to 100 MB, that consumer’s memory usage will potentially require as much as
100 * 100 MB—just to accommodate fetch requests. If you plan to increase the maximum message size, you may need to scale out so that fewer partitions are being consumed per consumer—otherwise, your consumers may run into out-of-memory errors.
|Metric name||MBean name||Description|
|MessagesInPerSec||kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec||Messages received by the broker|
|Bytes sent/received by the broker|
|System load, CPU, memory, network||N/A (host-level metrics)||System load, CPU, memory, and network utilization on the broker|
Monitoring a few message-related metrics can help ensure that messages are flowing across your Kafka pipeline. Kafka exposes metrics via MBeans to help you track message throughput (
MessagesInPerSec) and network traffic sent and received from each broker. You can also monitor the approximate size of messages processed on each broker by dividing the bytes received (
BytesInPerSec) by the number of messages received (
Tracking system load and other host-level resource metrics can also help you detect if certain brokers are having trouble processing messages. If you’ve upgraded any of your brokers to use the new message format introduced in 0.10.0, note that they may incur additional CPU and memory overhead if they interact with older consumers that haven’t yet upgraded, since they need to convert messages to the old format. The new message format also adds 8 bytes to the size of each message, which could place additional overhead on your brokers, particularly in high-throughput environments; make sure to monitor network throughput on upgraded brokers to ensure that they don’t hit their maximum bandwidths.
Unclean leader elections are enabled by default on older versions of Kafka, but they can lead to data loss. If you enable unclean leader elections (or leave them enabled), perform duplicate writes to a secondary cluster if possible.
In Kafka, an unclean leader election occurs when an unclean broker (“unclean” because it has not finished replicating the latest data updates from the previous leader) becomes the new leader. Although this feature prioritizes availability over durability, it can lead to data loss if you’re not careful, which we’ll explain in further detail below. Kafka allows you to enable or disable unclean leader elections, depending on your use case. In versions prior to 0.11.0, the default setting was
On one of our (pre-0.11.0) clusters, we were unaware of this default behavior until an unclean election occurred, which led to temporary data loss. Thankfully, we recovered the data because we were performing duplicate writes to primary and secondary Kafka clusters. This incident reinforced two ideas for us: It’s important to confirm that the default settings in your version of Kafka actually align with your expectations—and, if you choose to enable unclean leader elections, safeguard your data against potential data loss by performing duplicate writes.
Below, we’ll take a closer look at how unclean leader elections can lead to data loss, and explore when it makes sense to enable or disable this setting in your own environment. Then we’ll take a look at some of the metrics and logs that can help you monitor the availability of your clusters, regardless of whether you’ve chosen to enable or disable unclean leader elections.
Kafka stores data across partitions in each topic, and each partition has a leader and zero or more followers that fetch and replicate new data from the leader. Each broker serves as the leader for certain partitions and the follower for others, which reduces the likelihood of a single point of failure for any given partition. For any partitions where the broker has been designated the leader, it processes incoming message batches and assigns each message an identifying integer known as an offset.
As long as followers stay up to date with the leader, and remain available/online (by regularly polling for new data), they will be considered part of the group of in-sync replicas (ISR). If any followers fall behind the leader, they will be removed from the ISR until they catch up with the partition leader.
If unclean leader elections are disabled (the default setting as of version 0.11.0.0), Kafka won’t be able to elect a new leader at certain times (i.e., if the leader becomes unavailable and none of the replicas have caught up with the former leader). If it cannot elect a new leader, Kafka will halt all reads and writes to that partition (because followers do not serve read/write requests—they exist only to replicate data from the leader).
On the other hand, if unclean leader elections are enabled, the aforementioned scenario will play out differently. If the leader becomes unavailable, Kafka will still be able to keep the partition online by electing a new leader, even in the absence of in-sync replicas (ISR=0). These unclean leader elections can help improve the availability of your cluster, but they could result in data loss if you do not duplicate the data to another cluster or other storage destination. Here’s how that might play out:
- The broker that is the leader for Partition A goes offline. None of the followers of Partition A are caught up to the leader (ISR=0).
- If unclean leader elections are enabled, one of the follower brokers will get elected as the new leader for the partition, even though it is “unclean.” This allows consumers and producers to continue sending requests to Partition A.
- If the previous leader comes back online, it will reset its offset to match the new leader’s offset, resulting in data loss. For example, if the previous leader processed messages up to offset=7, and the new, unclean leader was slightly behind (offset=4) at the time it was elected leader, some messages (offsets 5–7) will get deleted after the first leader comes back online and rejoins the cluster as a replica/follower.
Kafka updated this setting in version (0.11.0) so that unclean leader elections would no longer be enabled by default (read more about the reasoning behind the change here). You can prioritize durability over availability by disabling unclean leader elections (if you’re using an older version of Kafka) or leaving them disabled (in newer version of Kafka). However, you may want to enable unclean leader elections so that you can keep your clusters highly available. In those cases, replicating data to a secondary cluster will allow you to recover from potential data loss.
Whether you’ve enabled or disabled unclean leader elections, keeping an eye on certain metrics can help ensure the durability of your data and the availability of your clusters.
|Metric name||MBean name||Description|
|UncleanLeaderElectionsPerSec||kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec||Number of “unclean” elections per second|
|IsrShrinksPerSec||kafka.server:type=ReplicaManager,name=IsrShrinksPerSec||Rate at which the group of in-sync replicas (ISR) shrinks per second|
|IsrExpandsPerSec||kafka.server:type=ReplicaManager,name=IsrExpandsPerSec||Rate at which the group of in-sync replicas (ISR) expands per second|
|UnderReplicatedPartitions||kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions||Number of partitions that are under-replicated (not yet replicated across all followers)|
If you see a spike in
IsrShrinksPerSec followed by a corresponding spike in
IsrExpandsPerSec, this indicates that a node may have briefly fallen behind, and then recovered and caught up.
When a follower falls out of the ISR, you’ll see a log similar to the following:
If an unclean leader election takes place, Kafka will log the following warning message, stating that data loss may occur.
[TIMESTAMP] WARN [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [<PARTITION_NAME>]. Elect leader <BROKER_NUMBER> from live brokers <BROKER_NUMBER>. There's potential data loss. (kafka.controller.OfflinePartitionLeaderSelector)
If, at this point, the old leader comes back online, it will reset its offset to the new (unclean) leader’s offset (
<BROKER_B> in the example below), even though it actually has more up-to-date data than the current leader:
[TIMESTAMP] WARN [ReplicaFetcherThread-6-<BROKER_B>], Replica <BROKER_A> for partition <PARTITION_NAME> reset its fetch offset from <NEW_LEADER'S_OFFSET> to current leader <BROKER_B>'s latest offset <NEW_LEADER'S_OFFSET> (kafka.server.ReplicaFetcherThread) [TIMESTAMP] INFO Truncating log <PARTITION_NAME> to offset <NEW_LEADER'S_OFFSET>. (kafka.log.Log) [TIMESTAMP] INFO Scheduling log segment <OLD_OFFSET> for <PARTITION_NAME> for deletion. (kafka.log.Log) [...]
The default offset retention period works well for high-traffic topics, but it was causing data reprocessing issues in some of our lower-throughput topics.
Kafka’s default configuration settings are generally designed with high-traffic topics in mind—think somewhere on the scale of millions of writes per second. However, if your cluster includes low-throughput topics or topics that don’t produce any messages for long periods of time (such as ones used solely for debugging or testing purposes), you may need to revise some default retention settings to decrease the likelihood of either reprocessing or skipping data.
After encountering data reprocessing issues on our low-throughput topics, we decided to increase the consumer offset retention period on these topics to match newer versions of Kafka (seven days as opposed to just one day).
Kafka organizes data into partitions to support parallel processing and to distribute workload across multiple brokers (i.e., you can store each partition on a separate broker). As mentioned earlier, each message gets assigned an identifying integer known as an offset. Starting in version 0.9.0, each consumer commits the offset of its most recently consumed message to a
__consumer_offsets topic every five seconds (by default). Prior to this version, consumers stored this information in ZooKeeper instead.
Retaining consumer offsets for a certain period of time enables consumers to know where to resume reading (or “consuming”) from the partition log if partitions rebalance across brokers, or when a consumer or broker temporarily becomes unavailable. If a consumer does not have a committed offset (e.g., because it just started) or if it requests an expired offset (which can happen when a low-throughput topic has not processed any messages over the length of the offset retention period), it will either reset to the
earliest (oldest) or
latest (most recent) offset stored in the
Kafka determines when and how to reset its offsets based on a few settings:
|Kafka setting||Description||Scope||Default setting|
|Offset retention period for the ||Broker-level||Pre-2.0.0: 1440 (1 day)|
2.0.0+: 10,080 (7 days)
|Determines if the consumer should reset to the ||Consumer-level||latest|
|Determines if the consumer should automatically commit its offsets to the ||Consumer-level|
|If ||Consumer-level||5,000 ms (5 seconds)|
If a topic goes through the offset retention period without processing any new commits, its consumers will reset their offsets based on the
auto.offset.reset setting. The offset retention period is much lower in older (pre-2.0) versions of Kafka—just one day, as opposed to one week. This created issues for so many users that the default offset retention period was extended to seven days in version 2.0.0.
We were running a pre-2.0 version of Kafka, so whenever one of our low-throughput topics passed the offset retention period (one day) without processing any new messages, the
__consumer_offsets topic deleted the consumer offsets for that topic. Our Kafka consumers were then unable to retrieve their offsets, so they reset to the
earliest offset (based on our
auto.offset.reset configuration) and started processing the earliest message still available on the broker. This meant that they reprocessed some data they had already consumed in the past. Depending on your use case, this might be exactly what you don’t want (e.g., you only intend to send an email notification to a newly registered user once).
For us, it made sense to increase the offset retention period to match newer versions of Kafka (seven days as opposed to just one day), to decrease the likelihood of this happening again. Note, however, that there are certain side effects of increasing the offset retention period. For example, it can increase memory usage on the broker, since it must retain those offsets for a longer period of time in memory. See the full Kafka Improvement Proposal for more details.
Two other useful bits of information we’ve picked up after running into various offset-related hiccups:
- Regardless of the
auto.offset.resetoption you choose, make sure to specify something. If you don’t, your consumer application will encounter an error. The default setting configures consumers to read from the
latestoffset stored on the broker, so when in doubt, leave it unchanged. But note that
latestmay result in skipping data in some cases (e.g., if the consumer hadn’t finished consuming the most recent/latest message on the broker before it got disconnected).
- You can manually reset a consumer’s offset using Kafka’s built-in command with the
--to-datetimeoption. Note that it assumes that you’ll provide a UTC timestamp.
Visualize, analyze, and autoscale your Kafka clusters with Datadog.
Low-throughput topics could retain messages longer than expected if you don’t account for the segment-level retention period.
Kafka stores data for each partition on a log, and logs are further divided into log segments. Kafka determines how long to store data based on topic-level and segment-level log retention periods. This makes it tricky to track, on a granular level, how long messages are actually stored on the broker.
On one of our low-throughput topics, a broker was storing messages well beyond our configured topic-level retention period (36 hours). This is not necessarily a problem, since the maximum amount of storage used by each log is still limited by the maximum log size setting. However, we still wanted to understand why this was happening, so we took a closer look at Kafka’s approach to data storage, and how that behavior has evolved across versions.
Before going any further, let’s take a closer look at the configuration settings that determine how long Kafka will retain data, on a segment-level basis. Note that
log.retention.ms (primary priority) or
log.retention.minutes (secondary priority) will override
log.retention.hours if you’ve configured a value for either setting.
|Kafka setting||Description||Scope||Default setting|
|Segment size limit: The maximum size a log segment can reach before a new one is created.||Topic-level||1,073,741,824 bytes (~1 GB)|
|Segment-level retention period: After this period of time, a new segment will get created even if it hasn’t yet reached the segment size limit.||Topic-level||604,800,000 ms (7 days)|
|Log retention period: Period of time the broker will retain a partition log before deleting it.||Broker-level||168 hours (7 days)|
|Topic-level retention period: The period of time the topic will retain old log segments before deleting or compacting them (depending on the ||Topic-level||604,800,000 ms (7 days)|
|How the topic should discard segments once they reach their retention period or size limit. If set to ||Topic-level|
Kafka closes a segment and opens a new one when one of two things happens (whichever comes first):
- the segment reaches a maximum size (as determined by
- the segment-level retention period has passed (based on
Furthermore, Kafka cannot close a segment while the segment is still “active,” or currently accepting updates. Even after Kafka closes a segment, it may not expire/delete the segment’s messages right away. Depending on the version you’re running, Kafka determines when it can start expiring messages by adding the segment-level retention period to either:
- the last time the segment was modified (prior to v. 0.10.1.0)
- the timestamp of the last (most recent) message on that segment (starting in version 0.10.1.0)
To see how this works, let’s take an example of a segment for a partition that doesn’t see much traffic. If the last/most recent message on the segment is three days old when the segment is closed, the data will not get deleted for another four days, because that is when the closed segment’s most recent message will reach the seven-day retention period. Accordingly, messages can be retained beyond the configured retention window—and the actual retention period depends on how often a segment gets updated.
On a high-volume topic, segments tend to get rolled out more frequently, since they receive enough traffic to reach the default segment size limit (1 GB) before the end of the segment-level retention period. On a low-throughput topic, this is usually not the case.
We had already lowered the segment size limit to 512 MB on the topic in question, but it was still not enough to make a difference—segments were not reaching the size limit before the end of the segment-level retention period. The partition logs for this low-throughput topic generally followed a seven-day cycle, because the segment-level retention period (which was still set to the default value of seven days) was taking precedence over the topic-level retention setting.
We revised some settings on our low-throughput topics to ensure that Kafka could roll out new segments more frequently, and to maintain more control over segment-level data retention. Here are our recommendations for low-throughput topics, based on what we learned from this experience:
segment.msto a value that’s less than the topic-level retention period (
retention.ms). We set this to 12 hours (43,200,000 ms) on one of our topics, to ensure that we could roll out segments more frequently.
- You may also want to reduce the segment size limit (
segment.bytes) to a value that’s lower than the current average size of your segments. This forces Kafka to roll out segments more frequently, and enforces a stricter time limit on the oldest data that should be stored on a segment. In our case, we changed this setting to ~100 MB, from our previously decreased value of 512 MB. However, the right value will depend on the average size of the segments stored for your topic.
In our case, this issue wasn’t affecting a large amount of data, since it was on a low-throughput topic—but we still decided to make these two changes to ensure that we were no longer retaining data longer than expected. However, keep in mind that setting the segment retention period or segment size limit too low is also not a good idea. Decreasing both of these settings encourages Kafka to roll out new segments more frequently, which may slow down operations and increases the number of open files needed (since Kafka needs to use an open file handle for every segment in every partition, not just active segments). As such, make sure to monitor the number of open files in the appropriate topic(s) if you’re going to decrease either or both of these settings.
In this post, we’ve covered some of the valuable lessons we’ve learned from running Apache Kafka in production—but we couldn’t possibly mention everything in a single blog post. To help you manage your own Kafka infrastructure, we have open sourced Kafka-Kit, a set of utilities that Datadog’s site reliability engineering team developed for reducing the amount of manual labor involved in Kafka ops procedures (recovery, capacity planning, rebalancing, etc.).
Kafka-Kit includes an
autothrottle tool (which helps reduce the likelihood that your brokers will exceed their network bandwidth limits when replicating or rebalancing data) as well as a
topicmappr utility (which is designed to intelligently map partitions across brokers, so that data is more evenly distributed). You can learn more about Kafka-Kit by reading the full post on our engineering blog.
We’d like to thank Jamie Alquiza and Balthazar Rouberol of the Datadog DRE (data reliability engineering) team for reviewing this article prior to publication and suggesting improvements. We also wish to thank Alexandre Fonseca, Kevin Le Brun, Isaac Sadaqah, and other members of the engineering team who drafted internal documentation and answered questions related to this article.