Implementing a Kafka consumer in Java
Introduction
In this post we will take a look at different ways how messages can be read from Kafka. Our goal will be to find the simplest way to implement a Kafka consumer in Java, exposing potential traps and showing interesting intricacies.
Naive implementation
Getting started with Kafka is incredibly straightforward. You just need to add an official Kafka dependency to your preferred dependency management tool, e.g. add this to your pom.xml :
Then you just need to prepare configuration for your cluster and topic:
Finally, you can create a consumer using this config:
KafkaConsumer
like any other resource needs to be released
so you need to ensure it is closed in the finally block:
Or even better, wrap it in try-with-resources:
Manual or automatic consumer-partition assignment
Now we need to tell the consumer to read messages from a particular set of topics.
One way do to this is to manually assign your consumer to a fixed list of topic-partition pairs:
Alternatively, you can leave it to Kafka by just providing a name of the consumer group the consumer should join:
…and then subscribing to a set of topics:
group.id
parameter is also required if you wish to
store offsets of consumed messages in Kafka (more on that later so stay tuned).
Although assigning partitions manually makes the code easier to reason about, because later you can easily pinpoint the process and the machine where the consumer run and what it tried to do, unfortunately it is not very reliable. If this consumer dies then message processing stops. You could programmatically implement some form of a fail-over but it could be quite tricky (as with any distributed system things tend to be more complicated than we have initially anticipated).
On the other hand, the subscription model assumes you are running multiple consumers that will evenly split topic-partitions between each other:
group.id
parameter.
my-consumer-group
in this case) subscribe to the same list of
topics. Otherwise the final list of topics from which your consumer group
will receive messages will be hard to predict (you can expect a race condition
making debugging particularly miserable). This means that the same consumer
group cannot be re-used to listen to a different subset of topics.
Then when one consumer crashes other consumers take over the partitions previously processed by this dead consumer:
Not only you get automatic fail-over but it also makes scaling-out easier when you add more partitions to your topics. In that case the existing consumers proactively divide the new partitions among themselves:
Consumer rebalancing
The way it works is through consumer rebalancing which gets triggered whenever a consumer joins or leaves a consumer group.
Consumer rebalancing is a huge topic, deserving a separate discussion, but it is definitely worthwhile to explore because it can later reward you with better understanding of what your consumers are doing when you need to investigate a problem.
There are some amazing talks about rebalancing available on the Internet, like for example these two:
- The Magical Rebalance Protocol of Apache Kafka by Gwen Shapira
- Everything You Always Wanted to Know About Kafka’s Rebalance Protocol but Were Afraid to Ask by Matthias J. Sax
And actually, rebalancing is an area of Kafka gaining quite a lot of traction lately. Here are two recent performance improvements that are fixing some of the most burning issues related with consumer rebalancing:
- static membership - introduces
group.instance.id
param used for preserving partition assignment between application restarts - incremental cooperative rebalancing - a new rebalancing algorithm that avoids stop-the-world pauses (instead it allows more than one rebalancing round to obtain final assignment and reduces the number of partitions that switch hands during rebalancing)
Poll for messages in a loop
Now that our consumer is ready to accept new messages we can start polling:
This is normally done in a long-running loop:
- poll for new messages:
consumer.poll(...)
- process messages:
records.forEach(...)
- repeat (go back to step 1.)
The complete code of a naive implementation
Putting it all together this is how the consumer’s code in the subscribing variant can look like:
Saving offsets
This code works fine, however if you are new to Kafka you may be wondering how consumer knows where it finished reading, meaning how and where does it store offsets.
It might be confusing at first, but offsets of read messages are saved
in a special topic called __consumer_offsets
.
Auto-commit
Alright, but the above code does not contain any logic for saving offsets.
And that is fine, because by default the consumer will commit
offsets automatically every interval configured in auto.commit.interval.ms
(5 seconds by default).
consumer.poll(...)
. So basically, if consumer crashes
then poll
is not called again and records from the previous
poll
do not get marked as consumed.
Where to start reading?
Another surprise that you can encounter is when you publish a message before your consumer polls for new records:
If there is no offset stored for a consumer group (it is the first poll or
previously committed offset expired) then your consumer will look at the
auto.offset.reset
parameter and either:
- start from the first record available if
auto.offset.reset=earliest
- start from the end (awaiting new messages) if
auto.offset.reset=latest
And as you may suspect, it is set by default to
latest
which means your consumer will not even try to
read messages produced before it first joined.
Committing manually
Although committing less often is the preferred option from the performance standpoint, it can result in more messages being re-processed when a consumer crashes. Another consumer has to read the same uncommitted messages and do the same work once again.
Problems could also arise if you would like to batch messages in memory so that you can later process them in bulk. Remember that whenever you call poll then records from the previous poll are being committed. It means that all the messages batched so far will be considered consumed, even though they were not yet processed. Now when a consumer crashes, these messages will not be processed by another live consumer because they were already marked as consumed. In other words, this will get you into trouble:
For cases like that you may decide to take a fine-grained control over
how and when messages are committed. You can disable auto-commit by
setting enable.auto.commit=false
and then commit manually by calling
either commitSync()
or commitAsync()
, depending on your
use-case.
If you just call the parameterless variant of the method then it will commit all the records returned by this poll:
But if you wish to be more specific then you can also provide exactly which offsets should be committed for which partitions. For instance like this:
Single- vs multi-threaded
It is important to keep in mind that KafkaConsumer
is not
thread-safe. Therefore it is easier to correctly implement a consumer
with the “one consumer per thread” pattern. In this pattern scaling boils down to
just adding more partitions and more consumers.
As an alternative, you can decouple consumption and processing which should result in reduced number of TCP connections to the cluster and increased throughput. On the downside, you will end up with much more complicated code because not only will you need to coordinate threads but also commit only specific offsets while preserving order between processed messages.
KafkaConsumer
.
If you are not discouraged by potential code complexity then Igor Buzatovic recently wrote an article explaining how to write a multi-threaded consumer. Igor also provided a ready-to-use implementation if you would like to jump straight to the code.
Spring Kafka
Up so far we have mostly complicated the code and discussed what can go wrong. And it is good to be aware of various gotchas you can run into.
But we have strayed from our original goal which was to come up with the simplest Kafka consumer possible.
When thinking about code simplification in Java then usually it is a good idea to check if Spring Framework offers an abstraction that can make our code more compact and less error-prone.
Actually, there is more than one way how Spring can simplify our integration with Kafka. For example you can use Spring Boot togther with Spring Kafka:
Then we only need to configure connection details in application.yml
(or application.properties
):
…and add @KafkaListener
annotation on a method that
will process messages:
If you do not need to access messages’ metadata (like partition, offset or timestamp) then you can make the code even shorter:
There is also an alternative way to access this kind of information using annotations:
Commit modes in Spring Kafka
We have already discussed that you can safely rely
on automatic committing for a wide range of use-cases. Surprisingly, Spring Kafka by
default sets enable.auto.commit=false
but actually makes it
work in a very similar way. What Spring does instead, is emulate
auto-commit by explicitly committing after all the records from the poll
are finally processed.
This acknowledgment mode is called BATCH
because Spring commits
messages from the previous batch of records returned by poll(...)
However, if you wish to manually commit offsets you can switch to MANUAL
or
MANUAL_IMMEDIATE
ACK mode. This can be accomplished by
changing Kafka Listener mode in your Spring Boot config:
Then you are expected to use the Acknowledgment
object to mark
messages as consumed:
Spring Cloud Stream
Another option is to use Spring Cloud Stream with Kafka Binder:
Spring Cloud Stream can help you write even more generic code that you can quickly integrate with a variety of modern messaging systems (e.g. RabbitMQ, Apache Kafka, Amazon Kinesis, Google PubSub and more).
The basic concept is that you just provide implementations of Java functional interfaces:
java.util.function.Supplier
for sourcesjava.util.function.Consumer
for sinksjava.util.function.Function
for processors
…and then in your configuration you bind them to specific queues or topics in the messaging system(s) of your choice.
For example, you can quickly put together a Spring Boot application like this:
Please note the java.util.function.Consumer
implementation
(returned from myMessageConsumer()
method) that you can replace
with your own logic for processing messages. After you finish the implementation
of this interface you just need to properly bind it. As an example, you can configure
the consumer to read from my-topic
using my-group
as
consumer group:
The name of the binding looks peculiar, but in most cases this will just be
<function name>-in-0
for inbound and
<function name>-out-0
for outbound topics.
-0
suffix
representing the index of the binding).
You might also notice that we have not specified any bootstrap.servers
.
By default it connects to a Kafka cluster running on localhost:9092
.
It can be easily changed to a different list of brokers:
Leveraging Spring Cloud Stream totally decoupled our code from Kafka. Now it is possible to switch to an entirely different message broker without having to do a single code change. All this can be done through configuration changes.
Parting thoughts
To sum up, Spring Kafka and Spring Cloud Stream with Kafka Binder offer quick ways to build incredibly succinct code for reading messages from Kafka.
That being said, even though the abstractions provided by Spring framework are usually very good, it can be beneficial to know what happens under the hood.
Here are some topics worth exploring: