Hudi是类似Carbondata的软数据层
,可以支持streaming的写入、读取,同时也支持将metadata同步到hive,提供Spark、Hive、Presto的SQL查询
software | version |
---|---|
Spark | 2.4.0.cloudera1 |
Hadoop | 2.6.0-cdh5.11.1 |
Hive | 1.1.0-cdh5.11.1 |
Hudi | 0.4.7 |
编译
可以查看官方文档的Quickstart进行编译:
Github clone 最新的tag
- 支持Java 8+
- 支持Hive 1+(Hive 2+ 据说有不少问题,可以通过Github Issue查询)
- 支持Spark2.x+
- 支持Apache、CDH Hadoop
编译可选项
1 | cd incubator-hudi-hoodie-0.4.7 |
参数 -DskipITs用来跳过integration test,test中含有docker test,没有docker环境无法编译通过
- 编译成功
1 | [INFO] ------------------------------------------------------------------------ |
Write With Spark
思路:
- 从hive的一张表查询数据
- 转换成DF后写出到Hudi的新表
- 将这个新表sync到hive的metadata
Write Code
1 | String warehouseLocation = new File("spark-warehouse").getAbsolutePath(); |
Options | 描述 |
---|---|
HIVE_ASSUME_DATE_PARTITION_OPT_KEY | 如果hive以日期分区,则日期的format格式,默认:yyyy/mm/dd |
HIVE_URL_OPT_KEY | hive metastore url |
HIVE_DATABASE_OPT_KEY | sync 到hive的库名 |
HIVE_SYNC_ENABLED_OPT_KEY | 是否将hudi表的元数据sync到hive |
HIVE_TABLE_OPT_KEY | sync 到hive 的表名 |
HIVE_PARTITION_FIELDS_OPT_KEY | sync 到hive表的分区键从哪个列中提取 |
RECORDKEY_FIELD_OPT_KEY | hudi 中recordKey从哪个列中提取 |
PARTITIONPATH_FIELD_OPT_KEY | hudi 中分区键从哪个列中提取 |
PRECOMBINE_FIELD_OPT_KEY | hudi 中预合并从哪个列中提取 |
HoodieWriteConfig.TABLE_NAME | hudi 的表名 |
更多Options请参考Configurations
Submit Job
1 | // 需要将 hoodie-hadoop-mr-0.4.7.jar、hoodie-spark-0.4.7.jar、hoodie-hive-0.4.7.jar、hoodie-common-0.4.7.jar 引入到spark shell 提交依赖中 |
自定义日期分区规则
通过HIVE_ASSUME_DATE_PARTITION_OPT_KEY参数设定后,默认的Date分区是yyyy/mm/dd,而我的业务场景下是yyyy-mm-dd,需要重写(此部分可以根据实际业务分区场景自行实现
PartitionValueExtractor
接口)
1 | public class DayPartitionValueExtractor implements PartitionValueExtractor { |
- 添加配置
1 | .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),"com.wxmimperio.spark.hudi.DayPartitionValueExtractor") |
Job运行
通过查看Spark Job logs,可以发现:
1.默认Hudi支持parquet
格式输出,关于ORC File的支持有相关Issue: HUDI-57 support orc file
2.会先生成parquet文件,然后用hive.cli相关命令建表,最后sync数据
sync到hive后,表结构会多几个Hudi的字段:
1 | `_hoodie_commit_time` string, |
3.分区的File Path与正常Hive的不同
正常情况下Hive的分区是key=value形式,
1 | insert overwrite table new_table partition(part_date='2019-08-02') select xxx from old_tabl; |
Hudi默认情况是: hudi data path/2019-08-02
,同时如果是第一次建表,当前分区不会自动添加,第二次运行会自动添加分区并load data,不清楚这是我测试有问题还是Hudi机制就是这样
已经提交了相关Issue:Synchronizing to hive partition is incorrect
4.java.lang.ClassNotFoundException: com.uber.hoodie.hadoop.HoodieInputFormat
在我前几次执行Job的时候,通过logs得知parquet文件已经生成,但是在sync hive的时候会报com.uber.hoodie.hadoop.HoodieInputFormat
类不存在
这个类在hoodie-hadoop-mr-0.4.7.jar中,可是spark提交jar的时候,我已经将相关jar添加到了依赖中还是报这个错
问题在于Hudi建hive表指定的InputFormat是com.uber.hoodie.hadoop.HoodieInputFormat
:
1 | INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: com.uber.hoodie.hive.HoodieHiveSyncException: Failed in executing SQL CREATE EXTERNAL TABLE IF NOT EXISTS dw.wxm_hoodie_test( `_hoodie_commit_time` string, `_hoodie_commit_seqno` string, `_hoodie_record_key` string, `_hoodie_partition_path` string, `_hoodie_file_name` string, `age` string, `key` string, `name` string, `timestamp` string) PARTITIONED BY (part string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'com.uber.hoodie.hadoop.HoodieInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION '/wxm/hudi/data' |
需要将hoodie-hadoop-mr-0.4.7.jar、hoodie-common-0.4.7.jar两个包放到$HIVE_HOME/lib
下,重启hive cluster
已经提交了相关Issue:java.lang.ClassNotFoundException: com.uber.hoodie.hadoop.HoodieInputFormat
Read With Spark
思路:
直接通过spark用sql查询sync到hive的表(不做介绍,和读hive一样)
通过client读取parquet文件
Read Code
1 | SparkSession spark = SparkSession |
Options | 描述 |
---|---|
BEGIN_INSTANTTIME_OPT_KEY | 过滤数据起始时间戳,不包含 |
END_INSTANTTIME_OPT_KEY | 过滤数据终止时间戳 |
更多Options请参考Configurations
Query With Hive Cli
通过hive cli查询需要添加两个依赖
1 | add jar file:///basePath/hoodie-hive-bundle-0.4.7.jar; |
Query With Presto
需要添加一个依赖到presto hive插件中
1 | cp basePath/packaging/hoodie-presto-bundle/target/hoodie-presto-bundle-0.4.7.jar <presto_install>/plugin/hive-hadoop2/ |
转载请注明出处:https://github.com/imperio-wxm