Jun 11, 2020

Concurrently Process a Single Kafka Partition

Concurrency in Kafka is defined by how many partitions make up a topic. For a consumer group, there can be as many consumers as there are partitions, with each consumer being assigned one or more partitions. If there are more partitions than consumers, some or all of the consumers will be assigned multiple partitions. If there are more consumers than partitions, the extra consumers will sit idle, potentially waiting for a rebalance when one of the other consumers goes down.

But what if there is a need to process a topic with a single partition faster than one consumer can process by itself?

Recently at a client we were debugging a data load trying to find some missing data. The data load is controlled by another team two hops away from where our process receives the data. We were missing some records on our end but the source team was convinced they loaded all the data without dropping any records. To verify that, I started up kafka-avro-console-consumer and pointed it at that topic to see what was there. The data on that topic is serialized with Avro and the field we needed to match on is defined as a logical type, so it shows up as gibberish in the output meaning grepping the output won’t work very well.

The next step was to write a simple consumer program that would deserialize the Avro and convert the field we needed to match on. Simple enough. So I fire it up and let it run. Four hours and over 300 million records later, I found the data we were looking for (it was dropped by an intermediate process in the next step of the pipeline). 300 million records is a lot of records for one partition and four hours is a long time to wait. How can this be sped up if we need to look for more data?

Why not split the partition into segments and let multiple consumers scan through their own offset range? Each consumer would need its own group and a start and end offset to scan through. So I gave it a shot with 10 threads and it reduced the processing time to somewhere in the neighborhood of 30 minutes.

I’ve written a sample project to illustrate how this works and you can find it on Github. Here are some of the key parts.

Find out the first and last offset to calculate number of messages.

kafkaConsumer.seekToBeginning(List.of(topicPartition));
long startOffset = kafkaConsumer.position(topicPartition);
kafkaConsumer.seekToEnd(List.of(topicPartition));
long endOffset = kafkaConsumer.position(topicPartition);

Split up the partition into a set of ranges for each thread to process.

long sliceSize = (endOffset - startOffset) / numberOfSlices;
List<Range> ranges = IntStream.range(0, numberOfSlices)
     .mapToObj(i -> new Range(startOffset + (i * sliceSize), startOffset + (i * sliceSize) + sliceSize - 1))
     .collect(Collectors.toList());
 
//make sure last range includes the endOffset
ranges.set(numberOfSlices - 1, new Range(ranges.get(numberOfSlices - 1).getStart(), endOffset));

In each thread, set the offset to the beginning of the range.

TopicPartition topicPartition = new TopicPartition(topicName, partition);
kafkaConsumer.assign(List.of(topicPartition));
// move this consumer's offset to the beginning of it's range
kafkaConsumer.seek(topicPartition, startOffset);

Process over this thread’s range and do something when a particular record is found.

// loop over the records until it reaches it's ending offset.  It may go beyond the ending
// offset due to the batch size.
while (currentOffset < endOffset) {
    ConsumerRecords<String, Product> records = kafkaConsumer.poll(Duration.of(1000, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, Product> record : records) {
        if (record.value().getModel() != null && record.value().getModel().equals("T65B")) {
            // we found the record we were looking for!
            LOG.info("Found record: {} - offset {}", record.value().getModel(), record.offset());
         }
    }
    currentOffset = kafkaConsumer.position(topicPartition);
}

This isn’t something I’d get too carried away with and create 1000 groups to split up a partition as that might overwhelm Kafka, though I don’t have any concrete facts to back that up. The KafkaConsumer documentation does state that any number of groups can subscribe to a topic, so maybe it’s not that big of a concern.

Processing data this way, however, breaks Kafka’s ordering guarantee, so keep that in mind if using this method to quickly catch up on processing a singly-partitioned topic.

Link to sample project: https://github.com/brendonanderson/partitionsplit

About the Author

Brendon Anderson profile.

Brendon Anderson

Sr. Consultant

Brendon has over 15 years of software development experience at organizations large and small.  He craves learning new technologies and techniques and lives in and understands large enterprise application environments with complex software and hardware architectures.

Leave a Reply

Your email address will not be published. Required fields are marked *

Related Blog Posts
Using Conftest to Validate Configuration Files
Conftest is a utility within the Open Policy Agent ecosystem that helps simplify writing validation tests against configuration files. In a previous blog post, I wrote about using the Open Policy Agent utility directly to […]
SwiftGen with Image & Color Asset Catalogs
You might remember back in 2015 when iOS 9 was introduced, and we were finally given a way to manage all of our assets in one place with Asset Catalogs. A few years later, support […]
Tracking Original URL Through Authentication
If you read my other post about refreshing AWS tokens, then you probably have a use case for keeping track of the original requested resource while the user goes through authentication so you can route […]
Using Spring Beans in a Kafka Streams ExceptionHandler
There are many things to know before diving into Kafka Streams. If you haven’t already, check out these 5 things as a starting point. Bullet 2 mentions designing for exceptions. Ironically, this seems to be […]