The technicalities 15 min read 1 September 2023

Managing inter-DAG dependencies in Airflow

Author
Amadeusz Kosik
Amadeusz Kosik

Big Data Engineer

Share
Share:

In the real world, data pipelines only sometimes come as a completely independent sequence of operations. Usually, they share dependences on one another, occasionally easy 1:1 ones, sometimes more complicated. Here is a short list of what Apache Airflow has to offer for handling those relationships.

The general setting

We will look into a situation where parent data pipelines (called DAGs in Airflowish) create data used (consumed) by children pipelines. The dependency looks like this:

The silent assumption is that we cannot just look at the metadata (file creation time, last row’s created timestamp or alike), as it might show that data is being loaded but not exactly fully loaded – therefore, we are waiting for the entire dataset to be available.

External tools

Using an external tool is always available, not only for Airflow data pipelines. The most straightforward implementation would be using an 0-byte file on NFS, HDFS or any other shareable filesystem to mark success for a given dataset.  

This example is based on HDFS, but you could also use:

  • NFS,
  • FTP/SFTP location,
  • metadata stored in a SQL or NoSQL database,
  • any other accessible non-airflow related data storage.

Source code

For the current (4.1.0) version of the apache-airflow-providers-apache-hdfs, there is no HDFS operator; only sensors are available. Therefore a bit of walkaround (e.g. BashOperator) has to be used to create a marker file.

from __future__ import annotations


from datetime import datetime


from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.apache.hdfs.sensors.web_hdfs import WebHdfsSensor
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor


HDFS_CONNECTION_ID = "hdfs-14"
SFTP_CONNECTION_ID = "sftp-21"


with DAG(
	dag_id="external-events",
	schedule="@daily",
	start_date=datetime(2023, 7, 1),
) as dag_events:
	SFTPSensor(
    		task_id="wait-for-csv",
	    	path="/inbox/events/{{ dag_run.logical_date | ds }}.csv",
    		sftp_conn_id=SFTP_CONNECTION_ID
	) >> SparkSubmitOperator(
	    	task_id="csv-to-parquet",
    		application="/spark/applications/csv-to-parquet.py"
	) >> SparkSubmitOperator(
	    	task_id="update-hive-table",
    		application="/spark/applications/update-hive-table.py"
	) >> BashOperator(
	    	task_id="create-hdfs-marker-file",
    		bash_command="hdfs dfs -touch /marker/events/{{ dag_run.logical_date | ds }}.success"
	)


with DAG(
	dag_id="external-users",
	schedule="@daily",
	start_date=datetime(2023, 7, 1),
) as dag_users:
	SFTPSensor(
    		task_id="wait-for-csv",
	    	path="/inbox/users/{{ dag_run.logical_date | ds }}.csv",
    		sftp_conn_id=SFTP_CONNECTION_ID
	) >> SparkSubmitOperator(
	    	task_id="csv-to-parquet",
    		application="/spark/applications/csv-to-parquet.py"
	) >> SparkSubmitOperator(
	    	task_id="update-hive-table",
    		application="/spark/applications/update-hive-table.py"
	) >> BashOperator(
	    	task_id="create-hdfs-marker-file",
    		bash_command="hdfs dfs -touch /marker/users/{{ dag_run.logical_date | ds }}.success"
	)


with DAG(
	dag_id="external-events-with-users",
	schedule="@daily",
	start_date=datetime(2023, 7, 1),
) as dag_events_with_users:
	[
    	WebHdfsSensor(
        	task_id="wait-for-marker-events",
        	filepath="/marker/events/{{ dag_run.logical_date | ds }}.success",
        	webhdfs_conn_id=HDFS_CONNECTION_ID
    	),
    	WebHdfsSensor(
        	task_id="wait-for-marker-users",
        	filepath="/marker/users/{{ dag_run.logical_date | ds }}.success",
        	webhdfs_conn_id=HDFS_CONNECTION_ID
    	)
	] >> SparkSubmitOperator(
	    	task_id="compute-events-with-users",
    		application="/spark/applications/compute-events-with-users.py"
	) >> SparkSubmitOperator(
	    	task_id="update-hive-table",
    		application="/spark/applications/update-hive-table.py"
	) >> BashOperator(
	    	task_id="create-hdfs-marker-file",
    		bash_command="hdfs dfs -touch /marker/events-with-users/{{ dag_run.logical_date | ds }}.success"
	)


