One of the use-cases for Apache Kafka is to use it as a storage layer for an application. You are able to rewind and fast-forward through messages which makes it behave like data cache. If the expiry for a given topic is set to forever then it behave like a persistent data store.

However, when you’re using Spring Kafka and much of the internals of the Kafka system is hidden from you, then some of the more complex situations can be harder to achieve.

A Basic Kafka Listener

A basic KafkaListener in Spring Boot is written something like:

@KafkaListener(topics = "${kafka.topic}")
public void receiveMessage(final String message) {
    LOGGER.info("{}", message);
}

This simply prints out a string message received on the topic.

When using Protocol Buffers as a message serialization schema, it is necessary to create your own configuration which you provide through a @Configuration class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Configuration
@EnableKafka
public class KafkaProtobufListenerConfiguration {
    private static final String BOOTSTRAP_SERVERS_CONFIG_PROPERTY = "spring.kafka.bootstrap-servers";
    private static final String GROUP_ID_CONFIG_PROPERTY = "spring.kafka.consumer.group-id";
    private static final String KEY_DESERIALIZER_CLASS_CONFIG = "org.apache.kafka.common.serialization.StringDeserializer";
    private static final String VALUE_DESERIALIZER_CLASS_CONFIG = "com.example.KafkaProtobufMessageDeserializer";

    private final Environment environment;

    @Autowired
    public KafkaProtobufListenerConfiguration(final Environment environment) {
        this(environment);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactoryListener() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = containerFactorySupplier.get();
        factory.setConsumerFactory(consumerFactorySupplier.apply(consumerConfigsListener()));
        factory.setBatchListener(false);
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigsListener() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty(BOOTSTRAP_SERVERS_CONFIG_PROPERTY));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, environment.getProperty(GROUP_ID_CONFIG_PROPERTY));
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG);
        return props;
    }
}

This class is the configuration class for the Spring Kafka consumer factory (note the @Configuration and @EnableKafka annotations). It’s actually line 30 that sets the message body deserializer to our own Prototbuf deserializer which uses the Google Protobuf library and our own message definitions. The rest is essentially proxying the properties from the properties file.

We use this factory by specifying it as part of the @KafkaListener annotation:

@KafkaListener(topics = "${kafka.topic}",
               containerFactory = "kafkaListenerContainerFactoryListener"
)
public void receiveMessage(final Message message) {
    LOGGER.info("{}", message);
}

The containerFactory annotation property is the name of the bean method in the @Configuration class that provides the configuration instance. Our listener will then receive Message objects instead of a String because that is what our deserializer provides.

Seeking On Partition Assignment

So our system starts and can process Protobuf messages from our topic. These messages might be used to update our service configuration when something else in the system changes - a sort of notification-of-configuration-change message.

If this topic contains configuration changes, then when the service restarts (say through a redeployment or a restart), it would make sense to always read the last message on the topic to set our configuration to the last update we received when the service was running.

If we set the spring.kafka.consumer.auto-offset-reset property to be latest, then the consumer will begin reading messages from the last commit for the consumer group its in. That means it may end up reading lots of message (if many messages came in while it was down), or it may read none (if it had already read the last message). If we set to earliest we will get all the messages from the start. If our processing of the message is somewhat complex (say downloading configuration from another service) then we want to avoid processing multiple messages, so both of these options are no good for us.

One way I found around this is to hook into the partition assignment lifecycle for the Kafka consumer and, during partition reassignment, reset the consumer offset to the latest offset minus one. This will make the listener re-read the last message. It is important that any services using this topic be in different consumer groups so that the seeking does not affect other consumers. If this is some sort of notification topic, then this is the most likely configuration anyway.

So, to do this in Spring Boot, you can implement the ConsumerSeekAware interface which allows you to add some code that is executed when the partition is assigned during a rebalance:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyListener implements ConsumerSeekAware {
    private static final Logger LOGGER = getLogger(MyListener.class);
    private KafkaProtobufListenerConfiguration configs;

    public MyListener(final KafkaProtobufListenerConfiguration configs) {
        this.configs = configs;
    }

    @KafkaListener(topics = "${kafka.topic}",
                   containerFactory = "kafkaListenerContainerFactoryListener")
    public void receiveMessage(final Message message) {
        LOGGER.info("{}", message);
    }

    @Override
    public void registerSeekCallback(final ConsumerSeekCallback consumerSeekCallback) {}
    
    @Override
    public void onPartitionsAssigned(final Map<TopicPartition, Long> assignments, final ConsumerSeekCallback consumerSeekCallback) {
        try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(this.configs.consumerConfigsListener())) {
            final Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(assignments.keySet());
            assignments.forEach((topic, action) -> consumerSeekCallback.seek(topic.topic(), topic.partition(), topicPartitionLongMap.get(topic) - 1));
        }
    }

    @Override
    public void onIdleContainer(final Map<TopicPartition, Long> map, final ConsumerSeekCallback consumerSeekCallback) {}
}

The ConsumerSeekAware interface enforces the last three method implementations, but really we are only interested in the method onPartitionsAssigned. This method is called when Kafka assigns partitions from a topic to the consumer.

We temporarily instantiate a new KafkaConsumer from which we can get the size of the topic and seek to the last-but-one message.

One line 20, we instantiate a temporary consumer using the configuration from our configuration class (which we inject through the constructor to allow reuse). With the consumer, we retrieve the list of topic lengths using the consumer.endOffsets() method. We use the map returned from this method to seek each of our topic assignments to the last-but-one message.

We can test this out by sending some messages and starting the service. We see that it only receives the last message. If we restart without sending new messages, we re-read the message again.

It took me a while to find out how to do this, as Spring documentation is a spread all over the internet and different Stack Overflow posts. I hope this post will help someone else work out how to do this rather faster than I did.