Apache Beam is a relatively new framework, which claims to deliver unified, parallel processing model for the data. Apache Beam with Google DataFlow can be used in various data processing scenarios like: ETLs (Extract Transform Load), data migrations and machine learning pipelines. This post explains how to run Apache Beam Python pipeline using Google DataFlow and then how to deploy this pipeline to App Engine in order to run it i.e on a daily basis using App Engine CRON service. There are some tutorials explaining how to create basic pipelines and how to run them using Apache Beam DataFlow Runner, but there is not much information how to deploy those pipelines to be runned without user interaction, which should be the first thing to do, since no one wants to trigger big data pipelines by hand.

As an example pipeline I will show wordcount on a few books of Shakespeare.

TL;DR

Use App Engine CRON to launch Google DataFlow pipeline written in Apache Beam Python SDK. Link to github: https://github.com/marrrcin/python-beam-dataflow-cron

Prerequisites

  • Python 2.7.12+
  • Apache Beam SDK for Python 2.0.0
  • Google Cloud Platform account to run the pipeline in DataFlow and store results in Cloud Storage
  • Google Cloud SDK for Python
  • basic knowledge of Apache Beam

The pipeline

The idea behind wordcount pipeline in Apache Beam is simple - actually it’s almost the same as for MapReduce, but simpler in the implementation, because of the Apache Beam capabilities and expressiveness:

  1. read all input files
  2. tokenize the text
  3. for every token, emit tuple (word, 1) - mapping phase
  4. group tuples by word
  5. sum values for every group - aggregation phase
  6. format and save output

Pipeline implementation

Here is python code for the pipeline described above. Comments and code explanation is below.

from __future__ import print_function, absolute_import
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
import logging
logging.basicConfig(level=logging.INFO)


class FindWords(beam.DoFn):
    def process(self, element):
        import re as regex
        return regex.findall(r"[A-Za-z\']+", element)


class CountWordsTransform(beam.PTransform):
    def expand(self, p_collection):
        # p_collection is collection of loaded lines
        return (p_collection
                | "Find words" >> (beam.ParDo(FindWords()).with_input_types(unicode))
                | "Pair With One" >> beam.Map(lambda word: (word, 1))
                | "Group By" >> beam.GroupByKey()
                | "Aggregate Groups" >> beam.Map(lambda (word, ones): (word, sum(ones))))


def run():
    import time
    gcs_path = "gs://marcin-playground/dataflow"
    pipeline =  beam.Pipeline(runner="DataflowRunner", argv=[
        "--project", "your-project-name",
        "--staging_location", ("%s/staging_location" % gcs_path),
        "--temp_location", ("%s/temp" % gcs_path),
        "--output", ("%s/output" % gcs_path),
        "--setup_file", "./setup.py"
    ])
    (pipeline
     | "Load" >> ReadFromText("gs://marcin-playground/books/*.txt")
     | "Count Words" >> CountWordsTransform()
     | "Format Output" >> beam.Map(lambda (word, count): "{0}: {1}".format(word, count))
     | "Save" >> WriteToText("{0}/output/wordcount{1}".format(gcs_path, int(time.time())))
     )
    pipeline.run()

Besides the required imports, first important part is the run() method - it’s an entry point to the pipeline. Whan you can observe there is that it does not follow the Apache Beam documentation convention, where beam.Pipeline objects are wrapped inside the python with clause. I did it on purpose - thanks to that, the code will not wait for the pipeline to complete, but rather it will dispatch the job to Google DataFlow and return (see “DataflowRunner” in the constructor). Other configuration parameters are self-descriptive and they are required by the Beam SDK.

First, the pipeline reads all book files (link to those files is at the end of this post) from Google Cloud Storage - you need to create GCS bucket and put the input files into the folders in order to use them in the pipeline.

Count Words is a second step. It uses my implementation of beam.PTransform class, which can be used to join multiple transforms into single high-level piece. The implementation itself consists of 4 steps:

  1. Finding the words - for simplicity, I assume that word only contains lower and upper case letters and/or an apostrophe. In the FindWords class, which is a custom implementation of beam.DoFn, the def process(self, element) function is responsible of transforming each line (element parameter) of the readed books in to something else, in this case - the output will be a list of words.
  2. Next, each list of words is transformed into tuples in form (word, 1), it’s just a matter of invoking beam.Map built-in function
  3. Third step is grouping tuples by word, so we can then aggregate (count) them in total
  4. Last step is to acctualy aggregate them, thanks to the fact that we’ve emmited tuples with 1, we can just sum them to obtain the total number of occurences for each word

Format Output does only one simple thing - transforms each tuple in form (word, number of occurences) to readable string in format: my word: 123.

Last step is to Save results into file. For the case of running the pipeline multiple times, output folder will have current timestamp in the name.

Deploying Python DataFlow pipeline to Google App Engine

Now when the pipeline is ready, we can prepare python web application which will trigger DataFlow jobs. For this task, I will use Flask framework.

Project structure

Start with creating the project structure:

Google DataFlow Python App Engine project structure