with DAG(
	dag_id="external-reports-1",
	schedule="@daily",
	start_date=datetime(2023, 7, 1),
) as dag_reports_1:
	WebHdfsSensor(
	    	task_id="wait-for-events-with-users",
    		filepath="/marker/events-with-users/{{ dag_run.logical_date | ds }}.success",
	    	webhdfs_conn_id=HDFS_CONNECTION_ID
	) >> SparkSubmitOperator(
	    	task_id="compute-report",
    		application="/spark/applications/compute-report-1.py"
	)


with DAG(
	dag_id="external-reports-2",
	schedule="@daily",
	start_date=datetime(2023, 7, 1),
) as dag_reports_2:
	WebHdfsSensor(
    		task_id="wait-for-events-with-users",
	    	filepath="/marker/events-with-users/{{ dag_run.logical_date | ds }}.success",
    		webhdfs_conn_id=HDFS_CONNECTION_ID
	) >> SparkSubmitOperator(
    		task_id="compute-report",
	    	application="/spark/applications/compute-report-2.py"
	)

This is a helpful technique if Airflow is to be integrated with other tools. On the other hand, the dependency is very implicit. It must be refactored carefully to maintain the contract between data pipelines.

Airflow datasets

Since version 2.4, Airflow has offered data-aware scheduling based on the concept of datasets. The general idea is:

1. Operators define datasets to which they publish data. The dataset in Airflow is just metadata – AF does not handle the data itself.

2. DAGs specify datasets that they depend on. Each time all dependency datasets are updated (once or many times), the dependent DAG is triggered. This setting replaces the time-based schedule.

Although this feature does not give any control over the time of the execution nor tie specific runs (there is no guarantee that DAG gets run on every separate dependency run – several runs may get squashed into a single dependent run), a user receives out-of-the-box a friendly UI for browsing datasets and DAGs relationships. 

Source code

from __future__ import annotations


from airflow import DAG, Dataset
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from pendulum import datetime




dataset_users = Dataset("af://datasets/users")
dataset_events = Dataset("af://datasets/events")
dataset_events_with_users = Dataset("af://datasets/events-with-users")
dataset_reports_1 = Dataset("af://datasets/reports-1")
dataset_reports_2 = Dataset("af://datasets/reports-2")


SFTP_CONNECTION_ID = "sftp-21"


with DAG(
	dag_id="dataset-events",
	catchup=True,
	start_date=datetime(2023, 7, 1, tz="UTC"),
	schedule="@daily",
	max_active_runs=1,
) as dag_producer_events:
	SFTPSensor(
        	task_id="wait-for-csv",
    	    	path="/inbox/events/{{ dag_run.logical_date | ds }}.csv",
    	    	sftp_conn_id=SFTP_CONNECTION_ID
	) >> SparkSubmitOperator(
    	    	task_id="csv-to-parquet",
    	    	application="/spark/applications/csv-to-parquet.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py",
    	    	outlets=[dataset_events]
	)


with DAG(
	dag_id="dataset-users",
	catchup=True,
	start_date=datetime(2023, 7, 1, tz="UTC"),
	schedule="@daily",
	max_active_runs=1,
) as dag_producer_users:
	SFTPSensor(
        	task_id="wait-for-csv",
    	    	path="/inbox/users/{{ dag_run.logical_date | ds }}.csv",
    	    	sftp_conn_id=SFTP_CONNECTION_ID
	) >> SparkSubmitOperator(
    	    	task_id="csv-to-parquet",
    	    	application="/spark/applications/csv-to-parquet.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py",
    	    	outlets=[dataset_users]
	)


with DAG(
	dag_id="dataset-events-with-users",
	catchup=True,
	start_date=datetime(2023, 7, 1, tz="UTC"),
	schedule=[dataset_events, dataset_users],
	max_active_runs=1,
) as dag_processor_events_with_users:
	SparkSubmitOperator(
    	    	ask_id="compute-events-with-users",
    	    	application="/spark/applications/compute-events-with-users.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py",
    	    	outlets=[dataset_events_with_users]
	)


with DAG(
	dag_id="dataset-reports-1",
	catchup=True,
	start_date=datetime(2023, 7, 1, tz="UTC"),
	schedule=[dataset_events_with_users],
	max_active_runs=1,
) as dag_processor_reports_1:
	SparkSubmitOperator(
    	    	task_id="compute-report",
    	    	application="/spark/applications/compute-report-1.py",
    	    	outlets=[dataset_reports_1]
	)


with DAG(
	dag_id="dataset-reports-2",
	catchup=True,
	start_date=datetime(2023, 7, 1, tz="UTC"),
	schedule=[dataset_events_with_users],
	max_active_runs=1,
) as dag_processor_reports_2:
	SparkSubmitOperator(
    	    	task_id="compute-report",
    	    	application="/spark/applications/compute-report-2.py",
    	    	outlets=[dataset_reports_2]
	)

Trigger the external DAG

Suppose the functionality of the datasets is too limited or an older version of the AF is used. In that case, there are alternative ways to orchestrate the data pipelines. TriggerDagRunOperator represents the push approach. The idea behind this operator is to trigger a run of a specified DAG with the option of supplying custom parameters. Obviously, this feature does not allow to create a dependency on more than one source dataset. However, one dataset can still run multiple dependents.

Since this approach uses the DAG name (id) to identify the dependent pipeline, you must be careful when refactoring those names. One way to look up the dependent (and depending) DAGs and tasks is to use the Browse > DAG Dependencies tab that shows a graph of trigger and sensor relationships.

Source code

from __future__ import annotations


from datetime import datetime
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.utils.task_group import TaskGroup
from airflow.utils.trigger_rule import TriggerRule




SFTP_CONNECTION_ID = "sftp-21"




with DAG(
	dag_id="trigger-events-with-users",
	schedule="@daily",
	start_date=datetime(2023, 7, 1),
) as dag_events_with_users:
	with TaskGroup(group_id="gather-events") as group_events:
    	SFTPSensor(
        	task_id="wait-for-csv",
        	path="/inbox/events/{{ dag_run.logical_date | ds }}.csv",
        	sftp_conn_id=SFTP_CONNECTION_ID
    	) >> SparkSubmitOperator(
        	task_id="csv-to-parquet",
        	application="/spark/applications/csv-to-parquet.py"
    	) >> SparkSubmitOperator(
        	task_id="update-hive-table",
        	application="/spark/applications/update-hive-table.py"
    	)


	with TaskGroup(group_id="gather-users") as group_users:
    	SFTPSensor(
        	task_id="wait-for-csv",
        	path="/inbox/users/{{ dag_run.logical_date | ds }}.csv",
        	sftp_conn_id=SFTP_CONNECTION_ID
    	) >> SparkSubmitOperator(
        	task_id="csv-to-parquet",
        	application="/spark/applications/csv-to-parquet.py"
    	) >> SparkSubmitOperator(
        	task_id="update-hive-table",
        	application="/spark/applications/update-hive-table.py"
    	)


	[group_events, group_users] >> SparkSubmitOperator(
    	    	task_id="compute-events-with-users",
    	    	application="/spark/applications/compute-events-with-users.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py"
	) >> [
    	    	TriggerDagRunOperator(
        	    	task_id="trigger-reports-1",
    	        	trigger_dag_id="trigger-reports-1",
        	    	trigger_rule=TriggerRule.ALL_SUCCESS,
    	    	),
    	    	TriggerDagRunOperator(
        	    	task_id="trigger-reports-2",
    	        	trigger_dag_id="trigger-reports-2",
           	 	trigger_rule=TriggerRule.ALL_SUCCESS,
    	    	)
	]


with DAG(
	dag_id="trigger-reports-1",
	catchup=True,
	start_date=datetime(2023, 7, 1),
) as dag_processor_reports_1:
	SparkSubmitOperator(
    	    	task_id="compute-report",
    	    	application="/spark/applications/compute-report-1.py",
	)


with DAG(
	dag_id="trigger-reports-2",
	catchup=True,
	start_date=datetime(2023, 7, 1),
) as dag_processor_reports_2:
	SparkSubmitOperator(
    	    	task_id="compute-report",
    	    	application="/spark/applications/compute-report-2.py",
	)

The sensor for external DAG

