Streaming Apache Ignite changes in Apache Kafka Topics
This is a two part series exploring Apache Ignite, Apache Kafka, and Reactive Spring Boot concepts.
This part covers the setting up of Apache Ignite as a datasource and using Apache Kafka as a streaming engine for database changes and events.
Part 1 – Streaming Apache Ignite changes in Apache Kafka Topics
Recently, I was exploring building an application and tech stack that could present real-time, or close to it, changes to a client as data changed on a database without re-querying.
My goal was to support the following use cases:
- To automatically notify any clients consuming from the controller when a database event occurred,
- To create a read-only view into the datasource, and
- To use a streaming service to separate the datasource from the REST application.
I settled on an Apache Ignite database, Apache Kafka streams, Spring Boot REST application and Server Sent Events as the driving tech stack.
Apache Ignite can function as an in-memory database and allows registering continuous queries over datasets using simple Java libraries. Apache Kafka streams seemed a natural fit for streaming those datasets from a database to 1 to n destinations. And, given the read-only requirement of the communication stream, a server sent event socket can be used like a topic to broadcast changes to listening clients.
Setting up the database
There are a few options when setting up Apache Ignite to run in a local environment:
- stand alone Ignite package configured via xml, or
- embedded Ignite within an application.
The steps below outline the necessary dependencies and configurations for managing the embedded Ignite node within a Spring application.
Dependencies
The ignite-core and ignite-spring dependencies are required for launching an Ignite node embedded within a Spring application. The ignite-spring-data dependency exposes JPA-like repository functionality to access the tables and caches within the Ignite node.
compile "org.apache.ignite:ignite-core:2.5.0" compile "org.apache.ignite:ignite-spring:2.5.0" compile "org.apache.ignite:ignite-spring-data:2.5.0"
Model
For this example, we define a trivial model consisting of a single class, ExampleModel. The QuerySqlField annotation allows the application to reference those fields in SQL queries against the Ignite database.
import org.apache.ignite.cache.query.annotations.QuerySqlField class ExampleModel implements Serializable { @QuerySqlField(index = true) String name }
With Ignite Spring Data, I needed to define a JPA-like repository so that the Spring application could store new ExampleModel objects into the Ignite database. This interface defines that ExampleModels are to be stored in and accessed from the ‘exampleModelCache’ and are identified by a Long key.
import org.apache.ignite.springdata.repository.IgniteRepository import org.apache.ignite.springdata.repository.config.RepositoryConfig @RepositoryConfig(cacheName = 'exampleModelCache') interface ExampleModelRepository extends IgniteRepository<ExampleModel, Long> { }
Cache Configuration
Since the Ignite database instance runs inside of the Spring application, it must be configured within the Spring configuration classes. By registering the Ignite instance as a Spring bean, Ignite can then be wired into other classes and services that may need to operate on the embedded Ignite node.
This configuration creates a single cache called ‘exampleModelCache’ that can store ExampleModel objects identified by Long keys. The EnableIgniteRepositories annotation is required to register any IgniteRepositories in the application.
import org.apache.ignite.Ignite import org.apache.ignite.Ignition import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.configuration.IgniteConfiguration import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @Configuration @EnableIgniteRepositories class IgniteExampleConfiguration { @Bean Ignite igniteInstance(){ IgniteConfiguration configuration = new IgniteConfiguration() configuration.igniteInstanceName = 'example-data-node' CacheConfiguration cacheConfiguration = new CacheConfiguration('exampleModelCache') cacheConfiguration.setIndexedTypes(Long, ExampleModel) configuration.cacheConfiguration = [cacheConfiguration] Ignition.start(configuration) } }
Register Continuous Queries
Apache Ignite can support Continuous Queries. This example registers a listener on the ExampleModel cache and logs anytime a change event occurs within that cache.
Since the Ignite instance is running within a Spring context, the continuous query is created and attached to the Ignite cache in the PostConstruct lifecycle of the ExampleCacheWatcher Spring bean. Although there is no explicit need to create the query in this part of the bean lifecycle, specifying the PostConstruct phase forces the query to be registered at least once and removes the need to call the exampleContinuousQuery method once the application is running.
Registering a continuous query in Ignite requires building a ContinuousQuery object and attaching it to the target Ignite cache. By setting the local listener of that query to a javax CacheEntryUpdatedListener, the query gets notified of any actions that change the state of the Ignite cache to which the query is attached. For this example, the continuous query is attached to the exampleModelCache created in previous steps.
import groovy.util.logging.Slf4j import org.apache.ignite.Ignite import org.apache.ignite.cache.query.ContinuousQuery import org.springframework.beans.factory.annotation.Autowired import org.springframework.kafka.core.KafkaTemplate import org.springframework.stereotype.Component import javax.annotation.PostConstruct import javax.cache.event.CacheEntryEvent import javax.cache.event.CacheEntryListenerException import javax.cache.event.CacheEntryUpdatedListener @Slf4j @Component class ExampleCacheWatcher { @Autowired Ignite ignite @PostConstruct def exampleContinuousQuery(){ ContinuousQuery query = new ContinuousQuery(); query.setLocalListener( new CacheEntryUpdatedListener<Integer, ExampleModel>(){ @Override void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends ExampleModel>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents.each { log.debug "key: $it.key \t value: $it.value" } } } ) def igniteCache = ignite.getOrCreateCache('exampleModelCache') igniteCache.query(query) } }
Setting up the Kafka stream
Configuring Apache Kafka
Follow the steps available at Apache Kafka Quickstart
Start a Kafka Topic
Create the Kafka topic ‘test-topic’
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
Dependencies
The Spring Kafka dependency allows the wiring up of a KafkaProducer and KafkaTemplate to communicate with the Apache Kafka cluster.
compile "org.springframework.kafka:spring-kafka:2.1.7.RELEASE"
Integrating Apache Ignite and Apache Kafka
Register a Kafka Producer in the Apache Ignite application
With the Apache Kafka topic started, we need to wire together the Apache Ignite database and the Kafka stream to begin streaming any updates out to possible consumers. The following configuration registers a Kafka producer factory for use within the Spring application and are based on the default settings of the Kafka instance configured in previous steps. For ease of this example, the kafka producer serializes the key and value of any objects sent to the topic as simple Strings.
The KafkaTemplate object allows easy wiring and interfacing with the Kafka stream within Spring beans.
import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.StringSerializer import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate import org.springframework.kafka.core.ProducerFactory @Configuration class KafkaConfiguration { @Bean ProducerFactory producerFactory(){ def bootstrapAddress = '0.0.0.0:9092' def configProps = [:] configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer); return new DefaultKafkaProducerFactory(configProps) } @Bean public KafkaTemplate kafkaTemplate() { return new KafkaTemplate(producerFactory()) } }
Using a Kafka Template in the Ignite Continuous Query
With the producer factory and template available, wiring up the Ignite Continuous Query to send any updates to the Kafka stream is simple. Autowire the template into the continuous query Spring bean and invoke the template any time there is an update. Before sending the update to the Kafka topic, the Spring ObjectMapper is called to serialize the ExampleModel to JSON so that the registered serializer for the Kafka ProducerFactory can appropriately serialize the ExampleModel representation and put it on the Kafka topic.
@Slf4j @Component class ExampleCacheWatcher { @Autowired Ignite ignite /* Added kafkaTemplate and objectMapper beans */ @Autowired KafkaTemplate kafkaTemplate @Autowired ObjectMapper objectMapper @PostConstruct def exampleContinuousQuery(){ ContinuousQuery query = new ContinuousQuery(); query.setLocalListener( new CacheEntryUpdatedListener<Integer, ExampleModel>(){ @Override void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends ExampleModel>> cacheEntryEvents) throws CacheEntryListenerException { cacheEntryEvents.each { kafkaTemplate.send('test-topic', objectMapper.writeValueAsString(it.value)) // instead of logging output, stick it on the Kafka topic } } } ) def igniteCache = ignite.getOrCreateCache('exampleModelCache') igniteCache.query(query) } }
Conclusion
With the Apache Ignite and Apache Kafka components in place, inserting, deleting, or updating ExampleModel objects in the Ignite cache will cause those events to be placed on the Kafka topic. These messages can be consumed in any number of ways; however, the next post in this series will outline how to retrieve the Ignite cache change events via a Spring Boot REST controller using reactive objects and Server Sent Events.
My advice is to not go this path. Use computation collocation with data, and kafka streamer, and ignite connectors instead – because of problems with spring libraries compatibility between different products versions. You will stick with one version of spring technology stack very quickly. Good luck.