Motivation

The core motivator of this proposal is the ability to efficiently perform setup and teardown activities in a DAG. One simple example of this is provisioning an external resource for a DAG run, like an EMR cluster: create a cluster on DagRun before the first task, and to easily and cleanly tear it down after the last task has finished, even in case of failures – Airflow itself would benefit from this ability in our "system"/integration tests!

This setup and teardown feature also plays a role when performing data processing inside Airflow. So far data handling with Airflow tasks has been relatively simplistic. Prior to AIP-48 Airflow has depended on a feature called XCom for passing data between tasks. This data is persisted in the Airflow metadatabase in the course of DAG execution and was originally intended for small data elements between tasks within the same DAG. As part of the Airflow 1.10.12 release, this feature was extended to support custom XCom data handlers to enable persistence of XCom data in external data sources such as S3 and Google Cloud Storage, thereby enabling large data sets to be passed between tasks using XCom as the core abstraction. Further, since the Airflow 2.0 release, users can pass data between tasks within a DAG through the TaskFlow API.

However, the XCom data is never cleaned up by Airflow. As this data grows larger, system performance can degrade and costs can increase over time. The current mitigation is to use maintenance DAGs or “airflow db clean” to clean up older XCom data, but this can be problematic when working with external data stored using custom XCom backends.

Especially as there are growing needs to share large data sets across tasks and across DAGs, using XCom and hopefully soon with Datasets as envisioned with AIP-48, this problem will only get worse. This should be cleanly handled by Airflow and adding this capability to build this is the core motivation for this proposal.

What's the TL;DR?

Add a new syntax for marking tasks as setup/teardown that:

  • Teardown tasks will still run even if the upstream tasks have otherwise failed
  • Teardown tasks failing don't always result in the DagRun being marked as failed (up to DAG author to choose failure mode)
  • Automatically clear setup/teardown tasks when clearing a dependent task

Setup and Teardown Task Semantics

The behavior of setup and teardown tasks is governed by just a few fundamental principles:

  • a teardown will run if its setup is successful and its upstream tasks are all done
  • the setups "required by" a task are cleared when the task is cleared

Users must define which setups are for which teardowns by setting upstream relationships between them.  For the purpose of clearing, Airflow will infer that a setup is "required by" a work task by looking at at the downstream teardowns.

Let's walk through some examples to clarify the behavior.

Teardown run conditions

Teardown tasks will run if the setup was successful and the upstream tasks are all done.

Consider the following example

[setup1, setup2] >> work1 >> [teardown1, teardown2]
setup1 >> teardown1
setup2 >> teardown2

In this example, work1 will run only if both setup1 and setup2 are successful.  Teardown1 will run if setup1 is successful and work1 is done.  Teardown 2 will run if setup1 is successful and work1 is done.

Clearing a work task clears its setup and teardown

In the above example, If you clear work1, then both setup1 and setup2 will be cleared.  As well, since their respective setup tasks were cleared, teardown1 and teardown2 will also be cleared.

Here's a more interesting example:

setup1 >> work1 >> teardown1
work1 >> work2
setup1 >> teardown1

Here, if we clear work1, then both setup1 and teardown1 will be cleared.  However, if we clear work2, the setup and teardown will not be cleared.  We can infer that work2 does not depend on setup1 because teardown1 is attached to setup1 and may run in parallel with work2.

Teardown "leaves" are ignored when setting deps on a group

The most important is that task group teardowns are ignored when arrowing from task group to task.

Consider this example:

with TaskGroup("my_group") as tg:
	s1 = setup1()
    t1 = teardown1()
    s1 >> work1() >> t1
	s1 >> t1
tg >> work2()

When applying tg >> work2 , airflow will ignore the teardown leaves of tg and will recurse to find the last non-teardown task.  So, tg >> work2  has the effect of work1 >> work2 .  This way, if work1 fails, teardown1 will still run, and work2 will not run, and the dag run will be a failure.  Conversely, if work1 succeeds, then work2 will run even if teardown1 fails, and if work2 succeeds the dag suceeds.

Graph:

More examples

One of the powerful things about setup and teardown is that it allows a user to separate infra components from the actual work in a pipeline.  So if your teardown fails, your dag can keep marching on.  To facilitate this, we've made some enhancements to the authoring API to reflect these new types of tasks.

Task group to task group

We can expand on this by chaining two task groups.

with TaskGroup("my_group1") as tg1:
    setup1 >> work1 >> teardown1
    setup1 >> teardown1
with TaskGroup("my_group2") as tg2:
    setup2 >> work2 >> teardown2
    setup2 >> teardown2
tg1 >> tg2

The above dag is equivalent to the following:

setup1 >> work1 >> teardown1
setup1 >> teardown1
setup2 >> work2 >> teardown2
setup2 >> teardown2
work1 >> setup2

So, setup2 will only run if work1 completes successfully, no matter whether teardown1 is successful.  If work1 is skipped, then setup2, and its downstreams will be skipped.

