Capturing missing events with Apache Kafka Streams

Introduction

There is a lot going on with Kafka Streams. Understanding it takes time, and it always seems there is more you could learn. In this article, I will utilize Kafka Core and Streams for writing a replay commit log for RESTful endpoints.

Provided is an example application showcasing this replay commit log. The application has many components; the technology stack includes Kafka, Kafka Streams, Spring Boot, Spring Kafka, Avro, Java 8, Lombok, and Jackson. The specific areas of Kafka streams are kTables, kStreams, windowing, aggregates, joins, and serialization.

Background

Why a replay of a commit log? I took a concept that interests me and the went ahead to determine how I could achieve this with Kafka streams.

Over the years, I have been given tasks of solving problems w/out adding to the technology stack. This led me to write a commit log and a sequence number generator (on separate occasions) in Cassandra; things I would normally not use Cassandra to implement. Using Kafka Streams on a known problem, is an excellent opportunity to learn about streams.

Development Notes

The use of peek() with logging was extremely helpful to visual Kafka Stream processing behavior. Also, through() is very useful for creating temporary topics to capture what is going on beyond logging with peek().

When it comes to Avro, using specific serializers and the generic serializer is a matter of choice. Using the generic serializers work well if the streams do not need to know about the content of the Avro message (or much of it). My next steps with Avro will be in understanding the different uses of Avro by Kafka Core, Streams, and the Connector API.

Challenges

Joining: While I have utilized SQL for over 20 years, I am surprised how many times I incorrectly established the joining structure for my Kafka streams. Always double check your joining strategy, as it is quite easy to forget something or select the wrong component.

Stream Delay: The delay response from streams; just because you do not see a response, does not mean you won’t; be patient.

Topic Generation: For development, it is super easy to let the broker create topics on demand. However, streams require topics to exist before the stream can is created. As I recreated containers I losing topics led to stream processing not starting; a simple reboot of my demo application solved the problem. Watch the application startups and ensure the streams are properly consuming the underlying topic.

Topic Repartitioning: Repartitioning a topic after a stream connection is challenging. Best to create your topics with desired replication and partitioning prior to starting your clients and streams.

Stream Topics: kafka-topics –zookeeper zookeeper:2181 –list will show the application topics and topics created for failure and recovery of streams. The partitioning, of these internal changelog topics, needs to have the same partitioning as the application topics.

Application

The demo application provides a RESTful post command that publishes the request on start and the response on completion. For demo purposes, it provides query parameters for forcing delays and failures. Set forceFailure to true to have the endpoint experience a hard fail.
Set delay to any positive value to pause the endpoint for the specified number of milliseconds. Ideally to show it pause longer than the 250ms SLA of the endpoint. The example execution and application logs, shown later, demonstrate their behavior.

/api/v1/commit?forceFailure=false&delay=1

 
Kafka Topics

There are only 3 public topics in this demo application.

  • commit.log : the log that the application writes at start and end of a RESTful call.
  • commit.log.replay : results of the stream processing (events needing replay).
  • commit.log.replay2 : alternate stream approach (joining in opposite order).

 
The Kafka kStreams and kTables

By using Spring, Kafka KTables and KStreams are Spring Beans. Spring Kafka Streams provides a StreamsBuilder bean making it easy to construct the Kafka Streams also as beans through Spring Configuration. See the KafkaConfig class for details.

kStream

The kStream bean is straightforward; consume a topic and make it a stream. The key is knowing how to serialize the key and the value. In my example, I used a string for the key and a custom Avro object for the value. The stream uses the GenericAvroSerde, which reads the Avro Schema from the schema registry and provide proper deserialization and access through a GenericRecord class. If you are unfamiliar with Avro, read their getting started documentation Avro Getting Started In Java.

  1.     /**
  2.      * Create a KStream for the given topic.
  3.      *
  4.      * You must know how the data is serialized on the topic. When using Avro you have various options for domain
  5.      * object to access the data. Various Serde's know how to convert read the binary Avro data into a domain structure
  6.      * that can access the data. Streams that need little or no inspection of the message, utilizing
  7.      * the GenericAvroSerde makes sense.
  8.      *
  9.      * Kafka Streams provides GenericAvroSerde and SpecificAvroSerde.
  10.      */
  11.     @Bean
  12.     public KStream kStream(StreamsBuilder streamBuilder) {
  13.         return streamBuilder
  14.                 // stream a topic deserializing the key as a string and deserializing the value as AVRO into a generic record object.
  15.                 .stream("commit.log", Consumed.with(Serdes.String(), valueSerde()))
  16.                 .peek((key, value) -> log.debug("kStream : key={}, value={}", key, value));
  17.     }

kTable

The groupByKey() is used to group records on the given stream. Because both the request and response messages are to the same topic and key; this grouping allows for those messages to be handled by the same stream processor if multiple instances of the stream application are deployed. The windowBy() determines the timeframe for grouping messages. There are two windowBy() methods on a grouped stream, a time window and a session window. A time window would have the possibility of two messages being separated, so utilizing a session window is the logical choice. It is based on inactivity of a given session (key). The session window should be equal to the acceptable terms for a given RESTful call.

With windowing established, aggregation (combining) of the data comes next. A session window also requires merging. For a time window, no merging is necessary. Merging occurs when sessions are combined; I’m trusting the inner workings of Kafka Streams to on how this piece works.

Once the session is combined it needs to be materialized into a kTable. The aggregate takes a Materialized.with() where the serialization method for both key and value materialization is supplied. The Materializer is an optional argument as a default can be inferred by the aggregator. For better understanding and documentation of the kTable, explicitly providing the Materializer is helpful.

  1.     /**
  2.      * Create a kTable
  3.      *
  4.      * kTables should be considered as a permanent data store.  When it comes to windowing, there are opportunities for
  5.      * purging tables.  Windowing kTables have a retention period, but still timeframe of windows can
  6.      * be a very long time, see UnlimitedWindows.of() for the support of windowing.
  7.      *
  8.      * However, when it comes to the commit log, the whole point is to capture windows of time with no response, so a
  9.      * session window of a period of time equals (or slightly greater) than the RESTful SLA seems to make the most
  10.      * sense.
  11.      */
  12.     @Bean
  13.     public KTable<Windowed, Integer> kTable(StreamsBuilder streamsBuilder) {
  14.         return kStream(streamsBuilder)
  15.                 .peek((key, value) -> log.debug("kTable : key={}, value={}", key, value))
  16.                 // group records on the stream by the key, since the request/response utilize the same key,
  17.                 // this is the logical choice for grouping.
  18.                 .groupByKey()
  19.                 // window by a period of time equivalent to the RESTful SLA, 250ms, for understanding how windowing
  20.                 // works, be sure to checkout UnlimitedWindows.of() to understand that Kafka Streams handle large
  21.                 // windows, and windowing should be based on solving a problem, not worrying about resources.
  22.                 .windowedBy(SessionWindows.with(250L))
  23.                 // aggregate is what is used to combine the stream records that are being grouped.
  24.                 // the aggregator is rather simple, count the messages.
  25.                 .aggregate(
  26.                         // initialize the value of the windowing to 0
  27.                         () -> 0,
  28.                         // for a given message merge, since we do not actually care of what is in the message (value)
  29.                         // we just increment the counter of the aggregate.
  30.                         (key, value, aggregate) -> {
  31.                             log.debug("kTable - aggregator : key={}, aggregate={}", key, aggregate + 1);
  32.                             return aggregate + 1;
  33.                         },
  34.                         // we need to merge two aggregates together.
  35.                         (aggKey, aggOne, aggTwo) -> {
  36.                             log.debug("kTable - merger : key={}, merge={}", aggKey, aggOne + aggTwo);
  37.                             return aggOne + aggTwo;
  38.                         },
  39.                         // how the kTable is materialized.
  40.                         Materialized.with(Serdes.String(), Serdes.Integer()));
  41.     }

kStreamResponseMissing

The kStreamResponseMissing takes the kTable and creates a stream where only values of 1 exist in the table. Since kTable is a window aggregate counting messages for the key, a count of 1 implies there was not a response for the given key. If a response was delayed, then there also is a case where two sessions are created for the same key and the response is defined in its own session. This is why how the definitions of the windows impact the behavior of the system.

  1.     @Bean
  2.     public KStream kStreamResponseMissing(StreamsBuilder streamBuilder) {
  3.         return kTable(streamBuilder)
  4.                 // turn the kTable into a kStream where the windowing key is the key, since the windowing key is the
  5.                 // same session key we used for the commit log, it is a natural choice.
  6.                 .toStream((wk, v) -> wk.key())
  7.                 .peek((key, value) -> log.debug("kStreamResponseMissing (pre filter) : key={}, value={}", key, value))
  8.                 // only take those from the table who have a value of 1, which means that for the given session-window only
  9.                 // one record exists.  If the kTable has a value of 2, that means it has both a request and response
  10.                 // in the given session window, so it finished successfully; so that means the response is not missing.
  11.                 .filter((key, value) -> value == 1)
  12.                 .peek((key, value) -> log.debug("kStreamResponseMissing (post filter) : key={}, value={}", key, value));
  13.     }

kStreamReplay

This bean combines the stream of the commit log with the stream of counts. If the count is 1 and publishing the original message to a new topic. Using through() allows for publishing to a topic and still returning the kStream (allowing it to be managed by Spring). The to() operation does not return the kStream.

  1.     /**
  2.      * Combine the streams and write out the response to a topic.
  3.      *
  4.      * Spring makes it easy to reference beans w/out recreating them.  Calling kStream(streamBuilder) returns the
  5.      * same spring bean no matter how many times you call it (provided you are using @Configuration).
  6.      */
  7.     @Bean
  8.     public KStream kStreamReplay(StreamsBuilder streamBuilder) {
  9.         return kStream(streamBuilder)
  10.                 .peek((key, value) -> log.debug("kStreamReplay (pre filter) : key={}, value={}", key, value))
  11.                 // we only want the requests, not the response, so we must filter based on content of the message (value)
  12.                 // we are utilizing the AVRO GenericRecord to obtain the action value.  a SpecificRecord AVRO could
  13.                 // have been utilized if so desired.
  14.                 .filter((key, value) -> "REQUEST".equals(value.get("action").toString())) // could use specific avro...
  15.                 .peek((key, value) -> log.debug("kStreamReplay (post filter) : key={}, value={}", key, value))
  16.                 .join(kStreamResponseMissing(streamBuilder),
  17.                         (value1, value2) -> {
  18.                             log.debug("kStreamReplay (join) value1={}, value2={}", value1, value2);
  19.                             return value1;
  20.                         },
  21.                         JoinWindows.of(1000L), // picking a window larger than the session window of the kTable
  22.                         Joined.with(Serdes.String(), valueSerde(), Serdes.Integer()))
  23.                 .peek((key, value) -> log.debug("kStreamReplay (post join) : key={}, value={}", key, value))
  24.                 // write the value out to another topic.  Since we are writing out the same key/value as the original
  25.                 // stream no Produced.with() serdes are needed.
  26.                 .through("commit.log.replay");
  27.     }

kStreamReplay_v2

An alternate implementation of kStreamReplay, reverse the join (for this example) does not change behavior. Because the first stream is the stream from the kTable, the type of message being published is not the same as the item.catalog topic, so providing a Produced.with() is required. When the application executes, the peek() methods will execute at different times because of joining the streams in the reverse order.

  1.     /**
  2.      * Same result as kStreamReplay, but joining the streams in the opposite order.
  3.      */
  4.     @Bean
  5.     public KStream kStreamReplay_v2(StreamsBuilder streamBuilder) {
  6.         return kStreamResponseMissing(streamBuilder)
  7.                 .peek((key, value) -> log.debug("kStreamReplay_v2 (pre filter) : key={}, value={}", key, value))
  8.                 // we only want the requests, not the response, so we must filter based on content of the message (value)
  9.                 // we are utilizing the Avro GenericRecord to obtain the action value.  a SpecificRecord Avro could
  10.                 // have been utilized if so desired.
  11.                 .filter((key, value) -> value == 1)
  12.                 .peek((key, value) -> log.debug("kStreamReplay_v2 (post filter) : key={}, value={}", key, value))
  13.                 .join(kStream(streamBuilder).filter((key, value) -> "REQUEST".equals(value.get("action").toString())),
  14.                         (value1, value2) -> {
  15.                             log.debug("kStreamReplay_v2 (join) value1={}, value2={}", value1, value2);
  16.                             return value2;
  17.                         },
  18.                         JoinWindows.of(1000L),
  19.                         Joined.with(Serdes.String(), Serdes.Integer(), valueSerde()))
  20.                 .peek((key, value) -> log.debug("kStreamReplay_v2 (post join) : key={}, value={}", key, value))
  21.                 // write the value out to another topic.  Since we are writing out the same key/value as the original
  22.                 // stream no Produced.with() serdes are needed.
  23.                 .through("commit.log.replay2", Produced.with(Serdes.String(), valueSerde()));
  24.     }

Scenarios

