Read data till Kafka Consumer Committed Position

We had a Kafka consumer which was responsible to print data from a Kafka topic for analytics purpose from start index . As per the current functionality it would read till the last message of of this topic and it worked all well . We did it by creating a new consumer with a consumer group id as UUID.randomUUID.toString() .

But as with any system in world our functionality got complicated and we had to read messages only till the point where another consumer for that topic had left .

For e.g. if topic had 20 messages and Consumer 1 had read this topic till message 10 before it died , we wanted our analytics consumer to only read message till 10 , not till 20 as it was happening initially for us .

We contemplated different mechanism to work this out :

  • Store index in a database of separate file
  • Make a copy of read messages in another topic
  • Run analytics thread parallel to our original consumer thread and connect them through mailbox mechanism .

All of the above ways are valid but had their own issues . So we found the solution through existing Kafka API as follows :

Step 1 : Create an object of original consumer group id . Just create object , don’t start to poll it . Once you have the object you can use it to get the last committed position of that group . In our case we had only 1 consumer with a given consumer name with 1 partition to the topic , below code shows the way to get OffsetAndMetadata for a consumer for a given TopicParition which can be used to get offset .

 TopicPartition partition = new TopicPartition(topicName, 0);

 KafkaConsumer consumer= new KafkaConsumer(realTimeConsumeProp);

 OffsetAndMetadata metadata = consumer.committed(partition);

 long offset = -1L;

 if (metadata != null) {
      offset = metadata.offset();

Step 2 : Once you have the offset for a given topic and partition , create a new consumer with different consumer id to read till that offset – 1

 private void retrieveMessage(Properties consumerProperties, Long 
                              topicParitionOffset) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
        while (true) {
            ConsumerRecords records = kafkaConsumer.poll(Duration.of(100, 
            if (!records.isEmpty()) {
                if (!readData(records, offset)) {

 private boolean readData(ConsumerRecords records, long offset) {
        for (Object record1 : records) {
            ConsumerRecord record = (ConsumerRecord) record1;
            try {
                if (offset != -1 && record.offset() >= offset - 1 ) {
                    return false;
            } catch (Exception e) {
        return true;

And job done without any external help !!

Comments are closed.

Blog at

Up ↑

%d bloggers like this: