Status

StateDraft
Discussion Thread


Vote Thread
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

2024-03-05

Version Released
Authors

This AIP is part of AIP-63, which aims to add DAG Versioning to Airflow.

This AIP is very much still a work in progress. It is the next (and likely final) step in the path to supporting DAG Versioning in Airflow. All of this is subject to change, but shows the high level vision of what this feature will allow.

Motivation

Today Airflow will always execute a task using the latest DAG code. While in some cases this may be what users want, in others it may be desired to complete a whole DAG run on a single version of a DAGs code. Or, when clearing tasks in older runs, allow that task to run on the DAG code at the time of the original run.

Considerations

We will introduce the concept of a DAG bundle (a collection of DAG files and other files) and allow Airflow to choose which version of a DAG bundle to use for a given task try. This means that a DAG run could continue running on the same DAG code for the entire run, even if the DAG is changed mid way through, as the worker can retrieve a specific DAG bundle when running a task.

This will require Airflow to support a different way of finding DAG bundles - it can no longer simply expect them on local disk. This will be done in a pluggable way by building in support for DAG bundle backends, with optional versioning support, so the ecosystem can evolve as time goes on.

By default DAG runs will execute on the same DAG bundle for the whole run, assuming the DAG bundle backend supports versioning. However, DAGs can opt to continue the existing behavior of running on the latest DAG code instead.

Pluggable DAG bundle backends

DAG bundle backends will allow Airflow to retrieve DAG bundles at a given version at any point in time. Since it’s common that DAGs are not simply contained in a single file, we require a way to version a whole set of files at the same time. How this is accomplished will depend on the backend. Airflow will only operate on complete DAG bundles - it will never attempt to identify or fetch a subset of a DAG bundle.

While we are still determining which DAG bundle backends will be available initially, it is likely that we have support for git and versioned buckets (like s3) from day 1.

It will be possible to have a DAG bundle backend that does not support versioning as well. This is primarily to allow a native local dev experience, but does open the door to other remote backends that may not meet Airflow versioning requirements.

By default Airflow will configure a local filesystem backed DAG bundle backend that does not support versioning, to support the local dev experience. This backend will expose DAGs from the currently configured DAG directory to Airflow, maintaining backward compatibility.

Parsing and the scheduler

Parsing will always happen on the latest version of the DAG bundles, as is the case today. The scheduler will also use the latest version when dealing with new DAG runs.

The scheduler will also be able to schedule and send to the executor a task that runs on a prior version of a DAG bundle.

Worker

The executor will now tell the worker what DAG bundle a task needs to run against. The worker can then use the DAG bundle backend to get the dags and execute the task.

