Aug 7, 2018

Apache Kafka and Reactive Spring Boot

This post is a continuation of the two part series exploring Apache Ignite, Apache Kafka, and Reactive Spring Boot concepts.

This part covers the use of Reactive Kafka consumers to return live database events to a listening client via a Spring Boot Server Sent Event REST endpoint.

Part 2 – Apache Kafka and Reactive Spring Boot

With Spring Boot version 2, reactive objects are now returnable objects for controllers. This support means we can use WebFlux to have long running web sockets or Server Sent Event topics that can leverage the benefits of reactive code. Using this in conjunction with a reactor-kafka library, an application can deliver Kafka topic messages to a Server Sent Event topic endpoint in a Spring Boot REST controller asynchronously. The combination of which delivers live updates from the database to the client with no direct connection to the underlying database.

Setting up the Spring Boot application

With the Apache Ignite and Kafka services in place from part 1 of this series, we can now implement the consumer side of the Kafka topic.

Dependencies

These dependencies allow use of the Reactive classes in Spring Boot and Kafka. The reactor-kafka dependency allows the creation of Kafka consumers that return Flux based objects.

compile 'org.springframework.boot:spring-boot-starter-webflux'
compile 'org.springframework.kafka:spring-kafka:2.1.7.RELEASE'
compile 'io.projectreactor.kafka:reactor-kafka:1.0.0.RELEASE'

Configuring a Spring Kafka Consumer

As with the Kafka producer, a consumer must be wired up and available for use in the Spring context. These configurations assume the defaults were used when creating the Kafka cluster and topic. The key and value of the Kafka message was specified in the producer as being Strings, therefore adding StringDeserializers to the KafkaReceiver is required. The Kafka topic, test-topic, was also made in the previous post.

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import reactor.kafka.receiver.internals.ConsumerFactory
import reactor.kafka.receiver.internals.DefaultKafkaReceiver
 
@Configuration
class SpringSSEConfiguration {
	@Bean
	KafkaReceiver kafkaReceiver(){
		def bootstrapAddress = '0.0.0.0:9092'
 
		def configProps = [:]
		configProps.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
		configProps.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer);
		configProps.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer);
		configProps.put( ConsumerConfig.CLIENT_ID_CONFIG, 'sample-client');
		configProps.put( ConsumerConfig.GROUP_ID_CONFIG, 'sample-group')
		configProps.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
 
		new DefaultKafkaReceiver(
			ConsumerFactory.INSTANCE,
			ReceiverOptions.create(configProps).subscription(['test-topic'])
		)
	}
}

Configuring a Spring Server Sent Event controller

The kafkaReceiver.receive() method returns a Flux reactive object which can then be acted upon. The defined Flux behavior acknowledges new messages on the Kafka topic and extracts the value data from the Kafka ReceiverRecord and returns that to the caller. The media type specification TEXT_EVENT_STREAM_VALUE adds the appropriate content header so that downstream clients are aware this is a Server Sent Event topic.

import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverRecord;
 
@RequestMapping(path = "/sse")
@RestController
public class SSEController {
 
	@Autowired
	KafkaReceiver<String,String> kafkaReceiver;
 
	@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
	Flux getSimpleFlux(){
		Flux<ReceiverRecord<String,String>> kafkaFlux = kafkaReceiver.receive();
 
		return kafkaFlux.log().doOnNext( r -> r.receiverOffset().acknowledge() )
		.map(ReceiverRecord::value);
	}
}

Conclusion

That’s it! Issuing a GET request to the “/sse” endpoint will return any changes that the database performed on the ExampleModel table/cache. After requesting the resource, try making changes to the ExampleModel table, add/delete/update records to see the changes flow over the Kafka topic to your end-client.

About the Author

Object Partners profile.

One thought on “Apache Kafka and Reactive Spring Boot

  1. rafeeq says:

    Please provide a link to any github project containing the entire code

  2. Mateusz says:

    Above example described here is incorrect.

    1.
    This would not work in case when N concurrent users would try to consume messages at the same time. The “fastest” one will acknowledge the record, and other listeners might not receive it. The reason is how Kafka works in general.

    2.
    The KafkaReceiver is not allowing more than one subscribers -> DefaultKafkaReceiver lines 246-247. In the above example, each time when someone starts listening a new subscription is created / the “second time” will break.

  3. Tuna says:

    I agree with Mateusz says. This example is NOT correct.
    You can of course create a kafka consumer for each unique request with a unique group-id. But then, there can be many consumer threads running on your machine.

  4. Neha says:

    Hi,

    Can you please create the same tutorial using Spring Boot and Reactor Kafka?

Leave a Reply to Mateusz Cancel reply

Your email address will not be published. Required fields are marked *

Related Blog Posts
Infrastructure as Code – The Wrong Way
You are probably familiar with the term “infrastructure as code”. It’s a great concept, and it’s gaining steam in the industry. Unfortunately, just as we had a lot to learn about how to write clean […]
Snowflake CI/CD using Jenkins and Schemachange
CI/CD and Management of Data Warehouses can be a serious challenge. In this blog you will learn how to setup CI/CD for Snowflake using Schemachange, Github, and Jenkins. For access to the code check out […]
How to get your pull requests approved more quickly
TL;DR The fewer reviews necessary, the quicker your PR gets approved. Code reviews serve an essential function on any software codebase. Done right, they help ensure correctness, reliability, and maintainability of code. On many teams, […]
Kafka & Kubernetes: Scaling Consumers
Kafka and Kubernetes (K8s) are a great match. Kafka has knobs to optimize throughput and Kubernetes scales to multiply that throughput. On the consumer side, there are a few ways to improve scalability. Resource & […]