Apache Airflow fundamentals

Start the project #

Astronomer CLI quickstart guide

To prevent error like this:

Env file ".env" found. Loading...
buildkit not supported by daemon
Error: command 'docker build -t astronomer_f5dd6c/airflow:latest failed: failed to execute cmd: exit status 1

You need to update your Docker config with this piece of code:

"features": {
    "buildkit": false
}

Source of the solution

mkdir <directory-name> && cd <directory-name>
astro dev init
astro dev start

It starts 3 containers:

  • Postgres: Airflow’s Metadata Database
  • Webserver: The Airflow component responsible for rendering the Airflow UI
  • Scheduler: The Airflow component responsible for monitoring and triggering tasks

The essentials #

Airflow if an orchestrator for creating dynamic data pipelines and executing tasks in the right order and at the right timec.

Airflow benefits:

  • dynamic - data pipelines are described as a python script.
  • scalable - you can choose required type of executor (kubernetes for example) to run as many tasks as you need in distributed environment
  • interactive - there are 3 ways to interact with Airflow:
    • Web UI
    • CLI
    • REST API
  • extensible - you can customize Airflow by creating new plugin

NB: Airflow IS NOT:

  • a streaming solution
  • a data processing framework (if you need to precess TBs of data, use the Airflow to trigger the Spark job)

Core components #

  • Web Server - Flask server with GUnicorn, serving the Web UI
  • Scheduler - the heart of Airflow; responsible for schaduling and triggering tasks; for HA purposes you can run multiple schedulers
  • Metadata Database - you can use any compatible with SQLAlchemy DB (even MongoDB, but with limitations, for example you can’t run multiple schadulers)

Additional components:

  • Executor - defines how your tasks are going to be executed by Airflow
    • Queue
  • Worker - defines where your tasks are actually executed; it is a process there task is executed

2 Common Architectures #

Single node #

Node:

  • Web Server
  • Metastore
  • Scheduler
  • Executor
    • Queue

Workflow:

  • Web Server interacts with Metastore and fetchs required data:
    • statuses of tasks
    • users
    • permissions
    • etc
  • If the task is ready to be scheduled, the Scheduler will:
    • change the status of the task in the Metastore
    • create the instance object of this task
    • send the task object to the Queue of the Executor
  • After that the task is ready to be fetched by the Worcker
  • Executor interacts with Metastore and updates the task status

The Web Server doesn’t interact directly with the Scheduler nor Executor.

Multi nodes (Celery) #

Node1:

  • Web Server
  • Scheduler
  • Executor

Node2:

  • Metastore
  • Queue (in this case we need dedicated queue like RabbitMQ or Redis)

Worker Node 1-n

  • Airflow Worker

Workflow:

  • Web Server interacts with Metastore like in the Single Node installation
  • Scheduler interacts with the Metastore and the Executor
  • When the task is ready to be scheduled, it sends to the Executor
  • The Executor resends the task to the Queue
  • From the Queue task will be pulled and executed by the Workers

Core Concepts #

DAG - in Airflow is a Data pipeline.

Operator - is an object for creating tasks in the a DAG.

Types of operators:

  • action - allow you to execute something in your data pipeline; examples:
    • python operator
    • shell op
    • postgres op
    • etc
  • transfer - allow you to transfer data from the source to the destination; example:
    • mysql-to-presto operator
  • sensor - is a kind of a trigger for tasks; example:
    • file sensor - will wait until some file will placed in a specific directory

Task - is an instance of an operator in a DAG. When the task is ready to be scheduled, it becomes to:

Task instance object - represents a specific run of a task: DAG + Task + Point of time

Dependencies - describes relations between operators in a DAG

Workflow - combination of all concepts described above.

+---------------------------------+
| DAG                             |
| +----------+       +----------+ |
| | Operator |       | Operator | |
| | +------+ |       | +------+ | |
| | | Task | |------>| | Task | | |
| | +------+ |       | +------+ | |
| +----------+       +----------+ |
+---------------------------------+

Task Lifecycle #

When you add new DAG file into the DAGs folder it parses by:

  • Web Server - it parses DAGs every 30 seconds by default
  • Scheduler - parses NEW files every 5 minutes by default

After that the Scheduler creates a DagRun object in the Metastore. This object is an instance of your DAG.

When the Task is ready to be scheduled, the Scheduler creates a TaskInstance object in the Metastore. And it has the status Scheduled.

When the Scheduler asigns the Task to the Executor, it has the status Queued.

When the Task has the Queued status, the Executer is ready to taje that task and execute it on the Worcker. Now the status is Runing.

While the task is executes, the Executor updates its status in the Metastore and when it will be done, the status will changed to Success.

The Scheduler checks the status of the TaskInstance object and if it is Success and there are no more tasks in this DAG.

Only after that the Web Server updates the UI.

Extras and providers #

Extras package - extends the functionality of the Airflow, for example it allows to run the AirFlow in the Celery mode. It extends the core of the Airflow.

Proveders allows you to add new functionality on top of the Airflow. Providers are separeted from the Airflow’s core. For example - PostgreSQL provider.

Upgrading Airflow #

  1. DB
  • create a backup of the Metastore
  1. DAGs
  • make sure the is no deprecated features in your DAGs.
  • pause all the DAGs and make sure no tasks is running
  1. Upgrade Airflow
  2. Upgrade the DB by
airflow db upgrade
  1. Restart:
  • Scheduler
  • Web Server
  • Workers

Interacting with Apache Airflow #

CLI #

airflow db init # Initialize the Metastore DB and generate required files and directories
airflow db upgrade # Upgrade DB's schema
airflow db reset # Erase the DB

airflow webserver # Start the Web Server
airflow scheduler # Start the Scheduler
airflow celery worker # Start the Worker

airflow dags (un)pause # (Un)pause a DAG
airflow dags trigger # Trigger a DAG
airflow dags list # List all DAGs

airflow tasks list ${DAG_ID} # List all tasks of the specific DAG
airflow tasks test ${DAG_ID} ${TASK_ID} ${EXECITION_DATE} # Usefull on adding new tasks, it allows to execute particular task regardles its dependencies; date format is YYYY-MM-DD

airflow dags backfill -s ${YYYY-MM-DD} -e {YYYY-MM-DD} --reset_dagruns ${DAG_ID} # Allows to rerun the past DAGs

REST API #

Stable API documentation

DAGs and Tasks #

DAG seketon #

Example of the simples but valide empty DAG:

from airflow import DAG

with DAG(dag_id='dag_name') as dag:
    None

You just need to put this file into the DAG’s folder.

By default the DAG has no owner and it is scheduled to execute every day at 00:00:00.

Demystifying DAG Scheduling #

  • start_date - defines the date at which your DAG starts being scheduled
  • schedule_interval - define a frequency at which your DAG will be triggered; 24 hours by default
    • NB: DAG will be triggered at start_date + scheduled_interval
  • execution_date - begining of the execution period (equals to start_date)
  • start_date will shifted to start_date + scheduled_interval
  • end_date - the date at whict your DAG won’t be scheduled anymore
from airflow import DAG
from airflow.operators.dummy import DummyOperator

from datetime import datetime

# Define the start_date for the whole DAG
with DAG(dag_id='simple_dag', schedule_interval="*/10 * * * *", start_date=datetime(2021, 1, 1)) as dag:
    task_1 = DummyOperator(
        task_id='task_1',
        # Define the start_date for the specific task, but it's a bad practice
        start_date=datetime(2021, 1, 2)
    )

By default the datetime in UTC and all dates in Airflow are stored in UTC.

If you define the start_date in the past, by default Airflow will trigger all non triggered dagRuns between the start_date and the current date.

NB: NEVER use datetime.now() as a start_date, because of start_date shifting before each DAG scheduling, this DAG will never run.

schedule_interval parameter may be represented as:

  • a cron-string (defines the absolute time):
    • */10 * * * *
  • a preseted value like:
    • @daily
    • @weekly
  • timedelta object (defines relative for previous run time):
    • timedelta(hours=7)
    • timedelta(days=1)
  • None (in this case DAG will never being autimatically triggered by scheduler)

