Implementing a Google DataFlow Pipeline

Stream processing frameworks are quickly gaining traction as an efficient method to analyze, decorate, and direct high volume data.  The power of stream processing comes from the idea that we can perform calculations on data as we receive it.  There are a number of options available to help write streaming services, but for this blog post I wanted to experiment with Google DataFlow.  My goal is to show how to build a simple, runnable data streaming pipeline using the DataFlow SDK, Gradle, and Spring Boot.

All code is available on GitHub.  The README explains how to run each pipeline:

DataFlow Terminology

To start with, it’s important to understand a few key concepts from the DataFlow framework:

Pipeline:  This is the actual stream “processor” itself.  You define a pipeline by configuring the input source to read data from, any number of transformations to apply to the data, and an output sink to write data to.  More info here:

PCollection: This is the most important class in the DataFlow SDK.  It’s a special type of collection which can potentially contain an unlimited number of elements.  When your pipleine is processing too much data to hold in memory at once, PCollection implementations can be backed by file systems or databases.

As data passes through your pipeline, it’s stored in a PCollection.  The DataFlow framework imposes restrictions on how we access data in PCollections to ensure our pipeline is able to stream data efficiently.  Specifically, you can’t simply iterate over a PCollection like it’s a List or Set.  Instead, you must apply “transforms” to manipulate the data, which we’ll cover in the next section.  More info here:

Transform: A step in your pipeline.  From the DataFlow documentation: “A transform takes one or more PCollections as input, performs a processing function that you provide on the elements of that PCollection, and produces an output PCollection.”  Transforms are how we specify the calculations we want to perform on our data. More info here:

Pipeline 1 – Count Engine Statuses

Now that we’ve got the terminology down, let’s try to implement it.  Allow me to propose a contrived use case:

Suppose you’ve installed a sensor in a vehicle which is capable of detecting when the vehicle’s engine switches from one “status” to another (OFF, PARKED, DRIVING, or MALFUNCTION).  Whenever the engine status changes, the sensor records the timestamp and sends an event to your backend system.  Our goal is to setup a pipeline to read in these events, count how many times the engine switched to each status, and output that data to a text file.  Note that this use case could easily be extended to track engine changes on a fleet of vehicles, instead of just one.

For our purposes, I’m going to represent the potentially unlimited stream of engine status events as a “bounded” dataset.  The stream processor we’ll create will read data off a simple .csv file.  In a real world scenario where we need to read events off a Queue or Kafka Topic, DataFlow makes it simple to replace your source without changing the core transformations you’re applying to the data.  Here is the code needed:

This snippet depicts how to create a default Pipeline object, apply some transforms, and execute it.  Let’s take it line by line.

  • Line 1 – Create a default Pipeline.
  • Line 3 – Begin applying data transforms to our pipeline.  Each transform takes a PCollection as it’s input, applies some logic, and returns a PCollection as the output.  This particular line applies a root transform where we configure the source of our data…a text file called “input.csv”.  The resulting PCollection will contain a String element for each line in the file.
  • Line 4 – Custom transform which converts each String element in the PCollection to an EngineStatusEvent object.
  • Line 5 – Custom transform which extracts the actual EngineStatus (PARKED, DRIVING, etc) from each EngineStatusEvent element in the PCollection.
  • Line 6 – A DataFlow-supplied transform which counts how many times each element appears in our PCollection, and returns this aggregation as a PCollection of key-value pairs.  The keys will be an EngineStatus, and the values will be the count for that status.
  • Line 7 – Custom transform which converts each key-value pair into a human readable String.
  • Line 8 – A DataFlow-supplied transform to write our data to the configured sink.  In this case a text file called “output.txt”
  • Line 10 – Run the actual pipeline.  Up to this point we are just configuring how our pipeline operates.  It doesn’t start reading, processing, or writing data until we execute

That’s all the code needed to setup and run a pipeline.  The real trick, really, is writing your transforms.  Here’s a look at the code for the EngineStatusEventMapper from Line 4.  This transform extends a SimpleFunction, and all it does is take a String input (representing a line of text) and convert it into an EngineStatusEvent output.  We can then apply this transform to each element in the PCollection using the supplied MapElements function.

