Spark Job Server for Persistent Contexts and Low Latency Jobs

During a recent effort building out the back end datastore of a large analytics platform built on Spark and Kudu, we needed to provide low-latency query performance to our front end application. We leveraged Spark Job Server to provide persistent spark contexts – so each call to a spark job can reuse a context that is already running, avoiding the overhead of starting a new context for each query. Spark Job Server also includes other features around executing spark jobs and managing jar files.

REST API

A primary feature of Spark Job Server is a simple REST api. The api exposes functionality around contexts, jars, jobs and data. The REST api makes it easy to do ad-hoc interactions with Spark Job Server via curl.

For example, to list current spark jobs and their statuses:
curl -X GET spark_job_server_host:8090/jobs

To list the jars that have been uploaded:
curl -X GET spark_job_server_host:8090/jars

Create persistent contexts

The key feature of Spark Job Server for us was persistent spark contexts. These contexts stay running indefinitely and are reusable – so a call to a spark job won’t incur the overhead of starting a context on each execution.

The Spark Job Server REST api makes it simple to create persistent spark contexts. Certain configuration properties of the context can be specified at create time by passing key/value pairs.

For example:
curl -X POST spark_job_server_host:8090/contexts/opi-context?num-cpu-cores=8&mem-per-node=4G
OK

Verify the context was created:
curl -X GET spark_job_server_host:8090/contexts
["opi-context"]

Since the contexts created by Spark Job Server will be persistent, be careful to consider the cpu and memory resources available to your spark cluster when passing such config values. It is important to utilize the resources available to you while leaving room for any ad-hoc spark shells or other context needs. For our project, we had a number of persistent contexts, each configured for different uses (some jobs were memory intensive, some cpu intensive) and the resources available to the cluster varied by environment; it took some iteration to arrive at good cpu and memory configuration for each context in each environment.

Upload a jar

Our spark jobs were built and packaged into a jar, and that jar was copied to Spark Job Server. The Spark Job Server api exposes methods for managing such jars.

For example:
curl --data-binary @job-server-tests/target/scala-2.10/job-server-tests_2.10-0.6.2.jar spark_job_server_host:8090/jars/sjs-test-jar
OK

The name specified when uploading the jar to Spark Job Server will be used in the appName parameter when calling jobs from that jar. In this example, we uploaded a jar containing test jobs that come with the Spark Job Server source, and we named the jar sjs-test-jar.

We can see the jars that have been uploaded:
curl -X GET spark_job_server_host:8090/jars
{
"sjs-test-jar": "2017-02-28T17:31:03.095Z"
}

After uploading a new jar, any running contexts need to be restarted in order to access the new jar. The Spark Job Server api does not include a restart context capability; to restart, simply delete and recreate the context.

Low latency jobs

By virtue of running in a persistent spark context, most of our Spark jobs were intended to be low latency queries. These low latency jobs are invoked via Spark Job Server as synchronous by passing the sync parameter (sync=true) and optionally a timeout parameter.

For example:
curl -d "" 'spark_job_server_host:8090/jobs?appName=sjs-test-jar&classPath=spark.jobserver.VeryShortDoubleJob&context=opi-context&sync=true'

In this example, the value of the appName parameter is the name that we provided when we uploaded the jar. The classPath value contains the fully qualified name of the class that contains the Spark job we want to run. The value of the context parameter is the name of the persistent context we want the job to run in.

Spark jobs that are expected to be long running can be invoked as asynchronous by passing sync=false, or simply omitting the sync parameter altogether.

For example:
curl -d "" 'spark_job_server_host:8090/jobs?appName=sjs-test-jar&classPath=spark.jobserver.LongPiJob&context=opi-context'
{
"status": "STARTED",
"result": {
"jobId": "97f231d9-4daf-488c-9b99-f08bc5766157",
"context": "opi-context"
}

Jobs invoked as asynchronous will return a job id, which can subsequently be used to query the status of the job.

For example:
curl -X GET spark_job_server_host:8090/jobs/97f231d9-4daf-488c-9b99-f08bc5766157
{
"duration": "5.015 secs",
"classPath": "spark.jobserver.LongPiJob",
"startTime": "2017-02-28T18:54:56.523Z",
"context": "opi-context",
"result": 3.1480378548895898,
"status": "FINISHED",
"jobId": "97f231d9-4daf-488c-9b99-f08bc5766157"
}

Other thoughts

Any spark jobs you intend to run via Spark Job Server must implement the spark.jobserver.SparkJob trait.

Memory leaks in your code will become apparent over time in a persistent context. One example we dealt with: if your spark job caches any dataframes via dataFrame.cache(), be sure to call dataFrame.unpersist() on them before ending the job.

Spark Job Server state can occasionally get out of sync with the Spark cluster. For example, we would see Spark Job Server report that a context existed when it did not. The correct state can be seen via the Spark master UI. A restart of Spark Job Server was the only way we found to remedy the situation.

The REST api of Spark Job Server makes it easy to integrate into your CI build process. When a build was successful, our build process would upload the new jars and bounce certain contexts to ensure our dev environment was up to date.

Wrap up

Using Spark Job Server to create and manage persistent spark contexts allowed us to provide low latency spark job execution to our front end application. Other features of Spark Job Server helped it become an integral component of our overall architecture. Hopefully this blog post has shed some light on the capabilities of Spark Job Server and will help you make use of this powerful tool.

Give it a try with the docker image and the example code provided by Spark Job Server.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

*