Apache Airflow 6 min read 19 March 2024

Airflow — pools and mutexes.

Author
Amadeusz Kosik
Amadeusz Kosik

Big Data Engineer

Share
Share:

Although the ideal data pipeline is made of idempotent and independent tasks, there are some cases when setting up a mutex (a.k.a. part of the job that cannot be run concurrently) is necessary. Fortunately, Airflow supports such cases and offers a few tools varying by complexity to implement such a pipeline.

In this article, we will look at the following DAG in AF. The graph itself is relatively simple; the catch is that load_1 and load_2 operators cannot have concurrently running task instances. We will look at treating loads separately and looking at them as a group.

Therefore, there are three scenarios we will look into:

  1. Only one instance of the operator can be running.
  2. One instance is that the operator cannot run before the older ones are successful.
  3. Only one instance of a group of operators.

The examples are using annotation syntax for Airflow. Still, the concept stays the same for the 1.0 compatible approach – instead of annotation params, use them in any Operator class constructor.

Mutex on an operator

The first scenario is that only one operator instance can be running at a time. If there are multiple runs ready to be scheduled, it does not matter which one goes first. One solution would be using the max_active_tis_per_dag option with the value of 1. 

@task(
   max_active_tis_per_dag=1
)

Dependency on past runs

The second example assumes that the nth batch cannot start before the nth-1 one is completed successfully (or at least marked so in Airflow). For this use case, AF offers depends_on_past flag. In this case, you have to be careful and pay some attention to the state of the latest runs. One failed, upstream-failed, or waiting task can halt all future DAG runs.

@task(
   depends_on_past=True
)

Pools – mutex across multiple operators

The most complex approach is required if we need to group multiple operators to make them share a mutex. One way to do it is to put them into one custom pool and limit it to accommodate only one task simultaneously – either by setting a pool with 1 slot or assigning a high number of required slots to each operator.

load_pool = Pool.create_or_update_pool(
   name="load_pool",
   slots=1,
   description="Pool for data load tasks."
)

@task(
   pool=load_pool.pool
)

Source

The source code for all examples and the docker environment to run them is available on GitHub.

Share
Share: