4 months before the incident – cluster away

So, one day you decide to switch from Kafka standalone to cluster mode in order to achieve high availability.

You are proud of yourself, but don’t want anyone to know just how proud, you are silently patting yourself on the back. Your Kafka cluster will soon consist of two brokers: broker1 and broker2. 

As you are adding and configuring broker2 and notifying consumers and producers to add another broker to their configuration, you can vividly imagine a situation when broker1 goes down, everyone around you starts worrying about all the messages that will be delayed or lost, but you have a big grin on your face because you know something they surely don’t: the newly added broker2 will be so kind to pick up exactly where the old and tired broker1 bravely went down. Birds chirping, clouds clearing, permafrost stops melting because you are that cool, all is well on planet Earth. 

You just can’t wait for the imagined situation to really happen, everyone will be proud of you. But it does not happen soon. Kafka is so stable that it works well by default, even without your intervention.

The incident – bad cluster

A couple of months go by, you come to work one day, having almost forgotten about your secret(ish) heroic act of switching Kafka to cluster mode. As you pass the hallways you can hear the whispers: 

“Message sent, but not consumed”, “No errors logged, everything works on the producer side”, “Well, the consumer is OK as well” (slightly irritated), “The code has not changed since last year”, and of course “It works locally”. The whispers stop there, because, where can you go from the “works locally” argument, really.

Thoughts start racing through your head, you are not sure why the nice new broker2 did not do its part. You just can’t trust anyone nowadays.

The troubleshooting starts:

  1. You check broker1 status – it was down for maintenance, but is up now.
  2. You check whether consumers are configured to connect to both brokers, they are.
  3. You test by sending a new message, you check the consumer, and see that it consumed the message.
  4. You restart broker1, the one that went down.
  5. You check consumer offsets on broker1 (consumer offset is the concept of consumer notifying the Kafka broker which is the last message that was consumed, so that if either broker or consumer go down, when everything is up again, both the broker and consumer know where to pick up in order to avoid consuming already consumed messages). You check the offset and you see that the last committed consumer offset is 10 and message log-end-offset is at 21. So, they are not even close. This looks like something that could be the cause. Except that it is not. There are 11 messages that were not consumed, and were still not being consumed, even though both brokers are up. You check with the “producer people” and the “consumer people”, and 11 is the exact number of messages that were sent without error, but not consumed. So, offset committal works well. 11 messages are waiting to be consumed, but are not being consumed.
  6. You restart the brokers and Zookeeper as well.
  7. All 11 messages are properly consumed. Interesting. 

The real cause – nice cluster 

After setting everything up locally, simulating broker1 and broker2 going down at various times, sending and consuming messages at various times, and reading about Kafka’s default configuration values, you conclude: topic replication factor configuration is the cause of the problem. 

The topic default replication factor for all new Kafka topics is 1. This means that the topic messages will be saved on the leader broker only. When running Kafka in a standalone mode, this does not matter, as there is only one available broker anyhow. Topic replication factor becomes important when running Kafka in cluster mode with multiple brokers.

How Kafka determines which broker is the topic leader at a certain time is a subject on its own, but, for the sake of this argument, the leader determination algorithm does not interest us. All we need to know is that at any point in time, of all the brokers in the Kafka cluster one is elected as the topic leader. 

This is the explanation of what really happened:

  1. Kafka running in a standalone mode – topic leader was broker1, all topics received messages on broker1
  2. Kafka mode switched to cluster – broker2 was added. All the old topics still had broker1 as the topic leader, received messages were not replicated to broker2
  3. Last consumed message was the one at position 9, consumer offset was committed as 10, 11 more messages were written to the topic before broker1 went down.
  4. broker1 goes down, broker2 is elected as the new topic leader.
  5. All new messages that arrive are written to broker2 and are properly consumed, but since, by default, topics are not replicated to other brokers in the cluster, broker1 messages are not available anywhere else, but on broker1, so they are left as unconsumed, broker2 is not aware of them.
  6. Restarting brokers and Zookeeper leads to broker1 being elected as topic leader again, old messages are now available and get consumed.

Clean up

OK, so you now know that you are not as cool as you thought you were, and you need to clean up your mess. This is the way to do it:

  1. When adding new brokers to the Kafka cluster, make sure to change offsets.topic.replication.factor, and default.replication.factor configuration properties from 1 to a number that fits your needs, in our case 2. This will cause all the messages that the leader broker receives to be replicated to another broker as well.
  2. For already existing topics, change the replication factor. This is not as easy as changing a single property, but there are various ways to do it, depending on the Kafka version.  

Leave a Reply