Backfilling and Catchup #

Backfilling allows you to run or rerun past non triggered or already triggered dagRuns.

By default Airflow will rau all non triggered dagRuns between the start_date and the current date.

We can enable or disable the behavior with catchup: bool = True parameter of the DAG() object. If you set this parameter to False, only the latest non triggered dagRun will be scheduled.

At the addition you can set max_active_runs: int to specify max number of runs for specific DAG, which allowed to run at the same time. With catchup=True and this parameter you can limit number of simultaneously runned DAGs.

Even if the catchup=False you still can start backfilling process from CLI.

Focus on operators #

An operator is a task in your DAG.

If you have more than one buisness task you shouldn’t put all of them into a single operator, because if one of this tasks fails, on retry you will rerun all of this tasks.

Onother words, you should to create a separate operator for each your task. For example data loading and data cleeaning should be executed in separete operators.

Operator should be idempotent.

You can set default arguments for each task with default_args: dict parameter of DAG object:

defaults_args = {
    'retry': 5,
    'retry_delay': timedelta(minutes=5)
}

with DAG(dag_id='simple_dag', default_args=defaults_args, ...) as dag:

    task_1 = DummyOperator(
        task_id='task_1',
    )

    task_2 = DummyOperator(
        task_id='task_2',
        # this parameter overrides the default
        retry=3
    )

    task_3 = DummyOperator(
        task_id='task_3',
    )

Documentation for the baseoperator class.

Executing python functions #

The Python operator is the most common used operator in the Airflow.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

defaults_args = {
    'retry': 5,
    'retry_delay': timedelta(minutes=5)
}

def _downloading_data(my_param, ds, **kwargs):
    print('Data is downloaded')
    # call all kwargs
    print(kwargs)
    # call one specific common parameter
    print(ds)
    # call a custom parameter, defined in the Operator's os_kwargs argument
    print(my_param)

with DAG(dag_id='simple_dag', default_args=defaults_args, ...) as dag:
    downloading_data = PythonOperator(
        task_id = 'downloading_data',
        python_callable = _downloading_data
        # Here you can specify custom params
        op_kwargs = {'my_param': 42}
    )

To get access to the DAG context you just need to pass **kwargs to the python function of the name of the specific parameter.

Putting DAG on hold #

File sensor - special type of operator, which waits when a file will putted into specific place.

In UI:

Admin -> Connections -> +

  • conn id - id, which you will use in DAG’s code
  • conn type - is required connection unavailable, you can add it by install provider
  • extra - this field is not secure; you have to put a JSON to it and for file conn type, this JSON should contain only path to directory, where file should exists:
{
  "path": "/tmp/"
}

How to use the FileSensor operator:

from airflow.sensors.filesystem import FileSensor

with DAG(...) as dag:

    ...

    waiting_file = FileSensor(
        task_id = "waiting_for_data",
        fs_conn_id = "fs_default",
        filepath = 'my_file.txt'
        # Determines how often the connection will be checked; 30 sec by default
        poke_interval = 15
    )

Executing Bash commands #

from airflow.operators.bash import BashOperator

with DAG(...) as dag:

    ...

    processing_data = BashOperator(
        task_id = 'processing_data',
        bash_command = 'exit 0'
    )

Define the path #

After all tasks are implemented, you need to defile the dependencies between tasks.

There are two ways to define the dependencies between tasks:

  • with set_upstream or set_downstream methods
with DAG(...) as dag:

    ...

    downloading_data.set_downstream(waiting_file)
    processing_data.set_upstream(waiting_file)
  • with right (» set_downstream equivalent) or left (« set_upstream equivalent) bit-shift operator (more commonely used option)
with DAG(...) as dag:

    ...

    downloading_data >> waiting_file >> processing_data

chain() function #

Completely the same DAG you can get with the cain() function:

from airflow.models.baseoperator import chain

with DAG(...) as dag:

    ...

    chain(downloading_data, waiting_file, processing_data)

Multiple tasks #