Here’s an example output from running the pipeline.  Each key-value pair element in our PCollection gets printed to a text file:

        KV{PARKED, 5}
        KV{DRIVING, 4}
        KV{MALFUNCTION, 1}
        KV{OFF, 2}

Pipeline 2 – Count Engine Statuses (windowed)

To recap, we read in a complete text file representing a stream of events and counted how many events occurred for each distinct engine status.  The problem is this pipeline is more “batch” than “stream” at this point…it wouldn’t be super useful for cases where your input source is providing constantly flowing data.  It wouldn’t be possible to hold all events in memory and count how many times an engine status occurred since the beginning of time.  That’s where windowing comes in.

Windowing:   From the docs: “The Dataflow SDK uses a concept called Windowing to subdivide a PCollection according to the timestamps of its individual elements. Dataflow transforms that aggregate multiple elements, process each PCollection as a succession of multiple, finite windows, even though the entire collection itself may be of infinite size (unbounded).”

The idea here is no matter how large a dataset we have, we can always divide it into finite windows to calculate meaningful results.  To accomplish this, the DataFlow framework asks us to assign a timestamp to each new data point, which it uses to determine the window that event belongs to.  If we encounter a new piece of data which belongs to an already processed window, DataFlow is capable of “triggering” the reprocessing of that window to create up-to-date output.  I’ll refer you to the Streaming 101 and Streaming 102 articles linked below for a more in-depth discussion.

For our purposes, I want to continue working with a bounded dataset, but let’s try breaking up the data into windows.

Notice the differences:

  • Line 5 – A custom transform to determine the timestamp of the given event.  This is required when working with windowed data.  We’ll look closer at this one in a minute.
  • Line 6 – Apply one minute windows to our pipeline.  Now when the PCollection is passed downstream, the framework applies each transform to a window of the data, instead of to the complete set.  For example, the countEngineStatuses step will aggregate distinct engine statuses PER window.  You’ll see what I mean in the final output.
  • Line 9 – A slightly different text formatter which includes window information.

Here’s the code for the timestamp function itself.  Notice it doesn’t extend SimpleFunction this time, but instead is an instance of DoFn.  That’s because the function needs access to the “ProcessContext” in order to use the outputWithTimestamp() method required by DataFlow.

Below is an example output of our pipeline.  Notice how there is a “PARKED” count for 3 different windows:  [10:30-10:31], [10:31-10:32], and [10:33-10:34].  No matter how much data we’re processing, we can always divide it into manageable windows and perform calculations like this within each time range.  The crucial point is that changing “HOW” we process the data (windows instead of all at once) did not really change “WHAT” processing we were doing.  We were able to use the same core transforms (countEngineStatuses) in both pipelines.

        window: [10:30:00..10:31:00) - KV{PARKED, 3}
        window: [10:30:00..10:31:00) - KV{DRIVING, 2}        
        window: [10:30:00..10:31:00) - KV{OFF, 2}        
        window: [10:31:00..10:32:00) - KV{PARKED, 1}
        window: [10:32:00..10:33:00) - KV{DRIVING, 1}
        window: [10:33:00..10:34:00) - KV{PARKED, 1}
        window: [10:34:00..10:35:00) - KV{MALFUNCTION, 1} 
        window: [10:35:00..10:36:00) - KV{DRIVING, 1} 

Hopefully this gives you an idea about what Google DataFlow can be used for and how to implement it.  I found it a little daunting at first, but once you get used to the API it can be very powerful.

As a footnote, I added one more pipeline to my github repository which counts how LONG the engine stayed in each state, per window.  This pipeline uses some more advanced aggregation techniques which seemed to work pretty well.

Additional Resources

Streaming 101 by Tyler Akidau

Streaming 102 by Tyler Akidau

Google DataFlow Documentation

Leave a Reply

Your email address will not be published. Required fields are marked *