The technicalities 7 min read 8 February 2024

Passing information between DAGs in Airflow.

Author
Amadeusz Kosik
Amadeusz Kosik

Big Data Engineer

Share
Share:

There are data pipelines where you must pass some values between tasks – not complete datasets, but ~ kilobytes. This can be managed even within the Airflow itself. As always, multiple options are available – let’s review some of them.

In this article, we are looking at sharing data between DAGs, which are connected via run dependencies. Let’s assume that each DAG needs to be run daily, and the first DAG generates some important data for the second DAG. 

XCom

XCom would be the first and the recommended approach. It works well with out-of-the-box features like ExternalTaskSensor:

with DAG(
       dag_id="xcom-sink",
       schedule_interval="@daily",
       start_date=datetime(2023, 7, 1),
       catchup=True,
) as xcom_sink:
   ExternalTaskSensor(
       task_id="wait-for-dependency",
       external_dag_id="xcom-source",
       external_task_id="update-hive-table-events-triggers"
   ) >> BashOperator(
       task_id="show-xcom",
       bash_command="echo {{ ti.xcom_pull(dag_id='xcom-source', task_ids='update-hive-table-events-triggers') }}"
   )

XCom vs run id

The XCom is identified by the DAG ID, task ID and run ID. If you want to run a single DAG with a custom run ID, you have to ensure there is an XCom value for that run ID already created. This complicates issuing manual runs.

When not to XCom?

Rather than discussing cases that are tailored to use XCom, let’s focus on examples that are not well supported. Basically, XCom does not work well with Datasets, and you might get some quirky results here: https://github.com/apache/airflow/discussions/33069

With datasets, you need to refer to the last past value of XCom, effectively losing all benefits of tight coupling. Using that parameter, you will need to deal with race conditions: if the not-latest sink DAG is restarted, it will receive an incorrect value from the source. Moreover, let’s consider a design with multiple sources and a single sink:

Suppose the events dataset gets updated twice and the users’ dataset only once. In that case, the events-with-users will receive only the latest value of the former one. It is up to you to decide whether this behaviour is expected or unacceptable.

Variables

In rare cases, you can use Variables from Airflow to solve the issue. Variables are built-in mechanisms in Airflow that provide a global state (or configuration) for all DAGs to read and write. You can use it to implement the Registry code pattern:

  1. Put a hashmap of the date of run -> metadata into a variable
  2. Use the execution date (named logical_date in the newer versions) as the hashmap key.
  3. Use PythonOperator to update the variable. Read can be done by either Python code or templating.
  4. Use sensors or datasets to schedule DAGs in the correct order.

Beware!

Before you go with the variable route, please keep in mind that compared to XComs, variables are way more costly to maintain. You might need to keep track of the variables’ sizes, implement error handling in your PythonOperators and be mindful of any unsolicited changes in the variables’ values.

External system?

There is always an option of using an external data service to synchronise, similarly to using built-in variables. This may come in the form of data paths on HDFS / S3 / other storage, batch load dates in the database or such. However, this solution creates an inferior design, as you would end up with disadvantages of the variables approach and new implicit dependencies between DAGs and external services.

Summary

XCom + datasetXCom + sensorVariablesExternal
+ no tight coupling between DAGs+ supports 1:1 task run relationships+ works with both sensors and datasets+ works even with 3rd party systems
– race conditions,

– no guarantee for 1:1 run relationships
– a bit tighter coupling

– for manual run one must supply by hand the exec_date
– handles only data sharing, does not handle orchestration

– requires way more effort than XCom
– requires even more effort

– hidden dependencies outside AF

– complicated architecture
Comparison

Source

The source code for both XCom and Variable examples and the docker environment to run them is available on GitHub.

Share
Share: