Status

StateCompleted
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/rhq63f27xkmqbc1owhmkrp3smbp0yhrz
Vote Result Threadhttps://lists.apache.org/thread/fnswoymp35pfgmkjzmx9cd9c9m9h1lbd
Progress Tracking (PR/GitHub Project/Issue Label)https://github.com/orgs/apache/projects/229
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

In Release2.7.0
AuthorsJulien Le Dem 

Motivation

Operational lineage collection is a common need to understand dependencies between data pipelines and track end-to-end provenance of data. It enables many use cases from ensuring reliable delivery of data through observability to compliance and cost management.

Publishing operational lineage is a core Airflow capability to enable troubleshooting and governance.

OpenLineage is a project part of the LFAI&Data foundation that provides a spec standardizing operational lineage collection and sharing across the data ecosystem. If it provides plugins for popular open source projects, its intent is very similar to OpenTelemetry (also under the Linux Foundation umbrella): to remain a spec for lineage exchange that projects - open source or proprietary - implement.

Built-in OpenLineage support in Airflow will make it easier and more reliable for Airflow users to publish their operational lineage through the OpenLineage ecosystem.

The current external plugin maintained in the OpenLineage project depends on Airflow and operators internals and gets broken when changes are made on those. Having a built-in integration ensures a better first class support to expose lineage that gets tested alongside other changes and therefore is more stable.

Today, OpenLineage consumers in the ecosystem include: Egeria (bank compliance), Marquez (build your own metadata platform for compliance for example), Microsoft Purview (Governance, …), Astro (data observability), Amundsen. AWS recently blogged about using OpenLineage in the AWS ecosystem. Other projects are at various levels of progress.

On the producer side, there is support for open source projects like Airflow, dbt, Spark, Flink, GreatExpectations and proprietary warehouses like Snowflake, BigQuery, Redshift through API integration or SQL parsing.

Examples of users talking about their usage of OpenLineage can be found on the Openlineage blog..

This integration will also stimulate the continued growth of the OpenLineage ecosystem and create more value for Airflow users. 

Considerations

What change do you propose to make?

First, we propose to create a new Airflow-OpenLineage provider in the Airflow project. This provider is created by moving the OpenLineage-airflow package from the OpenLineage project.

This provider would be included in the base Airflow docker image to be easily enabled by configuration.

As a follow up, the lineage extraction logic which is currently included in Extractors in that package would move in each corresponding provider package along with their unit tests. For this purpose, a new optional API for Operators (get_openlineage_facets_on_{start(), complete(ti), failure(ti)}) is documented here. This removes the need for extractors moving forward. (we can also add more events in the future to track pause/resume events for example)

Eventually, there should be no need to write an Extractor unless one does not control the operator they are using and need to add lineage extraction externally. This will ensure stability of the lineage contract in each operator.

Follows an illustration explaining the current packages and the proposed future state:

  • Top: current state implementation
  • Bottom: proposed future state

What problem does it solve?

The new airflow-openlineage provider and the new optional get_openlineage_facets operator methods significantly simplify exposing lineage in Airflow operators.

The lineage collection functionality through OpenLineage becomes stable as the contract is implemented and tested in each provider instead of another project that can get out of sync.

Airflow users will be able to turn on lineage collection to the consumer[s] of their choice much easier.

As discussed in the motivation section, there are multiple tools able to ingest OpenLineage. As an illustration, here is a screenshot of the Marquez UI. Marquez is the reference implementation of the OpenLineage spec and allows visualizing lineage emitted by this provider.

Why is it needed?

Today the openlineage-airflow package is an external project depending on internals of Airflow and its providers (through the Extractors). 

Being external makes the integration brittle and prone to breakage for each new release of Airflow or providers. 

Users with custom operators and third-party providers have to maintain a separate lineage extraction logic. This proposal makes it easier to make custom operators expose lineage natively.

It is also extra maintenance and setup for administrators wanting to collect operational lineage from airflow instances they administrate.

Are there any downsides to this change?

The Airflow project assumes maintenance of the openlineage airflow integration. (Which is an upside from a stability perspective but shifts work to the Airflow community). 

