How Kafka stores messages

Intro
Apache Kafka, like any other messaging or database system, is a complicated beast. But when divided into manageable chunks it can be much easier to understand how it all works. In this post let’s focus on one area of Kafka’s inner workings and try to figure out how Kafka writes messages to disk.
Create a topic
First we will create a test topic using an utility script provided with Kafka:
> $KAFKA_HOME/bin/kafka-topics.sh \
--create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic my-topic
Created topic "my-topic".
> $KAFKA_HOME/bin/kafka-topics.sh \
--describe \
--bootstrap-server localhost:9092 \
--topic my-topic
Topic: my-topic PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: my-topic Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
my-topic
is the simplest topic possible. There is only a single
copy of each message ( replication-factor 1
) so everything will
be stored in one of the nodes, which means data could
be lost in case of just one node failure. On top of that it has only
one partition ( partitions 1
) which greatly limits parallelism but
on the other hand makes things much easier to understand.
Having a single partition sounds perfectly suited to our case since we are just trying to get our heads around how things work. More partitions would only complicate things.
Send a test message
To get started we will send just one message. Again Kafka has an out-of-the-box tool we can use:
> $KAFKA_HOME/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic
my test message<enter>
<ctrl+c>
Where is the message?
The message should be stored in the folder pointed by logs.dirs
parameter in your Kafka’s broker configuration, e.g.:
# server.properties
log.dirs=/var/lib/kafka
If you listed files in this folder you would see something like this:
> ls /var/lib/kafka
cleaner-offset-checkpoint
log-start-offset-checkpoint
meta.properties
my-topic-0
recovery-point-offset-checkpoint
replication-offset-checkpoint
There is a lot of interesting stuff here, but from our point of view
my-topic-0
is the folder we should focus on. This folder contains messages
(and their metadata) sent to the newly created topic.
Why was 0
appended to the folder name? Well, we asked for
a topic with just one partition so we only have my-topic-0
.
If our topic had multiple partitions and these partitions were assigned to this
node we would see my-topic-1
, my-topic-2
and so on
(one folder per partition).
Now let’s dig deeper and reveal the contents of my-topic-0
.
> ls /var/lib/kafka/my-topic-0
00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex
leader-epoch-checkpoint
Once again, quite interesting things are going on here, but we will limit our
experiment to the file which contains our message, which is the
.log
file:
> cat /var/lib/kafka/my-topic-0/00000000000000000000.log
GM??rat??rat????????????????*my test message
We can obviously see the exact same text that was sent (
my test message
), but what is this gibberish in front of it?
Message format
It probably does not come as a surprise that the message was saved in a binary format and each message has some metadata together with the payload:
> hexdump -C /var/lib/kafka/my-topic-0/00000000000000000000.log
00000000 00 00 00 00 00 00 00 00 00 00 00 47 00 00 00 00 |...........G....|
00000010 02 4d 1a e9 ea 00 00 00 00 00 00 00 00 01 72 61 |.M............ra|
00000020 74 a0 a7 00 00 01 72 61 74 a0 a7 ff ff ff ff ff |t.....rat.......|
00000030 ff ff ff ff ff ff ff ff ff 00 00 00 01 2a 00 00 |.............*..|
00000040 00 01 1e 6d 79 20 74 65 73 74 20 6d 65 73 73 61 |...my test messa|
00000050 67 65 00 |ge.|
00000053
A message stored in this binary format can be (fairly) easily decoded by referring to Kafka’s official documentation.
You should be aware that Kafka, for performance’s sake, instead of writing one message at a time stores incoming messages in batches:
So from the output of the .log
file above we can distinguish
the batch’s header:
And batch’s header is followed by a record:
Our batch is quite special because it actually contains just one record:
Better way to decode messages
Dealing with bits and bytes is fun and makes us, programmers, look like those hackers from NCIS.
But a more efficient way would be to write a tool that automatically transforms binary representation to something easily interpretable by human beings.
Fortunately, such utility is already bundled with Kafka and here is how you
can decode all messages from a .log
file:
> $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \
--files /var/lib/kafka/my-topic-0/00000000000000000000.log
Dumping /var/lib/kafka/my-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0
lastOffset: 0
count: 1
baseSequence: -1
lastSequence: -1
producerId: -1
producerEpoch: -1
partitionLeaderEpoch: 0
isTransactional: false
isControl: false
position: 0
CreateTime: 1590772932775
size: 83
magic: 2
compresscodec: NONE
crc: 1293609450
isvalid: true
| offset: 0
CreateTime: 1590772932775
keysize: -1
valuesize: 15
sequence: -1
headerKeys: []
payload: my test message
Add more messages
Before we continue our little experiment, to spice things up,
let’s add another 2 test messages. To send them in the same batch we will
add them at once (note that messages are divided with \n
):
> printf "next test message\nyet another message\n" | \
$KAFKA_HOME/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic my-topic
If we now explore the contents of the .log
file we could see
something like this:
> hexdump -C /var/lib/kafka/my-topic-0/00000000000000000000.log
00000000 00 00 00 00 00 00 00 00 00 00 00 47 00 00 00 00 |...........G....|
00000010 02 4d 1a e9 ea 00 00 00 00 00 00 00 00 01 72 61 |.M............ra|
00000020 74 a0 a7 00 00 01 72 61 74 a0 a7 ff ff ff ff ff |t.....rat.......|
00000030 ff ff ff ff ff ff ff ff ff 00 00 00 01 2a 00 00 |.............*..|
00000040 00 01 1e 6d 79 20 74 65 73 74 20 6d 65 73 73 61 |...my test messa|
00000050 67 65 00 00 00 00 00 00 00 00 01 00 00 00 63 00 |ge............c.|
00000060 00 00 00 02 cd 0b 06 ba 00 00 00 00 00 01 00 00 |................|
00000070 01 72 61 d2 37 37 00 00 01 72 61 d2 37 63 ff ff |.ra.77...ra.7c..|
00000080 ff ff ff ff ff ff ff ff ff ff ff ff 00 00 00 02 |................|
00000090 2e 00 00 00 01 22 6e 65 78 74 20 74 65 73 74 20 |....."next test |
000000a0 6d 65 73 73 61 67 65 00 32 00 58 02 01 26 79 65 |message.2.X..&ye|
000000b0 74 20 61 6e 6f 74 68 65 72 20 6d 65 73 73 61 67 |t another messag|
000000c0 65 00 |e.|
After a closer look you may notice that the new messages are part of the same batch:
> $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
--print-data-log \
--files /var/lib/kafka/my-topic-0/00000000000000000000.log
Dumping /var/lib/kafka/my-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 ...
| offset: 0 CreateTime: 1590779007825 keysize: -1
valuesize: 15 sequence: -1 headerKeys: []
payload: my test message
baseOffset: 1 lastOffset: 2 count: 2 ...
| offset: 1 CreateTime: 1590779066167 keysize: -1
valuesize: 17 sequence: -1 headerKeys: []
payload: next test message
| offset: 2 CreateTime: 1590779066211 keysize: -1
valuesize: 19 sequence: -1 headerKeys: []
payload: yet another message
Offsets
You may have also noticed that this concept of offsets
popped up several times already.
Offset is a unique record identifier which allows to quickly find a message on a partition (using binary search).Offset is an always increasing number where next offset is calculated by adding 1 to the last offset:
Log segments
What about the filename of the log file ( 000...000.log
)?
The name of the file is the first offset that it contains. You can expect the
next file to be something like 00000000000000671025.log
.
The .log
files within partition folders
(e.g. my-topic-0
) are the main part of so called log segments
(together with .index
and .timeindex
).
Log segments have this important characteristic that messages are only appended
at the end of a .log
file in the currently active segment.
Another thing is that already stored records cannot be modified (although
they can be deleted but more on that later).
I guess it all boils down to architectural decisions that Kafka creators had to make. And setting this limitations should have simplified some of performance and replication considerations.
Rolling segments
You can control when a new log segment gets created by changing Kafka broker config parameters:
-
log.roll.ms
orlog.roll.hours
to create a new segment log file after some time is elapsed -
log.segment.bytes
to limit segment log file size in bytes
For example, if you set log.roll.hours = 3
then every 3 hours
a new log segment file will be created (unless of course
log.segment.bytes
gets exceeded sooner).
Alright, but you may be asking yourself, why those log segments are needed at all?
Removing old messages
One of the main aspects of log segments is related with cleaning up no longer needed messages.
Kafka, unlike other “message brokers”, does not remove a message after consumer reads it. Instead it waits a certain amount of time before a message is eligible for removal. The fun part is, because messages are kept for some time, you can replay the same message. This can be handy after you fix a bug that earlier crashed message processing because you can later reprocess the problematic message.
So for example, if you want to only keep messages sent within last 30 days
you can set retention.ms = 2592000000
. Then Kafka will get rid
of segments older then 30 days. At first, all the filenames related with that
segment will get the .deleted
suffix:
> ls /var/lib/kafka/my-topic-0
00000000000000000000.log.deleted
00000000000000000000.index.deleted
00000000000000000000.timeindex.deleted
...
Then after log.segment.delete.delay.ms
elapses these files
will be gone from your storage.
Flushing to disk
There seems to be many similarities between Kafka log files and write-ahead logs (WAL) commonly used in databases for transaction processing.
Usually, such databases have some clever strategy as for when data is flushed to permanent storage to ensure integrity without hindering performance with excessive flushing (e.g. multiple transactions can be flushed together in one go). Kafka, however, takes a slightly different approach and it is suggested to leave flushing to the operating system.
The recommended approach to guaranteeing durability is to instead rely on replication, meaning spreading data across many machines/disks at the application level.
But if durability is of highest importance to you then it is still possible to force fsync with the following properties (keep in mind performance will likely degrade):
-
flush.messages
- data will be flushed to disk ever N-th message -
flush.ms
- data will be explicitly flushed after some timeout
What is important, is that a hybrid approach is possible to achieve. As an example, you could rely on OS to do the flushing for most of the topics but set up explicit fsync on topics where you refuse to compromise on durability.
So if you are a little bit paranoid about a certain topic you may force fsync after every single message:
> $KAFKA_HOME/bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config flush.messages=1
Completed updating config for topic my-topic.
Summary
As you can see the way Kafka stores messages on disk is not that scary. It is not based on a complicated data structure, but rather it is just a log file where each message is simply appended at the end.
We have skipped a lot of details, but at least now, if you ever need to investigate a low-level data-related Kafka issue you know what tools and config parameters you can use.
For example, you can utilise kafka.tools.DumpLogSegments
to confirm that a particular
message was stored in one of Kafka nodes (if a need for such precise
confirmation ever arises).