Status

StateAccepted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/8sfh6pwr24jvgdylfmfm2rf610ll6fcy
Vote Result Threadhttps://lists.apache.org/thread/27nqcfbmhvgq90t9lkdyvo1pfphfgn97
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

2024-03-05


Version Released
AuthorsMaciej Obuchowski 
  

Abstract

This document proposes instrumenting Airflow Hooks and Object Storage to collect dataset updates automatically. The data will be used to gather OpenLineage from additional operators, in addition to operators running user-defined code like PythonOperator and other custom operators. In the future, the gathered datasets could be used to other purposes, like automatically emitting DatasetEvents, making writing dataset-scheduled DAGs easier. Instrumentation feature is proposed to be experimental, and expanded upon and possibly modified upon several Airflow releases due to it’s scope.

Motivation

Currently the lineage information gathered in AIP-53 OpenLineage in Airflow is limited to Operators in which it’s explicitly implemented: this is justified by the lack of information that Airflow possesses about actual data processing. Airflow is extremely flexible when it comes to the variety of tasks it can schedule which are abstracted by Operators.

Airflow Survey 2023 suggests that data lineage is the second most popular feature requested by Airflow users. However, PythonOperator, which has no support for lineage, is the most popular Airflow Operator, and Airflow gains new features to make writing custom code easier - like TaskFlow API and Object Storage API. OpenLineage can’t get lineage information from those. Implementing instrumentation in those parts would allow OpenLineage to emit lineage and potentially other parts of Airflow to use those in some other way.


Considerations

What change do you propose to make?

There are two general parts to this proposal. First, is directly modifying Hooks and Object Storage to be able to automatically gather consistent, canonically (AIP-60) named datasets. This means instrumenting code that reads, writes or transforms data. We propose to collect this data in AIP-60 format to potentially enable further use cases connected to scheduling and visualizing Datasets in Airflow. In contrast to Operators, Hooks don’t have a single method that is used one time when they are run; also, Operators can use multiple methods and initialize multiple hooks. For this reason, this AIP proposes a model with an additional component on worker (currently named HookLineageCollector) that’s responsible for receiving the Datasets. It will be a class/module, not a separate process.

Second, OpenLineage, and potentially any other component or listener will be able to access the collected data. If there’s no system registered to use the collected datasets, the collection part will be disabled. For use case within OpenLineage, Hooks implementing this feature will implement standard API to convert between AIP-60 URIs and OpenLineage URIs that will resolve any potential differences between those formats.

from airflow.dataset import Dataset
from openlineage.client.run import Dataset as OpenLineageDataset

def airflow_to_ol_dataset(self, dataset: Dataset) -> OpenLineageDataset:
    raise NotImplementedError()

There is still limitation to a lineage collection for some hook methods. For example, DbApiHook allows you to directly use SQLAlchemy engine by calling get_sqlalchemy_engine - for example, to use with `pandas.read_sql`. To collect lineage from that type of usage, we'd need to integrate directly with SQLAlchemy and/or Pandas. That type of usage won’t necessarily be covered as part of this AIP. Additionally, SQL based hooks will be able to emit lineage only if SQL parser (for example, OpenLineage one) is present or it’s possible to get lineage information in some other way (for example, by calling BigQuery API). We accept that the lineage collection can’t be always 100% complete within such flexible system as Airflow.

Below are diagrams of a workflow before and after proposed changes in Sequence Diagram form:

AirflowInstrumentationBefore.png

Why is it needed?

It’s impossible to implement something like this without changes to core Airflow, because we need component to hold dataset data collected from hooks to allow OpenLineage and potential other plugins and Airflow components to use it.

Additionally, the change touches multiple providers (potentially - all that touch data) - it’s great to have consensus on such topic.

Are there any downsides to this change?

There will be additional code in hooks supporting that feature that needs to be updated, maintained and tested.

There will be no changes that will affect users that don’t use OpenLineage or other part that uses automatically-collected lineage data.

Which users are affected by the change?

Only users using OpenLineage or any other part that uses automatically-collected lineage data would be affected.

Hook developers will be affected by additional code that needs to be updated, maintained and tested. Based on current experience with OpenLineage in providers, the burden is rather low.

How are users affected by the change? (e.g. DB upgrade required?)

Only users using OpenLineage or any other part that uses automatically-collected lineage data would be affected.

