基于Flink1.14 + Iceberg0.13构建实时数据湖实战
点击上方
蓝色字体
,选择“设为星标”
回复" 面试" 获取更多惊喜
八股文教给我,你们专心刷题和面试
Hi,我是王知无,一个大数据领域的原创作者。
放心关注我,获取更多行业的一手消息。
目录
-
Flink SQL Client配置Iceberg
-
Java/Scala pom.xml配置
-
Catalog
3.1 Hive Catalog
3.2 HDFS Catalog -
数据库和表相关DDL命令
4.1 创建数据库
4.2 创建表(不支持primary key等)
4.3 修改表
4.4 删除表 -
插入数据到表
5.1 insert into
5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition) -
查询数据
暂时还不支持通过Flink SQL读取Iceberg表的元数据,可以通过Java API读取
1. Flink SQL Client配置Iceberg
Flink集群需要使用Scala 2.12版本的
-
将Iceberg的依赖包下载放到Flink集群所有服务器的lib目录下,然后重启Flink
[root@flink1~]#wget-P/root/flink-1.14.3/libhttps://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
[root@flink1~]#
[root@flink1~]#scp/root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jarroot@flink2:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar100%23MB42.9MB/s00:00
[root@flink1~]#scp/root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jarroot@flink3:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar100%23MB35.4MB/s00:00
[root@flink1~]#
Iceberg默认支持Hadoop Catalog。如果需要使用Hive Catalog,需要将flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服务器的lib目录下,然后重启Flink
-
然后启动SQL Client就可以了
2. Java/Scala pom.xml配置
添加如下依赖
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>
3. Catalog
3.1 Hive Catalog
注意:测试的时候,从Hive中查询表数据,查询不到。但是从Trino查询可以查询到数据
使用Hive的metastore保存元数据,HDFS保存数据库表的数据
FlinkSQL>createcataloghive_catalogwith(
>'type'='iceberg',
>'catalog-type'='hive',
>'property-version'='1',
>'cache-enabled'='true',
>'uri'='thrift://hive1:9083',
>'client'='5',
>'warehouse'='hdfs://nnha/user/hive/warehouse',
>'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
[INFO]Executestatementsucceed.
FlinkSQL>
-
property-version: 为了向后兼容,以防property格式改变。当前设置为1即可
-
cache-enabled: 是否开启catalog缓存,默认开启
-
clients: 在hive metastore中,hive_catalog供客户端访问的连接池大小,默认是2
-
warehouse: 是Flink集群所在的HDFS路径, hive_catalog下的数据库表存放数据的位置
-
hive-conf-dir: hive集群的配置目录。只能是Flink集群的本地路径,从hive-site.xml解析出来的HDFS路径,是Flink集群所在HDFS路径
-
warehouse的优先级比hive-conf-dir的优先级高
3.2 HDFS Catalog
用HDFS保存元数据和数据库表的数据。warehouse是Flink集群所在的HDFS路径
FlinkSQL>createcataloghadoop_catalogwith(
>'type'='iceberg',
>'catalog-type'='hadoop',
>'property-version'='1',
>'cache-enabled'='true',
>'warehouse'='hdfs://nnha/user/iceberg/warehouse'
[INFO]Executestatementsucceed.
FlinkSQL>
通过配置conf/sql-cli-defaults.yaml实现永久catalog。但测试的时候并未生效
[root@flink1~]#cat/root/flink-1.14.3/conf/sql-cli-defaults.yaml
catalogs:
-name:hadoop_catalog
type:iceberg
catalog-type:hadoop
property-version:1
cache-enabled:true
warehouse:hdfs://nnha/user/iceberg/warehouse
[root@flink1~]#
[root@flink1~]#chown501:games/root/flink-1.14.3/conf/sql-cli-defaults.yaml
下面我们重点以Hadoop Catalog为例,进行测试讲解
4. 数据库和表相关DDL命令
4.1 创建数据库
Catalog下面默认都有一个default数据库
FlinkSQL>createdatabasehadoop_catalog.iceberg_db;
[INFO]Executestatementsucceed.
FlinkSQL>usehadoop_catalog.iceberg_db;
[INFO]Executestatementsucceed.
FlinkSQL>
-
会在HDFS目录上创建iceberg_db子目录
-
如果删除数据库,会删除HDFS上的iceberg_db子目录
4.2 创建表(不支持primary key等)
FlinkSQL>createtablehadoop_catalog.iceberg_db.my_user(
>user_idbigintcomment'用户ID',
>user_namestring,
>birthdaydate,
>countrystring
>)comment'用户表'
>partitionedby(birthday,country)with(
>'write.format.default'='parquet',
>'write.parquet.compression-codec'='gzip'
[INFO]Executestatementsucceed.
FlinkSQL>
-
目前表不支持计算列、primay key, Watermark
-
不支持计算分区。但是iceberg支持计算分区
-
创建表生成的文件信息如下:
[root@flink1~]#
[root@flink1~]#hadoopfs-lshdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata
Found2items
-rw-r--r--1rootsupergroup21152022-02-1322:01hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json
-rw-r--r--1rootsupergroup12022-02-1322:01hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
[root@flink1~]#
查看v1.metadata.json,可以看到"current-snapshot-id" : -1
FlinkSQL>createtablehadoop_catalog.iceberg_db.my_user_copy
>likehadoop_catalog.iceberg_db.my_user;
[INFO]Executestatementsucceed.
FlinkSQL>
-
复制的表拥有相同的表结构、分区、表属性
4.3 修改表
修改表属性
FlinkSQL>altertablehadoop_catalog.iceberg_db.my_user_copy
>'write.format.default'='avro',
>'write.avro.compression-codec'='gzip'
[INFO]Executestatementsucceed.
FlinkSQL>
-
目前Flink只支持修改iceberg的表属性
重命名表
FlinkSQL>altertablehadoop_catalog.iceberg_db.my_user_copy
>renametohadoop_catalog.iceberg_db.my_user_copy_new;
[ERROR]CouldnotexecuteSQLstatement.Reason:
java.lang.UnsupportedOperationException:CannotrenameHadooptables
FlinkSQL>
-
Hadoop Catalog中的表不支持重命名表
4.4 删除表
FlinkSQL>droptablehadoop_catalog.iceberg_db.my_user_copy;
[INFO]Executestatementsucceed.
FlinkSQL>
-
会删除HDFS上的my_user_copy子目录
5. 插入数据到表
5.1 insert into
-
insert into … values …
-
insert into … select …
FlinkSQL>insertintohadoop_catalog.iceberg_db.my_user(
>user_id,user_name,birthday,country
>)values(1,'zhang_san',date'2022-02-01','china'),
>(2,'li_si',date'2022-02-02','japan');
[INFO]SubmittingSQLupdatestatementtothecluster...
[INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster:
JobID:f1aa8bee0be5bda8b166cc361e113268
FlinkSQL>
FlinkSQL>insertintohadoop_catalog.iceberg_db.my_userselect(user_id+1),user_name,birthday,countryfromhadoop_catalog.iceberg_db.my_user;
[INFO]SubmittingSQLupdatestatementtothecluster...
[INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster:
JobID:c408e324ca3861b39176c6bd15770aca
FlinkSQL>
HDFS目录结果如下
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition)
只支持Flink Batch模式,不支持Streaming模式
insert overwrite替换多个整个分区,而不是一行数据。如果不是分区表,则替换的是整个表,如下所示:
FlinkSQL>set'execution.runtime-mode'='batch';
[INFO]Sessionpropertyhasbeenset.
FlinkSQL>
FlinkSQL>insertoverwritehadoop_catalog.iceberg_db.my_uservalues(4,'wang_wu',date'2022-02-02','japan');
[INFO]SubmittingSQLupdatestatementtothecluster...
[INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster:
JobID:63cf6c27060ec9ebdce75b785cc3fa3a
FlinkSQL>set'sql-client.execution.result-mode'='tableau';
[INFO]Sessionpropertyhasbeenset.
FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
|user_id|user_name|birthday|country|
+---------+-----------+------------+---------+
|1|zhang_san|2022-02-01|china|
|4|wang_wu|2022-02-02|japan|
|2|zhang_san|2022-02-01|china|
+---------+-----------+------------+---------+
3rowsinset
birthday=2022-02-02/country=japan分区下的数据如下,insert overwrite也是新增一个文件
birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet
birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
insert ovewrite … partition替换指定分区
FlinkSQL>insertoverwritehadoop_catalog.iceberg_db.my_userpartition(birthday='2022-02-02',country='japan')select5,'zhao_liu';
[INFO]SubmittingSQLupdatestatementtothecluster...
[INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster:
JobID:97e9ba4131028c53461e739b34108ae0
FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
|user_id|user_name|birthday|country|
+---------+-----------+------------+---------+
|1|zhang_san|2022-02-01|china|
|5|zhao_liu|2022-02-02|japan|
|2|zhang_san|2022-02-01|china|
+---------+-----------+------------+---------+
3rowsinset
FlinkSQL>
6. 查询数据
Batch模式
FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
|user_id|user_name|birthday|country|
+---------+-----------+------------+---------+
|1|zhang_san|2022-02-01|china|
|5|zhao_liu|2022-02-02|japan|
|2|zhang_san|2022-02-01|china|
+---------+-----------+------------+---------+
3rowsinset
FlinkSQL>
streaming模式
查看最新的snapshot-id
[root@flink1conf]#hadoopfs-cathdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
我们前面创建表 + 两次insert + 两次insert overwrite,所以最新的版本号为5。然后我们查看该版本号对于的metadata json文件
[root@flink1~]#hadoopfs-cathdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json
"format-version":1,
"table-uuid":"84a5e90d-7ae9-4dfd-aeab-c74f07447513",
"location":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user",
"last-updated-ms":1644761481488,
"last-column-id":4,
"schema":{
"type":"struct",
"schema-id":0,
"fields":[{
"id":1,
"name":"user_id",
"required":false,
"type":"long"
"id":2,
"name":"user_name",
"required":false,
"type":"string"
"id":3,
"name":"birthday",
"required":false,
"type":"date"
"id":4,
"name":"country",
"required":false,
"type":"string"
"current-schema-id":0,
"schemas":[{
"type":"struct",
"schema-id":0,
"fields":[{
"id":1,
"name":"user_id",
"required":false,
"type":"long"
"id":2,
"name":"user_name",
"required":false,
"type":"string"
"id":3,
"name":"birthday",
"required":false,
"type":"date"
"id":4,
"name":"country",
"required":false,
"type":"string"
"partition-spec":[{
"name":"birthday",
"transform":"identity",
"source-id":3,
"field-id":1000
"name":"country",
"transform":"identity",
"source-id":4,
"field-id":1001
"default-spec-id":0,
"partition-specs":[{
"spec-id":0,
"fields":[{
"name":"birthday",
"transform":"identity",
"source-id":3,
"field-id":1000
"name":"country",
"transform":"identity",
"source-id":4,
"field-id":1001
"last-partition-id":1001,
"default-sort-order-id":0,
"sort-orders":[{
"order-id":0,
"fields":[]
"properties":{
"write.format.default":"parquet",
"write.parquet.compression-codec":"gzip"
"current-snapshot-id":138573494821828246,
"snapshots":[{
"snapshot-id":8012517928892530314,
"timestamp-ms":1644761130111,
"summary":{
"operation":"append",
"flink.job-id":"8f228ae49d34aafb4b2887db3149e3f6",
"flink.max-committed-checkpoint-id":"9223372036854775807",
"added-data-files":"2",
"added-records":"2",
"added-files-size":"2487",
"changed-partition-count":"2",
"total-records":"2",
"total-files-size":"2487",
"total-data-files":"2",
"total-delete-files":"0",
"total-position-deletes":"0",
"total-equality-deletes":"0"
"manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro",
"schema-id":0
"snapshot-id":453371561664052237,
"parent-snapshot-id":8012517928892530314,
"timestamp-ms":1644761150082,
"summary":{
"operation":"append",
"flink.job-id":"813b7a17c21ddd003e1a210b1366e0c5",
"flink.max-committed-checkpoint-id":"9223372036854775807",
"added-data-files":"2",
"added-records":"2",
"added-files-size":"2487",
"changed-partition-count":"2",
"total-records":"4",
"total-files-size":"4974",
"total-data-files":"4",
"total-delete-files":"0",
"total-position-deletes":"0",
"total-equality-deletes":"0"
"manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro",
"schema-id":0
"snapshot-id":6410282459040239217,
"parent-snapshot-id":453371561664052237,
"timestamp-ms":1644761403566,
"summary":{
"operation":"overwrite",
"replace-partitions":"true",
"flink.job-id":"f7085f68e5ff73c1c8aa1f4f59996068",
"flink.max-committed-checkpoint-id":"9223372036854775807",
"added-data-files":"1",
"deleted-data-files":"2",
"added-records":"1",
"deleted-records":"2",
"added-files-size":"1244",
"removed-files-size":"2459",
"changed-partition-count":"1",
"total-records":"3",
"total-files-size":"3759",
"total-data-files":"3",
"total-delete-files":"0",
"total-position-deletes":"0",
"total-equality-deletes":"0"
"manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro",
"schema-id":0
"snapshot-id":138573494821828246,
"parent-snapshot-id":6410282459040239217,
"timestamp-ms":1644761481488,
"summary":{
"operation":"overwrite",
"replace-partitions":"true",
"flink.job-id":"d434d6d4f658d61732d7e9a0a85279fc",
"flink.max-committed-checkpoint-id":"9223372036854775807",
"added-data-files":"1",
"deleted-data-files":"1",
"added-records":"1",
"deleted-records":"1",
"added-files-size":"1251",
"removed-files-size":"1244",
"changed-partition-count":"1",
"total-records":"3",
"total-files-size":"3766",
"total-data-files":"3",
"total-delete-files":"0",
"total-position-deletes":"0",
"total-equality-deletes":"0"
"manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro",
"schema-id":0
"snapshot-log":[{
"timestamp-ms":1644761130111,
"snapshot-id":8012517928892530314
"timestamp-ms":1644761150082,
"snapshot-id":453371561664052237
"timestamp-ms":1644761403566,
"snapshot-id":6410282459040239217
"timestamp-ms":1644761481488,
"snapshot-id":138573494821828246
"metadata-log":[{
"timestamp-ms":1644760911017,
"metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"
"timestamp-ms":1644761130111,
"metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"
"timestamp-ms":1644761150082,
"metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"
"timestamp-ms":1644761403566,
"metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"
}[root@flink1~]#
可以看到 "current-snapshot-id" : 138573494821828246,,表示当前的snapshot-id
FlinkSQL>set'execution.runtime-mode'='streaming';
[INFO]Sessionpropertyhasbeenset.
FlinkSQL>
FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user
>/*+options(
>'streaming'='true',
>'monitor-interval'='5s'
+----+----------------------+--------------------------------+------------+--------------------------------+
|op|user_id|user_name|birthday|country|
+----+----------------------+--------------------------------+------------+--------------------------------+
|+I|5|zhao_liu|2022-02-02|japan|
|+I|2|zhang_san|2022-02-01|china|
|+I|1|zhang_san|2022-02-01|china|
可以看到最新snapshot对应的数据
FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user
>/*+options(
>'streaming'='true',
>'monitor-interval'='5s',
>'start-snapshot-id'='138573494821828246'
+----+----------------------+--------------------------------+------------+--------------------------------+
|op|user_id|user_name|birthday|country|
+----+----------------------+--------------------------------+------------+--------------------------------+
这里只能指定最后一个insert overwrite操作的snapshot id,及其后面的snapshot id,否则后台会报异常,且程序一直处于restarting的状态:
java.lang.UnsupportedOperationException:Foundoverwriteoperation,cannotsupportincrementaldatainsnapshots(8012517928892530314,138573494821828246]
在本示例中snapshot id: 138573494821828246,是最后一个snapshot id,同时也是最后一个insert overwrite操作的snapshot id。如果再insert两条数据,则只能看到增量的数据
FlinkSQL>insertintohadoop_catalog.iceberg_db.my_user(
>user_id,user_name,birthday,country
>)values(6,'zhang_san',date'2022-02-01','china');
[INFO]SubmittingSQLupdatestatementtothecluster...
[INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster:
JobID:8eb279e61aed66304d78ad027eaf8d30
FlinkSQL>insertintohadoop_catalog.iceberg_db.my_user(
>user_id,user_name,birthday,country
>)values(7,'zhang_san',date'2022-02-01','china');
[INFO]SubmittingSQLupdatestatementtothecluster...
[INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster:
JobID:70a050e455d188d0d3f3adc2ba367fb6
FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user
>/*+options(
>'streaming'='true',
>'monitor-interval'='30s',
>'start-snapshot-id'='138573494821828246'