Airflow Scheduler: the very basics

Tags:
Renew Refresh Republish,
Big Data,
Technical Track,
Cloud,
Google Cloud Platform (Gcp),
Python,
Amazon Web Services (Aws),
Airflow
Apache Airflow is a great tool for scheduling jobs. It has a nice UI out of the box. It allows you to create a directed acyclic graph (DAG) of tasks and their dependencies. You can easily look at how the jobs are currently doing and how they have performed in the past. If a job fails, you can configure retries or manually kick the job easily through Airflow CLI or using the Airflow UI. Overall, it is a great tool to run your pipeline. However, if you are just getting started with Airflow, the scheduler may be fairly confusing. Let's start at the beginning and make things very simple. Our goal is to schedule a DAG that runs every day at 2:00:00 UTC, starting from today (you already have yesterday's data). A DAG, amongst other params, is instantiated with the following params: schedule_interval, start_date and end_date. Let's say we created a DAG called... Foo (I know, so original!) on 2018-06-07 with the configuration below:
dag_params = { ... start_date: datetime(2018, 6, 6), end_date: None, # Explicitly makes the DAG open-ended, run forever ... } with DAG('Foo', default_args=dag_params, schedule_interval='0 2 * * *') as dag: ... # a bunch of tasks ...The scheduler periodically checks the DAG folder and registers the new DAG. The scheduler sets new DAGs in the paused state at creation by default, unless a developer has changed the dags_are_paused_at_creation in airflow.cfg. BEFORE unpausing the DAG, there are several things we need to understand about how the scheduler determines if a DAG should be run.
The first run
Let's pretend that we unpause the DAG. What will the scheduler do in regards to the Foo DAG? It will attempt to find previous runs. However, since this is a new DAG, there won't be any previous runs. The scheduler periodically goes through all of the DAGs and checks if the DAG needs to be run. In the context of our example DAG, the scheduler attempts to find a previous run for our DAG in its database. However, it won't find any because it is our first run. In our case, the scheduler will try to calculate the date for the next run based on the start date the DAG was instantiated with:def following_schedule(self, dttm): ... cron = croniter(self._schedule_interval, dttm) following = timezone.make_aware(cron.get_next(datetime), self.timezone) ... next_run_date = following_schedule(start_date) # start_date=datetime(2018, 6, 6). For our DAG this will yield datetime(2018, 6, 6, 2)The period_end is calculated after the next_run_date is calculated, . period_end is calculated as:
period_end = following_schedule(next_run_date) # period_end=datetime(2018, 6, 7, 2)A DAG run for our DAG will be created if the following conditions are met:
- next_run_date <= timezone.utcnow() # next_run_date is execution_date
- period_end <= timezone.utcnow()
DAGRun
A DAGRun is an object in airflow that represents an execution/instance of a specific DAG. Amongst other fields, it contains the execution_date, start_date and end_date. For our first DAG run, the scheduler will create a DAG run object with the following properties:DAGRun( ... execution_date=next_run_date, # NOTE THIS HERE! start_date=timezone.utcnow(), ... )If we were to unpause the Foo DAG at 2018-06-07T14:00:00 the scheduler will schedule the first DAGRun with the following params:
DAGRun( ... execution_date=datetime(2018, 6, 6, 2), start_date=datetime(2018, 6, 7, 2, 0, 1, 45), # the start_date will be slightly after 2018-06-07T14:00:00 because the scheduler periodically checks if a DAGRun should run ... )Key Takeaway: The execution_date of a DAGRun is not when the DAG starts. The general rule of thumb is: the execution_date is one cron iteration prior to when the DAG Run is supposed to be scheduled to run. For example, if a job is supposed to run every hour, the execution_date of the DAGRun created at approximately 2 PM will be 1 PM.
Subsequent DAGRuns
The scheduler will be able to find a previous run for all subsequent DAGRuns. Therefore the execution_date for the new DAGRuns will simply be:next_dag_run = following_schedule(last_scheduled_run)
The two conditions will still have to be met. From here onwards, the Foo DAG will run every day at 2 AM until with the previous day's 2 AM as the execution date forever!