Status

StateDraft
Discussion Threadhttps://lists.apache.org/thread/8hlltm9brdxqyf8jyw1syrfb11hl52k5
Vote Threadhttps://lists.apache.org/thread/tyfsrpjn12sz9dw50pbg16dsv6lmj610
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

2024-04-15

Version Released
AuthorsJens Scheffler

Motivation

The world is moving to cloud! No... see a lot is still running on-prem! Oh, we need to be hybrid. Damn we have actually a distributed system. We want it in the cloud but have these couple of on-prem servers behind our corporate proxy... how can we add them to the Airflow? SSH Operator is hardly possible... and our IT security will kill us for the idea of incoming connections... Oh, yeah Airflow can be need to be distributed!

In today's (Airflow 2.9) setup most users use a either cloud-native or docker'ized setup in Kubernetes. This allows flexibility and distribution of the components. Or also you can setup the environment on-prem privately. Using CeleryExecutor you could distribute your workers to different locations and link them into a central instance. Most standard features can be set via Helm chart. But... Help! If you have the demand to have a worker far-far away from the main deployment (e.g. crossing security perimeters, need http proxy to connect...), you need to have a good "extension cord". A distributed setup of Celery workers can be complex and as workers need to be able to connect to the Metadata database latency influences the performance. You need to link Redis, the Airflow Meta DB and at least a good GIT Sync to execute code on a remote distance. Also UDP is needed if you want to have statsd reporting for metrics.

This AIP proposes to add a "Remote Executor" (probably via provider package) allowing to create a distributed worker setup. It considers the planned extensions for AIP-61 Hybrid Execution which would allow selective distribution depending on task.

Considerations

We have a couple of Executors already in Airflow and they have a big user base. We know that Dask Executor has been removed because the user base was quite small and we know that maintaining multiple executors will need capacity. Nevertheless for setting up a distributed scheduling which crosses a wide infrastructure the current executors shipped with Airflow are complex to maintain. Also if your "Remote" machines are opened to the central cloud to trust incoming connections you can use SSH to push jobs (including to Windows machines).

You could think about adding such Remote Executor capability as external solution - but most probably adjustments in the core (not finally planned yet...) are needed to be able to distribute tasks w/o DB access and cutting-off codebase from the core.

What change do you propose to make?

It is proposed to implement an Remote Executor as additional executor option for Airflow in order to distribute tasks to remote machines. A remote machine receiving a job to execute would be a "Remote Worker".

On a high level this would be similar to Github self-hosted runners, GitLab (custom) runner or Azure DevOps Self-hosted Agents - a very thin remote deployment which connects to a central pipeline to execute jobs. Compared to current distribution and connectivity the core difference would be that a Remote executor always conects to the central setup and pulls jobs, assumption is that a Remote Worker can not be directly connected and might sit behind corporate proxies and/or firewalls.

Requirements that a Remote Executor must provide (Criteria for first release/MVP)

  • The Remote Executor is an optional dependency, delivered as provider package. It consists of a "Executor" Airflow core deployment with some administrative endpoints, integration to Scheduler and authentication/authorization mechanisms. And an additional package which is deployed on the "Remote Worker" which should be lean (just some Python packages and a file tree, no database, docker or other infrastructure requirements).
  • All connectivity from the Remote worker to the central Airflow setup must be made via HTTPS. Authentication must be made either via current Airflow credentials (as provided by security manager component). The remote worker must be able to use a Proxy to connect to the central Airflow setup. No connection from Airflow central site to Remote is assumed (pull/request only, potentially via WebSockets or long polling)
  • A Remote worker must be able to sustain a temporary connection loss or interruption. It should automatically be able to re-connect. A temporary connection loss shall not terminate a job execution.
  • A small dashboard must be provided such that an administrator can view
    • The connectivity state of each Remote Worker (Similar like Flower in Celery) and if free/busy, which job/task they are on
    • Needs to provide a short overview about state of Queues (amount of jobs per queue and age)
  • Similar like Celery Queues each Remote Worker should be able to be configured to execute jobs from one or multiple queues. This would allow distribution to different Remote endpoints.
  • Logs of task execution (stdout/stderr) must be uploaded finally and during a job execution such that the progress can be seen on the central web UI. Local logs must be made as copy for troubleshooting locally (including log rotation/cleanup)
  • A Remote Worker needs to send regular heart beats.
  • If a task is terminated from the central side (Timeout, "Set as success/fail" in UI), remote must also terminate the job
  • At least 100 Remote Worker nodes should be supported.
  • Documentation for setup/start of the Remote worker
  • From workflow perspective a task directed to Remote Worker should be able to be modeled like every other task, can be templated, receive parameters and XCom and will be able to push XCom.
    • There might be restrictions assumed, potentially due to lag of DB/internal APIs not all Operators might be suitable - details need to be elaborated during the implementation
  • Support for local development via Breeze is to be included of course

