~$ virtualenv -p python3 myenv ~$ cd myenv && source bin/activate (myenv) ~$ python -V Python 3.5.4 (myenv) ~$The Airflow service environment should be configured in the same way, then run your pip installation from inside the virtualenv. If you wish to make the environment reproducible, run a 'pip freeze' inside the environment and then save the output in a requirements file. You can then add '-r requirements.txt' to your virtualenv command to reconfigure the same way. We also need to install the 'requests' package in the Airflow environment to support our DAG... 'pip install -U requests'. The requests package is a wrapper for fetching HTTP URL, including headers and content. It's often used with RESTful API's (here also).
# Airflow imports from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta from airflow.models import Variable from airflow.hooks.base_hook import BaseHook # Metadata API imports import requests, json, stringThese declarations import all the libs we need to process our DAG.
connection = BaseHook.get_connection("source_bucket")
source_bucket = connection.extra
connection = BaseHook.get_connection("target_bucket")
target_bucket = connection.extra
metadataurl = 'https://myweb.ka.com/api/v1/metadata/'
First this collects our bucket URLs from our Airflow Connections. We have two connections, named 'source_bucket' and 'target_bucket', with the bucket names saved in the 'extra' field in the connection object. We're using the BaseHook library to fetch it from Airflow. This is the same as setting "source_bucket = 'some_bucket_name'". Second, we set a simple target for our metadata service, which is our pipeline configuration store. This could also be loaded as a connection object as the bucket names were. The metadata API is a simple datastore that keeps metadata about namespaces, pipelines and data sources configured in our environment. When we query the service, it returns a JSON list of these objects and their attributes. We will use the 'name' key of each record in the data set in our task construction.
# DAG arguments
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 4, 12),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10)
}
These parameters are defined in a handy object that will be included in the DAG specification. Normally, we also include an 'on_failure_callback' param, pointing at a custom Python function, which is configured to page on a failed task execution. This is an excellent pipeline monitoring strategy, but not needed for our POC and out of scope. Airflow will record task execution failures in the database, and display them in the UI.
# DAG definition daily_incremental = DAG( 'daily_incremental', catchup=False, concurrency=16, default_args=default_args, schedule_interval=timedelta(1))The code here creates our DAG object, assigns our arguments from the previous code block, establishes a task concurrency limit and schedule interval (default is days, and this is a daily backup). Our DAG is called 'daily_incremental'. Note, Airflow is smart enough to execute daily tasks at midnight and use yesterday's date; you don't have to handle any offset logic. All of the parameters are documented DAG arguments, except the name 'daily_incremental', a static string. The 'timedelta()' function is also a documented Airflow feature.
# Command template gsutil_cp_command = "gsutil -m cp -r gs:// gs:///"With this line we create our template command to invoke gsutil to copy the data we wish to back up between buckets. Notice we have our bucket name parameters being rendered via Jinja. We will define these parameters below. Note that Airflow Variables can also be accessed in the same manner... use 'Variable.get()' instead of 'BaseHook.get_connection()', directly assigned. Also, we are using the Airflow builtin to add our date partition, which defaults to the format 'YYYYMMDD'. This is the target directory for each daily task execution to back up the dated directory, recursively.
# Fetch the namespaces list from the Metadata API #constructing the URI for our REST API call ns_resp = requests.get(metadataurl + 'namespaces') ns_json = json.loads(ns_resp.text)Here we use the Python 'requests' library to query our API for configured namespaces and load the resulting JSON into a python object that we can query. Of course not everyone has a RESTful service to query for data, and so we could replace our 'metadataurl' variable with a filename reference that contains the relevant JSON records, and then query the file. Also, we could query the cloud storage API itself for available paths. It's Python, so sky's the limit here. The main point is, we grab our dynamic data from someplace and use it to generate tasks.
# Iterate over the namespaces
for i in range(0, len(ns_json)):
# Fetch the pipelines configured for each namespace
#constructing the URI again
pl_resp = requests.get(metadataurl + 'ns/' + ns_json[i]['name'] + '/pipelines')
pl_json = json.loads(pl_resp.text)
# Iterate over the pipelines
for j in range(0, len(pl_json)):
#constructs a task name, 'namespace_pipeline'
task_name = str(ns_json[i]['name']).replace(".", "_") + '_' + str(pl_json[j]['name'])
#constructs a bucket path, 'namespace/archive/pipeline/'
task_path = '/' + str(ns_json[i]['name']) + '/archive/' + str(pl_json[j]['name'])
In this nested iteration we're finally rendering all the inputs for our tasks. Our first iterates through each namespace and the second collects a list of pipelines that our application has configured within each namespace. We then generate a dynamic list of unique names and paths on each iteration and assign the to variables task_name and task_path, respectively.
# Create our dynamic tasks
gsutil_cp_pipeline = BashOperator(
task_id=task_name,
bash_command=gsutil_cp_command,
params={'source_path': source_bucket + task_path,
'dest_path': target_bucket
},
dag=daily_incremental)
Note above that the bash_command parameter in our BashOperator object uses the command template from above, and passes the 'source_path' and 'dest_path' parameters to it. This block constructs a task from the result of each unique namespace and pipeline pairing, using the 'source_bucket', 'target_bucket', 'task_name' and 'task_path' variables as input.
(myenv) ~$ airflow list_tasks daily_incremental com_ka_weatherdata_chicago_weather com_ka_weatherdata_losangeles_weather com_ka_weatherdata_newyork_weather com_ka_weatherdata_washington_weather com_ka_newsfeeds_chicago_news com_ka_newsfeeds_losangeles_news com_ka_newsfeeds_newyorkcity_news com_ka_newsfeeds_winnipeg_news (myenv) ~$ airflow unpause daily_incremental Dag: <DAG: daily_incremental>, paused: 0 (myenv) ~$
Keep in mind that we have the power of Python at our fingertips here. I use JSON and a REST API with an Airflow Connection and a static variable. Your data inputs can come from any source however, including your Cloud Provider's API, a metrics database for scaling tasks, an RDBMS, yaml or xml files... anything. A great learning exercise would be to deploy a DAG that works in your own environment and using your own sources. Commands and outputs can change too... and this is the power of Airflow, to create dynamic tasks. I construct a file path, but yours might create a database connection string and execute an update to a materialized view, or invoke a mysqldump command to back up an RDBMS datasource, or run a Spark job in a data pipeline. Airflow can do it all, and also allows you to build dependencies, instrumentation and workflow processing rules right into your DAGs (for instance, don't run task_2 until all instances of task_1 are successfully executed). Leveraging a little knowledge of Python to interact with your application environment allows you to be dynamic with your scheduled tasks, and also to update your configuration centrally. If done well, a DAG can be fully dynamic and execute tasks based on the target environment's configuration details without have to write new code for every instance of the same task. Cool.
Ready to optimize your Oracle Database for the future?