基于Flink1.14 + Iceberg0.13构建实时数据湖实战

基于Flink1.14 + Iceberg0.13构建实时数据湖实战

点击上方 蓝色字体 ,选择“设为星标”

回复" 面试" 获取更多惊喜

八股文教给我,你们专心刷题和面试

Hi,我是王知无,一个大数据领域的原创作者。
放心关注我,获取更多行业的一手消息。

目录

  1. Flink SQL Client配置Iceberg

  2. Java/Scala pom.xml配置

  3. Catalog
    3.1 Hive Catalog
    3.2 HDFS Catalog

  4. 数据库和表相关DDL命令
    4.1 创建数据库
    4.2 创建表(不支持primary key等)
    4.3 修改表
    4.4 删除表

  5. 插入数据到表
    5.1 insert into
    5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition)

  6. 查询数据
    暂时还不支持通过Flink SQL读取Iceberg表的元数据,可以通过Java API读取

1. Flink SQL Client配置Iceberg

Flink集群需要使用Scala 2.12版本的

  1. 将Iceberg的依赖包下载放到Flink集群所有服务器的lib目录下,然后重启Flink

[root@flink1~]#wget-P/root/flink-1.14.3/libhttps://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar [root@flink1~]# [root@flink1~]#scp/root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jarroot@flink2:/root/flink-1.14.3/lib iceberg-flink-runtime-1.14-0.13.0.jar100%23MB42.9MB/s00:00 [root@flink1~]#scp/root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jarroot@flink3:/root/flink-1.14.3/lib iceberg-flink-runtime-1.14-0.13.0.jar100%23MB35.4MB/s00:00 [root@flink1~]#

Iceberg默认支持Hadoop Catalog。如果需要使用Hive Catalog,需要将flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服务器的lib目录下,然后重启Flink

  1. 然后启动SQL Client就可以了

2. Java/Scala pom.xml配置

添加如下依赖

<dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-flink</artifactId> <version>0.13.0</version> <scope>provided</scope> </dependency>

3. Catalog

3.1 Hive Catalog

注意:测试的时候,从Hive中查询表数据,查询不到。但是从Trino查询可以查询到数据

使用Hive的metastore保存元数据,HDFS保存数据库表的数据

