相关文章推荐
狂野的荒野  ·  Caused by: ...·  3 周前    · 
非常酷的豆腐  ·  Django model distinct ...·  1 年前    · 
爱逃课的葡萄酒  ·  Grails Error: No ...·  1 年前    · 

用Flink SQL实现数据流式导入Elasticsearch

通过 上一文 我们知道要如果用Elasticsearch实现全文检索,需要先将待检索的语料数据导入Elasticsearch集群。我们可以采用定期批量导入,但这样很多时候并不能满足业务需求, 比如新出来的新闻,或者新上传的文档,商家新上传的商品等,我们希望能实时能被检索到,这就需要这些数据导入Elasticsearch的过程是流式的。
Flink作为当前火热的开源流式计算引擎,具有性能好与使用广泛的优势,尤其是flink sql api,可以使得开发者可以像操作关系型数据一样使用简单的sql创建流式计算任务, 本文将介绍如何用flink sql将数据流式到Elasticsearch.

任务描述

假设我们的数据来自kafka中名为person的topic,为了方便陈述,假设仅包含两个字段,id与doc,类型均为string。 现在需要将其导入Elsaticsearch的某个index下面。 可以用PUT请求创建一个名为test_index的index用于存放导入的数据。

创建Source Connector

flink sql api提供了关键字用于创建kafka connector,相应的kafka配置参数通过DDL语句的with参数传入,示例如下

CREATE TABLE person_kafka_source (
  id STRING,
  doc STRING
) WITH (
  -- declare the external system to connect to
  'connector.type' = 'kafka',
  'connector.version' = '0.10',
  'connector.topic' = 'topic_name',
  'connector.startup-mode' = 'earliest-offset',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  -- specify the update-mode for streaming tables
  'update-mode' = 'append',
  -- declare a format for this system
  'format.type' = 'avro',
  'format.avro-schema' = '{
                            "namespace": "org.myorganization",
                            "type": "record",
                            "name": "person",
                            "fields": [
                                {"name": "id", "type": "string"},
                                {"name": "doc", "type": "string"}
)

这里数据源的scheme是通过format.avro-schema参数声明的,参数类型为要求json字符串。

创建Elasticserach Connector

同样也是通过DDL来创建

create table person_dup_es_sink (
    id          VARCHAR,
    data        VARCHAR
)  with (
    'connector.type' = 'elasticsearch',
    'connector.version' = '6',
    'connector.hosts.0.hostname' = 'remote host',
    'connector.hosts.0.port' = '9200',
    'connector.hosts.0.protocol' = 'http',
    'connector.index' = 'test_index',
    'connector.document-type' = 'person',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.json-schema' = '{"type":"object", "properties": {"id": {"type": "string"}, "data":{"type":"string"}}}'
;

与创建kafka connector的DDL类似,也需要在with参数中传入数据的schema。此外还需要用connector.document-type参数指定将数据导入到Elasticsearch哪个type,如果改type不存在的话,会自动创建。

Insert语句

可以使用INSERT语句实现数据的导入

INSERT INTO person_dup_es_sink
SELECT id as id
    ,doc AS data
FROM person_kafka_data

采用这种方式导入的话,每条数据在Elasticsearch中的document ID是通过自增的方式生成的。 如果你需要将id指定为document ID,则需要将Elasticserach connector声明为插入模式,即将上面DDL中update-mode 参数设为'upsert'。 并对id进行group by以保证其唯一性

INSERT INTO person_dup_es_sink
SELECT id as id
    ,FIRST_VALUE(doc) AS data