Streaming Apache Ignite changes in Apache Kafka Topics

This is a two part series exploring Apache Ignite, Apache Kafka, and Reactive Spring Boot concepts.

This part covers the setting up of Apache Ignite as a datasource and using Apache Kafka as a streaming engine for database changes and events.

Part 1 – Streaming Apache Ignite changes in Apache Kafka Topics

Recently, I was exploring building an application and tech stack that could present real-time, or close to it, changes to a client as data changed on a database without re-querying.

My goal was to support the following use cases:

  1. To automatically notify any clients consuming from the controller when a database event occurred,
  2. To create a read-only view into the datasource, and
  3. To use a streaming service to separate the datasource from the REST application.

I settled on an Apache Ignite database, Apache Kafka streams, Spring Boot REST application and Server Sent Events as the driving tech stack.

Apache Ignite can function as an in-memory database and allows registering continuous queries over datasets using simple Java libraries. Apache Kafka streams seemed a natural fit for streaming those datasets from a database to 1 to n destinations. And, given the read-only requirement of the communication stream, a server sent event socket can be used like a topic to broadcast changes to listening clients.

Setting up the database

There are a few options when setting up Apache Ignite to run in a local environment:

  1. stand alone Ignite package configured via xml, or
  2. embedded Ignite within an application.

The steps below outline the necessary dependencies and configurations for managing the embedded Ignite node within a Spring application.

Dependencies

The ignite-core and ignite-spring dependencies are required for launching an Ignite node embedded within a Spring application. The ignite-spring-data dependency exposes JPA-like repository functionality to access the tables and caches within the Ignite node.

compile "org.apache.ignite:ignite-core:2.5.0"
compile "org.apache.ignite:ignite-spring:2.5.0"
compile "org.apache.ignite:ignite-spring-data:2.5.0"

Model

For this example, we define a trivial model consisting of a single class, ExampleModel. The QuerySqlField annotation allows the application to reference those fields in SQL queries against the Ignite database.

import org.apache.ignite.cache.query.annotations.QuerySqlField
 
class ExampleModel implements Serializable {
 
	@QuerySqlField(index = true)
	String name
}

With Ignite Spring Data, I needed to define a JPA-like repository so that the Spring application could store new ExampleModel objects into the Ignite database. This interface defines that ExampleModels are to be stored in and accessed from the ‘exampleModelCache’ and are identified by a Long key.

import org.apache.ignite.springdata.repository.IgniteRepository
import org.apache.ignite.springdata.repository.config.RepositoryConfig
 
@RepositoryConfig(cacheName = 'exampleModelCache')
interface ExampleModelRepository extends IgniteRepository<ExampleModel, Long> {
}

Cache Configuration

Since the Ignite database instance runs inside of the Spring application, it must be configured within the Spring configuration classes. By registering the Ignite instance as a Spring bean, Ignite can then be wired into other classes and services that may need to operate on the embedded Ignite node.

This configuration creates a single cache called ‘exampleModelCache’ that can store ExampleModel objects identified by Long keys. The EnableIgniteRepositories annotation is required to register any IgniteRepositories in the application.

import org.apache.ignite.Ignite
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration
import org.apache.ignite.configuration.IgniteConfiguration
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
 
@Configuration
@EnableIgniteRepositories
class IgniteExampleConfiguration {
	@Bean
	Ignite igniteInstance(){
		IgniteConfiguration configuration = new IgniteConfiguration()
 
		configuration.igniteInstanceName = 'example-data-node'
 
		CacheConfiguration cacheConfiguration = new CacheConfiguration('exampleModelCache')
		cacheConfiguration.setIndexedTypes(Long, ExampleModel)
 
		configuration.cacheConfiguration = [cacheConfiguration]
 
		Ignition.start(configuration)
	}
}

Register Continuous Queries

Apache Ignite can support Continuous Queries. This example registers a listener on the ExampleModel cache and logs anytime a change event occurs within that cache.

Since the Ignite instance is running within a Spring context, the continuous query is created and attached to the Ignite cache in the PostConstruct lifecycle of the ExampleCacheWatcher Spring bean. Although there is no explicit need to create the query in this part of the bean lifecycle, specifying the PostConstruct phase forces the query to be registered at least once and removes the need to call the exampleContinuousQuery method once the application is running.

