Creating dynamic tasks using Apache Airflow

7 min read
Apr 24, 2018

What's Airflow?

Apache Airflow is an open source scheduler built on Python. It uses a topological sorting mechanism, called a DAG ( Directed Acyclic Graph) to generate dynamic tasks for execution according to dependency, schedule, dependency task completion, data partition and/or many other possible criteria. This essentially means that the tasks that Airflow generates in a DAG have execution dependencies that define their ordering, and that the workflow has no cycles in it... there's a sequential progression of tasks that result in the ultimate completion of a complex workflow. Note that tasks can also be executed in parallel as long as they share the same prerequisite dependencies, which could be 'no dependencies'. The product consists of a web server with UI, CLI and configuration database (API is coming), a collection of Gunicorn workers to process tasks in parallel and a scheduler service that processes DAGs and feeds the workers task instances to run via a task queue (usually run on a message queue service, such as RabbitMQ... many options exist). What's all that mean? It means we can write Python code that generates and schedules dynamic tasks in the environment. A task is simply a chunk of code for Airflow to run, and can be created using a static file, by parameter passing to a template command, or by actual dynamic generation of the code in Python. In our example below, we will demonstrate the latter two options, since writing static code is kind of boring.  

Why is this interesting?

Creating pipeline tasks dynamically allows us to create automation on a large scale, and also to manage dynamically-provisioned environments. For instance, we could have a DAG that queries a configuration API and creates dynamic daily backup tasks for each pipeline in our data services environment. This is what we're going to do below. Since we have Python at our fingertips we can envision many other interactions with APIs are possible, for instance we could have Airflow manage scaling and synthetic service checks, create dynamic data processing pipelines from template, or execute snapshot backups of our cloud instances. Let's not get ahead of ourselves.  

Airflow installation and configuration

Go here... ...we installed Airflow with Celery, using a RabbitMQ task queue and MySQL DB backend. This blog isn't about installation, but let's understand what we have for our example setup:
  1. A web server with Airflow UI and MySQL backend... DAGs are displayed here, along with task execution records and Connections, Variables and Xcoms.
  2. A RabbitMQ message queue with the Airflow configuration pointed at a configured vhost and Celery Executor configured.
  3. A scheduler service that polls the DAGs directory, processes the code and manages resulting task schedules.
  4. A worker service consisting of a configurable pool of gunicorn task executor threads. Workers are assigned DAG processing and task execution jobs by the scheduler, which enqueues tasks for worker instances to pick up for processing.
Python3 and Virtualenv
I would recommend that you use a Python Virtualenv for Airflow and for your Python working directory. Most systems still use Python 2.7, which is olde. Basically, install Python3 and Virtualenv for your platform. Create a Virtualenv using Python3, with:  
 ~$ virtualenv -p python3 myenv
  ~$ cd myenv && source bin/activate
  (myenv) ~$ python -V
  Python 3.5.4
  (myenv) ~$
The Airflow service environment should be configured in the same way, then run your pip installation from inside the virtualenv. If you wish to make the environment reproducible, run a 'pip freeze' inside the environment and then save the output in a requirements file. You can then add '-r requirements.txt' to your virtualenv command to reconfigure the same way. We also need to install the 'requests' package in the Airflow environment to support our DAG... 'pip install -U requests'. The requests package is a wrapper for fetching HTTP URL, including headers and content. It's often used with RESTful API's (here also).  

Building the DAG