To execute multiple tasks on the same DAG level (i.e. simultaneously), you just need to put them into the list.

with DAG(...) as dag:

    ...

    downloading_data >> [ waiting_file, processing_data ]

Cross dependencies #

Also you can define cross dependencies between tasks:


from airflow.models.baseoperator import cross_downstream

with DAG(...) as dag:

    ...

    cross_downstream([downloading_data, sone_another_task], [waiting_file, processing_data])

Here we will get:

downloading_data » [waiting_file, processing_data] AND sone_another_task » [waiting_file, processing_data]

This function required because you couldn’t create dependencies directly between lists of tasks.

Exchanging Data #

XCom (cross communication) - is the mechanism, which allows you to exchange smal amount of data between tasks.

In the UI you can access all the XComs via Admin->XComs menu.

The 1st way to implement XUom is returning data from the function

For getting access to the XCom from another task, we have to use the context of a dagRun and use the ti (for taskinstance ) object. With help of this object we can pull the required XCom:

def _first_task():
  ...
  return 42

def _seconf_task(ti):
  my_xcom = ti.xcom_pull(key='return_value', task_ids=['first_task'])
  ...

Also you may push some value to XCom without returning from the function:


def _another_task(ti):
  ...
  ti.xcom_push(key='my_key', value=42)

NB: XComs are storred in the Metadata DB and they are limited in size, based on the DB which you use:

  • SQLite - up to 2Gb
  • PostgreSQL - up to 1 Gb
  • MySQL - up to 64 Kb (sic!)

Failure #

If the task is finished with an error, Airflow automatically trys to run it again.

You can rerun the task manually from the UI by clicking clear button for the required task_instance. For applying it to the multiple tasks at the same time, use Browse->Task Instances menu and search filters on that page.

You can enable email notifications by setting:

default_args = {
  ...
  'email_on_failure': True,
  'email_on_retry': True,
  'email': 'admin@example.com'
}

Also you may add some custom logic on failure, by specifying argument on_failure_callback for the operator.

Executors #

The default executor #

Executor defines where your tasks are going to executed in your Airflow instance.

The default executor is SequentialExecutor and it is based on SQLite), so it executes task only one after another.

Sequential is usefull for debugging purpose or for experiments.

You may configure the executor in Airflow’s main config: /usr/local/airflow/airflow.cfg (in the scheduler container).

For configuration you should pay attention for these params:

  • executor
  • sql_alchemy_conn

Concurrency. The parameters you must know. #

Parameters:

  • parallelism (default is 32) - defines the max number of tasks that you can execute at the same time.
  • dag_concurrency (16) - defines the number of tasks that can be executed in parallel across all of the dagRuns
  • max_active_runs_per_dag (16) - defines the max number of dagRuns that can run at the same time for a given DAG
  • max_active_runs - defines the max number of simultaneos dagRuns for a specific DAG
  • concurrency - set the number of tasks for a given DAG

Start scaling the Airflow #

Local executor - allows you to execute multiple tasks at the same time on a single macine. Each triggered task is executed in a subprocess.

To use it, you need to change the executor parameter to LocalExecutor and add connection for DB like PostgreSQL.

Scaling to the infinity #

Salary executor #

Salary is a kind of a distributed task queue.

Example of an architecture:

+----------------+       +------------------+      +----------+
| Node 1         |       | Node 2           |   +--| Worker 1 |
| +------------+ |       | +--------------+ |   |  +----------+
| | Web Server | |       | | Metadata DB* | |  2|  +----------+
| +------------+ |       | +--------------+ |   |  | Worker 2 |
| +------------+ |   1   | +--------------+ |   |  +----------+
| | Scheduler  |-|-------|>|    Queue**   |<|---+  +----------+
| +------------+ |       | +--------------+ |      | Worker 3 |
+----------------+       +------------------+      +----------+

* - for example - PostgreSQL
** - RabbitMQ or Redis

1 - a task is pushed by te executor to the queue
2 - one of the workers pulls the task from the queue and execute the task

Each worker machine must have Airflow installed as well as all dependencies for each DAG.