Registering a continuous query in Ignite requires building a ContinuousQuery object and attaching it to the target Ignite cache. By setting the local listener of that query to a javax CacheEntryUpdatedListener, the query gets notified of any actions that change the state of the Ignite cache to which the query is attached. For this example, the continuous query is attached to the exampleModelCache created in previous steps.

import groovy.util.logging.Slf4j
import org.apache.ignite.Ignite
import org.apache.ignite.cache.query.ContinuousQuery
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
 
import javax.annotation.PostConstruct
import javax.cache.event.CacheEntryEvent
import javax.cache.event.CacheEntryListenerException
import javax.cache.event.CacheEntryUpdatedListener
 
@Slf4j
@Component
class ExampleCacheWatcher {
 
	@Autowired
	Ignite ignite
 
	@PostConstruct
	def exampleContinuousQuery(){
		ContinuousQuery query = new ContinuousQuery();
 
		query.setLocalListener(
			new CacheEntryUpdatedListener<Integer, ExampleModel>(){
 
			@Override
			void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends ExampleModel>> cacheEntryEvents) throws CacheEntryListenerException {
				cacheEntryEvents.each {
					log.debug "key: $it.key \t value: $it.value"
					}
				}
			}
		)
 
		def igniteCache = ignite.getOrCreateCache('exampleModelCache')
		igniteCache.query(query)
	}
}

Setting up the Kafka stream

Configuring Apache Kafka

Follow the steps available at Apache Kafka Quickstart

Start a Kafka Topic

Create the Kafka topic ‘test-topic’

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

Dependencies

The Spring Kafka dependency allows the wiring up of a KafkaProducer and KafkaTemplate to communicate with the Apache Kafka cluster.

compile "org.springframework.kafka:spring-kafka:2.1.7.RELEASE"

Integrating Apache Ignite and Apache Kafka

Register a Kafka Producer in the Apache Ignite application

With the Apache Kafka topic started, we need to wire together the Apache Ignite database and the Kafka stream to begin streaming any updates out to possible consumers. The following configuration registers a Kafka producer factory for use within the Spring application and are based on the default settings of the Kafka instance configured in previous steps. For ease of this example, the kafka producer serializes the key and value of any objects sent to the topic as simple Strings.

The KafkaTemplate object allows easy wiring and interfacing with the Kafka stream within Spring beans.

import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
 
@Configuration
class KafkaConfiguration {
	@Bean
	ProducerFactory producerFactory(){
		def bootstrapAddress = '0.0.0.0:9092'
 
		def configProps = [:]
		configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
		configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer);
		configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer);
		return new DefaultKafkaProducerFactory(configProps)
	}
 
	@Bean
	public KafkaTemplate kafkaTemplate() {
		return new KafkaTemplate(producerFactory())
	}
}

Using a Kafka Template in the Ignite Continuous Query

With the producer factory and template available, wiring up the Ignite Continuous Query to send any updates to the Kafka stream is simple. Autowire the template into the continuous query Spring bean and invoke the template any time there is an update. Before sending the update to the Kafka topic, the Spring ObjectMapper is called to serialize the ExampleModel to JSON so that the registered serializer for the Kafka ProducerFactory can appropriately serialize the ExampleModel representation and put it on the Kafka topic.

@Slf4j
@Component
class ExampleCacheWatcher {
 
	@Autowired
	Ignite ignite
 
    /* Added kafkaTemplate and objectMapper beans */
	@Autowired
	KafkaTemplate kafkaTemplate
 
	@Autowired
	ObjectMapper objectMapper
 
	@PostConstruct
	def exampleContinuousQuery(){
		ContinuousQuery query = new ContinuousQuery();
 
		query.setLocalListener(
			new CacheEntryUpdatedListener<Integer, ExampleModel>(){
				@Override
				void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends ExampleModel>> cacheEntryEvents) throws CacheEntryListenerException {
					cacheEntryEvents.each {
						kafkaTemplate.send('test-topic', objectMapper.writeValueAsString(it.value)) // instead of logging output, stick it on the Kafka topic
					}
				}	
			}
		)
 
		def igniteCache = ignite.getOrCreateCache('exampleModelCache')
		igniteCache.query(query)
	}
}

Conclusion

With the Apache Ignite and Apache Kafka components in place, inserting, deleting, or updating ExampleModel objects in the Ignite cache will cause those events to be placed on the Kafka topic. These messages can be consumed in any number of ways; however, the next post in this series will outline how to retrieve the Ignite cache change events via a Spring Boot REST controller using reactive objects and Server Sent Events.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

*