Other considerations?

  • Similar to Operators in AIP-53, not every Hook needs to be instrumented. We’re focusing on those involved in data processing and movement.
  • Special consideration needs to be paid for the async tasks. In general, in contrast to operators, hooks don’t know whether they are used in async or sync context. The same method can be used in both ways - the async framework is flexible enough. The potential solution would be to collect dataset changes the same way, but instead of emitting dataset events on task deferral, persist them and emit them on actual task completion. This could be done by adding on_task_instance_deferred to TaskInstance listener.
  • At first, dynamically mapped tasks and task groups would be excluded (as in, lineage won't be collected). This limitation might get lifted in the future, as described in the future work section.

What defines this AIP as "done"?

Airflow 2.10 ships with being able to collect hook-level lineage.

There is coverage for Object Storage, as well as a few popular hooks (S3, GCS, DbApi…). Further coverage can be added in regular Provider releases.

What’s the (potential) future work?

The gathered datasets could be visualized in Airflow UI directly by some additional component or plugin that could persist the collected data. This could enhance the Data-Driven Scheduling experience by visualizing potential AIP-60 Datasets that could be used for scheduling.

The “reverse conversion” between AIP-60 URIs and OpenLineage datasets.

There could be another conversion interface:

from airflow.dataset import Dataset
from openlineage.client.run import Dataset as OpenLineageDataset

def ol_to_airflow_dataset(self, ol_dataset: OpenLineageDataset) -> Dataset:
    
raise NotImplementedError()

Combining this with scheduling feature below, we’d be able to schedule Airflow DAGs on any OpenLineage event - including those coming from other systems, like Apache Spark or even user-defined OpenLineage implementations.

Scheduling on automatically collected Datasets

Right now features introduced in AIP-48 Data Dependency Management and Data Driven Scheduling, allow a DAG to be scheduled based on a dataset update. However, the mechanism to do that is manual: DAG authors have to define what datasets the DAG is scheduled on, and what datasets each task produces. This manual process not only adds complexity to DAG development but also hinders the full potential of data-driven scheduling. Users are required to have in-depth knowledge of the data pipeline and explicitly declare outputs manually in DAG code, making it less adaptive to changes and more prone to errors. Natural consequence of automatically gathering Datasets, as a potential future work, would be to use them to emit DatasetEvents that would be used to schedule DAGs based on defined AIP-60 Dataset. This would potentially be another future AIP. Following subindentation points show work that describe this feature in more detail.

This feature would be enabled per DAG and per Task level parameter that will enable or disable emitting DatasetEvents from dataset data gathered from that task. The default would be driven by global setting that is disabled by default.Even if global opt-in is disabled, we could add per-DAG or Task option to produce automatic events from particular DAG or Task. This would be useful for testing purposes or incrementally enabling this feature. It should be visible in UI whether DAG or Task produces automatic DatasetEvents.

Additionally, we could add DAG level parameter will disable scheduling on automatic dataset updates for that DAG, while leaving manual (on outlet) dataset scheduling on. However, this would require database upgrade.

Hierarchical dataset scheduling

It seems that s3://some_bucket/some_directory should be updated on write to s3://some_bucket/some_directory/some_subdir.

This could be a feature that DAGs or users could opt-out from. This would solve problem of overloaded partitioning-based datasets: the some_subdir above could be a directory specifying particular day. Overall for generic partitioning where we don’t know the location of data we probably can’t do better for automatic generation. Thanks to Kacper Muda, we have done some benchmarking of a few naive solutions. One, most naive is the LIKE solution:

SELECT uri FROM dataset
WHERE 's3://dir1/dir2/file1.csv' LIKE CONCAT(uri, "%")
ORDER BY LENGTH(uri) DESC LIMIT 1;

However, in practical test it fails to solution which enumerates all possible substrings:

SELECT uri FROM dataset
WHERE uri IN ('s3://bucket/dir/file.csv', 's3://bucket/dir/file.cs', 's3://bucket/dir/file.c', 's3://bucket/dir/file', 's3://bucket/dir/fil', ...)
ORDER BY LENGTH(uri) DESC LIMIT 1;

On one million row table with index and searched URI with length of 400 characters the LIKE query took 600ms, while the IN query took 130ms. More benchmarking information here.
It needs to be added that those times aren’t expected in Airflow: it’s hard to imagine an Airflow deployment where users schedule DAGs on a million different datasets, especially with such long URIs.

Scheduling on updates in dynamically mapped tasks and task groups

Dynamical task and task group mapping could update a single dataset multiple times. Current workaround is to add a singular task after dynamically mapped one, that will update the dataset one time. In “automatic” model we won’t have this. There are potential “heuristics” we could use - for example, emit on first map index - but all of those have possible perils. This solution would fail if the high concurrency with low worker could would cause whole task to finish much later than the subtask with first map index.
Currently, the best solution seems to have two parts. First, is to add annotation for users to decide what should happen - whether we should emit single dataset event after processing full task (which seems that it should be the default), or emit on each finished subtask, or even something else. Second part would be to add separate table (DatasetEventRequests?) that would hold the “partial” data, and additional component (possibly working on triggerer) would process the data and annotation, making the actual decision.

It’s important to define few most common use cases and implement it, since it’s expected 90-99% would use the predefined ones.