Share this
Dataflow and Airflow Integration Challenges for Beginners
by Monica Lambate on Sep 20, 2023 5:20:55 PM
First, let me briefly introduce dataflow and what it offers as a Google Cloud Platform (GCP) service.
What is Dataflow?
Dataflow is a fully managed data processing service on GCP that helps organizations build and execute batch and streaming data processing pipelines. With Dataflow, data engineers can focus on building business logic without worrying about infrastructure management, as the service manages provisioning and scaling computing resources on demand. Dataflow also provides a high level of fault tolerance, ensuring that data processing pipelines are resilient to failures and can recover automatically from errors.
Dataflow is equipped to solve a large variety of use cases, including:
- ETL and Batch Processing: Dataflow enables efficient extract, transform, and load (ETL) workflows, allowing users to process large volumes of data in batch mode, perform data transformations, and load it into target systems.
- Real-time Analytics: Dataflow supports streaming data processing, making it suitable for real-time analytics use cases. It can handle continuous data streams, perform computations on the fly, and provide real-time insights and alerts.
- Data Pipelines: Dataflow facilitates the construction of complex data pipelines, where data can be processed, transformed, and routed across various stages or systems.
- Machine Learning: Dataflow integrates with Google Cloud's AI and machine learning services, enabling efficient data preparation, feature engineering, and model training pipelines.
- IoT Data Processing: Dataflow can handle high-volume, low-latency data from Internet of Things (IoT) devices, allowing for real-time processing, filtering, and aggregation.
- Clickstream Analysis: Dataflow can process clickstream data, perform event-based computations, and derive insights for customer behavior analysis, personalized recommendations, and targeted advertising.
In addition to traditional batch and stream processing use cases, Dataflow provides great flexibility to handle custom use cases. We worked on a project where we would take customer inputs in a predefined format, which would then build and trigger multiple pipelines. In this project, we built complex data pipelines based on customer-defined JSON metadata; these pipelines could have multiple tasks involving one or more GCP services. One such task was using Dataflow. This task generates a custom Query using runtime parameters (from the user metadata), runs the Query, and exports the data produced by the Query to the custom location in GCS (Google Cloud Storage).
While working on this task, we faced quite a few challenges. This article describes those challenges and how we overcame them.
Below is the high-level architecture of the use case we were building.
Some of the challenges that I faced while working on Dataflow:
- Passing parameters to an operator
- Reading the parameters in Apache Beam code
- Handling the dependencies in Apache Beam
- Using other GCP services from within the Dataflow pipeline
- Passing custom dependencies required in the Datafow
Challenge 1: Passing parameters to operator
Dataflow can be used in two ways, either using dataflow SQL or using Apache Beam. We had opted for the latter, and our dataflow job would be triggered from Airflow, where we had used BeamRunPythonPipelineOperator[1]. We initially felt that the requirements/parameters of the Beam code and all options the operator had matched well, but the dataflow_config parameter could have been a lot better. Since we were passing the value to this parameter from a jinja variable, it would not resolve the value and give the jinja variable as is. So to resolve this, we had to read the dataflow_config parameter value using a python operator and then store it in Airflow’s environment variables using Variable.set() and while declaring the BeamRunPythonPipelineOperator pull it from the Airflow’s environment variables using Variable.get().
Note: A small caution point here is that since these variables are exposed to all the Airflow users, we need to ensure that we don’t use any variables that contain confidential information.
Challenge 2: Reading the parameters in Apache Beam code
The basic/default parameters that were needed for pipeline building are passed using pipeline_options. They were declared as below:
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args()
parser.add_argument("--runner", help="Runner for Dataflow pipeline")
parser.add_argument("--labels", help="Foo the program")
parser.add_argument("--project", help="Dataflow project")
parser.add_argument("--region", help="Region")
parser.add_argument("--num_workers", help="Number of workers")
parser.add_argument("--max_num_workers", help="Maximum number of workers")
These arguments would be used for setting up the pipeline arguments using PipelineOptions.
However, there were a few parameters that we needed to pass from the dag into the Beam code, which will be further used in the code. They were passed in the same manner as the above arguments, but then, since we needed to use them in code, we had to parse them as shown below. Also, a few parameters were dictionary objects, so we needed to handle them.
Adding the arguments:
parser.add_argument("--input", help="Input from the dag")
parser.add_argument("--source_config", help="query_bucket")
Parsing the arguments to use in the code:
args = parser.parse_args()
arguments = vars(args)
input = arguments['input']
source_config = arguments['source_config']
source_config = json.loads(source_config.replace("\'", '\"'))
batch = json.loads(input.replace("\'", '\"'))
Challenge 3: Handling the dependencies in Apache Beam
Dataflow has given a very good example [2] in its documentation. But the issue was how to pass this setup.py file to Airflow. The solution was to keep the setup.py at the base location of Airflow in its dags folder. Also, we need to get the file's location in a variable in our dag(as shown below) and then pass that in the operator parameters.
LOCAL_SETUP_FILE = os.path.join(
configuration.get('core', 'dags_folder'), 'setup.py')
dataflow_task = BeamRunPythonPipelineOperator(
py_file=main_py_file,
pipeline_options={},
runner=BeamRunnerType.DataflowRunner,
task_id='dataflow_task',
dag=dag,
dataflow_config=DataflowConfiguration(),
default_pipeline_options={
"num_workers": "1",
"max_num_workers": "5",
"setup_file": LOCAL_SETUP_FILE
})
Challenge 4: Using other GCP services
So, our Beam code used a few other GCP services like GCS, BigQuery, Firestore, logging, monitoring, etc. So, we need to explicitly mention the version of these service packages we need to install in our setup.py (as highlighted below).
This setup file is more like a requirement.txt file. So, in the future, if we need to update the versions, we need to update the setup.py file and again load it in the composer’s dag_folder.
If CI/CD is enabled in your project, simply changing the versions and pushing it to git would work.
import setuptools
REQUIRED_PACKAGES = [
"apache-beam[gcp]==2.43.0", "google-cloud-storage==2.5.0", "google-cloud-bigquery[pandas]==2.34.3","google-cloud-firestore==2.7.3"\
,"pypika","google-cloud-monitoring==2.14.1","google.cloud.logging==3.4.0","croniter==1.3.8","firebase-admin==5.3.0"
]
setuptools.setup(
name='shared',
version='0.0.1',
description='shared code',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages())
Also, these dependencies would be installed in the workers and not the master. We need to mention the import statements in the pardo and not at a global level.
Challenge 5: Passing custom dependencies
So, this was quite a big challenge!
We had some common code in our project, and we also had to use that in my Beam code. However, the challenge was understanding where to keep the shared folder in GCS.
What worked was keeping it at the same level as setup.py. As setup.py was kept in dag_folder, we kept the common code folder at the same location so that setup.py could load it.
Our setup.py is as shown below.
Note: Shared (as highlighted) was the folder's name with the common code.
import setuptools
REQUIRED_PACKAGES = [
"apache-beam[gcp]==2.43.0", "google-cloud-storage==2.5.0", "google-cloud-bigquery[pandas]==2.34.3","google-cloud-firestore==2.7.3"\
,"pypika","google-cloud-monitoring==2.14.1","google.cloud.logging==3.4.0","croniter==1.3.8","firebase-admin==5.3.0"
]
setuptools.setup(
name='shared',
version='0.0.1',
description='shared code',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages())
Also, these dependencies would be installed in the workers and not the master. We need to mention the import statements in the pardo and not at a global level.
Conclusion
In conclusion, working with dataflow has been an enriching experience despite the challenges that came with it. Learning something new is always rewarding, and the difficulties make the journey memorable. Google's dataflow service offers powerful tools that can be immensely valuable for organizations seeking to extract insights and value from their data. Dataflow can be a key component of a successful data management strategy by proactively addressing the challenges that inevitably arise. Whether you're just starting or looking to expand your existing data analytics capabilities, I highly recommend exploring the potential of dataflow and seeing how it can help you achieve your goals.
References
[1] - BeamRunPythonPipelineOperator
[2] - Juliaset
[3] - Dataflow Documentation
Share this
- Technical Track (967)
- Oracle (410)
- MySQL (140)
- Cloud (128)
- Microsoft SQL Server (117)
- Open Source (90)
- Google Cloud (81)
- Microsoft Azure (63)
- Amazon Web Services (AWS) (58)
- Big Data (52)
- Google Cloud Platform (46)
- Cassandra (44)
- DevOps (41)
- Pythian (33)
- Linux (30)
- Database (26)
- Performance (25)
- Podcasts (25)
- Site Reliability Engineering (25)
- PostgreSQL (24)
- Oracle E-Business Suite (23)
- Oracle Database (22)
- Docker (21)
- DBA (20)
- Security (20)
- Exadata (18)
- MongoDB (18)
- Oracle Cloud Infrastructure (OCI) (18)
- Oracle Exadata (18)
- Automation (17)
- Hadoop (16)
- Oracleebs (16)
- Amazon RDS (15)
- Ansible (15)
- Snowflake (15)
- ASM (13)
- Artificial Intelligence (AI) (13)
- BigQuery (13)
- Replication (13)
- Advanced Analytics (12)
- Data (12)
- GenAI (12)
- Kubernetes (12)
- LLM (12)
- Authentication, SSO and MFA (11)
- Cloud Migration (11)
- Machine Learning (11)
- Rman (11)
- Datascape Podcast (10)
- Monitoring (10)
- Apache Cassandra (9)
- ChatGPT (9)
- Data Guard (9)
- Infrastructure (9)
- Oracle Applications (9)
- Python (9)
- Series (9)
- AWR (8)
- High Availability (8)
- Oracle EBS (8)
- Oracle Enterprise Manager (OEM) (8)
- Percona (8)
- Apache Beam (7)
- Data Governance (7)
- Innodb (7)
- Microsoft Azure SQL Database (7)
- Migration (7)
- Myrocks (7)
- Performance Tuning (7)
- Data Enablement (6)
- Data Visualization (6)
- Database Performance (6)
- Oracle Enterprise Manager (6)
- Orchestrator (6)
- RocksDB (6)
- Serverless (6)
- Azure Data Factory (5)
- Azure Synapse Analytics (5)
- Covid-19 (5)
- Disaster Recovery (5)
- Generative AI (5)
- Google BigQuery (5)
- Mariadb (5)
- Microsoft (5)
- Scala (5)
- Windows (5)
- Xtrabackup (5)
- Airflow (4)
- Analytics (4)
- Apex (4)
- Cloud Security (4)
- Cloud Spanner (4)
- CockroachDB (4)
- Data Management (4)
- Data Pipeline (4)
- Data Security (4)
- Data Strategy (4)
- Database Administrator (4)
- Database Management (4)
- Database Migration (4)
- Dataflow (4)
- Fusion Middleware (4)
- Google (4)
- Oracle Autonomous Database (Adb) (4)
- Oracle Cloud (4)
- Prometheus (4)
- Redhat (4)
- Slob (4)
- Ssl (4)
- Terraform (4)
- Amazon Relational Database Service (Rds) (3)
- Apache Kafka (3)
- Apexexport (3)
- Aurora (3)
- Business Intelligence (3)
- Cloud Armor (3)
- Cloud Database (3)
- Cloud FinOps (3)
- Cosmos Db (3)
- Data Analytics (3)
- Data Integration (3)
- Database Monitoring (3)
- Database Troubleshooting (3)
- Database Upgrade (3)
- Databases (3)
- Dataops (3)
- Digital Transformation (3)
- ERP (3)
- Google Chrome (3)
- Google Cloud Sql (3)
- Google Workspace (3)
- Graphite (3)
- Heterogeneous Database Migration (3)
- Liquibase (3)
- Oracle Data Guard (3)
- Oracle Live Sql (3)
- Oracle Rac (3)
- Perl (3)
- Rdbms (3)
- Remote Teams (3)
- S3 (3)
- SAP (3)
- Tensorflow (3)
- Adf (2)
- Adop (2)
- Amazon Data Migration Service (2)
- Amazon Ec2 (2)
- Amazon S3 (2)
- Apache Flink (2)
- Ashdump (2)
- Atp (2)
- Autonomous (2)
- Awr Data Mining (2)
- Cloud Cost Optimization (2)
- Cloud Data Fusion (2)
- Cloud Hosting (2)
- Cloud Infrastructure (2)
- Cloud Shell (2)
- Cloud Sql (2)
- Conferences (2)
- Cosmosdb (2)
- Cost Management (2)
- Cyber Security (2)
- Data Analysis (2)
- Data Discovery (2)
- Data Engineering (2)
- Data Migration (2)
- Data Modeling (2)
- Data Quality (2)
- Data Streaming (2)
- Data Warehouse (2)
- Database Consulting (2)
- Database Migrations (2)
- Dataguard (2)
- Docker-Composer (2)
- Enterprise Data Platform (EDP) (2)
- Etl (2)
- Events (2)
- Gemini (2)
- Health Check (2)
- Infrastructure As Code (2)
- Innodb Cluster (2)
- Innodb File Structure (2)
- Innodb Group Replication (2)
- NLP (2)
- Neo4J (2)
- Nosql (2)
- Open Source Database (2)
- Oracle Datase (2)
- Oracle Extended Manager (Oem) (2)
- Oracle Flashback (2)
- Oracle Forms (2)
- Oracle Installation (2)
- Oracle Io Testing (2)
- Podcast (2)
- Power Bi (2)
- Redshift (2)
- Remote DBA (2)
- Remote Sre (2)
- SAP HANA Cloud (2)
- Single Sign-On (2)
- Webinars (2)
- X5 (2)
- Actifio (1)
- Adf Custom Email (1)
- Adrci (1)
- Advanced Data Services (1)
- Afd (1)
- Ahf (1)
- Alloydb (1)
- Amazon (1)
- Amazon Athena (1)
- Amazon Aurora Backtrack (1)
- Amazon Efs (1)
- Amazon Redshift (1)
- Amazon Sagemaker (1)
- Amazon Vpc Flow Logs (1)
- Analysis (1)
- Analytical Models (1)
- Anisble (1)
- Anthos (1)
- Apache (1)
- Apache Nifi (1)
- Apache Spark (1)
- Application Migration (1)
- Ash (1)
- Asmlib (1)
- Atlas CLI (1)
- Awr Mining (1)
- Aws Lake Formation (1)
- Azure Data Lake (1)
- Azure Data Lake Analytics (1)
- Azure Data Lake Store (1)
- Azure Data Migration Service (1)
- Azure OpenAI (1)
- Azure Sql Data Warehouse (1)
- Batches In Cassandra (1)
- Business Insights (1)
- Chown (1)
- Chrome Security (1)
- Cloud Browser (1)
- Cloud Build (1)
- Cloud Consulting (1)
- Cloud Data Warehouse (1)
- Cloud Database Management (1)
- Cloud Dataproc (1)
- Cloud Foundry (1)
- Cloud Manager (1)
- Cloud Networking (1)
- Cloud SQL Replica (1)
- Cloud Scheduler (1)
- Cloud Services (1)
- Cloud Strategies (1)
- Compliance (1)
- Conversational AI (1)
- DAX (1)
- Data Analytics Platform (1)
- Data Box (1)
- Data Classification (1)
- Data Cleansing (1)
- Data Encryption (1)
- Data Estate (1)
- Data Flow Management (1)
- Data Insights (1)
- Data Integrity (1)
- Data Lake (1)
- Data Leader (1)
- Data Lifecycle Management (1)
- Data Lineage (1)
- Data Masking (1)
- Data Mesh (1)
- Data Migration Assistant (1)
- Data Migration Service (1)
- Data Mining (1)
- Data Monetization (1)
- Data Policy (1)
- Data Profiling (1)
- Data Protection (1)
- Data Retention (1)
- Data Safe (1)
- Data Sheets (1)
- Data Summit (1)
- Data Vault (1)
- Data Warehouse Modernization (1)
- Database Auditing (1)
- Database Consultant (1)
- Database Link (1)
- Database Modernization (1)
- Database Provisioning (1)
- Database Provisioning Failed (1)
- Database Replication (1)
- Database Scaling (1)
- Database Schemas (1)
- Database Security (1)
- Databricks (1)
- Datascape 59 (1)
- DeepSeek (1)
- Duet AI (1)
- Edp (1)
- Gcp Compute (1)
- Gcp-Spanner (1)
- Global Analytics (1)
- Google Analytics (1)
- Google Cloud Architecture Framework (1)
- Google Cloud Data Services (1)
- Google Cloud Partner (1)
- Google Cloud Spanner (1)
- Google Cloud VMware Engine (1)
- Google Compute Engine (1)
- Google Dataflow (1)
- Google Datalab (1)
- Google Grab And Go (1)
- Graph Algorithms (1)
- Graph Databases (1)
- Graph Inferences (1)
- Graph Theory (1)
- GraphQL (1)
- Healthcheck (1)
- Information (1)
- Infrastructure As A Code (1)
- Innobackupex (1)
- Innodb Concurrency (1)
- Innodb Flush Method (1)
- It Industry (1)
- Kubeflow (1)
- LMSYS Chatbot Arena (1)
- Linux Host Monitoring (1)
- Linux Storage Appliance (1)
- Looker (1)
- MMLU (1)
- Managed Services (1)
- Migrate (1)
- Migrating Ssis Catalog (1)
- Migration Checklist (1)
- MongoDB Atlas (1)
- MongoDB Compass (1)
- Newsroom (1)
- Nifi (1)
- OPEX (1)
- ORAPKI (1)
- Odbcs (1)
- Odbs (1)
- On-Premises (1)
- Ora-01852 (1)
- Ora-7445 (1)
- Oracle Cursor (1)
- Oracle Database Appliance (1)
- Oracle Database Se2 (1)
- Oracle Database Standard Edition 2 (1)
- Oracle Database Upgrade (1)
- Oracle Database@Google Cloud (1)
- Oracle Exadata Smart Scan (1)
- Oracle Licensing (1)
- Oracle Linux Virtualization Manager (1)
- Oracle Oda (1)
- Oracle Openworld (1)
- Oracle Parallelism (1)
- Oracle RMAN (1)
- Oracle Rdbms (1)
- Oracle Real Application Clusters (1)
- Oracle Reports (1)
- Oracle Security (1)
- Oracle Wallet (1)
- Perfomrance (1)
- Performance Schema (1)
- Policy (1)
- Prompt Engineering (1)
- Public Cloud (1)
- Pythian News (1)
- Rdb (1)
- Replication Compatibility (1)
- Replication Error (1)
- Retail (1)
- Scaling Ir (1)
- Securing Sql Server (1)
- Security Compliance (1)
- Serverless Computing (1)
- Sso (1)
- Tenserflow (1)
- Teradata (1)
- Vertex AI (1)
- Vertica (1)
- Videos (1)
- Workspace Security (1)
- Xbstream (1)
- May 2025 (1)
- March 2025 (2)
- February 2025 (1)
- January 2025 (2)
- December 2024 (1)
- October 2024 (2)
- September 2024 (7)
- August 2024 (4)
- July 2024 (2)
- June 2024 (6)
- May 2024 (3)
- April 2024 (2)
- February 2024 (1)
- January 2024 (11)
- December 2023 (10)
- November 2023 (11)
- October 2023 (10)
- September 2023 (8)
- August 2023 (6)
- July 2023 (2)
- June 2023 (13)
- May 2023 (4)
- April 2023 (6)
- March 2023 (10)
- February 2023 (6)
- January 2023 (5)
- December 2022 (10)
- November 2022 (10)
- October 2022 (10)
- September 2022 (13)
- August 2022 (16)
- July 2022 (12)
- June 2022 (13)
- May 2022 (11)
- April 2022 (4)
- March 2022 (5)
- February 2022 (4)
- January 2022 (14)
- December 2021 (16)
- November 2021 (11)
- October 2021 (6)
- September 2021 (11)
- August 2021 (6)
- July 2021 (9)
- June 2021 (4)
- May 2021 (8)
- April 2021 (16)
- March 2021 (16)
- February 2021 (6)
- January 2021 (12)
- December 2020 (12)
- November 2020 (17)
- October 2020 (11)
- September 2020 (10)
- August 2020 (11)
- July 2020 (13)
- June 2020 (6)
- May 2020 (9)
- April 2020 (18)
- March 2020 (21)
- February 2020 (13)
- January 2020 (15)
- December 2019 (10)
- November 2019 (11)
- October 2019 (12)
- September 2019 (16)
- August 2019 (15)
- July 2019 (10)
- June 2019 (16)
- May 2019 (20)
- April 2019 (21)
- March 2019 (14)
- February 2019 (18)
- January 2019 (18)
- December 2018 (5)
- November 2018 (16)
- October 2018 (12)
- September 2018 (20)
- August 2018 (27)
- July 2018 (31)
- June 2018 (34)
- May 2018 (28)
- April 2018 (27)
- March 2018 (17)
- February 2018 (8)
- January 2018 (20)
- December 2017 (14)
- November 2017 (4)
- October 2017 (1)
- September 2017 (3)
- August 2017 (5)
- July 2017 (4)
- June 2017 (2)
- May 2017 (7)
- April 2017 (7)
- March 2017 (8)
- February 2017 (8)
- January 2017 (5)
- December 2016 (3)
- November 2016 (4)
- October 2016 (8)
- September 2016 (9)
- August 2016 (10)
- July 2016 (9)
- June 2016 (8)
- May 2016 (13)
- April 2016 (16)
- March 2016 (13)
- February 2016 (11)
- January 2016 (6)
- December 2015 (11)
- November 2015 (11)
- October 2015 (5)
- September 2015 (16)
- August 2015 (4)
- July 2015 (1)
- June 2015 (3)
- May 2015 (6)
- April 2015 (5)
- March 2015 (5)
- February 2015 (4)
- January 2015 (3)
- December 2014 (7)
- October 2014 (4)
- September 2014 (6)
- August 2014 (6)
- July 2014 (16)
- June 2014 (7)
- May 2014 (6)
- April 2014 (5)
- March 2014 (4)
- February 2014 (10)
- January 2014 (6)
- December 2013 (8)
- November 2013 (12)
- October 2013 (9)
- September 2013 (6)
- August 2013 (7)
- July 2013 (9)
- June 2013 (7)
- May 2013 (7)
- April 2013 (4)
- March 2013 (7)
- February 2013 (4)
- January 2013 (4)
- December 2012 (6)
- November 2012 (8)
- October 2012 (9)
- September 2012 (3)
- August 2012 (5)
- July 2012 (5)
- June 2012 (7)
- May 2012 (11)
- April 2012 (1)
- March 2012 (8)
- February 2012 (1)
- January 2012 (6)
- December 2011 (8)
- November 2011 (5)
- October 2011 (9)
- September 2011 (6)
- August 2011 (4)
- July 2011 (1)
- June 2011 (1)
- May 2011 (5)
- April 2011 (2)
- February 2011 (2)
- January 2011 (2)
- December 2010 (1)
- November 2010 (7)
- October 2010 (3)
- September 2010 (8)
- August 2010 (2)
- July 2010 (4)
- June 2010 (7)
- May 2010 (2)
- April 2010 (1)
- March 2010 (3)
- February 2010 (3)
- January 2010 (2)
- November 2009 (6)
- October 2009 (6)
- August 2009 (3)
- July 2009 (3)
- June 2009 (3)
- May 2009 (2)
- April 2009 (8)
- March 2009 (6)
- February 2009 (4)
- January 2009 (3)
- November 2008 (3)
- October 2008 (7)
- September 2008 (6)
- August 2008 (9)
- July 2008 (9)
- June 2008 (9)
- May 2008 (9)
- April 2008 (8)
- March 2008 (4)
- February 2008 (3)
- January 2008 (3)
- December 2007 (2)
- November 2007 (7)
- October 2007 (1)
- August 2007 (4)
- July 2007 (3)
- June 2007 (8)
- May 2007 (4)
- April 2007 (2)
- March 2007 (2)
- February 2007 (5)
- January 2007 (8)
- December 2006 (1)
- November 2006 (3)
- October 2006 (4)
- September 2006 (3)
- July 2006 (1)
- May 2006 (2)
- April 2006 (1)
- July 2005 (1)
No Comments Yet
Let us know what you think