The demo has two options to the RESTful endpoint. The option forceFailure with a value of true causes the controller to end early. The delay option will result in the endpoint to take longer. When setting the value greater than 250ms (longer than the session window) it will be reported as a failed event and the retry message publishes.

Happy Path, no exception, no delay

No publishing to commit.log.replay, as the kTable will have a count of 2.

Request:

POST {{host}}/api/v1/commit?forceFailure=false&delay=1
Content-Type: application/json
 
{"example":"happy_path"}

Application Log

20:31:55.128 DEBUG kStream : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"happy_path\"}", "status": null}
20:31:55.129 DEBUG kTable : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"happy_path\"}", "status": null}
20:31:55.137 DEBUG topic=commit.log, offset=15, key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"happy_path\"}", "status": null}
20:31:55.138 DEBUG topic=commit.log, offset=16, key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_1\"}", "status": 200}
20:31:55.143 DEBUG kTable - aggregator : key=ID_Hm_1, aggregate=1
20:31:55.144 DEBUG kStreamReplay (pre filter) : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"happy_path\"}", "status": null}
20:31:55.144 DEBUG kStreamReplay (post filter) : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"happy_path\"}", "status": null}
20:31:55.172  INFO Cluster ID: eV6BYFQoQq-swDNXyj58eQ
20:31:55.220 DEBUG kStream : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_1\"}", "status": 200}
20:31:55.220 DEBUG kTable : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_1\"}", "status": 200}
20:31:55.221 DEBUG kTable - merger : key=ID_Hm_1, merge=1
20:31:55.221 DEBUG kTable - aggregator : key=ID_Hm_1, aggregate=2
20:31:55.221 DEBUG kStreamReplay (pre filter) : key=ID_Hm_1, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_1\"}", "status": 200}

The logs show that no replay messages are written to the replay topic.

Exception, controller doesn’t run to completition

Publishing to commit.log.replay, as the kTable has a result of 1.

POST {{host}}/api/v1/commit?forceFailure=true&delay=1
Content-Type: application/json
 
{"example":"exception_thrown"}
20:33:16.729 DEBUG topic=commit.log, offset=17, key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:16.729 DEBUG kStream : key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:16.729 DEBUG kTable : key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:16.730 DEBUG kTable - aggregator : key=ID_Hm_2, aggregate=1
20:33:16.730 DEBUG kStreamReplay (pre filter) : key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:16.730 DEBUG kStreamReplay (post filter) : key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:38.501 DEBUG kStreamResponseMissing (pre filter) : key=ID_Hm_2, value=1
20:33:38.501 DEBUG kStreamResponseMissing (post filter) : key=ID_Hm_2, value=1
20:33:38.505 DEBUG kStreamReplay_v2 (pre filter) : key=ID_Hm_2, value=1
20:33:38.506 DEBUG kStreamReplay_v2 (post filter) : key=ID_Hm_2, value=1
20:33:38.540 DEBUG kStreamReplay (join) value1={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}, value2=1
20:33:38.540 DEBUG kStreamReplay (post join) : key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:38.558 DEBUG kStreamReplay_v2 (join) value1=1, value2={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:38.558 DEBUG kStreamReplay_v2 (post join) : key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:38.672 DEBUG REPLAY NEEDED : topic=commit.log.replay, offset=10, key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}
20:33:38.675 DEBUG (v2) REPLAY NEEDED : topic=commit.log.replay2, offset=13, key=ID_Hm_2, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"exception_thrown\"}", "status": null}

The logs show replay messages published and consumed.

Delay, controller takes longer than expected

Notice how commit.log.replay has 2 records. This is because the tTable has two different session windows for the same session key (due to the delay between the publishing of the request and the response). The join windowing will determine if a duplicate retry log is published.

POST {{host}}/api/v1/commit?forceFailure=false&delay=260
Content-Type: application/json
 