The maintenance responsibility is decomposed as follows:

  • Contributors of new Operators are expected to include lineage support as part of the contract and add corresponding unit tests. The Operator howto will explain how to do this. The OpenLineage doc on exposing lineage in operators and corresponding example will be moved accordingly.
  • The current maintainers of the openlineage-airflow package will continue maintaining that code and be active contributors.
  • For existing operators that don’t have lineage support yet, adding this feature could be strongly encouraged as regular maintenance happens on them.

This provider will have additional dependencies, like the SQL parser package in OpenLineage but those should be fairly limited.

Which users are affected by the change and how are users affected by the change? (e.g. DB upgrade required?)

Users using the OpenLineage integration will no longer need to add an external package to the project. 

Users creating custom Operators will find it easier to expose lineage in this framework.

Users who are not using OpenLineage will not be affected.

Implementation Details

This effort is decomposed in a few independent steps, which details are described in the following sections. 

We expect to implement those steps in the following sequence:

  • Create the Airflow-OpenLineage provider in the Airflow project.
  • Move lineage logic from Extractors into the Operators themselves.
  • Add lineage coverage to the TaskFlow API.

OpenLineage Provider  in Airflow

Creation of a new OpenLineage provider in the airflow project.

  • The DAG and Task listeners are moved from the openlineage-airflow package to the Airflow-OpenLineage-provider in the Airflow project.
  • The openlineage-airflow package is deprecated in OpenLineage in favor of the new provider. Support for older versions of Airflow will stay there for some time. We can turn the openlineage-airflow package into a shell that depends on the new provider.
  • We add the OpenLineage configuration in the airflow configuration. This would be the preferred configuration mechanism over the environment variables.
  • The new Airflow-OpenLineage-provider is added to the Airflow docker image.

OpenLineage support in providers:

As part of this new provider, we define a new optional contract for operators defined in the documentation here: get_openlineage_facets*. Each

  • Moving forward, support for lineage in more operators does not require writing an extractors. A good example of this is described here. Extractors were needed only because the lineage logic was in a separate package from the Operator. If the logic is in the same package then the preferred approach is to add get_openlineage_facets* methods to the Operator and let the DefaultExtractor collect lineage.
  • Existing Extractors logic is moved to their respective Operators.
  • Lineage and operator logic are tested together, which avoids breaking lineage extraction when operators change.
  • The lineage support is tested as part of the Operator tests.
  • In this API, we will reuse a dictionary of entities (PostgresTable, GCSEntity, AWSS3Entity, …) defined in Airflow to describe Datasets that are mapped to OpenLineage Datasets and can describe their specific facets.

We intend to contribute that to the most commonly used operators. Moving forward it is expected for new Operators to expose lineage natively and external Providers maintainers to implement this API. If there are gaps, users can query their OpenLineage repository to track Operators that are missing lineage and open issues on the corresponding repo.

The OpenLineage SQL parser stays a shared component used by other integrations.

OpenLineage support in TaskFlow and Python operators

Currently TaskFlow API and PythonOperators are opaque because there is limited lineage we can extract from arbitrary code. Currently PythonOperator and TaskFlow do not expose lineage unless users provide inlets and outlets for their tasks. As an improvement, We propose adding an API enabling exposing lineage from hooks. Then at the operator level (in the PythonOperator or the DecoratedPythonOperator used by TaskFlow), this lineage is collected and returned through the new get_OpenLineage_Facets* APIs.

The DBAPIHook is the common API for all SQL hooks and can be the central point to collect lineage in a similar fashion that SQL based Operators collect lineage.

High level proposal:

  1. The Operator registers a Listener to the DbApiHook in get_openlineage_facets_on_start()
  2. The Operator runs, the DBAPIHook notifies the Listener when it access datasets and the Operator accumulates this information.
  3. The Operator exposes the collected lineage in get_openlineage_facets_on_complete(ti)

listener = CollectionListener()
get_openlineage_facets_on_start():
DbApiHook.setListener(listener)

get_openlineage_facets_on_complete(ti):
return listener.get_consolidated_lineage()

Other considerations?

What defines this AIP as "done"?

This is done once the first release of the airflow-openlineage-provider is released, replacing the openlineage-airflow package.

Scope of the initial implementation

Move the openlineage-airflow code base into an Airflow-OpenLineage provider. Renaming packages and refactoring accordingly.