Intro

Apache Kafka, like any other messaging or database system, is a complicated beast. But when divided into manageable chunks it can be much easier to understand how it all works. In this post let’s focus on one area of Kafka’s inner workings and try to figure out how Kafka writes messages to disk.

Create a topic

First we will create a test topic using an utility script provided with Kafka:

> $KAFKA_HOME/bin/kafka-topics.sh \
  --create \
  --bootstrap-server localhost:9092 \
  --replication-factor 1 \
  --partitions 1 \
  --topic my-topic

Created topic "my-topic".


> $KAFKA_HOME/bin/kafka-topics.sh \
  --describe \
  --bootstrap-server localhost:9092 \
  --topic my-topic

Topic: my-topic	PartitionCount: 1	ReplicationFactor: 1	Configs:
	Topic: my-topic	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001

my-topic is the simplest topic possible. There is only a single copy of each message ( replication-factor 1 ) so everything will be stored in one of the nodes, which means data could be lost in case of just one node failure. On top of that it has only one partition ( partitions 1 ) which greatly limits parallelism but on the other hand makes things much easier to understand.

Partitions play an important role in increasing throughput. Basically, if you have more partitions you can write and read more at the same time. The downside of partitions is that you may lose message ordering. In other words, it is possible that messages written later will be read sooner than other messages which were produced earlier.

Having a single partition sounds perfectly suited to our case since we are just trying to get our heads around how things work. More partitions would only complicate things.

Send a test message

To get started we will send just one message. Again Kafka has an out-of-the-box tool we can use:

> $KAFKA_HOME/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic

my test message<enter>
<ctrl+c>

Where is the message?

The message should be stored in the folder pointed by logs.dirs parameter in your Kafka’s broker configuration, e.g.:

# server.properties
log.dirs=/var/lib/kafka

If you listed files in this folder you would see something like this:

> ls /var/lib/kafka

cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
my-topic-0
recovery-point-offset-checkpoint
replication-offset-checkpoint

There is a lot of interesting stuff here, but from our point of view my-topic-0 is the folder we should focus on. This folder contains messages (and their metadata) sent to the newly created topic.

Why was 0 appended to the folder name? Well, we asked for a topic with just one partition so we only have my-topic-0. If our topic had multiple partitions and these partitions were assigned to this node we would see my-topic-1 , my-topic-2 and so on (one folder per partition).

Now let’s dig deeper and reveal the contents of my-topic-0.

> ls /var/lib/kafka/my-topic-0

00000000000000000000.index      
00000000000000000000.log        
00000000000000000000.timeindex  
leader-epoch-checkpoint

Once again, quite interesting things are going on here, but we will limit our experiment to the file which contains our message, which is the .log file:

> cat /var/lib/kafka/my-topic-0/00000000000000000000.log

GM??rat??rat????????????????*my test message

We can obviously see the exact same text that was sent ( my test message ), but what is this gibberish in front of it?

Message format

It probably does not come as a surprise that the message was saved in a binary format and each message has some metadata together with the payload:

> hexdump -C /var/lib/kafka/my-topic-0/00000000000000000000.log

00000000  00 00 00 00 00 00 00 00  00 00 00 47 00 00 00 00  |...........G....|
00000010  02 4d 1a e9 ea 00 00 00  00 00 00 00 00 01 72 61  |.M............ra|
00000020  74 a0 a7 00 00 01 72 61  74 a0 a7 ff ff ff ff ff  |t.....rat.......|
00000030  ff ff ff ff ff ff ff ff  ff 00 00 00 01 2a 00 00  |.............*..|
00000040  00 01 1e 6d 79 20 74 65  73 74 20 6d 65 73 73 61  |...my test messa|
00000050  67 65 00                                          |ge.|
00000053

A message stored in this binary format can be (fairly) easily decoded by referring to Kafka’s official documentation.

You should be aware that Kafka, for performance’s sake, instead of writing one message at a time stores incoming messages in batches:

So from the output of the .log file above we can distinguish the batch’s header:

