Status

StateDraft
Discussion Thread
Vote Thread
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

2024-03-05

Version Released
Authors

Jedidiah Cunningham Constance Martineau

This AIP is part of AIP-63, which aims to add DAG Versioning to Airflow.

Motivation

DAGs can change over time as business needs evolve. A key challenge faced by Airflow users today is understanding how a DAG was run in the past. When someone replaces a DAG in Airflow, Airflow will only represent the current (latest) version of the DAG within the user interface (UI), without references to prior versions.

In practice, this means that when trying to understand how an older DAG run ran, you are missing:

  • An accurate representation of the DAG structure;
  • The code used to generate the DAG structure;
  • Access to Task Instance logs for tasks that no longer exist;

At first glance, this sounds like a relatively simple problem to solve. However when you consider that a DAG can change while running, the problem becomes a lot more complex. 

The scope of this AIP is to make sure that the UI of Airflow accurately reflects how a DAG was run in the past, without changing the execution behavior. Execution behavior will continue to be based on the most recent version of a DAG. While this AIP attempts to make the UI more usable for edge cases that come up when DAGs change mid-execution, the goal is to set the stage for AIP-66 to allow Airflow to execute an entire DAG using the same DAG code.

Requirements

For every instantiation of a task run (i.e a task instance run), we will show the following information in a consolidated place:

  • The DAG structure at the time of the task instance run
  • The DAG file code used to generate the DAG structure at the time of the task instance run
  • Access to logs for the task instance run from the Airflow UI

This information will flow up to the dag run and task instance levels where it makes sense, defaulting to the most recent DAG version.

The UI we show in the following scenarios shows the general idea of the functionality this AIP will add. The final product will be different, as we adapt during development.

Scenarios

We will consider 4 different scenarios (in increasing order of complexity):

  1. DAG changes between two DAG runs
  2. DAG changes in between two Task Instances in the same DAG run
  3. DAG changes in between two Task Instance runs for the same Task Instance
  4. Clearing DAG runs and Task Instances for older DAG versions

Scenario 1: DAG changes in between two DAG runs

If a DAG changes between DAG runs, Airflow will be able to display the structure and details of the earlier DAG runs.

Imagine that we went from 2 to 3 tasks. This is problematic today since the old DagRun assumes there was the 3rd task:

Note the new “welcome” task is shown in the graph of the first dag run.

It gets worse when we reduce back down to 2 tasks, as the 3rd task disappears completely from the 2nd run. You don’t even need a DagRun to lose the historical view of the task:

In the worst case scenario, if your dag structure is completely different, you lose all value from the UI. Again, with no DagRun required:


The good news is when the DAG changes between DAG runs, remedying the UI is fairly straightforward - each DAG run has a single structure. In the grid, we can show a caret which indicates that a new dag version was used. In the graph and code views, we can represent the DAG structure and code that was used for each DAG run.

The following shows what the grid would look like once the welcome task has been removed:


And if all of the existing tasks were replaced with a new byebye task:




The DAG version - which is represented as a partial checksum for illustrative purposes only - can be shown within DAG, DAG run, and Task Instance details.


For tasks that are removed, we will keep the task in the grid, but no status box will be added for newer DAG runs (similar to what we do today for old DAG runs whenever a task is added). If a task were to move from one task-group to another, that task will be shown twice in the grid, within both task groups. The task will show no status box for applicable runs to show that it is no longer part of the old or new task group. The task may not show in the old task group, if the task group is no longer present in the current DAG. In that case, the task will simply be at the bottom of the list in the grid view (see the next section for more details).

You will also be able to exclude removed tasks from the grid with a toggle.

Sort Order in Grid

We will default to the latest version structure, with removed tasks at the bottom. When clicking on an older DAG run, we will animate rows moving to their correct positions to make it obvious that there is a change. There will be a toggle to control whether the tasks are reordered when an older DAG run is selected.

Clearing removed tasks

If a task is no longer in the latest version of the DAG, users will be unable to clear that task. The clear button will be disabled, and the tooltip will give the user an explanation of why is it disabled.

Scenario 2: DAG changes in between two Task Instances in the same DAG run

Non-Structural DAG Changes

When there are non-structural dag code changes (meaning, the task_ids, operators used, and relationships remain unchanged), there is no impact to the grid or graph. To highlight that there was a DAG version change between task instances, we will also include a caret overlaid on the task instance and show the version change in the tooltip. 

Things get a bit more complicated if the task has upstream tasks that were run on different versions. We will reflect that in the tooltip as well, and users can utilize the graph to determine the various versions.

On the graph, a caret will also be included to indicate when in the DAG run the version changed:

