E-MapReduce的Flink Table Store服务支持通过Flink SQL对Flink Table Store进行读写操作。本文通过示例为您介绍如何通过Flink SQL对Flink Table Store进行读写操作。

使用限制

仅EMR-3.45.0版本、EMR-5.11.0版本的集群,支持通过Flink SQL对Flink Table Store进行读写操作。

操作步骤

步骤一:启动集群

本文以Session模式为例,其余模式请参见 基础使用

执行以下命令,启动YARN Session。

yarn-session.sh --detached

步骤二:创建Catalog

Flink Table Store将数据和元数据都保存在文件系统或对象存储中,存储的根路径由 warehouse 参数指定。如果指定的warehouse路径不存在,将会自动创建该路径;如果指定的warehouse路径存在,您可以通过该Catalog访问路径中已有的表。

您还可以将元数据额外同步到Hive或DLF中,方便其它服务访问Flink Table Store。

  • 创建Filesystem Catalog。

    Filesystem Catalog仅将元数据保存在文件系统或对象存储中。

    1. 执行以下命令,启动Flink SQL。

      sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/
    2. 执行以下Flink SQL语句,创建Filesystem Catalog。

      CREATE CATALOG test_catalog WITH (
          'type' = 'table-store',
          'metastore' = 'filesystem',
          'warehouse' = 'oss://oss-bucket/warehouse'
      );
  • 创建Hive Catalog

    Hive Catalog会同步元数据到Hive MetaStore中。在Hive Catalog中创建的表可以直接在Hive中查询。

    Hive查询Flink Table Store,详情请参见 Flink Table Store与Hive集成

    • EMR-3.45.1及之后版本、EMR-5.11.1及之后版本

      1. 执行以下命令,启动Flink SQL。

        sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/ -l /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/
        重要

        即使您使用的是Hive3,也无需修改启动命令。

      2. 执行以下Flink SQL语句,创建Hive Catalog。

        CREATE CATALOG test_catalog WITH (
            'type' = 'table-store',
            'metastore' = 'hive',
            'uri' = 'thrift://master-1-1:9083', -- 指向Hive MetaStore Service的地址。
            'warehouse' = 'oss://oss-bucket/warehouse'
        );
    • EMR-3.45.0、EMR-5.11.0

      1. 执行以下命令,启动Flink SQL。

        sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/ -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/catalog/hive2/ -l /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/
      2. 执行以下Flink SQL语句,创建Hive Catalog。

        CREATE CATALOG test_catalog WITH (
            'type' = 'table-store',
            'metastore' = 'hive',
            'uri' = 'thrift://master-1-1:9083', -- 指向Hive MetaStore Service的地址。
            'warehouse' = 'oss://oss-bucket/warehouse'
        );
  • 创建DLF Catalog

    DLF Catalog会同步元数据到DLF中。

    重要

    创建集群时, 元数据 须为 DLF统一元数据

    • EMR-3.45.1及之后版本、EMR-5.11.1及之后版本

      1. 执行以下命令,启动Flink SQL。

        sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/ -l /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/
      2. 执行以下Flink SQL语句,创建DLF Catalog。

        CREATE CATALOG test_catalog WITH (
            'type' = 'table-store',
            'metastore' = 'dlf',
            'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
            'warehouse' = 'oss://oss-bucket/warehouse'
        );
    • EMR-3.45.0、EMR-5.11.0

      1. 执行以下命令,启动Flink SQL。

        sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/ -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/catalog/dlf/
      2. 执行以下Flink SQL语句,创建DLF Catalog。

        CREATE CATALOG test_catalog WITH (
            'type' = 'table-store',
            'metastore' = 'dlf',
            'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
            'warehouse' = 'oss://oss-bucket/warehouse'
        );

步骤三 :流作业读写Flink Table Store

执行以下Flink SQL语句,在Catalog中创建一张表,并读写表中的数据。

-- 设置为流作业。
SET 'execution.runtime-mode' = 'streaming';
-- Flink Table Store在流作业中需要设置checkpoint。
SET 'execution.checkpointing.interval' = '10s';
-- 使用前一步骤中创建的Catalog。
USE CATALOG test_catalog;
-- 创建并使用一个测试DATABASE。
CREATE DATABASE test_db;
USE test_db;
-- 创建一个产生随机数据的datagen源表。
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
-- 创建Flink Table Store表。
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
-- 向Flink Table Store表中写入数据。
INSERT INTO test_tbl SELECT * FROM datagen_source;
-- 读取表中的数据。
-- 流式查询作业运行的过程中,上面触发的流式写入作业仍在运行。
-- 您需要保证集群有足够的资源(task slot)同时运行两个作业,否则无法查到数据。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步骤四:OLAP查询Flink Table Store

执行以下Flink SQL语句,对刚才创建的表进行OLAP查询。

-- 设置为批作业。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- 使用tableau展示模式,在命令行中直接打印出结果。
SET 'sql-client.execution.result-mode' = 'tableau';
-- 查询表中数据。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步骤五:清理资源

重要

完成测试后,请手动停止流式写入Flink Table Store的作业,防止资源泄漏。

停止作业后,执行以下Flink SQL语句,删除刚才创建的表。

DROP TABLE test_tbl;