{"example":"too_long"}
20:35:17.716 DEBUG topic=commit.log, offset=18, key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:17.717 DEBUG kStream : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:17.717 DEBUG kTable : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:17.718 DEBUG kTable - aggregator : key=ID_Hm_3, aggregate=1
20:35:17.718 DEBUG kStreamReplay (pre filter) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:17.718 DEBUG kStreamReplay (post filter) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:17.982 DEBUG topic=commit.log, offset=19, key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_3\"}", "status": 200}
20:35:17.982 DEBUG kStream : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_3\"}", "status": 200}
20:35:17.982 DEBUG kTable : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_3\"}", "status": 200}
20:35:17.983 DEBUG kTable - aggregator : key=ID_Hm_3, aggregate=1
20:35:17.983 DEBUG kStreamReplay (pre filter) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "RESPONSE", "headers": {}, "body": "{\"key\":\"ID_Hm_3\"}", "status": 200}
20:35:38.733 DEBUG kStreamResponseMissing (pre filter) : key=ID_Hm_3, value=1
20:35:38.733 DEBUG kStreamResponseMissing (post filter) : key=ID_Hm_3, value=1
20:35:38.734 DEBUG kStreamReplay_v2 (pre filter) : key=ID_Hm_3, value=1
20:35:38.734 DEBUG kStreamReplay_v2 (post filter) : key=ID_Hm_3, value=1
20:35:38.734 DEBUG kStreamResponseMissing (pre filter) : key=ID_Hm_3, value=1
20:35:38.734 DEBUG kStreamResponseMissing (post filter) : key=ID_Hm_3, value=1
20:35:38.734 DEBUG kStreamReplay_v2 (pre filter) : key=ID_Hm_3, value=1
20:35:38.734 DEBUG kStreamReplay_v2 (post filter) : key=ID_Hm_3, value=1
20:35:38.751 DEBUG kStreamReplay (join) value1={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}, value2=1
20:35:38.753 DEBUG kStreamReplay (post join) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.754 DEBUG kStreamReplay (join) value1={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}, value2=1
20:35:38.754 DEBUG kStreamReplay (post join) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.754 DEBUG kStreamReplay_v2 (join) value1=1, value2={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.754 DEBUG kStreamReplay_v2 (post join) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.755 DEBUG kStreamReplay_v2 (join) value1=1, value2={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.755 DEBUG kStreamReplay_v2 (post join) : key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.863 DEBUG REPLAY NEEDED : topic=commit.log.replay, offset=11, key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.863 DEBUG REPLAY NEEDED : topic=commit.log.replay, offset=12, key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.864 DEBUG (v2) REPLAY NEEDED : topic=commit.log.replay2, offset=14, key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}
20:35:38.864 DEBUG (v2) REPLAY NEEDED : topic=commit.log.replay2, offset=15, key=ID_Hm_3, value={"url": "/api/v1/commit", "action": "REQUEST", "headers": {}, "body": "{\"example\":\"too_long\"}", "status": null}

Logs show duplicate messages published and consumed.

 
Application

The application can be found on GitHub at : opiblog-kafka-streams-rlog

To run the application, start the docker images of Zookeeper, Kafka broker, and confluent schema registry.

docker-compose --file docker/docker-compose.yml up -d

And then start the application

./gradlew bootRun

 
Conclusion

Would I write a replay (commit) log in Kafka with Kafka Streams? If I needed one, yes; however, I would structure it differently. I would use a separate topic for request and response and leverage that to avoid duplicates on delayed topics.

What I want to learn next:

  • Avro – learn differences in how Kafka Core, Kafka Streams, and Kafka Connector use Avro.
  • Kafka Stream Topology – Learning how the Streams API works
  • Stream Delay – I need to learn more how this works, the more known, the fewer mistakes.
  • Group Window and Join Window – need to determine how the windowing can impact each other more.
  • Rocksdb – Understand streams utilization of rocksdb.

About the Author

Object Partners profile.

One thought on “Capturing missing events with Apache Kafka Streams

  1. Bheema says:

    Good article with messages

  2. Viet says:

    Perfect. I need alert service like that. I have 2 topic for request and response. So how can we check them? Please suggest me idea

Leave a Reply

Your email address will not be published.

Related Blog Posts
Natively Compiled Java on Google App Engine
Google App Engine is a platform-as-a-service product that is marketed as a way to get your applications into the cloud without necessarily knowing all of the infrastructure bits and pieces to do so. Google App […]
Building Better Data Visualization Experiences: Part 2 of 2
If you don't have a Ph.D. in data science, the raw data might be difficult to comprehend. This is where data visualization comes in.
Unleashing Feature Flags onto Kafka Consumers
Feature flags are a tool to strategically enable or disable functionality at runtime. They are often used to drive different user experiences but can also be useful in real-time data systems. In this post, we’ll […]
A security model for developers
Software security is more important than ever, but developing secure applications is more confusing than ever. TLS, mTLS, RBAC, SAML, OAUTH, OWASP, GDPR, SASL, RSA, JWT, cookie, attack vector, DDoS, firewall, VPN, security groups, exploit, […]