Using the sensor is exactly the opposite of triggering some other DAG. In this approach, we tell DAG to pause and wait until some other DAG (or a specific task in the DAG) is completed. We still need to specify appropriate schedules for both pipelines, however.

The sensor approach is very flexible. Unlike the datasets approach, each sensor instance can wait for a specific DAG/task/execution date combination, which allows waiting with specific time offset or model aggregation relationships (hourly to daily, daily to weekly, etc.). Unlike the triggering approach, a DAG can model waiting for all preconditions with the sensor approach.

Precisely, like when using DAG triggering, sensor dependency can be checked on the Browse > DAG Dependencies tab. 

Source code

from __future__ import annotations


from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.sensors.external_task import ExternalTaskSensor




SFTP_CONNECTION_ID = "sftp-21"




with DAG(
    	dag_id="sensor-events",
    	schedule_interval="@daily",
    	start_date=datetime(2023, 7, 1),
    	catchup=True,
) as sensor_events:
	SFTPSensor(
    	    	task_id="wait-for-csv",
    	    	path="/inbox/events/{{ dag_run.logical_date | ds }}.csv",
    	    	sftp_conn_id=SFTP_CONNECTION_ID
	) >> SparkSubmitOperator(
    	    	task_id="csv-to-parquet",
    	    	application="/spark/applications/csv-to-parquet.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py"
	)


with DAG(
    	dag_id="sensor-users",
    	schedule_interval="@daily",
    	start_date=datetime(2023, 7, 1),
    	catchup=True,
) as sensor_users:
	SFTPSensor(
    	    	task_id="wait-for-csv",
    	    	path="/inbox/users/{{ dag_run.logical_date | ds }}.csv",
    	    	sftp_conn_id=SFTP_CONNECTION_ID
	) >> SparkSubmitOperator(
    	    	task_id="csv-to-parquet",
    	    	application="/spark/applications/csv-to-parquet.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py"
	)


with DAG(
    	dag_id="sensor-events-with-users",
    	schedule_interval="@daily",
    	start_date=datetime(2023, 7, 1),
    	catchup=True,
) as dag_daily:
	[
    	    	ExternalTaskSensor(
         	   	task_id="wait-for-events",
        	    	external_dag_id="sensor-events",
    	        	check_existence=True
    	    	),
    	    	ExternalTaskSensor(
         	   	task_id="wait-for-users",
        	    	external_dag_id="sensor-users",
    	        	check_existence=True
    	    	)
	] >> SparkSubmitOperator(
    	    	task_id="compute-events-with-users",
    	    	application="/spark/applications/compute-events-with-users.py"
	) >> SparkSubmitOperator(
    	    	task_id="update-hive-table",
    	    	application="/spark/applications/update-hive-table.py"
	)


with DAG(
    	dag_id="sensors-reports-1",
    	schedule="@daily",
    	start_date=datetime(2023, 7, 1),
) as dag_reports_1:
	ExternalTaskSensor(
        		task_id="wait-for-events-with-users",
    	    	external_dag_id="sensor-events-with-users",
    	    	check_existence=True
	) >> SparkSubmitOperator(
    	    	task_id="compute-report",
    	    	application="/spark/applications/compute-report-1.py"
	)


with DAG(
    	dag_id="sensors-reports-2",
    	schedule="@daily",
    	start_date=datetime(2023, 7, 1),
) as dag_reports_2:
	ExternalTaskSensor(
    	    	task_id="wait-for-events-with-users",
    	    	external_dag_id="sensor-events-with-users",
    	    	check_existence=True
	) >> SparkSubmitOperator(
    	    	task_id="compute-report",
    	    	application="/spark/applications/compute-report-2.py"
	)

Short comparison

ExternalDatasetTriggerSensor
Multiple triggers for a single DAGYesNoYesYes (although a bit obscure)
Multiple requirements to run a single DAGYesYesNoYes
Dependencies on different time intervalYesNoNoYes
Strong pointsIntegration with 3rd party tools.Good lineage support in the UI.Push scheduling.Push scheduling.Allows execution time manipulation (daily to weekly DAGs).
Weak pointsSensitive to refactoring.No control over schedule intervals.Weak error handling. Any DAG can depend on only up to one DAG.Pull scheduling.
The options compared

Example scenarios

