Read data till Kafka Consumer Committed Position

We had a Kafka consumer which was responsible to print data from a Kafka topic for analytics purpose from start index . As per the current functionality it would read till the last message of of this topic and it worked all well . We did it by creating a new consumer with a consumer group id as UUID.randomUUID.toString() .

But as with any system in world our functionality got complicated and we had to read messages only till the point where another consumer for that topic had left .

For e.g. if topic had 20 messages and Consumer 1 had read this topic till message 10 before it died , we wanted our analytics consumer to only read message till 10 , not till 20 as it was happening initially for us .

We contemplated different mechanism to work this out :

  • Store index in a database of separate file
  • Make a copy of read messages in another topic
  • Run analytics thread parallel to our original consumer thread and connect them through mailbox mechanism .

All of the above ways are valid but had their own issues . So we found the solution through existing Kafka API as follows :

Step 1 : Create an object of original consumer group id . Just create object , don’t start to poll it . Once you have the object you can use it to get the last committed position of that group . In our case we had only 1 consumer with a given consumer name with 1 partition to the topic , below code shows the way to get OffsetAndMetadata for a consumer for a given TopicParition which can be used to get offset .

 TopicPartition partition = new TopicPartition(topicName, 0);

 KafkaConsumer consumer= new KafkaConsumer(realTimeConsumeProp);

 OffsetAndMetadata metadata = consumer.committed(partition);

 long offset = -1L;

 if (metadata != null) {
      offset = metadata.offset();
 }
 consumer.close();

Step 2 : Once you have the offset for a given topic and partition , create a new consumer with different consumer id to read till that offset – 1

 private void retrieveMessage(Properties consumerProperties, Long 
                              topicParitionOffset) {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
        kafkaConsumer.subscribe(topicNameList);
        while (true) {
            ConsumerRecords records = kafkaConsumer.poll(Duration.of(100, 
                                      ChronoUnit.MILLIS));
            if (!records.isEmpty()) {
                if (!readData(records, offset)) {
                    break;
                }
            } 
        }
        kafkaConsumer.close();
    }

 private boolean readData(ConsumerRecords records, long offset) {
        for (Object record1 : records) {
            ConsumerRecord record = (ConsumerRecord) record1;
            try {
                if (offset != -1 && record.offset() >= offset - 1 ) {
                    return false;
                }
                System.out.println(record);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return true;
    }

And job done without any external help !!

Comments are closed.

Blog at WordPress.com.

Up ↑

%d bloggers like this: