用Flink SQL实现数据流式导入Elasticsearch
通过
上一文
我们知道要如果用Elasticsearch实现全文检索,需要先将待检索的语料数据导入Elasticsearch集群。我们可以采用定期批量导入,但这样很多时候并不能满足业务需求, 比如新出来的新闻,或者新上传的文档,商家新上传的商品等,我们希望能实时能被检索到,这就需要这些数据导入Elasticsearch的过程是流式的。
Flink作为当前火热的开源流式计算引擎,具有性能好与使用广泛的优势,尤其是flink sql api,可以使得开发者可以像操作关系型数据一样使用简单的sql创建流式计算任务, 本文将介绍如何用flink sql将数据流式到Elasticsearch.
任务描述
假设我们的数据来自kafka中名为person的topic,为了方便陈述,假设仅包含两个字段,id与doc,类型均为string。 现在需要将其导入Elsaticsearch的某个index下面。 可以用PUT请求创建一个名为test_index的index用于存放导入的数据。
创建Source Connector
flink sql api提供了关键字用于创建kafka connector,相应的kafka配置参数通过DDL语句的with参数传入,示例如下
CREATE TABLE person_kafka_source (
id STRING,
doc STRING
) WITH (
-- declare the external system to connect to
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'topic_name',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
-- specify the update-mode for streaming tables
'update-mode' = 'append',
-- declare a format for this system
'format.type' = 'avro',
'format.avro-schema' = '{
"namespace": "org.myorganization",
"type": "record",
"name": "person",
"fields": [
{"name": "id", "type": "string"},
{"name": "doc", "type": "string"}
)
这里数据源的scheme是通过format.avro-schema参数声明的,参数类型为要求json字符串。
创建Elasticserach Connector
同样也是通过DDL来创建
create table person_dup_es_sink (
id VARCHAR,
data VARCHAR
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts.0.hostname' = 'remote host',
'connector.hosts.0.port' = '9200',
'connector.hosts.0.protocol' = 'http',
'connector.index' = 'test_index',
'connector.document-type' = 'person',
'update-mode' = 'append',
'format.type' = 'json',
'format.json-schema' = '{"type":"object", "properties": {"id": {"type": "string"}, "data":{"type":"string"}}}'
;
与创建kafka connector的DDL类似,也需要在with参数中传入数据的schema。此外还需要用connector.document-type参数指定将数据导入到Elasticsearch哪个type,如果改type不存在的话,会自动创建。
Insert语句
可以使用INSERT语句实现数据的导入
INSERT INTO person_dup_es_sink
SELECT id as id
,doc AS data
FROM person_kafka_data
采用这种方式导入的话,每条数据在Elasticsearch中的document ID是通过自增的方式生成的。 如果你需要将id指定为document ID,则需要将Elasticserach connector声明为插入模式,即将上面DDL中update-mode 参数设为'upsert'。 并对id进行group by以保证其唯一性
INSERT INTO person_dup_es_sink
SELECT id as id
,FIRST_VALUE(doc) AS data