Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties


StateDraft
Discussion Thread


Vote Thread
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

Created

Version Released
Authors



Motivation

Measure performance changes between Airflow versions. Identify changes that have impact on performance, CPU, memory and disk resources utilization. Improve transparency of performance changes in release notes of Apache Airflow.

Deliver Airflow users a tool to measure performance of their own installations and compare it across different setups or versions.

Considerations

What change do you propose to make?

The framework aims to measure performance and resources footprint of Airflow components.

The framework does not and should not measure performance or resources utilization of third party code (including operators being part of provided packages).

The tests should be executed as part of build process (TODO: how to plug It's going to be decided later about plugging it into the build pipeline) of Pull Requests.

The tests results should be included as the part of PR documentation (if possible).

The proposed framework is based on the following concepts:

  • Instance - definition of Airflow installation setup.
  • Performance DAG - definition of DAG that is executed during the test.
  • Test suite - combination of Instance and Performance DAG. Test suite may include values for Instance or Performance DAG Jinja placeholders.

Instance is a concept that defines Airflow setup.

Instance is configured using JSON files. Schema of the file is specific to Instance type. Therefore the only required field in each Instance configuration file is instance_type. That field is used by the framework to pick dedicated test execution solution (e.g. Docker Compose, Cloud Composer, GKE, MWAA, etc.).

The remaining fields of the instance configuration may include information such as:

  • number of schedulers
  • number of CPUs in schedulers
  • worker CPU or memory (for Celery workers)
  • machine types

Instance configuration file can include Jinja style placeholders that are populated in Test suite.

Performance DAG

It is a special DAG file that allows to create a simple test scenario by specifying edge conditions:

  • PERF_DAG_FILES_COUNT: number of copies of elastic DAG file that should be uploaded to Instance. Every copy will use the same configuration for DAGs (except for dag_id prefix). Must be a positive int (default: “1”).
  • PERF_DAG_PREFIX: string that will be used as a prefix for filename of every DAG file copy and as a prefix of dag_id for every DAG created (default: “perf_scheduler”).
  • PERF_DAGS_COUNT: number of DAGs that should be created from every copy of DAG file. Must be a positive int (required).
  • PERF_TASKS_COUNT: number of tasks for every DAG. Must be a positive int (required).
  • PERF_START_AGO: a string identifying a duration (eg. 2h13m) that will be subtracted from current time to determine start_date of DAGs (default: “1h”).
  • PERF_START_DATE: a date string in “%Y-%m-%d %H:%M:%S.%f” format identifying the start_date of the DAGs. If not provided, it will be calculated based on the value of PERF_START_AGO and added to the configuration.
  • PERF_SCHEDULE_INTERVAL: schedule_interval for every DAG. Can be either “@once” or an expression following the same format as for PERF_START_AGO (default: “@once”)
  • PERF_SHAPE: a string which determines structure of every DAG. Possible values: “no_structure”, “linear”, “binary_tree”, “star”, “grid” (required).
  • PERF_SLEEP_TIME: a float non-negative value that specifies sleep time in seconds for every task (default: “0”).
  • PERF_OPERATOR_TYPE: a string specifying type of operator to use: “bash” for BashOperator, “python” for PythonOperator or “big_query_insert_job” for BigQueryInsertJobOperator (default: “bash”)
  • PERF_MAX_RUNS: number of DagRuns that should be created for every DAG. This will be achieved by setting DAGs' end_date according to start_date and schedule_interval. The resulting end_date cannot occur in the future. Must be specified if PERF_SCHEDULE_INTERVAL is provided as a time expression and cannot be specified if it is not. Must be a positive int (default: not specified).
  • PERF_START_PAUSED: boolean value determining whether DAGs should be initially paused.
  • PERF_TASKS_TRIGGER_RULE: value defining the rule by which dependencies are applied for the tasks to get triggered. Options are defined in the class airflow.utils.trigger_rule.TriggerRule.
  • PERF_OPERATOR_EXTRA_KWARGS: json value defining extra kwargs for DAG tasks
  • PERF_DYNAMIC_TASKS_MAPPED: number of tasks to be started with Dynamic Tasks Mapping mechanism.
  • PERF_DEFERRED_TASKS: number of deferred tasks to be started.

Test suite

Test suite is represented by a configuration file represented with JSON or YAML file. It includes the following attributes:

  • instance - object defining the instance configuration used in the test run.
    • instance_specification_path - path to the instance definition file.
    • args - map of variables values replacing Jinja placeholders of the instance configuration file.
  • performance_dag - object defining performance DAG running in the test run.
    • performance_dag_specification_path - path to the Performance DAG specification file.
    • dag_file_path - alternatively provide path to DAG file (Python code).
    • args - map of variables values replacing Jinja placeholders of the performance DAG configuration file.
  • attempts - number of attempts the file should be run. The reason for having multiple attempts is to take multiple measurements in order to increase results stability. Airflow instance by default is torn down and recreated between attempts.

Consideration: Test suite can be constructed with list of instances and list of performance_dags objects to run all combinations of those multiplied by attempts. Therefore having 3 instance objects and 4 performance_dag objects and attempts = 5 will produce 12 combinations and execute each of them 5 times resulting in 60 runs.

Running tests

The tests are executed for each Test suite.

Test runner script may include the following parameters:

  • test_suite-path - path of the Test suite definition file.
  • reuse-if-exists - the flag will tell the script to re-use Instance if exists or create new otherwise. If the flag has not been set and the Instance exist the script is expected to fail.
  • delete-if-exists - this flag will cause the script to delete an existing environment with the same name as the one specified in your specification json file and then recreate it.
  • keep-on-finish - this flag will leave the instances created for the test after the tests completes. By default instances created during the tests are deleted.
  • output-path - name of folder to store the output results to.
  • dry-run - test the configuration

Test run is a state machine that walks through the following steps:

  • prepare the instance (either by reusing or deleting an existing one)
  • generate and upload copies of Performance dag
  • wait for all expected Performance DAGs to be parsed by the scheduler
  • unpause the Performance DAGs
  • wait for all the expected Performance Dag Runs to finish (with either a success or failure)
  • write the performance metrics in a csv format to the results table
  • delete the environment (if --delete-upon-finish flag was set).

Creating Airflow instance is delegated to dedicated class handling given instance_type. Initially the following Instance handlers will implemented:

  • kubernetes - creates Airflow instance in Kubernetes cluster.

  • gke - creates Airflow instance in Google Kubernetes Engine.
  • composer - creates Airflow instance with Google Cloud Composer.
  • docker - creates Airflow instance with Docker Compose.
    Additional Instance providers can be included in the test framework and mapped to corresponding instance_type value. The list above comes with a package from Cloud Composer team initial code contribution.

Results format

Test results store the following information in a folder created for the test run. The folder name is provided as an argument to the test runner script. It could be considered to name the folder after the Test suite name and date but the run can include additional parameters populating Jinja variables.

The resulting folder includes the following files:

  • Instance configuration file with resolved Jinja variables values.
  • Performance DAG configuration file with resolved Jinja variables values.
  • Test suite arguments - full list of arguments used to run the suite including default values applied.
  • Test result metrics CSV file.

Tests results metrics CSV file has following columns:

  • scheduler_memory_average - average utilization of Scheduler memory
  • scheduler_memory_max - max utilization of Scheduler memory
  • scheduler_cpu_average - average utilization of Scheduler cpu
  • scheduler_cpu_max - max utilization of Scheduler cpu
  • worker_memory_average - average utilization of Worker memory
  • worker_memory_max - max utilization of Worker memory
  • trigerrer_memory_average - average utilization of Triggerer memory
  • trigerrer_memory_max - max utilization of Triggerer memory
  • trigerrer_cpu_average - average utilization of Triggerer cpu
  • trigerrer_cpu_max - max utilization of Triggerer cpu
  • database_cpu_average - average utilization of metadata database cpu
  • database_cpu_max - max utilization of metadata database cpu
  • database_qps - average number of queries per second to metadata database
  • database_qps_max - max number of queries per second to metadata database
  • test_start_date - earliest start_date of any Dag Runs created as part of the test
  • test_end_date - latest end_date of any Dag Runs created as part of the test
  • test_duration - the difference between test_end_date and test_start_date in seconds
  • dag_run_total_count - total amount of test Dag Runs
  • dag_run_success_count - amount of test Dag Runs that finished with a success
  • dag_run_failed_count - amount of test Dag Runs that finished with a failure
  • dag_run_average_duration - average duration of test Dag Runs, where duration is calculated as difference between Dag Run's end_date and start_date.
  • dag_run_min_duration - minimal duration of any of test Dag Runs
  • dag_run_max_duration - maximal duration of any of test Dag Runs
  • task_instance_total_count - total amount of Task Instances belonging to test Dag Runs
  • task_instance_average_duration - average duration of test Task Instances
  • task_instance_min_duration - minimal duration of any of test Task Instances
  • task_instance_max_duration - minimal duration of any of test Task Instances
  • task_delay_average_duration - minimal duration of delay calculated as difference between execution time and start time
  • task_delay_max_duration - minimal duration of delay calculated as difference between execution time and start time

What problem does it solve?

  1. It makes performance changes visible to Airflow users.
  2. It also helps identifying changes that have unacceptably negative impact on Airflow performance or resources utilization.
  3. It helps the users to adopt to new releases requirements by adding more resources to their Airflow configurations if necessary which has positive impact on new versions adoptions.
  4. Lastly it sends the message that Airflow community takes performance seriously and validates the solution against important metrics.

Why is it needed?

Currently there are no performance tests running for Apache Airflow.

There is no visibility of performance changes and resources utilization (especially when it grows).

Adoption of new versions is exposed to unexpected failures and has negative impact on stability of Airflow.

The tests can be used both during Airflow builds but also by the users who can individually test performance with their own setup.


Are there any downsides to this change?

Tests runs take additional resources and time which is require to setup Airflow instance.

Performance measurement is not perfect and is exposed to transient issues in the infrastructure. Therefore we cannot expect perfectly aligned results. Yet it's worth to have any results to track the deviation from baseline based on run history.

The tests don't answer the question how scalable Airflow is (how many task can it run in parallel or how many DAGs can be tracked by scheduler) although the framework can be adopted to such a purpose in future.

Which users are affected by the change?

Any user who is conscious of the cost, stability and performance of orchestration solution based on Apache Airflow running in their organisation.

How are users affected by the change? (e.g. DB upgrade required?)

The users are provided with additional information in the release notes that may be meaningful to their setup and new versions adoption.

Other considerations?


What defines this AIP as "done"?

This is done once the first release of Apache Airflow includes performance measurement results.