base offset 00 00 00 00 00 00 00 00 batch length 00 00 00 47 partition leader epoch 00 00 00 00 magic 02 crc 4d 1a e9 ea attributes 00 00 last offset delta 00 00 00 00 first time stamp 00 00 01 72 61 74 a0 a7 max time stamp 00 00 01 72 61 74 a0 a7 producer id ff ff ff ff ff ff ff ff producer epoch ff ff base sequence ff ff ff ff

And batch’s header is followed by a record:

00 00 00 01 2a 00 00 00 01 1e "my test message" in ASCII 6d 79 20 74 65 73 74 20 6d 65 73 73 61 67 65 00

Our batch is quite special because it actually contains just one record:

Please take note that message format may be subject to change as Kafka continuously evolves and new KIPs are being implemented. The example above shows how Kafka v2.5.0 serialized messages to bytes.

Better way to decode messages

Dealing with bits and bytes is fun and makes us, programmers, look like those hackers from NCIS.

But a more efficient way would be to write a tool that automatically transforms binary representation to something easily interpretable by human beings.

Fortunately, such utility is already bundled with Kafka and here is how you can decode all messages from a .log file:

> $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
  --print-data-log \
  --files /var/lib/kafka/my-topic-0/00000000000000000000.log

Dumping /var/lib/kafka/my-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0
lastOffset: 0
count: 1
baseSequence: -1
lastSequence: -1
producerId: -1
producerEpoch: -1
partitionLeaderEpoch: 0
isTransactional: false
isControl: false
position: 0
CreateTime: 1590772932775
size: 83
magic: 2
compresscodec: NONE
crc: 1293609450
isvalid: true
| offset: 0
CreateTime: 1590772932775
keysize: -1
valuesize: 15
sequence: -1
headerKeys: []
payload: my test message

Add more messages

Before we continue our little experiment, to spice things up, let’s add another 2 test messages. To send them in the same batch we will add them at once (note that messages are divided with \n):

> printf "next test message\nyet another message\n" | \
  $KAFKA_HOME/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic my-topic

If we now explore the contents of the .log file we could see something like this:

> hexdump -C /var/lib/kafka/my-topic-0/00000000000000000000.log

00000000  00 00 00 00 00 00 00 00  00 00 00 47 00 00 00 00  |...........G....|
00000010  02 4d 1a e9 ea 00 00 00  00 00 00 00 00 01 72 61  |.M............ra|
00000020  74 a0 a7 00 00 01 72 61  74 a0 a7 ff ff ff ff ff  |t.....rat.......|
00000030  ff ff ff ff ff ff ff ff  ff 00 00 00 01 2a 00 00  |.............*..|
00000040  00 01 1e 6d 79 20 74 65  73 74 20 6d 65 73 73 61  |...my test messa|
00000050  67 65 00 00 00 00 00 00  00 00 01 00 00 00 63 00  |ge............c.|
00000060  00 00 00 02 cd 0b 06 ba  00 00 00 00 00 01 00 00  |................|
00000070  01 72 61 d2 37 37 00 00  01 72 61 d2 37 63 ff ff  |.ra.77...ra.7c..|
00000080  ff ff ff ff ff ff ff ff  ff ff ff ff 00 00 00 02  |................|
00000090  2e 00 00 00 01 22 6e 65  78 74 20 74 65 73 74 20  |....."next test |
000000a0  6d 65 73 73 61 67 65 00  32 00 58 02 01 26 79 65  |message.2.X..&ye|
000000b0  74 20 61 6e 6f 74 68 65  72 20 6d 65 73 73 61 67  |t another messag|
000000c0  65 00                                             |e.|

After a closer look you may notice that the new messages are part of the same batch:

> $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
  --print-data-log \
  --files /var/lib/kafka/my-topic-0/00000000000000000000.log

Dumping /var/lib/kafka/my-topic-0/00000000000000000000.log

Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 ...
| offset: 0 CreateTime: 1590779007825 keysize: -1
  valuesize: 15 sequence: -1 headerKeys: []
  payload: my test message