If work2 is cleared, then setup2 and teardown2 will be cleared.

Here's the graph:

Teardown without setup

It's permissible to have a teardown with no setup

w1 >> w2 >> t1

In this case, t1's trigger rule will in effect be ALL_DONE, but it will still have the same features i.e. configurable impact on dag run state and ignored as group leaf.

Setup without teardown

It's persmissible to have a setup with no teardown task.  Then all downstream tasks are assumed to require the setup.

s1 >> w1 >> w2

In this example if you clear w1, then s1, w1 and w2 will be cleared.  If you clear w2 then just s1 and w2 are cleared.

If we want to signal that w2 does not require s1, then we can add an empty teardown t1 after w1.

s1 >> w1 >> [w2, t1]
s1 >> t1


Failure of setup tasks

Normal tasks which are downstream of setup tasks will evaluate the failure of the upstream setup tasks according to their trigger rules, just like any other task.  Accordingly, by default a "normal" task will not run if its setup is not successful.  For teardowns, a teardown will not run unless its upstream setup is successful.

Failure of teardown tasks

Teardown tasks are not handled any differently from normal tasks with respect to trigger rules.  Generally it's assumed that teardowns will not have downstreams.  That's not to say that the dag doesn't continue on after the teardown; just that the teardown's success or failure doesn't determine whether the dag continuesn.  For this reason, by default when you arrow task_group >> task , the teardown leaves in task_group  are ignored.  But if for your particular workflow, you need an outer teardown to wait for an inner teardown, you may arrow them and the outer teardown will wait for the inner to complete, and will run even if the inner one fails.  Consider this example:

with dag:
    dag_setup = my_setup("dag_setup")
    dag_work = my_work("dag_work")
    dag_teardown = my_teardown("dag_teardown")
    dag_setup >> dag_work >> dag_teardown
    dag_setup >> dag_teardown
    with TaskGroup("my_group1") as tg1:
        setup1 = my_setup("setup")
        work1 = my_work("work")
        teardown1 = my_teardown("teardown")
        setup1 >> work1 >> teardown1
        setup1 >> teardown1
    dag_setup >> tg1 >> dag_teardown

This results in this graph:

Note that there is no relationship between teardown in group1 and the dag teardown.  This is because generally speaking, task_group >> task  ignores the teardown leaves, because it's assumed that the tasks downstream of a group don't care about that group's teardowns, and the "success" of a group should in effect be driven by the work tasks and not the teardowns.

If however the user doesn't want dag_teardown to run until the group teardown runs, the user may arrow these tasks, and the dag teardown will wait for the group teardown.

Teardown tasks run even when work tasks fail

Compared to a normal task which will examine the state of the upstream tasks, Teardown tasks will always run as long as the setup task (if there is one) completed successfully.

Teardown failure doesn't have to result in DagRun failure

By default, failure of teardown doesn't result in dag run failure, but user may override this.

Suppose you have a dag that loads some data then does some cleanup operations in a teardown task.  For the purpose of determining dag run state, you might only care that the data was loaded.  So, the default behavior, to ignore teardowns for the purpose of determining dag run state, would be appropriate.  But you can override this behavior with the param `on_failure_fail_dagrun=True`.

Clearing automatically clears setup & teardown too

When a task in a DAG with setup or teardown tasks is cleared, the relevant setup and teardown tasks will also be cleared.  This is one key feature that cannot be easily achieved right now in Airflow.

Airflow will determine "relevant" by looking at teardowns: if the task is in between a setup and its teardown, we'll assume it's relevant.

Example 1:

my_setup >> work1 >> my_teardown
my_setup >> my_teardown

In example 1, we can infer that work1 requires my_setup, so it will be cleared. And since my_setup is torn down by my_teardown, we clear it as well.

Example 2:

[setup1, setup2] >> work1 >> teardown1 >> work2 >> teardown2
setup1 >> teardown1
setup2 >> teardown2

Here work1 requires both setup1 and setup2, so both of these plus their teardowns will be cleared.

But work2 does not require teardown1 (since it will run before work2) therefore we can infer that it doesn't need setup1.  So when work2 is cleared, only setup2 and teardown2 will be cleared.


DAG Authoring syntax

Use with Taskflow API

from airflow import DAG, task, setup, teardown
 
 
with DAG(dag_id='test'):
    @setup
    def create_cluster():
        ...
        return cluster_id
 
    @task
    def load(ti):
        # Example:
        cluster_id = ti.xcom_pull(task_id="create_cluster")
 
    def summarize():
        ...
 
    @teardown(on_failure_fail_dagrun=False)
    def teardown_cluster():
        ...
       cluster_id = ti.xcom_pull(task_id="create_cluster")
 
    setup_task = create_cluster()
    teardown_task = teardown_cluster()
    setup_task >> load() >> summarize() >> teardown_task
    setup_task >> teardown_task    

Use with classic operator

These two decorators will be written such that they will also work for classic operators:

with DAG(...):
    job_flow_creator = EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
    ).as_setup()

