Status

Discussion thread
Vote thread
JIRA
Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We want to improve hive-integration out-of-box experience.

  1. Flink-Hive integration is important to many many Flink and Hive users in two dimensions:
    1. For Flink metadata: HiveCatalog is the only persistent catalog to manage Flink tables. With Flink 1.10 supporting more DDL, the persistent catalog would be playing even more critical role in users' workflow
    2. For Flink data: Hive data connector (source/sink) helps both Flink and Hive users to unlock new use cases in streaming, near-realtime/realtime data warehouse, backfill, etc.
  2. Currently users have to go thru a *really* tedious process to get started, because it requires lots of extra jars that are absent in Flink's lean distribution. We've had so many users from public mailing list, private email, DingTalk groups who got frustrated on spending lots of time figuring out the jars themselves. They would rather have a more "right out of box" quickstart experience, and play with the catalog and source/sink without hassle.
  3. At present, we have too many dependent versions of hive, and the dependence of each version may be different. Users will be confused, and it is difficult for us to provide so many versions of pre-bundled or distribution.

Spark and Presto already built-in hive dependencies. The startup of users is very simple.

We have discussed in [1].

We have documented the dependencies detailed information[2]. But still has some inconvenient:

  • Too many versions, users need to pick one version from 8 versions.
  • Too many versions, It's not friendly to our developers either, because there's a problem/exception, we need to look at eight different versions of hive client code, which are often various.
  • Too many jars, for example, need to download 4+ jars for Hive 1.x.
  • Version in Yaml/HiveCatalog needs to be consistent with the dependencies version. There are three places: version in metastore, version in dependencies, version in Yaml/HiveCatalog, users are easy to make mistakes.

Public Interfaces

A hive integration startup should just need do:

  • Providing Hadoop classes. [3]
  • Downloading one hive pre-bundled jar from flink-web. (With few hive pre-bundled versions)
  • Providing hive conf directory path. (User should not provided hive version things)

Users can enjoy their Flink-Hive trip.

Proposed Change

We provide one major pre-bundled jar to cover all supported Hive versions.

And provide two minor pre-bundled jar to cover new features.

Major version (Cover all supported versions):

Provide hive-1.2.2 pre-bundled dependencies for hive metastore version: 1.0.0 - 3.1.2

  • flink-shaded-hive-uber-1.2.2.jars
    • 1.0.0
    • 1.0.1
    • 1.1.0
    • 1.1.1
    • 1.2.0
    • 1.2.1
    • 1.2.2
    • 2.0.0
    • 2.0.1
    • 2.1.0
    • 2.1.1
    • 2.2.0
    • 2.3.1
    • 2.3.2
    • 2.3.3
    • 2.3.4
    • 2.3.5
    • 2.3.6
    • 3.0.0
    • 3.1.0
    • 3.1.1
    • 3.1.2

Minor version (+primary key):

Provide hive-2.3.6 pre-bundled dependencies for hive metastore version: 2.3.4 - 2.3.6

  • flink-shaded-hive-uber-2.3.6.jar
    • 2.3.4
    • 2.3.5
    • 2.3.6

Minor version: (+not null, +unique)

Provide hive-3.1.2 pre-bundled dependencies for hive metastore version: 3.0.0 - 3.1.2

  • flink-shaded-hive-uber-3.1.2.jar
    • 3.0.0
    • 3.1.0
    • 3.1.1
    • 3.1.2

Because flink-shaded can not contain flink dependencies/classes, we should pack flink-connector-hive into flink/lib for better out-of-box experience.

We should just pack flink classes, without dependencies, dependencies should be in flink-shaded.

The flink-connector-hive should shade flink-orc/flink-orc-nohive/flink-parquet too for vectorization reading. 

Hide hiveVersion

In HiveCatalog and HiveModule, there is a field named “hiveVersion” that must passed by users.

Actually, it is not the true remote metastore version, it is the hive dependencies version. Users are hard to understand this field, and maybe pass a wrong one.

After we provide pre-bundled dependencies, we can obtain the hive dependencies version from “HiveVersionInfo.getVersion()”.

We should modify the document too.

Detail for unify version

New features

About primary key support in Hive 2.1.0, unique support and not null support in Hive 3.0.0.

Solution: 

  • We provide a new version of bundled jar to support.

Alter table statistics

In the version before 1.2.1, alter table stats are not effective. In 1.10, we can refuse to do this operation on the old version by detecting the hive version. But after unifying it into one version, we can't detect it like this. As a result, we may have invoked the HMS API but it doesn't work.

Solution: 

  • We need documents for this.
  • Second: We can check the stats after updating.

Alter column statistics

The date type statistics are supported from 1.2.0. If we pass this data to the old version of HMS, will it report an error? Can we judge whether it is due to the date statistics based on the error information.

Solution:

  • We need documents for this.
  • Will throw an exception due to missing DateColumnStatsData thrift class, we can improve the exception message.

Migration Plan and Compatibility

Flink 1.10 hive dependencies way should also work in new release.

Test plan

We should provide a Hive E2E Test.

We should test pre-bundled with multi metastore versions to cover major versions that we want to support.

We should port major tests and ITCases of hive-connector into E2E.

Rejected Alternatives

Flink distribution built-in Hive dependencies

Have separate Flink distributions with built-in Hive dependencies.

We have had much trouble in the past from "too deep too custom" integrations that everyone got out of the box, i.e., Hadoop. Flink has has such a broad spectrum of use cases, if we have custom build for every other framework in that spectrum, we'll be in trouble.

Export HIVE_CLASSPATH

  • The hive/lib has 200+ jars, but we only need hive-exec.jar or plus two or three jars, if so many jars are introduced, maybe will there be a big conflict.
  • And hive/lib is not available on every machine. We need to upload so many jars.
  • A separate classloader maybe hard to work too, our flink-connector-hive need hive jars, we may need to deal with flink-connector-hive jar spacial too.

Not better than pre-bundled way.

[1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-td35918.html

[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/#dependencies

[3]https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html#providing-hadoop-classes


Discussion in Google doc: https://docs.google.com/document/d/14C4933qfPXeHK34rERPCJ514MTrk_MC2beNUSgQO_Uo/edit?usp=sharing