11 Comments

  1. What if we bring the idea of Prefect Deployments? This has a bit overhead (or huge, I never count TBH) when it starts the flow, because required to copy entire source, which could be also remote, into the temporary directory.

    1. I think that's exactly what "bundle" is - there are differente names. I named them "snapshots" - but concept is the same. Because they should contain (this way or that way obtained) - sources + variables +  other environment - connections,  that we can restore in somewhat reproducible way. And I think "Pluggable DAG bundle" will mean that we will have to find a way to efficiently switch between snapshots/bundles/deployments (whatever you call them) for the execution. What is interesting there is that there are also various "levels" of those snapshots - for example in some cases you might want to have 'almost' the same bundle (for example connections will need to be updated as password changes since the last run and your connections are generally the same +/- credentials. 

      1. Fwiw I prefer "snapshots" over bundle – way more intuitive and conveys the connection to DAG versioning instantly. I was confused about what bundle meant in this context until I read the AIP. 

    2. I've left this intentionally vague. It's much easier to scope it at todays DAG dir only, and is where this draft draws the line now. But Jareks idea of bringing in "everything" is definitely on the table too. It raises questions like "should deployment managers be responsible for managing provider versions, or DAG authors?". Definitely a lot to consider when we get closer to this being a reality.

  2. I very much like this as the major "big thing" of the DAG versioning "bundle" of AIPs!

    I like the abstract way of describing a bundle and especially that you see the "bundle" of files together, which also considers cross-file dependencies for utilities and generator stuff.

    I assume most common backends used for versioning today is GIT. I assume it would be really complex and error prone to try to re-implement an SCM like logic. Also you need the python source files available in a file-system like makker in order to cross `import` refernced files in the python module loader.

    As being biased towards GIT I don't know if you are aware of `git worktree` (https://git-scm.com/docs/git-worktree) which allows a temporary checkout (or many) from one GIT repo. If I think of this ans assume I have a LocalExecutor or CeleryWorker then there might be 16 task slots operating in parallel all on different task versions and potentially different version hashes. So DAG code must be available in the file system in potentially multiple versions in parallel.
    In such cases you could think of if a GIT repo is cloned and available, via `git worktree` the individual versionned file tree could be cloned into a TMP folder prior task execution. Multiple tasks with the same version even might share the file tree to save IO. If the version hash corresponds to GIT commit hash then 90% of the complexity could be pushed down to logic in GIT. And you would have the benefit of 100% traceability to "what code was executed" as well as via the GIT hash you can always reproduce the code view on demand in webserver (if code versions are not kept as blob copies in DB... whereas DB is needed to prevent GIT being available as clone on webserver though).

    1. Yep, basically exactly what I was thinking for git.

      A key part though is these will be pluggable, so all the git specific logic and say versioned s3 bucket specific logic can be isolated, and all "core" Airflow needs to know is ask for version x. The specifics of that interface still needs to be ironed out, but I think it's important to have that flexibility long term.

  3. Like the idea of addressing this huge missing peace in Airflow!


    Have you considered taking it as an opportunity to abstract the way how the DAGs are defined together with the definition of a bundle? For example, this can be achieved by abstracting away how DagBag is populated - for a given bundle it would defer the logic of presenting a DAG of certain version to a pluggable bundle provider. The trivial and default implementation of bundle provider - read /dags directory and parse it. For Git-based - checkout with worktree a version and parse it. For custom backend that doesn't use Python, but for ex. YAMLs to define their workflows - create DAGs from these directly (instead of having a special fake DAG file that would read YAMLs to create DAGs).


    Time and time again I see unsuccessful attempts to contribute visual DAG editors (like the one Pinterest was proposing some time ago) and users implementing their own hacky solutions of not-Python-file-managed DAGs withing Airflow constraints. The right implementation of DAG bundles can reduce the amount of workarounds in these solutions and will be a huge step towards the feasibility of visual DAG editing being a part of core Airflow experience one day.


    Looking forward to having this one polished and shipped!

    1. No, I didn't consider changing that. My initial reaction is that that wouldn't need to be coupled with this proposal either. I see that being more of a "pluggable parser" than a "pluggable source of files" that the parser looks at.

      That said, I do understand the desire for more flexibility in how Airflow discovers and parses DAGs. I'll definitely keep it in mind and make sure we don't make that type of thing harder to achieve.

  4. Referring to Airflow 3 discussion.  I'd say we should be much bolder than FSSPEC or common abstraction over file system versioning. 

    I think we should consider a MUCH more decoupled architecture for Airflow 3 that might simplify a lot and be even more opinionated in the way how we actually implement versioning internally. If we decide to break with Airflow 2  DAG compatibility, I think we should consider the world where Worker and Triggerer do not need DAG folder access at all (warning) . If we consider "Task execution Isolation" by splitting of DB access and communication, we should also seriously consider sending not only "secrets", but also "DAG code" to the tasks. If we sign and verify the payload that we send to Worker and Triggerer, and if we are able to extract only the files that are needed to parse and execute given DAG (either automatically or via annotations). We should be able to store versions of DAGs locally on Scheduler/ DAG File processor side and send the code over to the task to execute. In this case we do not need to focus on FSSpec, or underlying file system capabiity for worker or triggerer and all the complexity of using a distributed file system as part of the deployment. We could mandate (and be opinionated) on how DAG file processor/scheduler stores the history (here I would side with Jens Scheffler that GIT is a good solution. And we could simply treat is as an internal implementation detail of how scheduler/DAG file processor stores the DAG folder history (we could use GIT for storing the history, indepenently on how DAG files gets delivered to DAG folder available for DAG file processor).

    That would be my bold proposal if we limit DAG versioning to Airflow 3 and break some compatibilities.


    1. Bold indeed. I like the general idea, but my initial reaction is that I'm a bit hesitant of Airflow trying to build the "payload" itself. I think having users (maybe with the help of Airflow somehow) build the "payload" themselves makes for a much clearer line. Say in docker "create a container with this image" vs "create a container from this single Dockerfile in a sea of Dockerfiles" (not a great analogy, but yeah). I think one benefit of that is, coupled with "clear interface for Task Execution", it opens the door for different DAGs to use different dependencies too.

      Anyways, lots to think through for this now that breaking changes are possible.

      1. > Anyways, lots to think through for this now that breaking changes are possible.

        Precisely. I think (and I had the same "revelation" during the AIP-67 multi-team discussions and voting) that IF we allow for breaking changes, then we can make very different decisions and make a number of simplifications or abstractions that would not be possible otherwise but would be way more future-proof. That's why I really started the discussion on Airlflow 3 because it might be a HUGE difference if we decide this one to be part of Airflow 2 or Airflow 3 or both.