Release History
ID | Date | Author | Comment |
---|---|---|---|
1 | 2020-11 | Zhichao Zhang | Tuning guide for 4.0.0-alpha, |
2 | 2022-5 | Shaofeng Shi | Update for 4.0.1 |
Kylin 4 is a major architecture upgrade version, as the picture shows below, both the cube building engine and query engine use spark as calculation engine, and cube data is stored in parquet files instead of HBase.
The build and query performance tuning is very different from Kylin 3 tuning(http://kylin.apache.org/docs/howto/howto_optimize_build.html). This article will introduce how to improve cube build and query performance in Kylin 4, including some tuning ways which will be made by Kylin 4 automatically.
In Kylin 4, there are two steps in the cube building job:
In the second step, all calculations are operations with a relatively heavy load, so except using "Joint" or "Hierarchy" on dimensions to reduce the number of cuboids (refers to the section 'Reduce combinations' in http://kylin.apache.org/docs/tutorial/cube_build_performance.html ), it’s also very important to use proper spark resources and configurations to build cube data. There are 3 key points in this section to improve cube building performance.
Assume your Spark application runs on YARN, the relevant configurations are as below:
Key | Description |
---|---|
spark.executor.instances | The number of executors for spark application. |
spark.executor.cores | The number of cores to use on each executor. The value of 'spark.executor.instances' * 'spark.executor.cores' is the maximum parallelism when running the cube building job. |
spark.executor.memory | Amount of memory to use per executor process. Generally speaking, the ratio of core to memory is 1:4, for example, if you set 'spark.executor.cores' to 4, and then set 'spark.executor.memory' to 16G. |
spark.executor.memoryOverhead | The amount of off-heap memory to be allocated per executor. This is the memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). |
spark.sql.shuffle.partitions | Configures the number of partitions to use when shuffling data for joins or aggregations, the default value is 200. A larger value requires more CPU resources, while a smaller value requires more memory resources. |
spark.sql.files.maxPartitionBytes | The maximum number of bytes to pack into a single partition when reading files, the default value is 128M. If there are many small files in source tables (Hive source), the spark will automatically pack a number of small files into a single partition to avoid too many small tasks. |
You can set these configurations with a 'kylin.engine.spark-conf.' prefix in 'kylin.properties' file; for example: 'kylin.engine.spark-conf.spark.executor.instances'. Then Kylin 4 will use them to allocate spark resources for the cube building job.
Similar to the tuning in spark + parquet, you may find out some problems through the Spark UI and change some configurations to improve performance, there are many articles describing how to improve the performance in spark + parquet, such as http://spark.apache.org/docs/2.4.6/sql-performance-tuning.html and http://spark.apache.org/docs/2.4.6/tuning.html .
If you don't know how to set these configurations properly, Kylin 4 will use the below allocation rules to automatically set spark resources and configurations, all spark resources and configurations are set according to the maximum file size of source files and whether the cube has accurate count distinct measure, this is the reason why we need to detect how many source files which will be built in the first step. You can see these allocation rules in the class 'SparkConfHelper':
If ${the maximum file size} >= 100G and ${exist accurate count distinct}, then set 'spark.executor.memory' to 20G;
If ${the maximum file size} >= 100G or (${the maximum file size} >= 10G and ${exist accurate count distinct}), then set 'spark.executor.memory' to 16G;
If ${the maximum file size} >= 10G or (${the maximum file size} >= 1G and ${exist accurate count distinct}), then set 'spark.executor.memory' to 10G;
If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.memory' to 4G;
Otherwise set 'spark.executor.memory' to 1G.
If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.cores' to 5;
Otherwise set 'spark.executor.cores' to 1.
If ${the maximum file size} >= 100G and ${exist accurate count distinct}, then set 'spark.executor.memoryOverhead' to 6G, so in this case, the memory of per executor is 20G + 6G = 26G;
If ${the maximum file size} >= 100G or (${the maximum file size} >= 10G and ${exist accurate count distinct}), then set 'spark.executor.memoryOverhead' to 4G;
If ${the maximum file size} >= 10G or (${the maximum file size} >= 1G and ${exist accurate count distinct}), then set 'spark.executor.memoryOverhead' to 2G;
If ${the maximum file size} >= 1G or ${exist accurate count distinct}, then set 'spark.executor.memoryOverhead' to 1G;
Otherwise set 'spark.executor.memoryOverhead' to 512M.
The steps to set 'spark.executor.instances' are as follows:
After applying all rules above, you can find some log messages in 'kylin.log' file as below:
Based on the values of automatically adjusted configurations by Kylin, if there are still some cube building performance issues, you can appropriately change the values of these configurations to have a try, for example:
The general adjustment strategy is to increase the value by 2 times. If the problem is solved, you can decrease it appropriately to avoid wasting resources. After increasing the memory per executor, if there is still a serious memory problem, you can consider adjusting 'spark.executor.cores' to 1, this adjustment can make a single task exclusive memory per executor and the execution efficiency is relatively low, but it can be done in this way to avoid build failure.
The general adjustment strategy is to increase the value by 2 times. If the problem is solved, you can decrease it appropriately to avoid wasting resources.
The duration time of this job is 3.0 min, but the sum duration time of stages is 17s + 2s = 19s, the stage 204 waited more than 2.0 min to be scheduled.
In this case, you need to increase the cores count of the spark application.
If the cube has accurate "count distinct" measures, Kylin 4.0 will build the global dictionary for these measure columns in the second step based on Spark for distributed encoding processing, which reduces the pressure on a single machine node, and can break the limit of the maximum integer of the global dictionary, please refer to the detail design article: https://cwiki.apache.org/confluence/display/KYLIN/Global+Dictionary+on+Spark . There is one configuration about tuning on global dictionary building:
kylin.dictionary.globalV2-threshold-bucket-size (default value is 500000)
Reducing the value of this configuration can reduce the amount of data in a single partition to build the global dictionary and speed up the dictionary building.
If there are some snapshot tables to be built, Kylin 4.0 will build them parallelly in the second step, because the default value of the configuration 'kylin.snapshot.parallel-build-enabled' is true, which will speed up the snapshot tables building.
On the other hand, you can reduce the value of configuration 'kylin.snapshot.shard-size-mb' (default value is 128MB) to increase the parallelism when building snapshot tables. According to the size of the source table, make sure the number of the building tasks is within 3 times the number of cores of the spark cube building application.
In Kylin 4.0, the query engine (called SparderContext) uses spark as the calculation engine too, it's a real distributed query engine, especially for complex queries, the performance will be better than Apache Calcite. However, there are still many key performance points that need to be optimized.
In addition to setting proper calculation resources mentioned above, it also includes reducing small or uneven files, setting proper partitions, and pruning parquet files as many as possible. Kylin 4.0 and Spark provide some optimization strategies to improve query performance.
Reading too many small files or a few too big files when querying will lead to low performance, in order to avoid this problem, Kylin 4.0 will repartition parquet files according to the following strategy to reduce small or uneven parquet files when building cube data as parquet files.
If the following conditions are met:
If meet the one of the conditions above, it will do repartition, the number of the partitions is calculated by this way:
${fileLengthRepartitionNum} = Math.ceil(${the parquet files size in MB} / ${kylin.storage.columnar.shard-size-mb})
${rowCountRepartitionNum} = Math.ceil(${the total row count of parquet files} / ${kylin.storage.columnar.shard-rowcount})
If this cuboid has accurate count distinct measure, use 'kylin.storage.columnar.shard-countdistinct-rowcount' instead of 'kylin.storage.columnar.shard-rowcount'.
The number of the partitions is :
Math.ceil(( ${fileLengthRepartitionNum} + ${ rowCountRepartitionNum } ) / 2)
Key | Default value | Description |
---|---|---|
kylin.storage.columnar.shard-size-mb | 128MB | The max size of each parquet file for shard by column, in MB. |
kylin.storage.columnar.shard-rowcount | 2500000 | Each parquet files should contain at most 2.5 million rows. |
kylin.storage.columnar.shard-countdistinct-rowcount | 1000000 | Since that Bitmap has a bigger size, so we can specify the max row count for cuboids containing Bitmap. By default, it contains at most 1.0 million rows. |
kylin.storage.columnar.repartition-threshold-size-mb | 128MB | The max size of each parquet file, in MB. |
You can use this command to find the repartition info messages in the kylin.log file after building cube data:
grep "Before repartition, cuboid" logs/kylin.log |
According to the log messages, you can find that the final number of partitions is too large, this will impact the building performance and query performance, after increasing the value of configuration 'kylin.storage.columnar.shard-rowcount' or 'kylin.storage.columnar.shard-countdistinct-rowcount' and rebuilding again, the log messages are shown below:
The final number of partitions was reduced a lot: 809 to 3, and the time of cube building was reduced a lot too: 58 mins to 24 mins:
And the query performance is improved too:
The query time from the cube which has a too large number of partitions is 1.7s, and the query engine scanned 58 files.
But the query time from a cube that has a proper number of partitions is 0.4s, and the query engine only scanned 4 files.
In Kylin 4.0, the directory structure of parquet file storage is as follows:
When querying, the query engine can filter out the segment-level directories through the date partition column, and filter out the cuboid-level directories through the hit cuboid, but at this time, if there are still many parquet files in the cuboid-level directories, you can use shard by column to further prune parquet files.
From Cube Designer → Advanced Setting → Rowkeys in Kylin UI, you can specify a shard by column when creating a cube:
After specifying a shard by column, it will repartition parquet files by this column when building cube data (If you do not specify, repartition is done with all columns).
When querying with this shard by column as a filter condition, the query engine will prune parquet files according to the value of shard by column, for example:
There are two parquet files in each of the cuboid 131084 directories of these two cubes: kylin_sales_cube_non_shardby and kylin_sales_cube_shardby;
Querying from cube 'kylin_sales_cube_non_shardby' which doesn't specify shard by column is scanning 2 files.
Querying from cube 'kylin_sales_cube_shardby' which specifies shard by column is only scanning 1 file.
Because it only supports specifying one shard by column for one cube currently, it's better to use a column that has high cardinality and often is used as a filter condition, such as a mandatory dimension column. If the specified shard by column is not a mandatory dimension, there are some cases where the cuboid cannot use this shard by column; for example, the specified shard by column is A, but the columns of one cuboid are B, C, D.
When you create a cube, you can specify the order of the dimension columns, and when saving cube data, the first of the dimension columns for each cuboid will be used to do the sort operation. The purpose is to filter out unwanted data as much as possible through the min-max index of the parquet file when querying with the sort by column.
From Cube Designer → Advanced Setting → Rowkeys in Kylin UI, you can drag the columns to adjust the order:
For example: if the cuboid includes these three columns: BUYER_ID, TRANS_ID, LEAF_CATEG_ID, then it will sort data in one partition by the BUYER_ID column when saving this cuboid data.
Notes: Currently Apache Spark 2.4.6 which Kylin 4.0 used only supports filtering out unwanted data through the min-max index of RowGroup in parquet files, which means that if there are some RowGroups in one parquet file, Spark will filter out unwanted data by the min-max index of RowGroup, but if one parquet file only includes one RowGroup, the filter doesn't take effect.
When there are many small files in some segments which had been built, you can set the configuration 'spark.sql.files.maxPartitionBytes' (default value is 128MB) to a proper value, which will let the spark engine pack some small files into a single partition and avoid to need too many small tasks, for example:
This query scanned 2 parquet files but it used one task to handle these two files:
On the other hand, if there are enough resources, you can reduce the value of configuration 'spark.sql.files.maxPartitionBytes' to increase the parallel tasks, but it also needs to reduce the value of configuration 'spark.hadoop.parquet.block.size' (default value is 128MB) when building cube data, because the smallest split unit of parquet files is RowGroup and configuration 'spark.hadoop.parquet.block.size' indicates the maximum size of one RowGroup for parquet.
Spark can directly operate the off-heap memory to reduce unnecessary memory overhead, as well as frequent GC, and improve processing performance.
Relevant configurations:
Key | Description |
---|---|
spark.memory.offHeap.enabled | Set to 'true', use off-heap memory for spark shuffle e.g. |
spark.memory.offHeap.size | indicates the size of off-heap memory. |
Currently, all queries share one Spark Session, which means that all of them share the same configurations, but each query has different scenarios and could be optimized by different configurations. Therefore, we plan to clone a thread-level SparkSession for each query to set different configurations, and then execute the query, such as configuration 'spark.sql.shuffle.partitions', set this configuration to different values according to the amount of data obtained by each query to achieve the optimal query performance.