Dataflow is a fully managed data processing service on GCP that helps organizations build and execute batch and streaming data processing pipelines. With Dataflow, data engineers can focus on building business logic without worrying about infrastructure management, as the service manages provisioning and scaling computing resources on demand. Dataflow also provides a high level of fault tolerance, ensuring that data processing pipelines are resilient to failures and can recover automatically from errors.
Dataflow is equipped to solve a large variety of use cases, including:
In addition to traditional batch and stream processing use cases, Dataflow provides great flexibility to handle custom use cases. We worked on a project where we would take customer inputs in a predefined format, which would then build and trigger multiple pipelines. In this project, we built complex data pipelines based on customer-defined JSON metadata; these pipelines could have multiple tasks involving one or more GCP services. One such task was using Dataflow. This task generates a custom Query using runtime parameters (from the user metadata), runs the Query, and exports the data produced by the Query to the custom location in GCS (Google Cloud Storage).
While working on this task, we faced quite a few challenges. This article describes those challenges and how we overcame them.
Below is the high-level architecture of the use case we were building.
Some of the challenges that I faced while working on Dataflow:
Dataflow can be used in two ways, either using dataflow SQL or using Apache Beam. We had opted for the latter, and our dataflow job would be triggered from Airflow, where we had used BeamRunPythonPipelineOperator[1]. We initially felt that the requirements/parameters of the Beam code and all options the operator had matched well, but the dataflow_config parameter could have been a lot better. Since we were passing the value to this parameter from a jinja variable, it would not resolve the value and give the jinja variable as is. So to resolve this, we had to read the dataflow_config parameter value using a python operator and then store it in Airflow’s environment variables using Variable.set() and while declaring the BeamRunPythonPipelineOperator pull it from the Airflow’s environment variables using Variable.get().
Note: A small caution point here is that since these variables are exposed to all the Airflow users, we need to ensure that we don’t use any variables that contain confidential information.
The basic/default parameters that were needed for pipeline building are passed using pipeline_options. They were declared as below:
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
parser.add_argument("--runner", help="Runner for Dataflow pipeline")
parser.add_argument("--labels", help="Foo the program")
parser.add_argument("--project", help="Dataflow project")
parser.add_argument("--region", help="Region")
parser.add_argument("--num_workers", help="Number of workers")
parser.add_argument("--max_num_workers", help="Maximum number of workers")
These arguments would be used for setting up the pipeline arguments using PipelineOptions.
However, there were a few parameters that we needed to pass from the dag into the Beam code, which will be further used in the code. They were passed in the same manner as the above arguments, but then, since we needed to use them in code, we had to parse them as shown below. Also, a few parameters were dictionary objects, so we needed to handle them.
Adding the arguments:
parser.add_argument("--input", help="Input from the dag")
parser.add_argument("--source_config", help="query_bucket")
Parsing the arguments to use in the code:
args = parser.parse_args()
arguments = vars(args)
input = arguments['input']
source_config = arguments['source_config']
source_config = json.loads(source_config.replace("\'", '\"'))
batch = json.loads(input.replace("\'", '\"'))
Dataflow has given a very good example [2] in its documentation. But the issue was how to pass this setup.py file to Airflow. The solution was to keep the setup.py at the base location of Airflow in its dags folder. Also, we need to get the file's location in a variable in our dag(as shown below) and then pass that in the operator parameters.
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'setup.py')
dataflow_task = BeamRunPythonPipelineOperator(
py_file=main_py_file,
pipeline_options={},
runner=BeamRunnerType.DataflowRunner,
task_id='dataflow_task',
dag=dag,
dataflow_config=DataflowConfiguration(),
default_pipeline_options={
"num_workers": "1",
"max_num_workers": "5",
"setup_file": LOCAL_SETUP_FILE
})
So, our Beam code used a few other GCP services like GCS, BigQuery, Firestore, logging, monitoring, etc. So, we need to explicitly mention the version of these service packages we need to install in our setup.py (as highlighted below).
This setup file is more like a requirement.txt file. So, in the future, if we need to update the versions, we need to update the setup.py file and again load it in the composer’s dag_folder.
If CI/CD is enabled in your project, simply changing the versions and pushing it to git would work.
import setuptools
REQUIRED_PACKAGES = [
"apache-beam[gcp]==2.43.0", "google-cloud-storage==2.5.0", "google-cloud-bigquery[pandas]==2.34.3","google-cloud-firestore==2.7.3"\
,"pypika","google-cloud-monitoring==2.14.1","google.cloud.logging==3.4.0","croniter==1.3.8","firebase-admin==5.3.0"
]
setuptools.setup(
name='shared',
version='0.0.1',
description='shared code',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages())
Also, these dependencies would be installed in the workers and not the master. We need to mention the import statements in the pardo and not at a global level.
So, this was quite a big challenge!
We had some common code in our project, and we also had to use that in my Beam code. However, the challenge was understanding where to keep the shared folder in GCS.
What worked was keeping it at the same level as setup.py. As setup.py was kept in dag_folder, we kept the common code folder at the same location so that setup.py could load it.
Our setup.py is as shown below.
Note: Shared (as highlighted) was the folder's name with the common code.
import setuptools
REQUIRED_PACKAGES = [
"apache-beam[gcp]==2.43.0", "google-cloud-storage==2.5.0", "google-cloud-bigquery[pandas]==2.34.3","google-cloud-firestore==2.7.3"\
,"pypika","google-cloud-monitoring==2.14.1","google.cloud.logging==3.4.0","croniter==1.3.8","firebase-admin==5.3.0"
]
setuptools.setup(
name='shared',
version='0.0.1',
description='shared code',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages())
Also, these dependencies would be installed in the workers and not the master. We need to mention the import statements in the pardo and not at a global level.
In conclusion, working with dataflow has been an enriching experience despite the challenges that came with it. Learning something new is always rewarding, and the difficulties make the journey memorable. Google's dataflow service offers powerful tools that can be immensely valuable for organizations seeking to extract insights and value from their data. Dataflow can be a key component of a successful data management strategy by proactively addressing the challenges that inevitably arise. Whether you're just starting or looking to expand your existing data analytics capabilities, I highly recommend exploring the potential of dataflow and seeing how it can help you achieve your goals.
[1] - BeamRunPythonPipelineOperator
[2] - Juliaset
[3] - Dataflow Documentation
Ready to make smarter, data-driven decisions?