相关文章推荐
没人理的爆米花  ·  PROCESSLIST | PingCAP ...·  1 年前    · 
绅士的斑马  ·  CSV无法连接到ArcGIS属性表_arcg ...·  1 年前    · 
曾经爱过的汉堡包  ·  NumPy 从已有的数组创建数组 | 菜鸟教程·  2 年前    · 
逃跑的企鹅  ·  用户对问题“不同终端架构上的BitConve ...·  2 年前    · 
一身肌肉的台灯  ·  七款从HTML文档提取文本的工具-提取htm ...·  2 年前    · 
Code  ›  Structured Streaming教程(3) —— 与Kafka的集成开发者社区
kafka string topic 数据集成
https://cloud.tencent.com/developer/article/1172203
有情有义的葡萄酒
2 年前
作者头像
用户1154259
0 篇文章

Structured Streaming教程(3) —— 与Kafka的集成

前往专栏
腾讯云
开发者社区
文档 意见反馈 控制台
首页
学习
活动
专区
工具
TVP
文章/答案/技术大牛
发布
首页
学习
活动
专区
工具
TVP
返回腾讯云官网
社区首页 > 专栏 > xingoo, 一个梦想做发明家的程序员 > Structured Streaming教程(3) —— 与Kafka的集成

Structured Streaming教程(3) —— 与Kafka的集成

作者头像
用户1154259
发布 于 2018-07-31 17:31:26
1.1K 0
发布 于 2018-07-31 17:31:26
举报

Structured Streaming最主要的生产环境应用场景就是配合kafka做实时处理,不过在Strucured Streaming中kafka的版本要求相对搞一些,只支持0.10及以上的版本。就在前一个月,我们才从0.9升级到0.10,终于可以尝试structured streaming的很多用法,很开心~

引入

如果是maven工程,直接添加对应的kafka的jar包即可:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

读取kafka的数据

以流的形式查询

读取的时候,可以读取某个topic,也可以读取多个topic,还可以指定topic的通配符形式:

读取一个topic

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

读取多个topic

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

读取通配符形式的topic组

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

以批的形式查询

关于Kafka的offset,structured streaming默认提供了几种方式:

设置每个分区的起始和结束值

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

配置起始和结束的offset值(默认)

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema信息

读取后的数据的Schema是固定的,包含的列如下:

Column

Type

说明

key

binary

信息的key

value

binary

信息的value(我们自己的数据)

topic

string

主题

partition

int

分区

offset

long

偏移值

timestamp

long

时间戳

timestampType

int

类型

source相关的配置

无论是流的形式,还是批的形式,都需要一些必要的参数:

  • kafka.bootstrap.servers kafka的 服务器 配置,host:post形式,用逗号进行分割,如host1:9000,host2:9000
  • assign,以json的形式指定topic信息
  • subscribe,通过逗号分隔,指定topic信息
  • subscribePattern,通过java的正则指定多个topic assign、subscribe、subscribePattern同时之中能使用一个。

其他比较重要的参数有:

  • startingOffsets, offset开始的值,如果是earliest,则从最早的数据开始读;如果是latest,则从最新的数据开始读。默认流是latest,批是earliest
  • endingOffsets,最大的offset,只在批处理的时候设置,如果是latest则为最新的数据
  • failOnDataLoss,在流处理时,当数据丢失时(比如topic被删除了,offset在指定的范围之外),查询是否报错,默认为true。这个功能可以当做是一种告警机制,如果对丢失数据不感兴趣,可以设置为false。在批处理时,这个值总是为true。
  • kafkaConsumer.pollTimeoutMs,excutor连接kafka的超时时间,默认是512ms
  • fetchOffset.numRetries,获取kafka的offset信息时,尝试的次数;默认是3次
  • fetchOffset.retryIntervalMs,尝试重新读取kafka offset信息时等待的时间,默认是10ms
  • maxOffsetsPerTrigger,trigger暂时不会用,不太明白什么意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

写入数据到Kafka

Apache kafka 仅支持“至少一次”的语义,因此,无论是流处理还是批处理,数据都有可能重复。比如,当出现失败的时候,structured streaming会尝试重试,但是不会确定broker那端是否已经处理以及持久化该数据。但是如果query成功,那么可以断定的是,数据至少写入了一次。比较常见的做法是,在后续处理kafka数据时,再进行额外的去重,关于这点,其实structured streaming有专门的解决方案。

保存数据时的schema:

  • key,可选。如果没有填,那么key会当做null,kafka针对null会有专门的处理(待查)。
  • value,必须有
  • topic,可选。(如果配置option里面有topic会覆盖这个字段)

下面是sink输出必须要有的参数:

  • kafka.bootstrap.servers,kafka的集群地址,host:port格式用逗号分隔。

流处理的数据写入

// 基于配置指定topic
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()
// 在字段中包含topic
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

批处理的数据写入

跟流处理其实一样

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()
 
推荐文章
没人理的爆米花  ·  PROCESSLIST | PingCAP Docs
1 年前
绅士的斑马  ·  CSV无法连接到ArcGIS属性表_arcgis导入csv文件出现空值-CSDN博客
1 年前
曾经爱过的汉堡包  ·  NumPy 从已有的数组创建数组 | 菜鸟教程
2 年前
逃跑的企鹅  ·  用户对问题“不同终端架构上的BitConvert.IsLittleEndianon”的回答 - 问答 - 腾讯云开发者社区-腾讯云
2 年前
一身肌肉的台灯  ·  七款从HTML文档提取文本的工具-提取html中的文本
2 年前
今天看啥   ·   Py中国   ·   codingpro   ·   小百科   ·   link之家   ·   卧龙AI搜索
删除内容请联系邮箱 2879853325@qq.com
Code - 代码工具平台
© 2024 ~ 沪ICP备11025650号