DAGs are picked up from the ${AIRFLOW_HOME}/dags directory by default. We can pretty much just drop our code into this directory, and the scheduler service will process it. If you drop a bad DAG, it will show as an error in the logs and also display a broken DAG warning in the interface. If you drop a healthy DAG in, it will execute the code and the resulting (dynamic, in our case) task instances will be created and scheduled. At this point we can manage the DAG via the Airflow CLI, to enable or disable it, trigger a backfill of tasks in a time range, or configure Connection and Variable objects for the DAG to use when generating task parameters. There are several basic components to a DAG, and they are simply outlined here... (note the templating with jinja section, as we'll use this approach in our DAG). So, here's code, in chunks:
# Airflow imports
 from airflow import DAG
 from airflow.operators.bash_operator import BashOperator
 from datetime import datetime, timedelta
 from airflow.models import Variable
 from airflow.hooks.base_hook import BaseHook
 # Metadata API imports
 import requests, json, string
These declarations import all the libs we need to process our DAG.
Inputs and arguments
connection = BaseHook.get_connection("source_bucket")
 source_bucket = connection.extra
 connection = BaseHook.get_connection("target_bucket")
 target_bucket = connection.extra
 metadataurl = ''
  First this collects our bucket URLs from our Airflow Connections. We have two connections, named 'source_bucket' and 'target_bucket', with the bucket names saved in the 'extra' field in the connection object. We're using the BaseHook library to fetch it from Airflow. This is the same as setting "source_bucket = 'some_bucket_name'". Second, we set a simple target for our metadata service, which is our pipeline configuration store. This could also be loaded as a connection object as the bucket names were. The metadata API is a simple datastore that keeps metadata about namespaces, pipelines and data sources configured in our environment. When we query the service, it returns a JSON list of these objects and their attributes. We will use the 'name' key of each record in the data set in our task construction.  
# DAG arguments
 default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime(2018, 4, 12),
  'email': [''],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  'retry_delay': timedelta(minutes=10)
These parameters are defined in a handy object that will be included in the DAG specification. Normally, we also include an 'on_failure_callback' param, pointing at a custom Python function, which is configured to page on a failed task execution. This is an excellent pipeline monitoring strategy, but not needed for our POC and out of scope. Airflow will record task execution failures in the database, and display them in the UI.
DAG definition
# DAG definition
 daily_incremental = DAG(
The code here creates our DAG object, assigns our arguments from the previous code block, establishes a task concurrency limit and schedule interval (default is days, and this is a daily backup). Our DAG is called 'daily_incremental'. Note, Airflow is smart enough to execute daily tasks at midnight and use yesterday's date; you don't have to handle any offset logic. All of the parameters are documented DAG arguments, except the name 'daily_incremental', a static string. The 'timedelta()' function is also a documented Airflow feature.
Command template
# Command template
 gsutil_cp_command = "gsutil -m cp -r gs:// gs:///"
With this line we create our template command to invoke gsutil to copy the data we wish to back up between buckets. Notice we have our bucket name parameters being rendered via Jinja. We will define these parameters below. Note that Airflow Variables can also be accessed in the same manner... use 'Variable.get()' instead of 'BaseHook.get_connection()', directly assigned. Also, we are using the Airflow builtin to add our date partition, which defaults to the format 'YYYYMMDD'. This is the target directory for each daily task execution to back up the dated directory, recursively.
Data fetch and iteration
# Fetch the namespaces list from the Metadata API
 #constructing the URI for our REST API call
 ns_resp = requests.get(metadataurl + 'namespaces')
 ns_json = json.loads(ns_resp.text)
Here we use the Python 'requests' library to query our API for configured namespaces and load the resulting JSON into a python object that we can query. Of course not everyone has a RESTful service to query for data, and so we could replace our 'metadataurl' variable with a filename reference that contains the relevant JSON records, and then query the file. Also, we could query the cloud storage API itself for available paths. It's Python, so sky's the limit here. The main point is, we grab our dynamic data from someplace and use it to generate tasks.  
# Iterate over the namespaces
 for i in range(0, len(ns_json)):
  # Fetch the pipelines configured for each namespace
  #constructing the URI again
  pl_resp = requests.get(metadataurl + 'ns/' + ns_json[i]['name'] + '/pipelines')
  pl_json = json.loads(pl_resp.text)
  # Iterate over the pipelines
  for j in range(0, len(pl_json)):
  #constructs a task name, 'namespace_pipeline'
  task_name = str(ns_json[i]['name']).replace(".", "_") + '_' + str(pl_json[j]['name'])
  #constructs a bucket path, 'namespace/archive/pipeline/'
  task_path = '/' + str(ns_json[i]['name']) + '/archive/' + str(pl_json[j]['name'])
In this nested iteration we're finally rendering all the inputs for our tasks. Our first iterates through each namespace and the second collects a list of pipelines that our application has configured within each namespace. We then generate a dynamic list of unique names and paths on each iteration and assign the to variables task_name and task_path, respectively.
Dynamic task definition
 # Create our dynamic tasks
  gsutil_cp_pipeline = BashOperator(
  params={'source_path': source_bucket + task_path,
  'dest_path': target_bucket
Note above that the bash_command parameter in our BashOperator object uses the command template from above, and passes the 'source_path' and 'dest_path' parameters to it. This block constructs a task from the result of each unique namespace and pipeline pairing, using the 'source_bucket', 'target_bucket', 'task_name' and 'task_path' variables as input.
When the loop is finished processing (Airflow scheduler will assign it to a worker for DAG processing and task rendering), you can query the DAG using the Airflow CLI to see the list of tasks it generated, and then enable the DAG (it deploys paused by default).  
(myenv) ~$ airflow list_tasks daily_incremental
 (myenv) ~$ airflow unpause daily_incremental
 Dag: <DAG: daily_incremental>, paused: 0
 (myenv) ~$


Keep in mind that we have the power of Python at our fingertips here. I use JSON and a REST API with an Airflow Connection and a static variable. Your data inputs can come from any source however, including your Cloud Provider's API, a metrics database for scaling tasks, an RDBMS, yaml or xml files... anything. A great learning exercise would be to deploy a DAG that works in your own environment and using your own sources. Commands and outputs can change too... and this is the power of Airflow, to create dynamic tasks. I construct a file path, but yours might create a database connection string and execute an update to a materialized view, or invoke a mysqldump command to back up an RDBMS datasource, or run a Spark job in a data pipeline. Airflow can do it all, and also allows you to build dependencies, instrumentation and workflow processing rules right into your DAGs (for instance, don't run task_2 until all instances of task_1 are successfully executed). Leveraging a little knowledge of Python to interact with your application environment allows you to be dynamic with your scheduled tasks, and also to update your configuration centrally. If done well, a DAG can be fully dynamic and execute tasks based on the target environment's configuration details without have to write new code for every instance of the same task. Cool.

Get Email Notifications

No Comments Yet

Let us know what you think