The end goal of running Airflow is to orchestrate an actual pipeline, so let’s discuss some actual-life examples where DAG dependencies are necessary.

Breaking down complex DAG

First, DAG is an example of a complex DAG used to calculate multiple outputs (exports) from some external inputs. For clarity, all operations are split into waiting (sensors), preprocessing and reports computation. The screenshot shows them in the form of a Graph view from Airflow.

The assumption is that all operations consume whole input datasets – they are not restricted to, e.g. current hour only. Otherwise, using sensors or external markers would be the right option.

This situation is where datasets (or, according to the docs – data-aware scheduling) show good results. For each branching point, we define an outlet – abstraction over the dataset produced by each operator (remember that it is only the metadata – handling the actual data is not done by Airflow and never should be). Then we define each chain of operations that takes those datasets as a separate DAG with datasets defined as the schedule. As a result, the DAGs are short & simple, and we got a bit of data lineage in the Datasets tab of Airflow for free:

Source code

from __future__ import annotations


from datetime import datetime


from airflow import Dataset
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.bash import BashSensor


dataset_clients = Dataset("af://dataset/clients")
dataset_events = Dataset("af://dataset/events")
dataset_transactions = Dataset("af://dataset/transactions")
dataset_users = Dataset("af://dataset/users")


with DAG(
	dag_id="complex-dag-clients",
	schedule_interval="@daily",
	start_date=datetime(2023, 7, 1),
) as complex_dag_clients:
	sensor_clients = BashSensor(task_id="wait-clients", bash_command="true")


	op_preprocess_step_1 = EmptyOperator(task_id="preprocess-step-1")
	op_preprocess_step_2 = EmptyOperator(task_id="preprocess-step-2")
	op_preprocess_step_3 = EmptyOperator(task_id="preprocess-step-3")
	op_preprocess_step_4 = EmptyOperator(task_id="preprocess-step-4", outlets=[dataset_clients])


	sensor_clients >> op_preprocess_step_1 >> \
    	[op_preprocess_step_2 >> op_preprocess_step_3] >> \
    	op_preprocess_step_4


with DAG(
	dag_id="complex-dag-events",
	schedule_interval="@daily",
	start_date=datetime(2023, 7, 1),
) as complex_dag_events:
	sensor_events = BashSensor(task_id="wait-events", bash_command="true")


	op_preprocess_step_1 = EmptyOperator(task_id="preprocess-step-1")
	op_preprocess_step_2 = EmptyOperator(task_id="preprocess-step-2")
	op_preprocess_step_3 = EmptyOperator(task_id="preprocess-step-3", outlets=[dataset_events])


	sensor_events >> op_preprocess_step_1 >> op_preprocess_step_2 >> op_preprocess_step_3


with DAG(
	dag_id="complex-dag-transactions",
	schedule_interval="@daily",
	start_date=datetime(2023, 7, 1),
) as complex_dag_transactions:
	sensor_gateway_01 = BashSensor(task_id="wait-gateway-01", bash_command="true")
	sensor_gateway_02 = BashSensor(task_id="wait-gateway-02", bash_command="true")
	sensor_gateway_03 = BashSensor(task_id="wait-gateway-03", bash_command="true")
	sensor_gateway_04 = BashSensor(task_id="wait-gateway-04", bash_command="true")
	sensor_gateway_05 = BashSensor(task_id="wait-gateway-05", bash_command="true")


	op_preprocess_step_1 = EmptyOperator(task_id="preprocess-step-1")
	op_preprocess_step_2 = EmptyOperator(task_id="preprocess-step-2")
	op_preprocess_step_3 = EmptyOperator(task_id="preprocess-step-3")
	op_preprocess_step_4 = EmptyOperator(task_id="preprocess-step-4")
	op_preprocess_step_5 = EmptyOperator(task_id="preprocess-step-5", outlets=[dataset_transactions])


	[
    	sensor_gateway_01,
    	sensor_gateway_02,
    	sensor_gateway_03,
    	sensor_gateway_04,
    	sensor_gateway_05
	] >> op_preprocess_step_1 >> op_preprocess_step_2 >> op_preprocess_step_3 >> \
    	op_preprocess_step_4 >> op_preprocess_step_5


