Skip to content

Insight and analysis of technology and business strategy

Dataflow and Airflow Integration Challenges for Beginners

First, let me briefly introduce dataflow and what it offers as a Google Cloud Platform (GCP) service.

What is Dataflow?

Dataflow is a fully managed data processing service on GCP that helps organizations build and execute batch and streaming data processing pipelines. With Dataflow, data engineers can focus on building business logic without worrying about infrastructure management, as the service manages provisioning and scaling computing resources on demand. Dataflow also provides a high level of fault tolerance, ensuring that data processing pipelines are resilient to failures and can recover automatically from errors.

Dataflow is equipped to solve a large variety of use cases, including: 

  • ETL and Batch Processing: Dataflow enables efficient extract, transform, and load (ETL) workflows, allowing users to process large volumes of data in batch mode, perform data transformations, and load it into target systems.
  • Real-time Analytics: Dataflow supports streaming data processing, making it suitable for real-time analytics use cases. It can handle continuous data streams, perform computations on the fly, and provide real-time insights and alerts.
  • Data Pipelines: Dataflow facilitates the construction of complex data pipelines, where data can be processed, transformed, and routed across various stages or systems.
  • Machine Learning: Dataflow integrates with Google Cloud's AI and machine learning services, enabling efficient data preparation, feature engineering, and model training pipelines.
  • IoT Data Processing: Dataflow can handle high-volume, low-latency data from Internet of Things (IoT) devices, allowing for real-time processing, filtering, and aggregation.
  • Clickstream Analysis: Dataflow can process clickstream data, perform event-based computations, and derive insights for customer behavior analysis, personalized recommendations, and targeted advertising.

In addition to traditional batch and stream processing use cases, Dataflow provides great flexibility to handle custom use cases. We worked on a project where we would take customer inputs in a predefined format, which would then build and trigger multiple pipelines. In this project, we built complex data pipelines based on customer-defined JSON metadata; these pipelines could have multiple tasks involving one or more GCP services. One such task was using Dataflow. This task generates a custom Query using runtime parameters (from the user metadata), runs the Query, and exports the data produced by the Query to the custom location in GCS (Google Cloud Storage). 

While working on this task, we faced quite a few challenges. This article describes those challenges and how we overcame them.

Below is the high-level architecture of the use case we were building.




                  

Some of the challenges that I faced while working on Dataflow: 

  1. Passing parameters to an operator
  2. Reading the parameters in Apache Beam code
  3. Handling the dependencies in Apache Beam
  4. Using other GCP services from within the Dataflow pipeline
  5. Passing custom dependencies required in the Datafow

Challenge 1: Passing parameters to operator

Dataflow can be used in two ways, either using dataflow SQL or using Apache Beam. We had opted for the latter, and our dataflow job would be triggered from Airflow, where we had used BeamRunPythonPipelineOperator[1]. We initially felt that the requirements/parameters of the Beam code and all options the operator had matched well, but the dataflow_config parameter could have been a lot better. Since we were passing the value to this parameter from a jinja variable, it would not resolve the value and give the jinja variable as is. So to resolve this, we had to read the dataflow_config parameter value using a python operator and then store it in Airflow’s environment variables using Variable.set() and while declaring the BeamRunPythonPipelineOperator pull it from the Airflow’s environment variables using Variable.get().

Note: A small caution point here is that since these variables are exposed to all the Airflow users, we need to ensure that we don’t use any variables that contain confidential information.

Challenge 2: Reading the parameters in Apache Beam code

The basic/default parameters that were needed for pipeline building are passed using pipeline_options. They were declared as below:

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
parser.add_argument("--runner", help="Runner for Dataflow pipeline")
parser.add_argument("--labels", help="Foo the program")
parser.add_argument("--project", help="Dataflow project")
parser.add_argument("--region", help="Region")
parser.add_argument("--num_workers", help="Number of workers")
parser.add_argument("--max_num_workers", help="Maximum number of workers")

These arguments would be used for setting up the pipeline arguments using PipelineOptions.

However, there were a few parameters that we needed to pass from the dag into the Beam code, which will be further used in the code. They were passed in the same manner as the above arguments, but then, since we needed to use them in code, we had to parse them as shown below. Also, a few parameters were dictionary objects, so we needed to handle them.

