Introduction

In this post we will take a look at different ways how messages can be read from Kafka. Our goal will be to find the simplest way to implement a Kafka consumer in Java, exposing potential traps and showing interesting intricacies.

The code samples will be provided in Java 11 but they could be also easily translated to other versions of Java (or even to other JVM languages, like Kotlin or Scala).

Naive implementation

Getting started with Kafka is incredibly straightforward. You just need to add an official Kafka dependency to your preferred dependency management tool, e.g. add this to your pom.xml :

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version> <!-- e.g. 2.6.0 -->
</dependency>

Then you just need to prepare configuration for your cluster and topic:

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

// ...

var props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

Finally, you can create a consumer using this config:

var consumer = new KafkaConsumer<String, String>(props);

KafkaConsumer like any other resource needs to be released so you need to ensure it is closed in the finally block:

var consumer = new KafkaConsumer<String, String>(props);
try {
  // ...
} finally {
  consumer.close();
}

Or even better, wrap it in try-with-resources:

try (var consumer = new KafkaConsumer<String, String>(props)) {
  // ...
}

Manual or automatic consumer-partition assignment

Now we need to tell the consumer to read messages from a particular set of topics.

One way do to this is to manually assign your consumer to a fixed list of topic-partition pairs:

var topicPartitionPairs = List.of(
    new TopicPartition("my-topic", 0),
    new TopicPartition("my-topic", 1)
);
consumer.assign(topicPartitionPairs);

Alternatively, you can leave it to Kafka by just providing a name of the consumer group the consumer should join:

props.put("group.id", "my-consumer-group");

…and then subscribing to a set of topics:

consumer.subscribe(List.of("my-topic"));
group.id parameter is also required if you wish to store offsets of consumed messages in Kafka (more on that later so stay tuned).

Although assigning partitions manually makes the code easier to reason about, because later you can easily pinpoint the process and the machine where the consumer run and what it tried to do, unfortunately it is not very reliable. If this consumer dies then message processing stops. You could programmatically implement some form of a fail-over but it could be quite tricky (as with any distributed system things tend to be more complicated than we have initially anticipated).

On the other hand, the subscription model assumes you are running multiple consumers that will evenly split topic-partitions between each other:

Consumer groups are uniquely identified by the group.id parameter.
It is important that all consumers within one consumer group (my-consumer-group in this case) subscribe to the same list of topics. Otherwise the final list of topics from which your consumer group will receive messages will be hard to predict (you can expect a race condition making debugging particularly miserable). This means that the same consumer group cannot be re-used to listen to a different subset of topics.

Then when one consumer crashes other consumers take over the partitions previously processed by this dead consumer:

Not only you get automatic fail-over but it also makes scaling-out easier when you add more partitions to your topics. In that case the existing consumers proactively divide the new partitions among themselves:

Consumer rebalancing

The way it works is through consumer rebalancing which gets triggered whenever a consumer joins or leaves a consumer group.

Consumer rebalancing is a huge topic, deserving a separate discussion, but it is definitely worthwhile to explore because it can later reward you with better understanding of what your consumers are doing when you need to investigate a problem.

There are some amazing talks about rebalancing available on the Internet, like for example these two:

And actually, rebalancing is an area of Kafka gaining quite a lot of traction lately. Here are two recent performance improvements that are fixing some of the most burning issues related with consumer rebalancing:

  • static membership - introduces group.instance.id param used for preserving partition assignment between application restarts
  • incremental cooperative rebalancing - a new rebalancing algorithm that avoids stop-the-world pauses (instead it allows more than one rebalancing round to obtain final assignment and reduces the number of partitions that switch hands during rebalancing)

Poll for messages in a loop

Now that our consumer is ready to accept new messages we can start polling:

var records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record ->
    System.out.println(
        record.offset() + " -> " + record.value()));

This is normally done in a long-running loop:

  1. poll for new messages: consumer.poll(...)
  2. process messages: records.forEach(...)
  3. repeat (go back to step 1.)

The complete code of a naive implementation

Putting it all together this is how the consumer’s code in the subscribing variant can look like:

var props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("group.id", "my-consumer-group");

try (var consumer = new KafkaConsumer<String, String>(props)) {
  consumer.subscribe(List.of("my-topic"));
  while (true) {
    var records = consumer.poll(Duration.ofSeconds(1));
    records.forEach(record ->
        System.out.println(
            record.offset() + " -> " + record.value()));
  }
}

Saving offsets

This code works fine, however if you are new to Kafka you may be wondering how consumer knows where it finished reading, meaning how and where does it store offsets.

It might be confusing at first, but offsets of read messages are saved in a special topic called __consumer_offsets.

In the past offsets used to be stored in ZooKeeper which is probably even more puzzling.

Auto-commit

Alright, but the above code does not contain any logic for saving offsets.

And that is fine, because by default the consumer will commit offsets automatically every interval configured in auto.commit.interval.ms (5 seconds by default).

You might be worried that this auto-commit functionality runs in the background and therefore it could incorrectly mark a message as consumed when consumer crashes or when an exception gets thrown. Actually, offsets are not committed in some background thread, but are saved when you call consumer.poll(...). So basically, if consumer crashes then poll is not called again and records from the previous poll do not get marked as consumed.

Where to start reading?

Another surprise that you can encounter is when you publish a message before your consumer polls for new records:

//  1. send
try (var producer = new KafkaProducer<String, String>(props)) {
  producer.send(new ProducerRecord<>("my-topic", value));
}

...

//  2. poll
var records = consumer.poll(Duration.ofSeconds(1));

If there is no offset stored for a consumer group (it is the first poll or previously committed offset expired) then your consumer will look at the auto.offset.reset parameter and either:

  • start from the first record available if auto.offset.reset=earliest
  • start from the end (awaiting new messages) if auto.offset.reset=latest

Please note the first available offset can be larger than 0 (due to old messages being deleted according to your retention policy).

And as you may suspect, it is set by default to latest which means your consumer will not even try to read messages produced before it first joined.

Committing manually

Although committing less often is the preferred option from the performance standpoint, it can result in more messages being re-processed when a consumer crashes. Another consumer has to read the same uncommitted messages and do the same work once again.

Problems could also arise if you would like to batch messages in memory so that you can later process them in bulk. Remember that whenever you call poll then records from the previous poll are being committed. It means that all the messages batched so far will be considered consumed, even though they were not yet processed. Now when a consumer crashes, these messages will not be processed by another live consumer because they were already marked as consumed. In other words, this will get you into trouble:

var buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
  // DO NOT DO THIS WHEN enable.auto.commit=true
  var records = consumer.poll(Duration.ofSeconds(1));
  records.forEach(buffer::add);
  if (buffer.size() > THRESHOLD) {
    buffer.forEach(record -> ...);
    buffer.clear();
  }
}

For cases like that you may decide to take a fine-grained control over how and when messages are committed. You can disable auto-commit by setting enable.auto.commit=false and then commit manually by calling either commitSync() or commitAsync(), depending on your use-case.

If you just call the parameterless variant of the method then it will commit all the records returned by this poll:

var records = consumer.poll(Duration.ofSeconds(1));
...
consumer.commitSync(); // commits all the records from the current poll

But if you wish to be more specific then you can also provide exactly which offsets should be committed for which partitions. For instance like this:

consumer.commitSync(Collections.singletonMap(
  partition, new OffsetAndMetadata(lastOffset + 1)
));

Single- vs multi-threaded

It is important to keep in mind that KafkaConsumer is not thread-safe. Therefore it is easier to correctly implement a consumer with the “one consumer per thread” pattern. In this pattern scaling boils down to just adding more partitions and more consumers.

As an alternative, you can decouple consumption and processing which should result in reduced number of TCP connections to the cluster and increased throughput. On the downside, you will end up with much more complicated code because not only will you need to coordinate threads but also commit only specific offsets while preserving order between processed messages.

If you are not discouraged by potential code complexity then Igor Buzatovic recently wrote an article explaining how to write a multi-threaded consumer. Igor also provided a ready-to-use implementation if you would like to jump straight to the code.

Spring Kafka

Up so far we have mostly complicated the code and discussed what can go wrong. And it is good to be aware of various gotchas you can run into.

But we have strayed from our original goal which was to come up with the simplest Kafka consumer possible.

When thinking about code simplification in Java then usually it is a good idea to check if Spring Framework offers an abstraction that can make our code more compact and less error-prone.

Actually, there is more than one way how Spring can simplify our integration with Kafka. For example you can use Spring Boot togther with Spring Kafka:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

Then we only need to configure connection details in application.yml (or application.properties):

spring.kafka:
  bootstrap-servers: localhost:9092
  consumer:
    group-id: my-group
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

…and add @KafkaListener annotation on a method that will process messages:

