Status

StateDone
Discussion Thread
Vote Threadhttps://lists.apache.org/thread/ycy3vh2h23t792zgrystv5k39nk9086n
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)https://github.com/apache/airflow/pull/37005
Date Created

2024-01-02

Version Released
AuthorsTP Chung

Abstract

This document proposes formats to Dataset URIs of various schemes, so URIs of those schemes can have a standard semantic. The proposed URI formats are inspired by those of OpenLineage.

Motivation

With AIP-48 Data Dependency Management and Data Driven Scheduling, it is now possible for a DAG to be scheduled based on a task outlet describing a data resource. However, AIP-48 does not specify semantics of the URI and how two since Dataset is internally identified by a URI. This raises a few issues. First, DAG authors in the same organisation must internally agree on a format to use. It is more difficult for tools to interact with Datasets; since there is not a standard semantic to the URI, tools are left with either imposing a format, which may then conflict with a user’s internally agreed one, or being able to only receive limited information from the Dataset than it ideally could.

Airflow Connection has a URI format that can be used to represent a Connection object in a simple string form. However, the format specifies the connection by design, without distinguishing resources that can be fetched by a connection, even across different resource types. For example, a connection containing AWS credentials can be used to connect to AWS, but does not contain information on exact which service, or what resources inside a particular service.

The recent Airflow Object Storage (AIP-58) introduces new URI formats that can be used to directly reference an object (“file”) in a storage. While this fits directly with the use case described here, it does not specify the format used, but leaves it as implementation details of each storage. It also, by design, only covers file-like storages. This document would explicit specify in the same formats currently used by the Object Storage.

Considerations

This AIP is informational. Users may choose to not follow the proposed rules if they do not expect the URI to be semantically interpreted by Airflow (only being understood as a literal string), or do not intend to interchange the value with libraries, including but not limited to providers and plugins by third-party authors.

What change do you propose to make?

A URI format is defined for a resource that can logically trigger a downstream workflow a la Dataset events. A resource type can be a database (Postgres, MySQL, etc.), an object storage service (Amazon S3, Azure Object Storage, etc.), or the local filesystem.

When describing URI sections, the terminology used by Python’s urllib.parse.urlsplit function is used. Although a RFC 3986 is generally followed as the standard URI format, various URI parsers are known to slightly deviate in behaviour in edge cases. Python’s implementation is therefore used as the de facto reference since it is the easiest function for both Airflow and various provider and plugin packages to reach for, and also considered stable enough to be relied on. If a value cannot be parsed as a URI matching any of the known rules, it is treated as a simple literal string containing no semantics.

The URI formats should generally follow OpenLineage’s Dataset naming scheme. For file-like resources, this conveniently matches the URI format used by . Others may require slight adjustments so they look more familiar to Airflow users. Most significantly, we use slashes (/) to denote hierarchy in a database, similar to how Airflow Connection formats them, instead of dots (.) used by OpenLineage.

Certain sections in the URI should either normalised or dropped entirely, so that a resource can be canonically identified by one URI. Each section should be escaped (“quoted”) according to rules so a valid URI can be generated to represent the target resource.

In the long run, users are not expected to need to hand-craft the URIs, but use convenience functions or Database subclasses to create a Dataset with an appropriate URI. This document does not cover those APIs.

What problem does it solve?

With a standard URI format in place, features can be designed to rely on the interoperability. Most significantly, Dataset events can be emitted automatically by operators either developed in Airflow core, providers, or third parties, similar to how OpenLineage events are, based on operator arguments, without relying on the user to manually specify outlets. While this is already possible now, having documented URI formats means operators can be more confident in not stepping on other operators’ toes, and users more easily discover such features and schedule events around them.

A standard URI format also unblocks tools from implementing more efficient Dataset features based on semantics. For example, it would be possible for the scheduler to implement triggering on files matching a glob pattern, without needing to saturate the triggering events to account for each pattern listened on.

How are users affected by the change?

Users already using Datasets may need to review the values they currently use, and change any that conflict with the defined formats. This is expected to be a minor annoyance since the protocol (scheme) part is generally enough to distinguish between unrelated URI formats. The formats defined in this document are also quite straightfowrad and considered established enough that users using those services should already use mostly, if not entirely, compatible formats internally without a standard anyway.

Again, since this AIP is informational, users are still allowed to disregard the rules defined here, as long as they do not access any of the new features enabled by the standard. The new features should be strictly opt-in and only be enabled by explicit user requests (e.g. by setting a flag to True on the operator) for backward compatibility.

What defines this AIP as "done"?

