Airflow — pools and mutexes.
Although the ideal data pipeline is made of idempotent and independent tasks, there are some cases when setting up a mutex (a.k.a. part of the job that cannot be run concurrently) is necessary. Fortunately, Airflow supports such cases and offers a few tools varying by complexity to implement such a pipeline.
In this article, we will look at the following DAG in AF. The graph itself is relatively simple; the catch is that load_1 and load_2 operators cannot have concurrently running task instances. We will look at treating loads separately and looking at them as a group.
Therefore, there are three scenarios we will look into:
- Only one instance of the operator can be running.
- One instance is that the operator cannot run before the older ones are successful.
- Only one instance of a group of operators.
The examples are using annotation syntax for Airflow. Still, the concept stays the same for the 1.0 compatible approach – instead of annotation params, use them in any Operator class constructor.
Mutex on an operator
The first scenario is that only one operator instance can be running at a time. If there are multiple runs ready to be scheduled, it does not matter which one goes first. One solution would be using the max_active_tis_per_dag option with the value of 1.
@task(
max_active_tis_per_dag=1
)
Dependency on past runs
The second example assumes that the nth batch cannot start before the nth-1 one is completed successfully (or at least marked so in Airflow). For this use case, AF offers depends_on_past flag. In this case, you have to be careful and pay some attention to the state of the latest runs. One failed, upstream-failed, or waiting task can halt all future DAG runs.
@task(
depends_on_past=True
)
Pools – mutex across multiple operators
The most complex approach is required if we need to group multiple operators to make them share a mutex. One way to do it is to put them into one custom pool and limit it to accommodate only one task simultaneously – either by setting a pool with 1 slot or assigning a high number of required slots to each operator.
load_pool = Pool.create_or_update_pool(
name="load_pool",
slots=1,
description="Pool for data load tasks."
)
@task(
pool=load_pool.pool
)
Source
The source code for all examples and the docker environment to run them is available on GitHub.