Apache Cassandra, a scalable and high-availability platform, is a good choice for high volume event management applications, such as large deployments of sensors. Applications include telematics data for large fleets, smart meter telemetry in electric, gas or water utility systems, and wide area weather station reporting. By analyzing this raw event data, system level intelligence can be extracted to discover trends and clustering along dimensions such as space, time and environmental parameters. Apache Spark enables this analysis, connecting directly to Cassandra and performing fault-tolerant processing with an architecture that scales out with Cassandra clustering.
This focus of this blog is showing how to connect Spark to Cassandra, analyze event data from Cassandra, and store the results of the analysis into S3, making it available for reporting or further analysis. The example uses 911 call event data collected over a number of weeks. For an example of Spark processing this same data from a Kafka based event stream, see this earlier blog, based on the same 911 call event data set.
To provide a demonstration that can be run locally, this blog describes a runnable Spark application available from Github, along with Docker images that provide a stand alone (single node) Cassandra cluster and a local S3 object store. The Docker images are described at the end of this blog.
Initializing Cassandra with demo data
To provide a repeatable demo with minimal pre-conditions, the Cassandra schema is created and populated with demo data on application startup.
The createSchema will drop the demo Cassandra keyspace if it exists, and then creates the keyspace and a table in the keyspace. These commands are CQL statements (Cassandra’s form of SQL) loaded into the application from an application.yml file:
Once the schema and table are created, the application loads the 911 call data from a gzip csv file, populating Cassandra with the 911 call data. The application uses the Cassandra session to execute this CQL prepared statement:
When finished, the table ‘rt911’ in keyspace ‘testkeyspace’ contains call event data.
Connecting Spark with Cassandra
DataStax provides a ready to use Spark to Cassandra connector. This connector exposes Cassandra data in terms of Spark structures, such as RDDs, supports writing Spark data to Cassandra, and allows CQL queries to be made from a Spark application. A quick start guide for the connector is also available. The Spark Cassandra Connector’s Java API utility class static method javaFunctions is used to read data from Cassandra. The data is read and transformed into Java objects that represent 911 calls, all in a single statemement:
Analyzing the event data with Spark
The 911 call data is filtered by event type based on simple classification using a contains match on the event type (in this case, event types that contain ‘Fire’).
The data is then mapped to key/value pairs, keyed by week of year:
which uses this class to do the mapping:
The event object data is then grouped by date (MM/dd/yyyy – e.g. 08/03/2015):
The pair data is then transformed from Java objects into JSON documents, one for each date group, and written to a Map with the date as key and the JSON as the value:
To store the data into S3, the AWS Java SDK is used to create an S3 client to the Scality S3 server.
Note that this client is written specifically for the Scality S3 server, which uses “accessKey1” as the access key and “verySecretKey1” as the secret key. These are the default AWSCredentials for a Scality S3 server.
This code saves the JSON documents to S3. First it removes the bucket if it already exists, then it creates the S3 bucket and writes key/value pairs to the bucket, storing a JSON document per date for which Cassandra has data. The bucket removal is to provide a repeatable demo, one that does not depend on, or conflict with previous runs.
The end to end process of extraction, computing new information from this data, and storage into S3 for reporting, business intelligence analysis and other repurposing is a pattern applicable to a wide variety of needs.
Installing Scality S3 server Docker image
The S3 object store is based on the Scality S3 Docker image, which provides an S3 instance without requiring Amazon Web Service (AWS) credentials, and is a convenient way to develop against S3 without using AWS credits.
Installing Scality’s S3 server as a Docker image and running are described at https://hub.docker.com/r/scality/s3server
Installing Cassandra Docker image
The instructions on installing a docker image of Cassandra are described at https://hub.docker.com/_/cassandra/
The instructions include installing the server image, as well as instructions for starting a cluster locally, and running Cassandra’s Query Language Shell (e.g. cqlsh) as a client to the Cassandra server.