We had a Kafka consumer which was responsible to print data from a Kafka topic for analytics purpose from start index . As per the current functionality it would read till the last message of of this topic and it worked all well . We did it by creating a new consumer with a consumer group id as UUID.randomUUID.toString() .
But as with any system in world our functionality got complicated and we had to read messages only till the point where another consumer for that topic had left .
For e.g. if topic had 20 messages and Consumer 1 had read this topic till message 10 before it died , we wanted our analytics consumer to only read message till 10 , not till 20 as it was happening initially for us .
We contemplated different mechanism to work this out :
- Store index in a database of separate file
- Make a copy of read messages in another topic
- Run analytics thread parallel to our original consumer thread and connect them through mailbox mechanism .
All of the above ways are valid but had their own issues . So we found the solution through existing Kafka API as follows :
Step 1 : Create an object of original consumer group id . Just create object , don’t start to poll it . Once you have the object you can use it to get the last committed position of that group . In our case we had only 1 consumer with a given consumer name with 1 partition to the topic , below code shows the way to get OffsetAndMetadata for a consumer for a given TopicParition which can be used to get offset .
TopicPartition partition = new TopicPartition(topicName, 0);
KafkaConsumer consumer= new KafkaConsumer(realTimeConsumeProp);
OffsetAndMetadata metadata = consumer.committed(partition);
long offset = -1L;
if (metadata != null) {
offset = metadata.offset();
}
consumer.close();
Step 2 : Once you have the offset for a given topic and partition , create a new consumer with different consumer id to read till that offset – 1
private void retrieveMessage(Properties consumerProperties, Long
topicParitionOffset) {
KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProperties);
kafkaConsumer.subscribe(topicNameList);
while (true) {
ConsumerRecords records = kafkaConsumer.poll(Duration.of(100,
ChronoUnit.MILLIS));
if (!records.isEmpty()) {
if (!readData(records, offset)) {
break;
}
}
}
kafkaConsumer.close();
}
private boolean readData(ConsumerRecords records, long offset) {
for (Object record1 : records) {
ConsumerRecord record = (ConsumerRecord) record1;
try {
if (offset != -1 && record.offset() >= offset - 1 ) {
return false;
}
System.out.println(record);
} catch (Exception e) {
e.printStackTrace();
}
}
return true;
}
And job done without any external help !!