Fork me on GitHub

Hudi Getting Started

Hudi是类似Carbondata的软数据层,可以支持streaming的写入、读取,同时也支持将metadata同步到hive,提供Spark、Hive、Presto的SQL查询

software version
Spark 2.4.0.cloudera1
Hadoop 2.6.0-cdh5.11.1
Hive 1.1.0-cdh5.11.1
Hudi 0.4.7

编译

可以查看官方文档的Quickstart进行编译:

Quickstart

Github clone 最新的tag

  • 支持Java 8+
  • 支持Hive 1+(Hive 2+ 据说有不少问题,可以通过Github Issue查询)
  • 支持Spark2.x+
  • 支持Apache、CDH Hadoop

编译可选项

1
2
3
cd incubator-hudi-hoodie-0.4.7

mvn clean install -DskipITs -DskipTests -Dhadoop.version=2.6.0-cdh5.11.1 -Dhive.version=1.1.0-cdh5.11.1

参数 -DskipITs用来跳过integration test,test中含有docker test,没有docker环境无法编译通过

-DskipITs相关issue

  • 编译成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Hoodie 0.4.7:
[INFO]
[INFO] Hoodie ............................................. SUCCESS [ 2.894 s]
[INFO] hoodie-common ...................................... SUCCESS [ 20.807 s]
[INFO] hoodie-hadoop-mr ................................... SUCCESS [ 2.372 s]
[INFO] hoodie-hive ........................................ SUCCESS [ 1.500 s]
[INFO] hoodie-timeline-service ............................ SUCCESS [ 15.561 s]
[INFO] hoodie-client ...................................... SUCCESS [ 5.584 s]
[INFO] hoodie-spark ....................................... SUCCESS [ 36.749 s]
[INFO] hoodie-utilities ................................... SUCCESS [ 35.936 s]
[INFO] hoodie-cli ......................................... SUCCESS [ 14.495 s]
[INFO] hoodie-hadoop-mr-bundle ............................ SUCCESS [ 2.126 s]
[INFO] hoodie-hive-bundle ................................. SUCCESS [ 23.160 s]
[INFO] hoodie-spark-bundle ................................ SUCCESS [02:43 min]
[INFO] hoodie-presto-bundle ............................... SUCCESS [ 23.024 s]
[INFO] hoodie-hadoop-docker ............................... SUCCESS [ 0.616 s]
[INFO] hoodie-hadoop-base-docker .......................... SUCCESS [ 0.491 s]
[INFO] hoodie-hadoop-namenode-docker ...................... SUCCESS [ 0.079 s]
[INFO] hoodie-hadoop-datanode-docker ...................... SUCCESS [ 0.076 s]
[INFO] hoodie-hadoop-history-docker ....................... SUCCESS [ 0.072 s]
[INFO] hoodie-hadoop-hive-docker .......................... SUCCESS [ 0.763 s]
[INFO] hoodie-hadoop-sparkbase-docker ..................... SUCCESS [ 0.095 s]
[INFO] hoodie-hadoop-sparkmaster-docker ................... SUCCESS [ 0.089 s]
[INFO] hoodie-hadoop-sparkworker-docker ................... SUCCESS [ 0.087 s]
[INFO] hoodie-hadoop-sparkadhoc-docker .................... SUCCESS [ 0.085 s]
[INFO] hoodie-integ-test .................................. SUCCESS [ 1.102 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 05:51 min
[INFO] Finished at: 2019-08-09T15:46:24+08:00
[INFO] ------------------------------------------------------------------------

Write With Spark

思路:

  1. 从hive的一张表查询数据
  2. 转换成DF后写出到Hudi的新表
  3. 将这个新表sync到hive的metadata

Write Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.appName("Spark Hudi Write Test")
.getOrCreate();

Dataset<Row> hiveQuery = spark.sql(" select * from dw.xxx where part_date='2019-08-02'");

hiveQuery.write()
.format("com.uber.hoodie")
.option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), true)
.option(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), "jdbc:hive2://xxx:10000")
.option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), "dw")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), true)
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), "hoodie_wxm_test")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), "part_date")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "part_date")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "event_time")
.option(HoodieWriteConfig.TABLE_NAME, "hoodie_wxm_test")
.mode(SaveMode.Append)
.save("hudi data path");
Options 描述
HIVE_ASSUME_DATE_PARTITION_OPT_KEY 如果hive以日期分区,则日期的format格式,默认:yyyy/mm/dd
HIVE_URL_OPT_KEY hive metastore url
HIVE_DATABASE_OPT_KEY sync 到hive的库名
HIVE_SYNC_ENABLED_OPT_KEY 是否将hudi表的元数据sync到hive
HIVE_TABLE_OPT_KEY sync 到hive 的表名
HIVE_PARTITION_FIELDS_OPT_KEY sync 到hive表的分区键从哪个列中提取
RECORDKEY_FIELD_OPT_KEY hudi 中recordKey从哪个列中提取
PARTITIONPATH_FIELD_OPT_KEY hudi 中分区键从哪个列中提取
PRECOMBINE_FIELD_OPT_KEY hudi 中预合并从哪个列中提取
HoodieWriteConfig.TABLE_NAME hudi 的表名

更多Options请参考Configurations

Submit Job

1
2
3
4
5
// 需要将 hoodie-hadoop-mr-0.4.7.jar、hoodie-spark-0.4.7.jar、hoodie-hive-0.4.7.jar、hoodie-common-0.4.7.jar 引入到spark shell 提交依赖中

