Intro

It is hard to deal with the thought of losing data, even the non-critical kind of data, like application logs. But if we start considering what could possibly happen, from short power outages through disk failures to losing the whole datacenter due to natural hazards, then we can realise that it is all just a matter of probabilities. And we, as engineers, should try to reduce the probabilities that we lose our precious data.

In this post let’s take a look at the approach Kafka takes on replication in order to prevent data loss.

Why replicate?

The motivation to replicate is rather simple: we would like to save our future selves from having to explain to a business person how and why our system lost their data.

Tuning knobs

Losing data is bad, no doubts about that. But one type of data may be more important than the other. We may get away with losing metrics related with customers’ shopping habits during last year’s Christmas season, but having to figure out why €10 mln disappeared from a system can significantly lower one’s life expectancy.

Another aspect of replication is that it has its costs: you simply need more storage space and you can also expect increased latency because it takes more time to distribute copies of the same data and then collect confirmations that it was correctly saved.

The great thing about Kafka is that it provides different ways to control durability, meaning you have various knobs and levers at your disposal with which you can choose different trade-offs and control the risk exposure.

The most common durability-related parameters are:

  • replication.factor - the number of Kafka brokers where your message will be stored (basically, the number of copies of your data)
  • min.insync.replicas - at least this number of caught-up replicas need to confirm before write operation is assumed to be successful
  • acks - how many confirmations need to be received before write is considered complete (available options are: 0, 1 or all)

Replicas

Let’s visualise what each of those params actually does, starting with replication.factor. Here you can see 3 topics with replication.factor=2:

For simplicity's sake, it was assumed that each topic has just one partition.

Setting replication.factor to 2 means that each message sent to one of these 3 topics will be saved to 2 brokers (e.g. messages in topic 1 are stored at broker 1 and broker 2).

Then if one of the brokers dies the other one can take over and still provide access to these messages. In other words, our cluster can survive a single-broker failure without any data loss (we have at least one copy of our data):

If we increase replication.factor to 3 then we get additional copy of each message:

…which in turn means we can still access data in our topics even if 2 brokers are down (in this case we end up with exactly one copy of each message available only from the 3rd broker):

Leaders and followers

From the replicas one is elected to be a leader, while the rest become followers:

Again, it was assumed that each topic has just a single partition so that it is easier to understand. In case of topics containing multiple partitions the process is similar, the only difference being that each partition on a topic can have different leaders.

Producers write messages to leaders. Then each message is propagated by the leader to followers:

The interesting thing about leaders is that by default consumers, similarly to producers, read only from the leaders:

And in the face of a network partition this can make problem resolution a bit easier as it reduces the number of moving parts (write and read operations are performed on the same broker).

On the other hand, it could make sense to fetch messages from a follower:

So if a consumer and a follower reside in the same datacenter then this could potentially help us reduce expensive cross-datacenter communication.

Fortunately, it recently became also possible to read messages from a follower. David Arthur from Confluent wrote an in-depth blog post about it.

In-sync replicas

Another important concept in Kafka is ISR (In-Sync Replica). ISR is a copy of a partition which is up-to-date (i.e. it is alive and caught-up to the leader). In order to prevent losing data, when leader election takes place only replicas that are in-sync can be selected as leaders.

If availability is more important than durability then you may decide to accept the risk and set unclean.leader.election.enable=true which allows for out-of-sync replica to become a leader at the cost of potential data loss.

So for example, if broker 3 becomes slow (e.g. due to infrastructure-related problems or something as trivial as a GC pause) then it falls out of in-sync replicas set:

And this means it is not eligible to become a leader until it catches up.

It also means that if min.insync.replicas was set to a value higher than 2 then all consecutive writes are going to fail with an error. This way we can prevent our Kafka cluster from accepting a message that can be potentially lost due to not enough back-up copies.

Okay, but what does it exactly mean that a replica is “in-sync”? This varies and can be controlled using replica.lag.time.max.ms param which sets a time limit on how “slow” a replica can become before it is dropped from the ISRs set.

There used to be a replica.lag.max.messages parameter which allowed to control when replica is assumed to be out-of-sync based on the number of messages it is lagging behind, but it was removed in Kafka 0.9.0.0. Still, a good rule of thumb is to monitor how much each replica is falling behind and set up proper alerting in case things go south.

Acks

The last piece of the puzzle is the acks parameter which allow us to control the certainty with which we can ascertain that a message was successfully written.

acks=0

If we set acks=0 we get no guarantees and producer assumes its work is finished as soon as the write request is pushed out the door:

This helps achieve the highest throughput possible, because producer does not need to wait, but at the same time it gives absolutely no confidence that our write succeeded.

Although it does not seem very useful, acks=0 can actually be a viable option in certain cases. For example, if you continuously send measurements (e.g. temperature, current, voltage, etc.) where the latest measurement discards the older ones then you can ignore losing a single message.

acks=1

Alternatively, you can wait for a confirmation from the leader by setting acks=1:

It gives some level of confidence that a write was successful, but if the leader fails before a message gets replicated then it is lost.

acks=all

If durability is your top priority then you should set acks=all. It makes producer wait for confirmation from all ISRs:

Typical durability setup

As stated in the docs it is quite common to set replication.factor=3, min.insync.replicas=2 and acks=all. It offers good-enough durability and is considered to be the sweet spot between greater durability and better performance.

Learn more

We have just touched upon the topic of replication in Kafka, but if you have around 37 minutes to spare then I recommend watching this talk by Jun Rao from Confluent. Jun explores replication and leader election in great detail.