On the code views, users will also be able to toggle between versions used in that DAG run:

Structural DAG Changes

When there are structural changes, there are additional considerations. 

Imagine that the DAG structure (task_ids, operators, and/or relationships) changes while a DAG Run is ongoing.

For example, let’s say that in Dag Version 1, a dag looks like this: 

Task A >> Task B >> Task C

Task A and Task B run.

Now Dag Version 2 is deployed. It looks like this:

Task A >> Task C >> Task D

Task C and Task D run.

Neither dag structures are representative of what ran. The Dag Run would look like: 

Task A >> Task B >> Task C >> Task D

Today, similar to all the other scenarios, we just ignore this possibility and the latest wins. 

Note: Ideally, Airflow could continue executing the remaining tasks using the old code, but that brings in significant execution complexity and would introduce behavioral differences. Therefore, it is out of scope for this proposal.

We will handle the grid as described in the non-structure DAG changes section. For the graph, we can use the same toggle to flip between both dag structures.

In version 1, Task A and Task B will show that it has run, while Task C will have no status:

In version 2, Task A will be empty, and Task C and Task D will show it has run:

We will not attempt to merge the graphs of the varying versions together. The user will only be able to view the graph for a single DAG version at a time.

At the DAG run level, we will show all of the versions that were used in the DAG run.




Scenario 3: DAG changes in between two Task Instance runs for the same Task Instance

Although we normally think of a task instance as a single “run”, there can be multiple task instance runs. This happens when a task is cleared, or when a task is retried. Today, with the exception of task logs, we only show data for the most recent task instance “run”. 

We will need to show the DAG structure, logs and code for the task instance run.

Navigating between versions will be similar to what is described in the prior sections. In addition, there will be a button to view the version history for the task instance. Similar to logs, in the code and graph and code views, there will be “attempt” numbers. Based on the attempt selected, the views will be updated with the version artifacts accordingly. 




Scenario 4: Clearing DAG runs and task instances from older DAG versions

Instead of creating a new DAG run or queuing up new task instances, users will sometimes clear older DAG runs and/or task instances whenever they want to run part of a DAG, and/or use the same xcom or data interval values from a previous task instance run. This is how we’ll handle clearing behavior.

Clearing an Entire DAG Run

If all tasks are cleared for an entire DAG run, the DAG will run using the most recent code version and DAG structure. We will need to make this obvious to the user. The UX will ultimately not be different from the above scenarios.

Clearing Individual Task Instances

If we are clearing an individual task instance, we will run the task using the most recent DAG version code. The UX will be the same as the task retry scenario. If applicable, downstream task instances will be queued using the new dag structure - similar to how we handle dag changes in between task instance retries. If you decide to only clear a specific task instance and leave downstream tasks as is, the visualization may not be obvious. We may trigger a warning to push users to include downstream tasks, and if they still decide to proceed, will have a caret before and after the task instance to show that the specific task ran on a different version. 

We will not allow users to clear removed tasks as the task would just fail. Users will only be able to clear existing tasks, or all tasks in an entire dag run

Out of scope

Some things will be out of scope in this AIP.

The DAG Dependencies page will continue to show the relationships between the latest version of the DAGs, with no way to see past relationships.

The dashboard will also reflect the latest version of the DAG (task counts, schedule, etc), even if DAG runs that started on older versions are still running.

Execution behavior will continue to be based on the most recent version of a DAG. AIP-66 will allow Airflow to execute an entire DAG using the same DAG code.

Backend Changes

Determining DAG version

To calculate the DAG version, we will simply hash the serialized DAG json. This is easy for us to calculate and already covers the things in the DAG used by the scheduler and webserver.

We will explore having a more comprehensive version detection method (e.g. if the code inside of a PythonOperators `python_callable` changes, or in functions that that callable calls), but we will at least be able to version based on the serialized DAG json if that proves problematic/slow.

We will also explore allowing users to provide their own version string for a DAG. This would (ideally) be meaningful to the user, and we can show the provided version string in the UI. We would use it in combination with the version we calculated above. This means users aren’t in direct control of the versioning, but they can force a new version on demand.

DAG version tracked at TaskInstance try level

We will track the DAG version at the TaskInstance try level so we have the granularity to allow the UI to handle the mixed version situations detailed above. AIP-64 will have added the necessary granularity to TaskInstance to enable this.

SerializedDAG and DAGCode will be versioned

In order to support the UI showing history for prior tries, both SerializedDag and DagCode will have version added as part of the primary key.


1 Comment

  1. This is the most important piece of the whole DAG versioning (smile)

    I think the idea is cool and sound. But there will be a lot of edge cases to handle that we will only find out diring implementing.