Status

StateDraft
Dicsussion Thread[DISCUSS] AIP-14 Create composable operators using common interfaces
JIRA

AIRFLOW-2651 - Getting issue details... STATUS

Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())



Motivation

This is an idea that I've used in the past for my own work, seen in this PR (https://github.com/apache/airflow/pull/3526), and now see again in Ash's user survey (https://ash.berlintaylor.com/writings/2019/02/airflow-user-survey-2019/):

More Operators: 11 comments

Requests for more operators/sensors. One good request was to have “composable” operators to explosion of XtoY operators. Ed: this would be nice! If someone wants to start an Airflow Improvement Proposal for this that would be ace.

So, I created this AIP because I think it would be great to have this in Airflow.

Considerations

To prevent a ton of A-to-B operators, we could create hooks which are accessible via a common interface. This allows for interchangeable operators, e.g. CopyOperator which takes 2 of such hooks to copy from system A using hook A to system B using hook B. PR https://github.com/apache/airflow/pull/3526 already created a collection of filesystem hooks using Python's file object API. This is explained in more detail in the corresponding JIRA ticket:  AIRFLOW-2651 - Getting issue details... STATUS .

The result is a few filesystem hooks:

  • ftp
  • hdfs
  • local
  • s3
  • sftp

And a few operators which accept any hook adhering to the file object interface, e.g. (not included in PR):

  • CopyFileOperator
  • DeleteFileOperator
  • CopyTreeOperator

This is specific to filesystems, but I think this idea can be extended to a.o. databases using the Python DB API.

Since the PR above went stale for reasons, before somebody puts a lot of time & effort into this, I think it would be wise to discuss if this is desirable. And if so, if the work from PR 3526 can be used as a basis or if large changes are required.

4 Comments

  1. As author of PR3526, I (of course) am a fan of this idea. I have also been working on improving the code involved in the PR, but intend to release it as a package separate from Airflow (similar to the idea behind AIP-8) to avoid the issues we encountered in the PR.

  2. Hi,

    I believe this is a great principle to developing operators. Take my use case as an example, I wrote a FileTransferOperator which can transmit files from A to B, where A, B may be local file system, S3, (S)FTP or any other cloud. By having such protocol, its much easier to extend the functionalities when adding a new source/destination.  Since we have AIP-8 Split Providers into Separate Packages for Airflow 2.0, those operators need have its own repo. Because operator is more for business logic. Such abstraction or composition maybe good for your case, but not good enough for every cases.

    IMO, this proposal could be a principle and be implemented in operator development. 

  3. Thanks for putting this together. We have in fact implemented this for our own use internally already, and it would be great to see something like this in Airflow! I'd like to share some of the learnings from our own work on this.

    First, we have deliberately designed our interface to be closer to key-value based storage like S3 and GCS, because that is our primary use-case. One consequence is that we use proxy-objects for individual storage-objects. That is because the underlying APIs are modeled like this, and often obtaining a reference to an object requires an additional HTTP(S) request. So our interface looks something like this when used:

    conn = hook.get_conn()
    f = conn.get('some/path/or/key')
    f.download('/tmp/local/path')

    There are also some features that are only supported by these types of storage, like content-type and content-disposition metadata. It would be great to accommodate this in these interfaces as well. Our implementations also all offer direct access to the underlying API, so that any code that wants to use implementation specific features can always do so.

    We've also taken care to make our hooks stateless, and keep all connection state in the connection objects. In the referenced PR, all the hooks internally cache the connection returned by get_conn. This is problematic when you want to write operators that use multi-threading for more efficient I/O (e.g. when operating on many small files). In our implementation the hook literally only has a get_conn method and nothing else - all the logic is in the connection object.

    I hope this us of some use to this discussion.

  4. Building on the work initially proposed in AIRFLOW-2651, we have developed a separate package for composable file system hooks and operators. In contrast to Björns work, our interface was designed to mimic the file system API used by Python for opening/copying/deleting files.

    See our repository here for more details: https://github.com/jrderuiter/airflow-fs.