FlinkSQL>createcataloghive_catalogwith( >'type'='iceberg', >'catalog-type'='hive', >'property-version'='1', >'cache-enabled'='true', >'uri'='thrift://hive1:9083', >'client'='5', >'warehouse'='hdfs://nnha/user/hive/warehouse', >'hive-conf-dir'='/root/flink-1.14.3/hive_conf' [INFO]Executestatementsucceed. FlinkSQL>
  • property-version: 为了向后兼容,以防property格式改变。当前设置为1即可

  • cache-enabled: 是否开启catalog缓存,默认开启

  • clients: 在hive metastore中,hive_catalog供客户端访问的连接池大小,默认是2

  • warehouse: 是Flink集群所在的HDFS路径, hive_catalog下的数据库表存放数据的位置

  • hive-conf-dir: hive集群的配置目录。只能是Flink集群的本地路径,从hive-site.xml解析出来的HDFS路径,是Flink集群所在HDFS路径

  • warehouse的优先级比hive-conf-dir的优先级高

3.2 HDFS Catalog

用HDFS保存元数据和数据库表的数据。warehouse是Flink集群所在的HDFS路径

FlinkSQL>createcataloghadoop_catalogwith( >'type'='iceberg', >'catalog-type'='hadoop', >'property-version'='1', >'cache-enabled'='true', >'warehouse'='hdfs://nnha/user/iceberg/warehouse' [INFO]Executestatementsucceed. FlinkSQL>

通过配置conf/sql-cli-defaults.yaml实现永久catalog。但测试的时候并未生效

[root@flink1~]#cat/root/flink-1.14.3/conf/sql-cli-defaults.yaml catalogs: -name:hadoop_catalog type:iceberg catalog-type:hadoop property-version:1 cache-enabled:true warehouse:hdfs://nnha/user/iceberg/warehouse [root@flink1~]# [root@flink1~]#chown501:games/root/flink-1.14.3/conf/sql-cli-defaults.yaml

下面我们重点以Hadoop Catalog为例,进行测试讲解

4. 数据库和表相关DDL命令

4.1 创建数据库

Catalog下面默认都有一个default数据库

FlinkSQL>createdatabasehadoop_catalog.iceberg_db; [INFO]Executestatementsucceed. FlinkSQL>usehadoop_catalog.iceberg_db; [INFO]Executestatementsucceed. FlinkSQL>
  • 会在HDFS目录上创建iceberg_db子目录

  • 如果删除数据库,会删除HDFS上的iceberg_db子目录

4.2 创建表(不支持primary key等)

FlinkSQL>createtablehadoop_catalog.iceberg_db.my_user( >user_idbigintcomment'用户ID', >user_namestring, >birthdaydate, >countrystring >)comment'用户表' >partitionedby(birthday,country)with( >'write.format.default'='parquet', >'write.parquet.compression-codec'='gzip' [INFO]Executestatementsucceed. FlinkSQL>
  • 目前表不支持计算列、primay key, Watermark

  • 不支持计算分区。但是iceberg支持计算分区

  • 创建表生成的文件信息如下:

[root@flink1~]# [root@flink1~]#hadoopfs-lshdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata Found2items -rw-r--r--1rootsupergroup21152022-02-1322:01hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json -rw-r--r--1rootsupergroup12022-02-1322:01hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text [root@flink1~]#

查看v1.metadata.json,可以看到"current-snapshot-id" : -1

FlinkSQL>createtablehadoop_catalog.iceberg_db.my_user_copy >likehadoop_catalog.iceberg_db.my_user; [INFO]Executestatementsucceed. FlinkSQL>
  • 复制的表拥有相同的表结构、分区、表属性

4.3 修改表

修改表属性

FlinkSQL>altertablehadoop_catalog.iceberg_db.my_user_copy >'write.format.default'='avro', >'write.avro.compression-codec'='gzip' [INFO]Executestatementsucceed. FlinkSQL>
  • 目前Flink只支持修改iceberg的表属性

重命名表

FlinkSQL>altertablehadoop_catalog.iceberg_db.my_user_copy >renametohadoop_catalog.iceberg_db.my_user_copy_new; [ERROR]CouldnotexecuteSQLstatement.Reason: java.lang.UnsupportedOperationException:CannotrenameHadooptables FlinkSQL>
  • Hadoop Catalog中的表不支持重命名表

4.4 删除表

FlinkSQL>droptablehadoop_catalog.iceberg_db.my_user_copy; [INFO]Executestatementsucceed. FlinkSQL>
  • 会删除HDFS上的my_user_copy子目录

5. 插入数据到表

5.1 insert into

  1. insert into … values …

  2. insert into … select …

FlinkSQL>insertintohadoop_catalog.iceberg_db.my_user( >user_id,user_name,birthday,country >)values(1,'zhang_san',date'2022-02-01','china'), >(2,'li_si',date'2022-02-02','japan'); [INFO]SubmittingSQLupdatestatementtothecluster... [INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster: JobID:f1aa8bee0be5bda8b166cc361e113268 FlinkSQL> FlinkSQL>insertintohadoop_catalog.iceberg_db.my_userselect(user_id+1),user_name,birthday,countryfromhadoop_catalog.iceberg_db.my_user; [INFO]SubmittingSQLupdatestatementtothecluster... [INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster: JobID:c408e324ca3861b39176c6bd15770aca FlinkSQL>

HDFS目录结果如下

hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet

5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition)

只支持Flink Batch模式,不支持Streaming模式

insert overwrite替换多个整个分区,而不是一行数据。如果不是分区表,则替换的是整个表,如下所示:

FlinkSQL>set'execution.runtime-mode'='batch'; [INFO]Sessionpropertyhasbeenset. FlinkSQL> FlinkSQL>insertoverwritehadoop_catalog.iceberg_db.my_uservalues(4,'wang_wu',date'2022-02-02','japan'); [INFO]SubmittingSQLupdatestatementtothecluster... [INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster: JobID:63cf6c27060ec9ebdce75b785cc3fa3a FlinkSQL>set'sql-client.execution.result-mode'='tableau'; [INFO]Sessionpropertyhasbeenset. FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user; +---------+-----------+------------+---------+ |user_id|user_name|birthday|country| +---------+-----------+------------+---------+ |1|zhang_san|2022-02-01|china| |4|wang_wu|2022-02-02|japan| |2|zhang_san|2022-02-01|china| +---------+-----------+------------+---------+ 3rowsinset

birthday=2022-02-02/country=japan分区下的数据如下,insert overwrite也是新增一个文件

birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet

insert ovewrite … partition替换指定分区

FlinkSQL>insertoverwritehadoop_catalog.iceberg_db.my_userpartition(birthday='2022-02-02',country='japan')select5,'zhao_liu'; [INFO]SubmittingSQLupdatestatementtothecluster... [INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster: JobID:97e9ba4131028c53461e739b34108ae0 FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user; +---------+-----------+------------+---------+ |user_id|user_name|birthday|country| +---------+-----------+------------+---------+ |1|zhang_san|2022-02-01|china| |5|zhao_liu|2022-02-02|japan| |2|zhang_san|2022-02-01|china| +---------+-----------+------------+---------+ 3rowsinset FlinkSQL>

6. 查询数据

Batch模式

FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user; +---------+-----------+------------+---------+ |user_id|user_name|birthday|country| +---------+-----------+------------+---------+ |1|zhang_san|2022-02-01|china| |5|zhao_liu|2022-02-02|japan| |2|zhang_san|2022-02-01|china| +---------+-----------+------------+---------+ 3rowsinset FlinkSQL>

streaming模式

查看最新的snapshot-id

[root@flink1conf]#hadoopfs-cathdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text

我们前面创建表 + 两次insert + 两次insert overwrite,所以最新的版本号为5。然后我们查看该版本号对于的metadata json文件

[root@flink1~]#hadoopfs-cathdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json "format-version":1, "table-uuid":"84a5e90d-7ae9-4dfd-aeab-c74f07447513", "location":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user", "last-updated-ms":1644761481488, "last-column-id":4, "schema":{ "type":"struct", "schema-id":0, "fields":[{ "id":1, "name":"user_id", "required":false, "type":"long" "id":2, "name":"user_name", "required":false, "type":"string" "id":3, "name":"birthday", "required":false, "type":"date" "id":4, "name":"country", "required":false, "type":"string" "current-schema-id":0, "schemas":[{ "type":"struct", "schema-id":0, "fields":[{ "id":1, "name":"user_id", "required":false, "type":"long" "id":2, "name":"user_name", "required":false, "type":"string" "id":3, "name":"birthday", "required":false, "type":"date" "id":4, "name":"country", "required":false, "type":"string" "partition-spec":[{ "name":"birthday", "transform":"identity", "source-id":3, "field-id":1000 "name":"country", "transform":"identity", "source-id":4, "field-id":1001 "default-spec-id":0, "partition-specs":[{ "spec-id":0, "fields":[{ "name":"birthday", "transform":"identity", "source-id":3, "field-id":1000 "name":"country", "transform":"identity", "source-id":4, "field-id":1001 "last-partition-id":1001, "default-sort-order-id":0, "sort-orders":[{ "order-id":0, "fields":[] "properties":{ "write.format.default":"parquet", "write.parquet.compression-codec":"gzip" "current-snapshot-id":138573494821828246, "snapshots":[{ "snapshot-id":8012517928892530314, "timestamp-ms":1644761130111, "summary":{ "operation":"append", "flink.job-id":"8f228ae49d34aafb4b2887db3149e3f6", "flink.max-committed-checkpoint-id":"9223372036854775807", "added-data-files":"2", "added-records":"2", "added-files-size":"2487", "changed-partition-count":"2", "total-records":"2", "total-files-size":"2487", "total-data-files":"2", "total-delete-files":"0", "total-position-deletes":"0", "total-equality-deletes":"0" "manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro", "schema-id":0 "snapshot-id":453371561664052237, "parent-snapshot-id":8012517928892530314, "timestamp-ms":1644761150082, "summary":{ "operation":"append", "flink.job-id":"813b7a17c21ddd003e1a210b1366e0c5", "flink.max-committed-checkpoint-id":"9223372036854775807", "added-data-files":"2", "added-records":"2", "added-files-size":"2487", "changed-partition-count":"2", "total-records":"4", "total-files-size":"4974", "total-data-files":"4", "total-delete-files":"0", "total-position-deletes":"0", "total-equality-deletes":"0" "manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro", "schema-id":0 "snapshot-id":6410282459040239217, "parent-snapshot-id":453371561664052237, "timestamp-ms":1644761403566, "summary":{ "operation":"overwrite", "replace-partitions":"true", "flink.job-id":"f7085f68e5ff73c1c8aa1f4f59996068", "flink.max-committed-checkpoint-id":"9223372036854775807", "added-data-files":"1", "deleted-data-files":"2", "added-records":"1", "deleted-records":"2", "added-files-size":"1244", "removed-files-size":"2459", "changed-partition-count":"1", "total-records":"3", "total-files-size":"3759", "total-data-files":"3", "total-delete-files":"0", "total-position-deletes":"0", "total-equality-deletes":"0" "manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro", "schema-id":0 "snapshot-id":138573494821828246, "parent-snapshot-id":6410282459040239217, "timestamp-ms":1644761481488, "summary":{ "operation":"overwrite", "replace-partitions":"true", "flink.job-id":"d434d6d4f658d61732d7e9a0a85279fc", "flink.max-committed-checkpoint-id":"9223372036854775807", "added-data-files":"1", "deleted-data-files":"1", "added-records":"1", "deleted-records":"1", "added-files-size":"1251", "removed-files-size":"1244", "changed-partition-count":"1", "total-records":"3", "total-files-size":"3766", "total-data-files":"3", "total-delete-files":"0", "total-position-deletes":"0", "total-equality-deletes":"0" "manifest-list":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro", "schema-id":0 "snapshot-log":[{ "timestamp-ms":1644761130111, "snapshot-id":8012517928892530314 "timestamp-ms":1644761150082, "snapshot-id":453371561664052237 "timestamp-ms":1644761403566, "snapshot-id":6410282459040239217 "timestamp-ms":1644761481488, "snapshot-id":138573494821828246 "metadata-log":[{ "timestamp-ms":1644760911017, "metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json" "timestamp-ms":1644761130111, "metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json" "timestamp-ms":1644761150082, "metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json" "timestamp-ms":1644761403566, "metadata-file":"hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json" }[root@flink1~]#

可以看到 "current-snapshot-id" : 138573494821828246,,表示当前的snapshot-id

FlinkSQL>set'execution.runtime-mode'='streaming'; [INFO]Sessionpropertyhasbeenset. FlinkSQL> FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user >/*+options( >'streaming'='true', >'monitor-interval'='5s' +----+----------------------+--------------------------------+------------+--------------------------------+ |op|user_id|user_name|birthday|country| +----+----------------------+--------------------------------+------------+--------------------------------+ |+I|5|zhao_liu|2022-02-02|japan| |+I|2|zhang_san|2022-02-01|china| |+I|1|zhang_san|2022-02-01|china|

可以看到最新snapshot对应的数据

FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user >/*+options( >'streaming'='true', >'monitor-interval'='5s', >'start-snapshot-id'='138573494821828246' +----+----------------------+--------------------------------+------------+--------------------------------+ |op|user_id|user_name|birthday|country| +----+----------------------+--------------------------------+------------+--------------------------------+

这里只能指定最后一个insert overwrite操作的snapshot id,及其后面的snapshot id,否则后台会报异常,且程序一直处于restarting的状态:

java.lang.UnsupportedOperationException:Foundoverwriteoperation,cannotsupportincrementaldatainsnapshots(8012517928892530314,138573494821828246]

在本示例中snapshot id: 138573494821828246,是最后一个snapshot id,同时也是最后一个insert overwrite操作的snapshot id。如果再insert两条数据,则只能看到增量的数据

FlinkSQL>insertintohadoop_catalog.iceberg_db.my_user( >user_id,user_name,birthday,country >)values(6,'zhang_san',date'2022-02-01','china'); [INFO]SubmittingSQLupdatestatementtothecluster... [INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster: JobID:8eb279e61aed66304d78ad027eaf8d30 FlinkSQL>insertintohadoop_catalog.iceberg_db.my_user( >user_id,user_name,birthday,country >)values(7,'zhang_san',date'2022-02-01','china'); [INFO]SubmittingSQLupdatestatementtothecluster... [INFO]SQLupdatestatementhasbeensuccessfullysubmittedtothecluster: JobID:70a050e455d188d0d3f3adc2ba367fb6 FlinkSQL>select*fromhadoop_catalog.iceberg_db.my_user >/*+options( >'streaming'='true', >'monitor-interval'='30s', >'start-snapshot-id'='138573494821828246'