本文为您介绍如何使用对象存储OSS连接器。

阿里云 对象存储OSS (Object Storage Service)是一款海量、安全、低成本和高可靠的云存储服务,可提供99.9999999999%(12个9)的数据持久性,99.995%的数据可用性。多种存储类型供选择,全面优化存储成本。

类别

详情

支持类型

源表和结果表

运行模式

批模式和流模式

数据格式

Orc、Parquet、Avro、Csv、JSON和Raw

说明

仅实时计算引擎VVR 6.0.7及以上版本支持读取Parquet格式的数据。

特有监控指标

暂无

API种类

Datastream和SQL

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

使用限制

  • 通用

    • 仅Flink计算引擎VVR 4.0.14及以上版本支持读取或写入OSS。

    • 仅支持读取或写入相同账号下的OSS。

  • 结果表独有

    • 对于写入OSS,目前暂不支持写Avro、CSV、JSON和Raw此类行存的格式,具体原因请参见 FLINK-30635

    • 仅Flink计算引擎VVR6.0.6及以上版本支持写入 OSS-HDFS服务 ,具体请参见 写OSS-HDFS

语法结构

CREATE TABLE OssTable (
  column_name1 INT,
  column_name2 STRING,
  datetime STRING,
  `hour` STRING
) PARTITIONED BY (datetime, `hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = '...'
);

元信息列

您可以在源表中读取信息列,以获取读取OSS数据对应的元信息。例如,如果在OSS源表中定义了元信息列 file.path ,则该列的值为该行数据所在的文件路径。元信息列的使用示例如下。

CREATE TABLE MyUserTableWithFilepath (
  column_name1 INT,
  column_name2 STRING,
  `file.path` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'oss://<bucket>/path',
  'format' = 'json'
)

下表列出了OSS源表所支持的元信息列:

Key

数据类型

说明

file.path

STRING NOT NULL

该行数据所在的文件路径。

file.name

STRING NOT NULL

该行数据所在的文件名,即距离文件根路径最远的元素。

file.size

BIGINT NOT NULL

该行数据所在的文件的字节数。

file.modification-time

TIMESTAMP_LTZ(3) NOT NULL

该行数据所在的文件的修改时间。

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为 filesystem

    path

    文件系统路径。

    String

    URI格式,例如 oss://my_bucket/my_path

    format

    文件格式。

    String

    参数取值如下:

    • csv

    • json

    • avro

    • parquet

    • orc

    • raw

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    source.monitor-interval

    设置新文件的监控时间间隔,并且必须设置>0的值。

    Duration

    如果未设置此配置项,则提供的路径仅会被扫描一次,因此源将是有界的。

    每个文件都由其路径唯一标识,一旦发现新文件,就会处理一次。

    已处理的文件在source的整个生命周期内存储在state中。因此,source的state在checkpoint和savepoint时进行保存。更短的时间间隔文件会被更快地发现,但也会更频繁地遍历文件系统或对象存储。

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    partition.default-name

    在分区字段值为NULL或空字符串时,分区的名称。

    String

    _DEFAULT_PARTITION__

    无。

    sink.rolling-policy.file-size

    滚动前,文件大小的最大值。

    MemorySize

    128 MB

    写入目录下的数据被分割到part文件中。每个分区对应sink的收到数据的subtask都至少会为该分区生成一个part文件。根据可配置的滚动策略,当前in-progress part文件将被关闭,生成新的part文件。该策略基于大小和指定的文件可被打开的最大timeout时长,来滚动part文件。

    sink.rolling-policy.rollover-interval

    滚动前,part文件处于打开状态的最大时长。

    Duration

    30min

    检查频率是由 sink.rolling-policy.check-interval 属性控制。

    sink.rolling-policy.check-interval

    基于时间滚动策略的检查间隔。

    Duration

    1min

    该属性控制了基于 sink.rolling-policy.rollover-interval 属性的检查文件是否该被滚动了。

    auto-compaction

    在流式结果表中是否开启自动合并功能。数据首先会被写入临时文件。当checkpoint完成后,该检查点产生的临时文件会被合并,临时文件在合并前不可见。

    Boolean

    false

    如果启用文件合并功能,会根据目标文件大小,将多个小文件合并成大文件。在生产环境中使用文件合并功能时,需要注意:

    • 只有checkpoint内部的文件才会被合并,会至少生成与checkpoint个数相同的文件个数。

    • 合并前文件不可见,文件的可见时间是 checkpoint间隔时长+合并时长

    • 合并时间过长,将导致反压,延长checkpoint所需时间。

    compaction.file-size

    合并目标文件大小。

    MemorySize

    128 MB

    默认值与滚动文件大小 sink.rolling-policy.file-size 相同。

    sink.partition-commit.trigger

    分区提交触发器类型。

    String

    process-time

    对于写分区表,Flink提供了两种类型分区提交触发器,类型如下两种:

    • process-time:分区提交触发器基于分区创建时间和当前系统时间,既不需要分区时间提取器,也不需要watermark生成器。一旦当前系统时间超过了分区创建系统时间和 sink.partition-commit.delay 之和,立即提交分区。这种触发器更具通用性,但不是很精确。例如,数据延迟或故障将导致过早提交分区。

    • partition-time:基于提取的分区时间,需要watermark生成。这需要Job支持watermark生成,分区是根据时间来切割的,例如,按小时或按天分区。一旦watermark超过了分区创建系统时间和 sink.partition-commit.delay 之和立即提交分区。

    sink.partition-commit.delay

    分区被提交的最大延迟时间。表明该延迟时间之前分区不会被提交。

    Duration

    0s

    • 如果是按天分区,可以设置为 1 d

    • 如果是按小时分区,应设置为 1 h

    sink.partition-commit.watermark-time-zone

    解析Long类型的watermark到TIMESTAMP类型时所采用的时区,解析得到的watermark的TIMESTAMP会跟分区时间进行比较,以判断是否该分区是否需要被提交。

    String

    UTC

    仅当 sink.partition-commit.trigger 被设置为partition-time时有效。

    • 如果设置得不正确,例如,在TIMESTAMP_LTZ类型的列上定义了source rowtime,如果没有设置该属性,那么用户可能会在若干个小时后才看到分区的提交。默认值为UTC,意味着watermark是定义在TIMESTAMP类型的列上或者没有定义watermark。

    • 如果watermark定义在TIMESTAMP_LTZ类型的列上,watermark时区必须是会话时区。该属性的可选值要么是完整的时区名(例如'America/Los_Angeles'),要么是自定义时区(例如'GMT-08:00')。

    partition.time-extractor.kind

    从分区字段中提取时间的时间提取器。

    String

    default

    参数取值如下:

    • default(默认):默认情况下,可以配置timestamp pattern或formatter。

    • custom:应指定提取器类。

    partition.time-extractor.class

    实现PartitionTimeExtractor接口的提取器类。

    String

    无。

    partition.time-extractor.timestamp-pattern

    允许用户使用分区字段来获取合法的timestamp pattern的默认construction方式。

    String

    默认支持第一个字段按 yyyy-MM-dd hh:mm:ss 这种模式提取。

    • 如果需要从一个分区字段'dt'提取timestamp,可以配置:$dt。

    • 如果需要从多个分区字段,比如year、month和day和hour提取timestamp,可以配置成: $year-$month-$day $hour:00:00

    • 如果需要从两个分区字段dt和hour提取timestamp,可以配置成: $dt $hour:00:00

    partition.time-extractor.timestamp-formatter

    转换分区timestamp字符串值为timestamp的formatter,分区timestamp字符串值通过 partition.time-extractor.timestamp-pattern 属性表达。

    String

    yyyy-MM-dd HH:mm:ss

    例如,分区timestamp提取来自多个分区字段,比如year、month和day,可以配置 partition.time-extractor.timestamp-pattern 属性为 $year$month$day ,并且配置 partition.time-extractor.timestamp-formatter 属性为yyyyMMdd。默认的formatter是 yyyy-MM-dd HH:mm:ss 。这里的timestamp-formatter和Java的 DateTimeFormatter 是通用的。

    sink.partition-commit.policy.kind

    分区提交策略类型。

    String

    分区提交策略通知下游某个分区,该分区已经写入完毕可以被读取。参数取值如下:

    • success-file:在目录中增加_success文件。

    • custom:通过指定的类来创建提交策略。支持同时指定多个提交策略。

    sink.partition-commit.policy.class

    实现PartitionCommitPolicy接口的分区提交策略类。

    String

    该类只有在custom提交策略下才能使用。

    sink.partition-commit.success-file.name

    使用success-file分区提交策略时的文件名。

    String

    _SUCCESS

    无。

    sink.parallelism

    将文件写入外部文件系统的parallelism。

    Integer

    默认情况下,该sink parallelism与上游chained operator的parallelism一样。当配置了跟上游的chained operator不一样的parallelism时,写文件的算子会使用指定的sink parallelism,如果开启了文件合并,文件合并的算子也会使用指定的sink parallelism。

    说明

    这个值应该大于0,否则将抛出异常。

写OSS-HDFS

首先需要在实时计算控制台的Flink配置页面上添加上如下配置:

fs.oss.jindo.buckets: xxx
fs.oss.jindo.accessKeyId: xxx
fs.oss.jindo.accessKeySecret: xxx

其中涉及到的参数解释如下表所示:

配置项

说明

fs.oss.jindo.buckets

写入OSS-HDFS服务中的Bucket名称,可配置多个,以分号分隔。当Flink写一个OSS路径时,如果其对应的bucket包含在fs.oss.jindo.buckets中,则会写入OSS-HDFS服务中。

fs.oss.jindo.accessKeyId

阿里云账号的Access Key。获取方法请参见 查看RAM用户的AccessKey信息

fs.oss.jindo.accessKeySecret

阿里云账号的AccessKey Secret。获取方法请参见 查看RAM用户的AccessKey信息

此外,还需要配置OSS-HDFS的EndPoint。目前支持两种方式来配置OSS-HDFS的EndPoint:

  • 在实时计算控制台的Flink配置页面上添加如下配置项来配置OSS-HDFS的EndPoint

    fs.oss.jindo.endpoint: xxx
  • 在OSS的路径中配置OSS-HDFS的EndPoint

    通过如下的路径来进行配置

    oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>

    其中user-defined-oss-hdfs-bucket为对应的bucket的名字,oss-hdfs-endpoint为OSS-HDFS的EndPoint ;此时配置项fs.oss.jindo.buckets需要包含<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>。

    例如,假设bucket名字为jindo-test,其oss-hdfs的endpoint为

    cn-beijing.oss-dls.aliyuncs.com。则OSS路径需为oss://jindo-test.cn-beijing.oss-dls.aliyuncs.com/<user-defined-dir>,配置项fs.oss.jindo.buckets需包含jindo-test.cn-beijing.oss-dls.aliyuncs.com。

使用示例

  • 源表

    CREATE TEMPORARY TABLE fs_table_source (
      `id` INT,
      `name` VARCHAR
    ) WITH (
      'connector'='filesystem',
      'path'='oss://<bucket>/path',
      'format'='parquet'
    CREATE TEMPORARY TABLE blackhole_sink(
      `id` INT,
      `name` VARCHAR
    ) with (
      'connector' = 'blackhole'
    INSERT INTO blackhole_sink SELECT * FROM fs_table_source ;
  • 结果表

    • 写分区表

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE,
        ts BIGINT, -- 以毫秒为单位的时间
        ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
        WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义 watermark
      ) WITH (
        'connector' = 'datagen'
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE,
        dt STRING,
        `hour` STRING
      ) PARTITIONED BY (dt, `hour`) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet',
        'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
        'sink.partition-commit.delay'='1 h',
        'sink.partition-commit.trigger'='partition-time',
        'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假设用户配置的时区为 'Asia/Shanghai'
        'sink.partition-commit.policy.kind'='success-file'
      -- 流式 sql,插入文件系统表
      INSERT INTO fs_table_sink 
      SELECT 
        user_id, 
        order_amount, 
        DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
        DATE_FORMAT(ts_ltz, 'HH') 
      FROM datagen_source;
    • 写非分区表

      CREATE TABLE datagen_source (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector' = 'datagen'
      CREATE TEMPORARY TABLE fs_table_sink (
        user_id STRING,
        order_amount DOUBLE
      ) WITH (
        'connector'='filesystem',
        'path'='oss://<bucket>/path',
        'format'='parquet'
      INSERT INTO fs_table_sink SELECT * FROM datagen_source;