spark2-submit --jars basePath/hoodie-hadoop-mr-0.4.7.jar,basePath/hoodie-spark-0.4.7.jar,basePath/hoodie-hive-0.4.7.jar,basePath/hoodie-common-0.4.7.jar \
--class com.wxmimperio.spark.hudi.HudiWrite --master yarn --deploy-mode cluster \
--driver-memory 4g --executor-memory 2g --executor-cores 1 xxxx.jar

自定义日期分区规则

通过HIVE_ASSUME_DATE_PARTITION_OPT_KEY参数设定后,默认的Date分区是yyyy/mm/dd,而我的业务场景下是yyyy-mm-dd,需要重写(此部分可以根据实际业务分区场景自行实现PartitionValueExtractor接口)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DayPartitionValueExtractor implements PartitionValueExtractor {

private transient DateTimeFormatter dtfOut;

public DayPartitionValueExtractor() {
this.dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
}

private DateTimeFormatter getDtfOut() {
if (dtfOut == null) {
dtfOut = DateTimeFormat.forPattern("yyyy-MM-dd");
}
return dtfOut;
}

@Override
public List<String> extractPartitionValuesInPath(String partitionPath) {
// partition path is expected to be in this format yyyy/mm/dd
String[] splits = partitionPath.split("-");
if (splits.length != 3) {
throw new IllegalArgumentException(
"Partition path " + partitionPath + " is not in the form yyyy-mm-dd ");
}
// Get the partition part and remove the / as well at the end
int year = Integer.parseInt(splits[0]);
int mm = Integer.parseInt(splits[1]);
int dd = Integer.parseInt(splits[2]);
DateTime dateTime = new DateTime(year, mm, dd, 0, 0);
return Lists.newArrayList(getDtfOut().print(dateTime));
}
}
  • 添加配置
1
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),"com.wxmimperio.spark.hudi.DayPartitionValueExtractor")

Job运行

通过查看Spark Job logs,可以发现:

1.默认Hudi支持parquet格式输出,关于ORC File的支持有相关Issue: HUDI-57 support orc file

2.会先生成parquet文件,然后用hive.cli相关命令建表,最后sync数据

sync到hive后,表结构会多几个Hudi的字段:

1
2
3
4
5
`_hoodie_commit_time` string, 
`_hoodie_commit_seqno` string,
`_hoodie_record_key` string,
`_hoodie_partition_path` string,
`_hoodie_file_name` string,

3.分区的File Path与正常Hive的不同

正常情况下Hive的分区是key=value形式,

1
2
3
insert overwrite table new_table partition(part_date='2019-08-02')  select xxx from old_tabl;

// hdfs file path: /wxm/hudi/data/hoodie_test/part_date=2019-08-02

Hudi默认情况是: hudi data path/2019-08-02,同时如果是第一次建表,当前分区不会自动添加,第二次运行会自动添加分区并load data,不清楚这是我测试有问题还是Hudi机制就是这样

已经提交了相关Issue:Synchronizing to hive partition is incorrect

4.java.lang.ClassNotFoundException: com.uber.hoodie.hadoop.HoodieInputFormat

在我前几次执行Job的时候,通过logs得知parquet文件已经生成,但是在sync hive的时候会报com.uber.hoodie.hadoop.HoodieInputFormat类不存在

这个类在hoodie-hadoop-mr-0.4.7.jar中,可是spark提交jar的时候,我已经将相关jar添加到了依赖中还是报这个错

问题在于Hudi建hive表指定的InputFormat是com.uber.hoodie.hadoop.HoodieInputFormat

1
INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: com.uber.hoodie.hive.HoodieHiveSyncException: Failed in executing SQL CREATE EXTERNAL TABLE  IF NOT EXISTS dw.wxm_hoodie_test( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `age` string, `key` string, `name` string, `timestamp` string) PARTITIONED BY (part string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'com.uber.hoodie.hadoop.HoodieInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION '/wxm/hudi/data'

需要将hoodie-hadoop-mr-0.4.7.jar、hoodie-common-0.4.7.jar两个包放到$HIVE_HOME/lib下,重启hive cluster

已经提交了相关Issue:java.lang.ClassNotFoundException: com.uber.hoodie.hadoop.HoodieInputFormat

Read With Spark

思路:

  1. 直接通过spark用sql查询sync到hive的表(不做介绍,和读hive一样)

  2. 通过client读取parquet文件

Read Code

1
2
3
4
5
6
7
8
9
10
11
12
13
SparkSession spark = SparkSession
.builder()
.appName("Spark Hudi Read Test")
.getOrCreate();

spark.read()
.format("com.uber.hoodie")
.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), "20190809120720")
.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY(), "20190809120729")
.load("hudi file path")
.limit(10)
.show();
Options 描述
BEGIN_INSTANTTIME_OPT_KEY 过滤数据起始时间戳,不包含
END_INSTANTTIME_OPT_KEY 过滤数据终止时间戳

更多Options请参考Configurations

Query With Hive Cli

通过hive cli查询需要添加两个依赖

1
2
add jar file:///basePath/hoodie-hive-bundle-0.4.7.jar;
add jar file:///basePath/hoodie-hadoop-mr-bundle-0.4.7.jar;

Query With Presto

需要添加一个依赖到presto hive插件中

1
2
3
cp basePath/packaging/hoodie-presto-bundle/target/hoodie-presto-bundle-0.4.7.jar <presto_install>/plugin/hive-hadoop2/

// then restart presto cluster

转载请注明出处:https://github.com/imperio-wxm


Thank you for your support.