Oct 13, 2016

Analyzing Kafka data streams with Spark

This blog describes a Spark Streaming application which consumes event data from a Kafka topic to provide continuous, near real-time processing and analysis of the event data stream. The demonstration application, written in Java 8 and runnable on a local host, uses a Spark direct connection to Kafka, and consumes the 911 calls as they are published to the topic. This example can be adapted to create applications capable of providing fast analysis and alerting of conditions of interest contained within a data stream.

Application Description

The general use case for this application is the monitoring and alerting of a data stream of events. Specifically, the example data is a set of 911 calls in the Seattle area, occurring over a number of days (the data is from Data.gov) These calls are provided on a Kafka topic as csv delimited records. Each includes a call type (e.g. a simple categorization, such as ‘Fire’ or ‘Aid’) a timestamp, and a geospatial location. The application performs batch set and stateful analysis, and outputs summaries of the current batch set and state to a logger.

Spark Technology

The Apache Spark project is an open source technology providing low latency, memory based, highly available, large scale analytical data processing. Spark’s performance capability and its growing base of software is driving its adoption into a variety of application areas. Spark documentation provides examples in Scala (the language Spark is written in), Java and Python. Spark also provides an API for the R language. This blog is written based on the Java API of Spark 2.0.0.

Apache Kafka is a widely adopted, scalable, durable, high performance distributed streaming platform. This example uses Kafka version 0.10.0.1.

Building on top of the Spark core functionality, specialized libraries provide for processing in different modes:

spark-toaster

DStream / RDD

Spark streaming uses an abstraction of streaming data known as a DStream (for discretized stream), which is based on the concept of a resilient distributed dataset (RDD); a fault-tolerant collection of elements which can be operated on in parallel. A DStream represents a sequence of RDDs organized on a periodic time series. Each RDD in the sequence can be considered a “micro batch” of input data, therefore Spark Streaming performs batch processing on a continuous basis.

DStreams can provide an abstraction of many actual data streams, among them Kafka topics, Apache Flume, Twitter feeds, socket connections, and others. This blog describes Spark consuming a Kafka topic. Being based on RDDs, DStreams offer much of the same processing as RDDs, but also provide time based processing, such as analytics that utilize sliding time windows.

Spark Direct Streaming

Spark can process Kafka using Receivers, but Spark also includes a Kafka Direct API (available for the Spark Java and Scala APIs since Spark 1.3, and for the Python API since Spark 1.4). The direct API does not use Receivers, and instead is a direct consumer client of Kafka. The direct approach ensures Exactly Once processing of the Kafka data stream messages. Full end to end Exactly Once processing can be achieved provided that Spark’s output processing is implemented as exactly-once. For an output source, this requires an Idempotent way to store the data, or it requires that data is stored using atomic transactions. The example application described below uses the Direct API.

Application Implementation

The application code described here is the code found in the com.objectpartners.spark.rt911.streaming package, and supporting packages in com.objectpartners.spark.rt911.common.

The demonstration requires Zookeeper, Kafka and a Kafka producer client running. Spark processing is launched by the Main Application class, which starts Spark via a SparkKafkaRunner class. Spark then runs continuously, consuming and processing a Kafka topic stream and waiting for termination.

The processStream method in this class does the actual Spark processing work, starting with creation of the Spark streaming context:

        // create a local Spark Context with two working threads
        SparkConf streamingConf = new SparkConf().setMaster("local[2]").setAppName("911Calls");
        streamingConf.set("spark.streaming.stopGracefullyOnShutdown", "true");
 
        // create a Spark Java streaming context, with stream batch interval
        JavaStreamingContext jssc = new JavaStreamingContext(streamingConf, Durations.milliseconds(streamBatchInterval));

The second argument to the JavaStreamingContext controls the micro batch time interval, e.g. if streamBatchInterval has a value of 1000L, then the data is presented as a series of RDDs that arrive every second.

To perform computation on state, it is necessary to have checkpointing enabled in the application. This is because the computation will use the previous state values. For demo purposes, a local temp directory works.

// set checkpoint for demo
jssc.checkpoint(System.getProperty("java.io.tmpdir"));

The direct connection to the Kafka topic is established with the following code:

        // create a Discretized Stream of Java String objects
        // as a direct stream from kafka (zookeeper is not an intermediate)
        JavaPairInputDStream rawDataLines =
                KafkaUtils.createDirectStream(
                        jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicsSet
                );

The String.class and StringDecoder.class are specified for both the key and value of the input, telling Spark that the Kafka message is of type <String, String> and the key and value can be decoded with the StringDecoder.

The KafkaParams and the topicsSet specify the Kafka brokers and the topics to subscribe to:

        // set up receive data stream from kafka
        final Set topicsSet = new HashSet(Arrays.asList(topic));
        final Map kafkaParams = new HashMap();
        kafkaParams.put("metadata.broker.list", broker);

For the demo, broker and topic are defined in the Spring configuration class.

private String topic = "demo-topic";
private String broker = "localhost:9092";

These could be read from a properties file, or some other external source in a production version.

With the direct Kafka connection, the Spark application takes in data from the Kafka topic in the form of a Java DStream parametrized as a String. This represents a DStream of Strings, each of which are a line of csv data taken from the value (Tuple2::_2) of the Kafka message <K, V> pair.

An example csv line:


415 Cedar St,Aid Response,09/06/2013 05:44:00 AM +0000,47.618123,-122.347615,"(47.618123, -122.347615)",F130090107

 

These csv lines are then converted to Java domain objects representing 911 calls using the following Java Class:

which is called by

        // transform to RealTime911 objects
        JavaDStream callData = lines.map(map911Call);

Now we have a DStream of Java domain objects to analyze with Spark. First some bad data filtering is performed, then, if ‘filterFire’ is set to true, the input is filtered to those calls that contain the text ‘fire’.

Micro Batch RDD computation

The DStream “pairs” are the Java 911 call objects keyed by call type. These are then reduced to lists of call objects keyed by the call type, and returned as the “reduced” DStream.  When written out, this DStream shows a summary of the current micro batch organized by call type:


4RED  2 + 1 + 1 calls: 1
   4RED  2 + 1 + 1                      06/12/2015 11:55:00 PM +0000 3021 Sw Bradford St       (-122.3714, 47.5692)
Aid Response calls: 1
   Aid Response                         06/12/2015 11:52:00 PM +0000 2211 Alaskan Way          (-122.3477, 47.6108)
Medic Response calls: 2
   Medic Response                       06/12/2015 11:58:00 PM +0000 2717 Dexter Av N          (-122.3469, 47.6444)
   Medic Response                       06/13/2015 12:06:00 AM +0000 120 6th Av S              (-122.3264, 47.6015)

 

Shown below is the mapping to unreduced pairs, and a mapping to pairs reduced by call type. The unreduced pairs are separated out to support stateful call type count calculations (see State Computation below).


 

The log messages are created by iterating over the RDDs in the “reduced” DStream:

To output a summary of the micro batch, DStream’s most generic output operator, foreachRDD, is provided a closure that performs formatting and logging of the micro batch RDD. This executes in Spark’s driver process, and could also be used to save RDDs to a database, or send RDDs to external consumers.

State Computation

For the current running total of each call type, the reducedState DStream is computed and used to show a summary of call types received so far:


---------------------------------------------------------------
          State Total: 
---------------------------------------------------------------
  4RED  2 + 1 + 1                           total received: 2
  Aid Response                              total received: 32
  Aid Response Yellow                       total received: 1
  Auto Fire Alarm                           total received: 1
  Automatic Fire Alarm False                total received: 1
  Automatic Medical Alarm                   total received: 1
  Investigate Out Of Service                total received: 1
  MVI  Motor Vehicle Incident               total received: 2
  MVI Freeway                               total received: 1
  Medic Response                            total received: 10
  Medic Response 7 per Rule                 total received: 2
  Trans to AMR                              total received: 1

 

The state computation is shown below:

A stateful Spark transformation, updateStateByKey(pairFunction), updates state with a Java lambda expression that takes a List of call counts, representing the previous state, and the current micro batch call counts.

These log messages are output by iterating over the reducedState DStream, similar to the code above.

Getting Code and Running the demo.

To download or clone the project, go to the Github project. The project README includes instructions on building and running the project code.

Conclusion.
This blog described Apache Spark directly consuming a Kafka topic, and performing analysis using Spark’s DStream abstraction. Spark streaming offers choices for processing and analyzing data stream. In addition to Scala and Java, Python, and R language APIs are available. Furthermore, other Spark libraries can be used with Spark Streaming, including the SQL library to process the stream with SQL constructs, and the machine learning algorithms provided by Spark’s MLlib.

About the Author

Object Partners profile.

One thought on “Analyzing Kafka data streams with Spark

  1. Balaji says:

    Very nice article. I am curious to know on how you handle the offset management in this scenario ?.

Leave a Reply

Your email address will not be published.

Related Blog Posts
Natively Compiled Java on Google App Engine
Google App Engine is a platform-as-a-service product that is marketed as a way to get your applications into the cloud without necessarily knowing all of the infrastructure bits and pieces to do so. Google App […]
Building Better Data Visualization Experiences: Part 2 of 2
If you don't have a Ph.D. in data science, the raw data might be difficult to comprehend. This is where data visualization comes in.
Unleashing Feature Flags onto Kafka Consumers
Feature flags are a tool to strategically enable or disable functionality at runtime. They are often used to drive different user experiences but can also be useful in real-time data systems. In this post, we’ll […]
A security model for developers
Software security is more important than ever, but developing secure applications is more confusing than ever. TLS, mTLS, RBAC, SAML, OAUTH, OWASP, GDPR, SASL, RSA, JWT, cookie, attack vector, DDoS, firewall, VPN, security groups, exploit, […]