baseOffset: 1 lastOffset: 2 count: 2 ...
| offset: 1 CreateTime: 1590779066167 keysize: -1
  valuesize: 17 sequence: -1 headerKeys: []
  payload: next test message
| offset: 2 CreateTime: 1590779066211 keysize: -1
  valuesize: 19 sequence: -1 headerKeys: []
  payload: yet another message

Offsets

You may have also noticed that this concept of offsets popped up several times already.

Offset is a unique record identifier which allows to quickly find a message on a partition (using binary search).Offset is an always increasing number where next offset is calculated by adding 1 to the last offset:

Log segments

What about the filename of the log file ( 000...000.log )? The name of the file is the first offset that it contains. You can expect the next file to be something like 00000000000000671025.log .

The .log files within partition folders (e.g. my-topic-0 ) are the main part of so called log segments (together with .index and .timeindex ).

Log segments have this important characteristic that messages are only appended at the end of a .log file in the currently active segment. Another thing is that already stored records cannot be modified (although they can be deleted but more on that later).

I guess it all boils down to architectural decisions that Kafka creators had to make. And setting this limitations should have simplified some of performance and replication considerations.

Rolling segments

You can control when a new log segment gets created by changing Kafka broker config parameters:

  • log.roll.ms or log.roll.hours to create a new segment log file after some time is elapsed

  • log.segment.bytes to limit segment log file size in bytes

For example, if you set log.roll.hours = 3 then every 3 hours a new log segment file will be created (unless of course log.segment.bytes gets exceeded sooner).

Alright, but you may be asking yourself, why those log segments are needed at all?

Removing old messages

One of the main aspects of log segments is related with cleaning up no longer needed messages.

Kafka, unlike other “message brokers”, does not remove a message after consumer reads it. Instead it waits a certain amount of time before a message is eligible for removal. The fun part is, because messages are kept for some time, you can replay the same message. This can be handy after you fix a bug that earlier crashed message processing because you can later reprocess the problematic message.

So for example, if you want to only keep messages sent within last 30 days you can set retention.ms = 2592000000. Then Kafka will get rid of segments older then 30 days. At first, all the filenames related with that segment will get the .deleted suffix:

> ls /var/lib/kafka/my-topic-0

00000000000000000000.log.deleted
00000000000000000000.index.deleted
00000000000000000000.timeindex.deleted
...

Then after log.segment.delete.delay.ms elapses these files will be gone from your storage.

Flushing to disk

There seems to be many similarities between Kafka log files and write-ahead logs (WAL) commonly used in databases for transaction processing.

Usually, such databases have some clever strategy as for when data is flushed to permanent storage to ensure integrity without hindering performance with excessive flushing (e.g. multiple transactions can be flushed together in one go). Kafka, however, takes a slightly different approach and it is suggested to leave flushing to the operating system.

The recommended approach to guaranteeing durability is to instead rely on replication, meaning spreading data across many machines/disks at the application level.

But if durability is of highest importance to you then it is still possible to force fsync with the following properties (keep in mind performance will likely degrade):

  • flush.messages - data will be flushed to disk ever N-th message

  • flush.ms - data will be explicitly flushed after some timeout

What is important, is that a hybrid approach is possible to achieve. As an example, you could rely on OS to do the flushing for most of the topics but set up explicit fsync on topics where you refuse to compromise on durability.

So if you are a little bit paranoid about a certain topic you may force fsync after every single message:

> $KAFKA_HOME/bin/kafka-configs.sh \
  --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config flush.messages=1

Completed updating config for topic my-topic.

Summary

As you can see the way Kafka stores messages on disk is not that scary. It is not based on a complicated data structure, but rather it is just a log file where each message is simply appended at the end.

We have skipped a lot of details, but at least now, if you ever need to investigate a low-level data-related Kafka issue you know what tools and config parameters you can use.

For example, you can utilise kafka.tools.DumpLogSegments to confirm that a particular message was stored in one of Kafka nodes (if a need for such precise confirmation ever arises).