with DAG(
	dag_id="complex-dag-users",
	schedule_interval="@daily",
	start_date=datetime(2023, 7, 1),
) as complex_dag_users:
	sensor_users = BashSensor(task_id="wait-users", bash_command="true")


	op_preprocess_step_1 = EmptyOperator(task_id="preprocess-step-1")
	op_preprocess_step_2 = EmptyOperator(task_id="preprocess-step-2")
	op_preprocess_step_3 = EmptyOperator(task_id="preprocess-step-3")
	op_preprocess_step_4 = EmptyOperator(task_id="preprocess-step-4")
	op_preprocess_step_5 = EmptyOperator(task_id="preprocess-step-5")
	op_preprocess_step_6 = EmptyOperator(task_id="preprocess-step-6", outlets=[dataset_users])


	sensor_users >> [op_preprocess_step_1, op_preprocess_step_2] >> op_preprocess_step_3 >> \
    	[op_preprocess_step_4, op_preprocess_step_5] >> op_preprocess_step_6


with DAG(
	dag_id="complex-dag-report-clients",
	schedule=[dataset_clients, dataset_events],
	start_date=datetime(2023, 7, 1),
) as complex_dag_report_clients:
	op_rich_clients = EmptyOperator(task_id="enrich-clients")
	op_report_clients = BashOperator(task_id="report-clients", bash_command="sleep 3")


	op_rich_clients >> op_report_clients


with DAG(
	dag_id="complex-dag-report-users",
	schedule=[dataset_events, dataset_users],
	start_date=datetime(2023, 7, 1),
) as complex_dag_report_users:
	op_rich_users = EmptyOperator(task_id="enrich-events")
	op_report_users = BashOperator(task_id="report-users", bash_command="sleep 3")


	op_rich_users >> op_report_users


with DAG(
	dag_id="complex-dag-report-full",
	schedule=[dataset_events, dataset_transactions, dataset_users],
	start_date=datetime(2023, 7, 1),
) as complex_dag_report_full:
	op_report_users = BashOperator(task_id="report-full", bash_command="sleep 3")

Daily to weekly transition

This example covers any time-based aggregation of the data. Regardless of the actual time intervals, the general idea is that multiple runs of one DAG are used as input for the other one:

  • 24 runs of the hourly pipeline for daily one,
  • 7 runs of the daily DAG for the weekly one.

This setting limits available options to using an external tool or implementing DAG-run sensors. Let’s take a look at the latter one.

The idea behind this diagram is simple – the daily pipeline is unaffected by the relationship. At the same time, the weekly one starts with seven sensors of ExternalTaskSensor, each with a different execution date offset. Those have been waiting for daily DAGs from 1 to 7 days ago. Apart from that set of sensors, the DAG is similar to others.

Source code

from __future__ import annotations


from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.sensors.bash import BashSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup


with DAG(
    	dag_id="dag-daily",
    	schedule_interval="@daily",
    	start_date=datetime(2023, 7, 1),
    	catchup=True,
) as dag_daily:
	BashSensor(task_id="some-input-wait", bash_command="true") >> \
    	BashOperator(task_id="some-data-processing-1", bash_command="sleep 3") >> \
    	BashOperator(task_id="some-data-processing-2", bash_command="sleep 3") >> \
    	BashOperator(task_id="exporting-to-hdfs", bash_command="sleep 3")


with DAG(
    	dag_id="dag-weekly",
    	schedule_interval="@weekly",
    	start_date=datetime(2023, 7, 1),
    	catchup=True,
) as dag_weekly:
	# TaskGroup is purely optional, but makes DAG in the UI a bit clearer.
	with TaskGroup(group_id="dag-daily-sensors") as sensors:
    	[
        	ExternalTaskSensor(
            		task_id=f"wait-{days_offset}d",
	            	external_dag_id="dag-daily",
       		     	timeout=24 * 60 * 60,
            		mode="reschedule",
	            	execution_delta=timedelta(days=days_offset)
        	) for days_offset in range(1, 8)
    	]


	sensors >> \
    		BashOperator(task_id="some-data-processing", bash_command="sleep 3") >> \
	    	BashOperator(task_id="exporting-to-hdfs", bash_command="sleep 3")

The source code for all examples and the docker environment to run them is available on GitHub.

Share
Share:

More insights