From Cassandra to S3, with Spark

Apache Cassandra, a scalable and high-availability platform, is a good choice for high volume event management applications, such as large deployments of sensors. Applications include telematics data for large fleets, smart meter telemetry in electric, gas or water utility systems, and wide area weather station reporting. By analyzing this raw event data, system level intelligence can be extracted to discover trends and clustering along dimensions such as space, time and environmental parameters. Apache Spark enables this analysis, connecting directly to Cassandra and performing fault-tolerant processing with an architecture that scales out with Cassandra clustering.

This focus of this blog is showing how to connect Spark to Cassandra, analyze event data from Cassandra, and store the results of the analysis into S3, making it available for reporting or further analysis. The example uses 911 call event data collected over a number of weeks. For an example of Spark processing this same data from a Kafka based event stream, see this earlier blog, based on the same 911 call event data set.

Demonstration components
To provide a demonstration that can be run locally, this blog describes a runnable Spark application available from Github,  along with Docker images that provide a stand alone (single node) Cassandra cluster and a local S3 object store. The Docker images are described at the end of this blog.

Initializing Cassandra with demo data
To provide a repeatable demo with minimal pre-conditions, the Cassandra schema is created and populated with demo data on application startup.

    private void createSchema() {
        try {
            session.execute(dropKeyspaceCommand);
        } catch (Exception e) {
            System.out.println(e.getMessage());
            System.exit(-1);
        }

        session.execute(createKeyspaceCommand);
        session.execute(createRT911TableCommand);
    }

 
The createSchema will drop the demo Cassandra keyspace if it exists, and then creates the keyspace and a table in the keyspace. These commands are CQL statements (Cassandra’s form of SQL) loaded into the application from an application.yml file:

---
# cassandra
cassandra:
  defaultQueryConsistency: ONE
  defaultUpdateConsistency: ONE
  updateTimeoutMillis: 5000
  compression: LZ4
  nodeAddress: localhost
  host: 127.0.0.1

  keyspaces:
    - name: rt911
      createCommand: "CREATE KEYSPACE testkeyspace WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};"
      dropCommand: "DROP KEYSPACE IF EXISTS testkeyspace;"
      truncateCommand: "TRUNCATE testkeyspace.rt911"
      tables:
        - name: calls
          createCommand:
              CREATE TABLE testkeyspace.rt911 (
                address varchar,
                calltype varchar,
                calltime varchar,
                latitude varchar,
                longitude varchar,
                location varchar,
                id varchar PRIMARY KEY);
          insertPreparedStatementCommand:
              INSERT INTO testkeyspace.rt911 (address, calltype, calltime, latitude, longitude, location, id)
                VALUES ( ?, ?, ?, ?, ?, ?, ? );

Once the schema and table are created, the application loads the 911 call data from a gzip csv file, populating Cassandra with the 911 call data. The application uses the Cassandra session to execute this CQL prepared statement:

            insertPreparedStatementCommand:
              INSERT INTO testkeyspace.rt911 (address, calltype, calltime, latitude, longitude, location, id)
                VALUES ( ?, ?, ?, ?, ?, ?, ? );

 
When finished, the table ‘rt911’ in keyspace ‘testkeyspace’ contains call event data.

Connecting Spark with Cassandra
DataStax provides a ready to use Spark to Cassandra connector. This connector exposes Cassandra data in terms of Spark structures, such as RDDs, supports writing Spark data to Cassandra, and allows CQL queries to be made from a Spark application. A quick start guide for the connector is also available. The Spark Cassandra Connector’s Java API utility class static method javaFunctions is used to read data from Cassandra. The data is read and transformed into Java objects that represent 911 calls, all in a single statemement:

        JavaRDD callData = javaFunctions(sc)
                .cassandraTable("testkeyspace", "rt911")
                .map(new Map911Call());

 

Analyzing the event data with Spark
The 911 call data is filtered by event type based on simple classification using a contains match on the event type (in this case, event types that contain ‘Fire’).

callData = callData.filter( c -> (c.getCallType().matches("(?i:.*\\bFire\\b.*)")));

 
The data is then mapped to key/value pairs, keyed by week of year:

        MapByCallDate mapByCallDate = new MapByCallDate();
        return callData.mapToPair(mapByCallDate);

 
which uses this class to do the mapping:

public class MapByCallDate implements PairFunction {

    @Override
    public Tuple2 call(RealTime911 realTime911) throws Exception {
        // create time bucket to group by dates (no time) - use MM/dd/yyyy
        String timeBucket = realTime911.getDateTime().substring(0,10);
        return new Tuple2(timeBucket, realTime911);
    }
}

 
The event object data is then grouped by date (MM/dd/yyyy – e.g. 08/03/2015):

        JavaPairRDD<String, Iterable> groupedCalls = callsByCallDate.groupByKey();

 
The pair data is then transformed from Java objects into JSON documents, one for each date group, and written to a Map with the date as key and the JSON as the value:

        Map<String, Iterable> groupedCallMap = groupedCalls.collectAsMap();
        Set keys = groupedCallMap.keySet();

        ObjectMapper mapper = new ObjectMapper();

        Map s3BucketData = new HashMap();
        for(String key: keys) {
            List jsonArrayElements = new ArrayList();
            Iterable iterable = groupedCallMap.get(key);
            Iterator iterator = iterable.iterator();
            while(iterator.hasNext()) {
                RealTime911 rt911 = iterator.next();
                LOG.debug(rt911.getDateTime() + " " + rt911.getCallType());
                try {
                    String jsonRT911 = mapper.writeValueAsString(rt911);
                    jsonArrayElements.add(jsonRT911);
                } catch (JsonProcessingException e) {
                    LOG.error(e.getMessage());
                }
            }

            StringJoiner joiner = new StringJoiner(",");
            jsonArrayElements.forEach(joiner::add);
            s3BucketData.put(key, "[" + joiner.toString() + "]");
        }

 
To store the data into S3, the AWS Java SDK is used to create an S3 client to the Scality S3 server.

@Component
public class S3Client {

    private AmazonS3Client s3 = null;

    public S3Client() {
        s3 = getClient();
    }

    private AmazonS3Client getClient() {
        if(null == s3) {
            System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
            BasicAWSCredentials credentials = new BasicAWSCredentials("accessKey1", "verySecretKey1");
            s3 = new AmazonS3Client(credentials);
            S3ClientOptions options = S3ClientOptions.builder().setPathStyleAccess(true).build();
            s3.setS3ClientOptions(options);
            s3.setEndpoint("http://127.0.0.1:8000/");
        }
        return s3;
    }

... the complete class can viewed in the GitHub project.
 }

 
Note that this client is written specifically for the Scality S3 server, which uses “accessKey1” as the access key and “verySecretKey1” as the secret key. These are the default AWSCredentials for a Scality S3 server.

This code saves the JSON documents to S3. First it removes the bucket if it already exists, then it creates the S3 bucket and writes key/value pairs to the bucket, storing a JSON document per date for which Cassandra has data. The bucket removal is to provide a repeatable demo, one that does not depend on, or conflict with previous runs.

         try {
            // remove the S3 bucket, this removes all objects in the bucket first
            s3Client.removeBucket(bucketName);
            LOG.info("S3 bucket " + bucketName + " deleted");
        } catch (Exception e) {
            // bucket not deleted, may not have been there
        }

        try {
            // create the bucket to start fresh
            s3Client.createBucket(bucketName);
            LOG.info("S3 bucket " + bucketName + " created");

            Set bucketKeys = s3BucketData.keySet();
            // save to S3
            for(String key: bucketKeys) {
                s3Client.storeString(bucketName, key, s3BucketData.get(key));
            }
            LOG.info("finished saving JSON to S3 completed");

            LOG.info("displaying all JSON objects and their keys saved to " + bucketName + "\n");
            for(String key: bucketKeys) {
                String storedObject = s3Client.readS3Object(bucketName, key);
                LOG.info("key: " + key + " value: " + storedObject);
            }
        } catch (Exception e) {
            LOG.error(e.getMessage());
        } finally {
            // clean up
            s3Client.removeBucket(bucketName);
        }

 
The end to end process of extraction, computing new information from this data, and storage into S3 for reporting, business intelligence analysis and other repurposing is a pattern applicable to a wide variety of needs.

Installing Scality S3 server Docker image
The S3 object store is based on the Scality S3 Docker image, which provides an S3 instance without requiring Amazon Web Service (AWS) credentials, and is a convenient way to develop against S3 without using AWS credits.

Installing Scality’s S3 server as a Docker image and running are described at https://hub.docker.com/r/scality/s3server

Installing Cassandra Docker image
The instructions on installing a docker image of Cassandra are described at https://hub.docker.com/_/cassandra/

The instructions include installing the server image, as well as instructions for starting a cluster locally, and running Cassandra’s Query Language Shell (e.g. cqlsh) as a client to the Cassandra server.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

*