Adding the arguments:

parser.add_argument("--input", help="Input from the dag")
parser.add_argument("--source_config", help="query_bucket")

Parsing the arguments to use in the code:

args = parser.parse_args()
arguments = vars(args)
input = arguments['input']
source_config = arguments['source_config']
source_config = json.loads(source_config.replace("\'", '\"'))
batch = json.loads(input.replace("\'", '\"'))

 

Challenge 3: Handling the dependencies in Apache Beam

Dataflow has given a very good example [2] in its documentation. But the issue was how to pass this setup.py file to Airflow. The solution was to keep the setup.py at the base location of Airflow in its dags folder. Also, we need to get the file's location in a variable in our dag(as shown below) and then pass that in the operator parameters.

LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'setup.py')

 

dataflow_task = BeamRunPythonPipelineOperator(
   py_file=main_py_file,
   pipeline_options={},
   runner=BeamRunnerType.DataflowRunner,
   task_id='dataflow_task',
   dag=dag,
   dataflow_config=DataflowConfiguration(),
   default_pipeline_options={
       "num_workers": "1",
       "max_num_workers": "5",
       "setup_file": LOCAL_SETUP_FILE
   })

 

Challenge 4: Using other GCP services

So, our Beam code used a few other GCP services like GCS, BigQuery, Firestore, logging, monitoring, etc. So, we need to explicitly mention the version of these service packages we need to install in our setup.py (as highlighted below).

This setup file is more like a requirement.txt file. So, in the future, if we need to update the versions, we need to update the setup.py file and again load it in the composer’s dag_folder.

If CI/CD is enabled in your project, simply changing the versions and pushing it to git would work.

import setuptools

REQUIRED_PACKAGES = [
   "apache-beam[gcp]==2.43.0", "google-cloud-storage==2.5.0", "google-cloud-bigquery[pandas]==2.34.3","google-cloud-firestore==2.7.3"\
,"pypika","google-cloud-monitoring==2.14.1","google.cloud.logging==3.4.0","croniter==1.3.8","firebase-admin==5.3.0"
]

setuptools.setup(
   name='shared',
   version='0.0.1',
   description='shared code',
   install_requires=REQUIRED_PACKAGES,
   packages=setuptools.find_packages())

Also, these dependencies would be installed in the workers and not the master. We need to mention the import statements in the pardo and not at a global level.

 

Challenge 5: Passing custom dependencies

So, this was quite a big challenge! 

We had some common code in our project, and we also had to use that in my Beam code. However, the challenge was understanding where to keep the shared folder in GCS. 

What worked was keeping it at the same level as setup.py. As setup.py was kept in dag_folder, we kept the common code folder at the same location so that setup.py could load it.

Our setup.py is as shown below. 

Note: Shared (as highlighted) was the folder's name with the common code.

import setuptools

REQUIRED_PACKAGES = [
   "apache-beam[gcp]==2.43.0", "google-cloud-storage==2.5.0", "google-cloud-bigquery[pandas]==2.34.3","google-cloud-firestore==2.7.3"\
,"pypika","google-cloud-monitoring==2.14.1","google.cloud.logging==3.4.0","croniter==1.3.8","firebase-admin==5.3.0"
]

setuptools.setup(
   name='shared',
   version='0.0.1',
   description='shared code',
   install_requires=REQUIRED_PACKAGES,
   packages=setuptools.find_packages())

Also, these dependencies would be installed in the workers and not the master. We need to mention the import statements in the pardo and not at a global level.

 

Conclusion

In conclusion, working with dataflow has been an enriching experience despite the challenges that came with it. Learning something new is always rewarding, and the difficulties make the journey memorable. Google's dataflow service offers powerful tools that can be immensely valuable for organizations seeking to extract insights and value from their data. Dataflow can be a key component of a successful data management strategy by proactively addressing the challenges that inevitably arise. Whether you're just starting or looking to expand your existing data analytics capabilities, I highly recommend exploring the potential of dataflow and seeing how it can help you achieve your goals.

 

References 

[1] - BeamRunPythonPipelineOperator  

[2] - Juliaset

[3] - Dataflow Documentation

Top Categories

  • There are no suggestions because the search field is empty.

Tell us how we can help!

dba-cloud-services
Upcoming-Events-banner