Requirements that a Remote Executor should provide (can be incrementally be added after first release)

  • As depending on a general rework for a task execution API? Secrets and other needed internal endpoints (e.g. DAG/Task Context)  are provided from the central instance "on demand" with a "need to know" as Connection are defined for the task. The Remote Worker should not have access to all other credentials. The remote setup should only be trusted to the scope of information and code needed to execute the job. Access to a Remote Worker must not generate a new attack vector to the system.
  • DAGs python task code will be pushed to the Remote worker (no need for a separate GIT sync should be needed per default) - this might impose some limitations which code can be executed similar like the PythonVirtualEnvOperator
  • Authentication must be made either via current Airflow credentials (as provided by security manager component) or preferably with an API Token. An individual API Token per Remote Worker is recommended for production and security
  • Enrollment might be able to be automated via IT distribution tooling e.g. Ansible
  • The small dashboard is extended by
    • Option to restart/update Remote Worker (Update: Push  a new package if central Airflow/provider package is upgraded)
    • Option to pause distribution to a worker instance for planned maintenance
    • General worker logs (stdout/stderr) should be streamed to the central site such that it can populated to the dashboard and Grid view like other workers log as well
    • Note: All central options on the dashboard must be available via API as well
  • A Remote Worker is integrated into standard Airflow telemetry/metrics back-ends.
  • A Remote Worker should be able to consume multiple jobs in parallel (similar to a Celery Worker)
  • The deployment of the Remote worker should be a thin install with minimum Python package dependencies - assumed that a Python runtime is available it can be deployed on Linux/Windows/MacOS. We would asusme Python version and package version is matching to the central setup.
  • At least 1000 Remote Worker nodes are supported
  • Documentation for setup includes help for deployment of the Remote worker (e.g. including systemd setup scripts, extensions on helm chart)

Technical Design

The following proposed implementation architecture is made. It might change in detail during the implementation

From the architecture the following components need to be implemented:

  • "Remote" Provider package
    • DB model extensions
    • RemoteExecutor implementation
    • UI Plugin with
      • API Endpoints for the Remote Worker
      • UI Plugin to show status
    • CLI to start Remote Worker
  • Support in Breeze for development
  • Potential adjustments in the log viewer in the web server (to be checked during implementation)

What problem does it solve?

  • If a worker in current Airflow is to be maintained outside of the core setup it is hard to (manually) integrate e.g. via Celery. A Remote Executor would allow easy integration.
  • Currently due to Python limitations it is not possible to distribute jobs to Windows machines except is a worker setup is made via WSL. A Remote Executor could strip-off a lot of Airflow core dependencies and could be made thin and with less dependencies to be deploy-able on Windows as well. With just a few MB of Python dependencies a remote machine could be looped into the setup
  • A lot of internal APIs and DB endpoint must be exposed to operate a remote worker today. This opens a door for security problems and for example a VPN or connection encryption is need to be made from a remote location to a central site. A Remote Executor could be designed to make it easy with less fear of disclosing secrets on a remotely or even badly managed machine with danger of secret exposure.
  • Long distance connection typically cross firewall boundaries. In enterprise networks this also is often restricted by need of Proxy connection or On-premise setups also need HTTP proxies to reach a cloud instance. A Remote Executor can be made in a way that only HTTP(s) is needed compared to communication protocols like today.
  • Worker status and logs are made visible in UI by calling the worker via HTTP. In a distributed setup this might be a challenge (security and firewalls) - The Remote Executor could send logs and bridge this gap

Why is it needed?

It is hard with Airflow (e.g. Celery) to deploy and host a distributed worker system in a different security perimeter. Onyl workaround in most cases is something lie a SSHOperator (which requires direct inbound access to Remote machine) or an exposed HTTP REST API which is called by Airflow - which requires also direct network access and a lot of custom code. Or a message bus which would bridge multiple locations (e.g. distributed Kafka Queue) but then the end-2-end workflow must be split into multiple sub-flows.

Are there any downsides to this change?

A new provider package might be needed. If rejected by community it could be also an external provider outside of the core Airflow.

Depending on technical details the possibilities for task execution might be limited assuming that not a full Airflow core codebase will be available.

The design must cater for security problems. This opens the risk for potential vulnerabilities and attack vectors. Concepts need careful review.

Which users are affected by the change?

None. Would be an optional package.

How are users affected by the change? (e.g. DB upgrade required?)

No user is affected. Databse tables need a small extension to keep the job status.

Other considerations?

As there is a parallel discussion about Airflow 3 ongoing - This AIP might influence design towards Airflow 3 plannings. Depending on requirements discussed it could be considered as replacement for CeleryExecutor where we know a lot of features would need to be added to be on the same level. The target is not to directly compete with CeleryExecutor but it might act as a point to discuss for the future.

This AIP is planned to be added as "non breaking change" as an extension to Airflow 2 branch. It would be marked as experimental after MVP until critical demanded features are included allowing to change (Remote Executor internal) interfaces flexibly. As of adding it to Airflow 2 branch it allows us to learn and rework and include outcome of the implementation also into a future Airflow 3 version. It would be explicitly assumed that a rework would be needed if Airflow 3 internal structures change. (But it would also be very value-able to have a learning curve before Airflow 3 design).

What defines this AIP as "done"?

  • MVP: If not all tasks can be made running on a reduced Remote Worker, at least a Bash/Shell Operator, Python Operator and Docker Operator should be able to be used.
  • A Airflow deployment manager can deploy the package to enable and configure the Remote Executor
  • A Remote Executor (agent) can be joined to a Airflow install
  • A job/task can be distributed via the executor attribute to a remote worker. The queue field might be used as label to distribute to a set of remotes.
  • Documentation is available

4 Comments

  1. Jens Scheffler great work on the AIP, I am in good support of the ideas posted here.

    I have some comments from my first review.


  2. I'm still not quite sure what changes this AIP is actually proposing yet? I think I understand with the core ask, but I'm just not sure how we get there?

    1. Thanks for the feedback. At the moment I have no fixed solution but also wanted to check for feedback on requirements first.

      Do you expect a technical design for a next step feedback or are specific points unclear?

      1. Gotcha gotcha. Not sure. I do have one idea that relates to this about changing the the task execution "API" (I'm doing a bad job of describing it in text, but I think it will help with this.)