The AIP should be considered done when a process for a service to add a new URI format is established. This is done by allowing services to record the format in the documentation, likely alongside with the corresponding operator and hook pages. Existing example DAGs on Datasets will be modified to use standard URI formats to demostrate the best practice, and additional examples will also be created to demostrate additional rules proposed in this document.


Specifications

NOTICE: This AIP will not be updated after acception to keep track of newly added rules used in the latest Airflow version. The Airflow documentation should describe the up-to-date URI formats, and be the canonical source instead.

A URI should be formatted as:

{protocol}://{netloc}{path}{query}
  • All parts are required unless explicitly specified.
  • The airflow protocol is reserved and should not used by any service.
  • Protocol values with a x- prefix are reserved for user-defined Dataset values. Airflow core and official providers should not define protocols in this namespace. Third-party libraries are also strongly discouraged from defining such protocols.
  • Unless specifically mentioned, the netloc part should be of form {host}[:{port}], where the port part, if missing, should be normalised into a value specified for each protocol. If a default port value is not specified, the port part is mandatory for the protocol.
  • Unless specifically mentioned, the netloc part MUST NOT contain auth information in the canonical URI form.
  • The path part starts from (including) the first / (forward slash). The final / in path is stripped, unless it is the only character present. This matches the behaviour of Python’s pathlib.
    • /foo/bar//foo/bar

    • //

  • Each path component (path separated by /) is percent-encoded if needed.
  • The canonical URI form should not contain a fragment (the part containing and after #).
  • Order of items in query (starting and including the ?) is insignificant, and should be normalised by ordering by the key’s Unicode code point value (ascending).

For example, each pair of URIs are equivalent semantically, with the latter being canonical.

s3://s3_default@some_bucket/order_data
s3://some_bucket/order_data

postgres://localhost/my_db/public/my_data
postgres://localhost:5432/my_db/public/my_data

service://token@location/path/to/data?y=2,3&x=1
service://location/path/to/data?x=1&y=2,3

Known Standard URI Formats

NOTICE: This AIP will not be updated after acception to keep track of newly added rules used in the latest Airflow version. The Airflow documentation should describe the up-to-date URI formats, and be the canonical source instead.

This section specifies how a Dataset URI should be written for various services.

Local file

  • Format: file://{host}{path} 
  • The host part can be empty, which is normalised into localhost.
  • The port part is not in the canonical form. It is ignored if present.

Amazon Simple Storage Service

  • Format: s3://{bucket_name}{path}
  • The netloc part denotes the bucket name, which is unique across an S3 partition. While it is technically possible for a bucket name to appear in multiple partitions (current three: standard, China, and US Gov), the use case of mixing partitions in an Airflow setup is considered too niche and therefore ignored. We may introduce additional protocol values in the future if the need arises.
  • The path part always starts with a slash (/), followed by the S3 object key name (which should never start with a slash). We include the leading slash in path to match the definition in RFC 3986, but the same format can be equivalently expressed as s3://{bucket_name}/{key_name}.

Google Cloud Storage

  • Format: gcs://{bucket_name}{path}
  • The gs protocol is an alias, normalised to gcs.
  • The netloc part denotes the bucket name, which is unique across all GCS.
  • The path part always starts with a slash (/), followed by the GCS object name (which should never start with a slash). We include the leading slash in path to match the definition in RFC 3986, but the same format can be equivalently expressed as gcs://{bucket_name}/{object_name}.

Postgres

  • Format: postgres://{host}[:{port}]/{database}/{schema}/{table}
  • The postgresql protocol is an alias, normalised to postgres.
  • Default port is 5432.
  • The schema value is required since it is not possible to infer the value statically without introspection.

MySQL

  • Format: mysql://{host}[:{port}]/{database}/{table}
  • Default port is 3306.
  • MariaDB also uses this form. The mariadb protocol is an alias, normalised to mysql.

Google BigQuery

  • Format: bigquery://{project_id}/{dataset}/{table}
  • The logical netloc part denotes the project ID, which is unique across all BigQuery projects according to Google documentation. Note that this is different from the “project name,” which is only unique to the user, or the “project number,” which although unique, is a serial number and not human-readable.

Trino

  • Format: trino://{host}[:{port}]/{catalog}/{schema}/{table}
  • Default port is 8080.

Implementation

Datasets are still created by the airflow.datasets.Dataset class. Core Airflow and each provider would register classes to handle protocols it understands, and functions to normalise URIs using them.

When a Dataset is created with a string that looks like a URI, it is parsed with with urlsplit, and Airflow checks if the protocol is registered, and call the corresponding function for turn the value into a canonical URI for comparison. The URI will continued to be stored as a raw string in both Dataset and DatasetModel.

The normalisation function takes a SplitResult and returns str

Quasi-pseudo implementation:

# The existing Dataset class.
class Dataset:
    def __init__(self, uri: str, extras):
        self.extra = extra  # Not related to this proposal.
        self.uri = _normalize_uri(uri)

def _normalize_uri(uri: str) -> str:
    try:
        parts = urllib.parse.urlsplit(uri)
    except ValueError:
      return uri  # Can't parse, treat as free-form text.
    try:
        normalizer = ProvidersManager().dataset_uri_handlers[parts.scheme]
    except KeyError:
        return uri  # No normalization logic, treat as free-form.
    try:
        return normalizer(parts)
    except Exception:
        log.exception("Failed to normalize %s", uri)
      return uri

9 Comments

  1. Yeah. I think it's generally cool and great to have it hashed out. No big comments except some nits.

  2. This is very useful from OpenLineage perspective - will have easy mapping between those URLs and OpenLineage ones.

  3. I very much like the idea, just a few comments. Besides the AIP I see at least some documentation should end-up in the codebase describing best practices and I'd propose to adjust examples following the best practices as well.

  4. Agree with the docs to be there (what Jens + Maciej). I think it's somethign we did not do well in the past that the docs were "afterthought" after feature implementation, which was bad IMHO.  I am always a big fan of treating documentation as part of implementation and simply not approving PRs that implement a feature if it is not documented well (or at least if there is no parallel PR with documentation ). I think when documenting we can also find a number of problems with the implementation. For example when you are not able to document something clearly and explain it well for the users, then it often means that the implementation is not very good.

    I believe the whole AIP here  is "documentation only" and it will only get "completed" when it will be properly documented in Airflow Docs (AIP is not enough) - for me that's completion criteria for this AIP. So I personally take documentation here as "given".


  5. Great idea to to standardize the URIs, like connections.
    +1 on documentation being a must requirement for this AIP completion. More details on how migration issues for existing dataset users will be surfaced will be helpful as well. 

  6. +1 for the standardization, I think it is a great idea.

    It is crucial to keep the Datasets URI conventions and format the same as the ones used in OpenLineage, so a data resource processed through Airflow can always be referred to with a single identifier.

  7. It's super exciting to see advances in this area and the standardization of URIs.

    > The URI formats should generally follow OpenLineage’s Dataset naming scheme. For file-like resources, this conveniently matches the URI format used by . Others may require slight adjustments so they look more familiar to Airflow users. Most significantly, we use slashes (/) to denote hierarchy in a database, similar to how Airflow Connection formats them, instead of dots (.) used by OpenLineage.

    > Certain sections in the URI should either normalised or dropped entirely, so that a resource can be canonically identified by one URI. Each section should be escaped (“quoted”) according to rules so a valid URI can be generated to represent the target resource.

    Why aren't we using the OpenLineage Dataset naming schema? It would be awesome if we could use a established standard, other than creating a new one. Could we have examples of where and when the OpenLineage dataset naming is not meeting our needs?

    While it's nice to be consistent with Connections, that feels like an internal part of Airflow. Ideally we'd aim to have URIs that can interoperate with other systems. Being compliant with Openlineage would be a big win.

    I understand that there may be situations that Airflow may want to include additional data to the URI (such as time ranges and partitions). As of now, these don't seem to be covered in the OpenLineage URI naming convention. That said, we can both influence the standard, and - if it takes long - Airflow can add extra information to the URIs - by doing an extension, and not a replacement.

  8. From the OpenLineage side, the crucial difference between proposed format and OpenLineage one is that naming cannot contain connection names like

    s3://s3_default@some_bucket/order_data
    service://token@location/path/to/data?y=2,3&x=1

    The same is why we don't use Airflow Connection URIs (smile)

    However, connection names could be included as part of custom facet, like

    Dataset(namespace="snowflake://xy123456.us-west-1.aws", name="database.schema.table", facets={"airflow_connection": AirflowConnectionFacet(name="snowflake_connection")}) 

    This would not be very good API for the users thought: for example, it does not hint how to format the name and namespace directly.
    If the proposal went to use OpenLineage directly, then it would be advisable to provide helper functions/classses as part of user facing API, for example

    SnowflakeDataset(conn=conn, account=account, database=database, schema=schema, table=table)

    could generate dataset shown above.

    On the other hand, OL could be used with proposed URI naming scheme given conversion methods between those formats provided by hooks, as those have all the necessary context information regarding dataset naming scheme:

    from airflow.dataset import Dataset
    from openlineage.client.run import Dataset as OpenLineageDataset
    
    def ol_to_airflow_dataset(self, ol_dataset: OpenLineageDataset) -> Dataset:    
        raise NotImplementedError()
    
    def airflow_to_ol_dataset(self, dataset: Dataset) -> OpenLineageDataset:
        raise NotImplementedError()