As a style point, having setup(MyOperator()) didn't seem to fit conceptually with us, so we propose re-exporting `setup` and `teardown` as `setup_task` and `teardown_task` from the airflow module (lazily, like everything else there).

Use with TaskGroups

To add a setup/teardown task to a task group, instantiate it within an active TG scope:

with TaskGroup("my_tg"):

    @setup
    def my_tg_setup():
        ...


    my_tg_setup()

Use with any decorated operator, python is automatic

The `@setup` and `@teardown` decorators will work against a plain function (which is automatically converted to a `@task.python` function, and against an already decorated function to allow setup tasks to run in Docker etc.

All of these are valid:

@setup
@task
def already_decorated():
    ...


@setup
def automatic_python_task():
    ...


@setup
@task.docker
def docker_setup():
    ...

One-liner syntax

This will mark task1 as a setup task, task3 as a teardown, and arrow the two of them.  This means you don't have to declare a task as a setup or teardown when defining it, which is particularly helpful if using reusable taskflow tasks.

task1 >> task2 >> task3.as_teardown(setups=task1)

Context manager

Automatically configure setup and teardown for a group of tasks:

with my_teardown.as_teardown(setups=my_setup):
    my_work() >> my_other_work()

This would be equivalent to the following:

my_setup >> my_work >> my_other_work >> my_teardown
my_setup >> my_teardown

Implementation details

For teardown run conditions, we add a new trigger rule ALL_DONE_SETUP_SUCCESS and this is what is used by teardown tasks.  This rule will pass if all upstreams are done and at least one of its setups is successful (note: trigger rules only look at direct upstreams).

For clearing behavior, when a task is cleared we recurse upstream to find the "relevant" setups, and if we find any, we clear them and their teardowns. A setup is "relavant" to an object task if it is upstream of the object task and has a teardown that is downstream of the object task (i.e. if the object task is "in the middle" of a setup / teardown pair).  A teardown is "for" a setup if it is a direct downstream.

For proper execution flow, we ignore teardowns in task group leaves, so that task_group >> work ignores teardowns in task_group.

Future Work

  • Automatically convert dag-level callback functions to a teardown task (It means logs for them are now visible in the UI!)
    • This would be nice, but there are some semantics about setup/teardown on failure that make this hard


Notes on difference from original AIP doc

Setup and teardowns are defined at the task level and not at the task group level.  You can have multiple setups and teardowns in a group or at the top level in a dag.

Users must set dependencies explicitly just like normal tasks.

Appendix

Considired but not implemented

Some things considered but not (yet) implemented

Not implemented:  allow to use >>  to add to context

Currently with teardown context manager, need to instantiate in the context or use scope.add_task .

E.g. this won't work 

with t1:
    a

Since a  is not instantiated in the context, there's nothing to trigger it to attache itself to the teardown scope of t1.

But this works:

with t1:
    a()

Or you could do this:

with t1 as scope:
    scope.add_task(a)


Similarly, this doesn't work, for the same reason:

with t1:
    a >> b

But we could make this work by invoking the "attach to scope" logic in >> 

Not implemented: Setup and teardown groups

Prior to AIP-52, when you do task_group >> task , this is equivalent to the following:

for leaf in task_group.get_leaves():
    leaf >> task

But after AIP-52, you would generally want this operation to ignore teardown tasks in task_group.  But it may be desirable to create a group all of whose tasks are setups or teardowns.  And in this case, we would not want teardowns or setups to be ignored when determining roots and leaves.  To this end, you may designate a group as either setup or teardown, and when so designated, setups and teardowns won't be ignored when arrowing the group.  Here's an example dag that illustrates the feature.

with dag:
    with TaskGroup("group1") as tg1:
        setup1 = my_setup("g1_setup")
        work1 = my_work("g1_work")
        teardown1 = my_teardown("g1_teardown")
        setup1 >> work1 >> teardown1
        setup1 >> teardown1
    with TaskGroup("dag_setup", is_setup=True) as dag_setup_group:
        dag_setup1 = my_setup("dag_setup1")
        dag_setup2 = my_setup("dag_setup2")
    with TaskGroup("dag_teardown", is_teardown=True) as dag_teardown_group:
        dag_teardown1 = my_teardown("dag_teardown1")
        dag_teardown2 = my_teardown("dag_teardown2")
    dag_setup_group >> my_work("dag_work1") >> dag_teardown_group
    dag_setup_group >> tg1
    tg1 >> my_work("dag_work2") >> dag_teardown_group
    dag_setup1 >> dag_teardown1
    dag_setup2 >> dag_teardown2

And here's the graph:

Some notes on the behavior of this dag:

  • dag setups 1 and 2 can run in parallel and are the first tasks to run in this dag
  • group1 and dag_work1 won't start until both dag setups are done and successful
  • dag_teardown1 will run if dag_setup1 is successful and all its upstreams are done
  • dag teardowns 1 and 2 can run concurrently with g1_teardown
  • dag_work2 will only run if g1_work is successful