Description of the structure:

  • app.yaml - definition of Google App Engine application
  • cron.yaml - definition of Google App Engine CRON job
  • main.py - entry point for the web application
  • dataflow_pipeline module - the pipeline described above, wrapped inside the python module
  • requirements.txt - file for pip package manager, which contains list of all required packages to run the application and the pipeline
  • appengine_config.py - standard App Engine python configuration file
  • lib - local folder with all pip-installed packages from requirements.txt file
  • setup.py - setup file, which is required for DataFlow to be able to run the code from the pipeline - when you use libraries like pandas in your pipeline code, you must notify the DataFlow to install this package on every worker machine that will be running the pipeline - that’s why the setup.py file is required

Configuration

What is really important in the requirements.txt file. For my pipeline, it’s contents are:

Flask==0.12.2
apache-beam[gcp]==2.0.0
gunicorn==19.7.1
google-cloud-dataflow==2.0.0

In order to make the App Engine application to work, all of the required packages are needed to be deployed with the application, that’s why you need to install them locally by invoking:

pip install -r requirements.txt -t lib

Also note that apache-beam package is used with [gcp] feature, which will install the capabilities of running the pipeline in DataFlow.

Configuration file for the application looks like this: app.yaml

runtime: python
env: flex

entrypoint: gunicorn -b :$PORT main:app

runtime_config:
  python_version: 2

The file describes that we want to use App Engine Flex environment, with python 2.7 runtime.

IMPORTANT: Apache Beam pipelines will not work on App Engine Standard environment, due to some filestystem I/O operations. You will most likely encounter this exception there:

IOError: [Errno 30] Read-only file system: '/dev/null'
at <module> (/base/data/home/apps/project/20170626t141416.402244965699518249/lib/dill/dill.py:167)
at <module> (/base/data/home/apps/project/20170626t141416.402244965699518249/lib/dill/__init__.py:27)

The solution is just to use Flex environment.

Flask application for starting DataFlow pipeline

Here is the implementation of Flask application with single endpoint: /start-dataflow.

from __future__ import print_function, absolute_import
from flask import Flask, request
import logging
import dataflow_pipeline.wordcountpipeline as pipeline

app = Flask(__name__)


@app.route("/start-dataflow")
def start_dataflow():
    is_cron = request.headers.get('X-Appengine-Cron', False)
    if not is_cron:
        return 'Bad Request', 400

    try:
        pipeline.run()
        return "Pipeline started", 200
    except Exception as e:
        logging.exception(e)
        return "Error: <pre>{}</pre>".format(e), 500


@app.errorhandler(500)
def server_error(e):
    logging.exception('An error occurred during a request.')
    return """
    An internal error occurred: <pre>{}</pre>
    See logs for full stacktrace.
    """.format(e), 500


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=8080, debug=True)

From the code above, the most important part is the def start_dataflow() function. It checks for the presence of X-Appengine-Cron HTTP request header. Thanks to that, the app can know, whether the request came from App Engine CRON service or just from external call. For the debugging purposes, you can comment out this part, but remember to NOT DEPLOY application without this check, because then, any user will be able to trigger your pipeline and potentially increase your billing a lot!

After asserting the presence of HTTP header, Apache Beam pipeline is spawned by calling run() method which was defined in our pipeline above. If everything goes fine, web application will return status code 200. If something will go wrong, you will get status code 500 with stacktrace, which you can use to hunt down the issue.

Defining App Engine CRON service to trigger python DataFlow pipeline

Last thing to do is to define cron.yaml file with CRON job description:

cron:
- description: trigger dataflow test
  url: /start-dataflow
  schedule: every 30 mins
  target: default

It’s a simple file, which will send GET request to /start-dataflow endpoint every 30 minutes. Of course you can modify the schedule as needed.

Deployment

  1. Deploy web application by invoking:
gcloud app deploy app.yaml

Note that for large pipelines with lots of external libraries, deploying App Engine Flex environment application might take a while.

  1. Deploy CRON service by invoking:
gcloud app deploy cron.yaml
  1. Go to Google Cloud Platform Console (https://console.cloud.google.com) and open App Engine tab. You should see your app there.

  2. Under App Engine tab, open Task queues > Cron Jobs - the CRON service should be visible there. You can trigger it manually by using [Run now] button.

Python Google DataFlow CRON service

Executing the pipeline

After successful job dispatch, you can open GCP Console and navigate to DataFlow tab to see all of your finished / running jobs:

Google DataFlow jobs dashboard

For each job, you can view it’s exectuion statistics, logs and data flow visualization:

Google DataFlow job execution details

Summary

That’s it! I hope this end-to-end example will help you deploy your jobs to Google DataFlow. Please share or leave comment if you like it!

Pro tips

  • when you want to use library in Apache Beam pipeline i.e in your beam.DoFn implementation, always use local imports (in the method body)
  • if you use custom python libraries (installed through pip) you must create setup.py file with all the packages listed there, because Google DataFlow workers are deployed only with limited number of preinstalled packages

Comments