@Component
public class MyKafkaListener {

  @KafkaListener(topics = "my-topic")
  public void processMessage(ConsumerRecord<String, String> record) {
    System.out.println(record.offset() + " -> " + record.value());
  }
}

If you do not need to access messages’ metadata (like partition, offset or timestamp) then you can make the code even shorter:

@KafkaListener(topics = "my-topic")
public void processMessage(String content) {
  System.out.println(content);
}

There is also an alternative way to access this kind of information using annotations:

@KafkaListener(topics = "my-topic")
public void processMessage(
    @Header(KafkaHeaders.PARTITION_ID) int partition,
    @Header(KafkaHeaders.OFFSET) int offset,
    @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
    @Payload String content) {
  // ...
}

Commit modes in Spring Kafka

We have already discussed that you can safely rely on automatic committing for a wide range of use-cases. Surprisingly, Spring Kafka by default sets enable.auto.commit=false but actually makes it work in a very similar way. What Spring does instead, is emulate auto-commit by explicitly committing after all the records from the poll are finally processed.

This acknowledgment mode is called BATCH because Spring commits messages from the previous batch of records returned by poll(...)

However, if you wish to manually commit offsets you can switch to MANUAL or MANUAL_IMMEDIATE ACK mode. This can be accomplished by changing Kafka Listener mode in your Spring Boot config:

spring.kafka:
  # ...
  listener:
    ack-mode: MANUAL_IMMEDIATE

Then you are expected to use the Acknowledgment object to mark messages as consumed:

@KafkaListener(topics = "my-topic")
public void processMessage(String content, Acknowledgment ack) {
  // ...
  ack.acknowledge();
}
There is a dedicated section about different acknowledgment modes in the official Spring Kafka documentation if you would like to read more about it.

Spring Cloud Stream

Another option is to use Spring Cloud Stream with Kafka Binder:

<dependency>
  	<groupId>org.springframework.cloud</groupId>
  	<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  	<groupId>org.springframework.cloud</groupId>
  	<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Spring Cloud Stream can help you write even more generic code that you can quickly integrate with a variety of modern messaging systems (e.g. RabbitMQ, Apache Kafka, Amazon Kinesis, Google PubSub and more).

The basic concept is that you just provide implementations of Java functional interfaces:

  • java.util.function.Supplier for sources
  • java.util.function.Consumer for sinks
  • java.util.function.Function for processors

…and then in your configuration you bind them to specific queues or topics in the messaging system(s) of your choice.

There is a very good talk by Soby Chacko and Oleg Zhurakousky from Pivotal about integrating Spring Cloud Stream with Kafka, explaining the approach with Java functional interfaces. Oleg Zhurakousky also wrote an interesting article explaining the motives behind the move to functional programming model in Spring Cloud Stream.

For example, you can quickly put together a Spring Boot application like this:

// ...
import java.util.function.Consumer;

@SpringBootApplication
public class KafkaConsumerApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafkaConsumerApplication.class, args);
  }

  @Bean
  public Consumer<String> myMessageConsumer() {
    return content -> System.out.println(content);
  }
}

Please note the java.util.function.Consumer implementation (returned from myMessageConsumer() method) that you can replace with your own logic for processing messages. After you finish the implementation of this interface you just need to properly bind it. As an example, you can configure the consumer to read from my-topic using my-group as consumer group:

spring.cloud.stream:
  bindings:
    myMessageConsumer-in-0:
      destination: my-topic
      group: my-group

The name of the binding looks peculiar, but in most cases this will just be <function name>-in-0 for inbound and <function name>-out-0 for outbound topics.

Please refer to documentation on binding names for more details (especially if you are curious about the -0 suffix representing the index of the binding).

You might also notice that we have not specified any bootstrap.servers. By default it connects to a Kafka cluster running on localhost:9092.

It can be easily changed to a different list of brokers:

spring.cloud.stream:
  kafka.binder:
    brokers: my-node1:9090,my-node2:9090,my-node3:9090

Leveraging Spring Cloud Stream totally decoupled our code from Kafka. Now it is possible to switch to an entirely different message broker without having to do a single code change. All this can be done through configuration changes.

Parting thoughts

To sum up, Spring Kafka and Spring Cloud Stream with Kafka Binder offer quick ways to build incredibly succinct code for reading messages from Kafka.

That being said, even though the abstractions provided by Spring framework are usually very good, it can be beneficial to know what happens under the hood.

Here are some topics worth exploring: