Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,
如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。
要使用ForeachSink自定义sink,必须实现
ForeachWriter[T](),
包括open(),process(),close()三个方法:
class redisSink extends ForeachWriter[Row](){
override def open(partitionId: Long, version: Long): Boolean ={
//这个方法进行一些初始化,如redis,获取连接
override def process(value: Row): Unit ={
//具体的处理逻辑,写数据到数据库中
override def close(errorOrNull: Throwable): Unit = {
//关闭连接
在每个batch中,这三个方法各调用一次,相当每批数据调用一次。下面是写入到redis和mysql的具体实现:
写入到Redis:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
import java.sql.Timestamp
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object writeToRedis {
def main(args:Array[String]):Unit= {
//获取sparkSession对象
val spark: SparkSession = SparkSession.builder
.appName("continuousTrigger")
.master("local[2]")
.getOrCreate()
//设置日志输出级别
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
var batchId: Long = 0
//对查询添加一个监听,获取每个批次的处理信息
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val progress: StreamingQueryProgress = event.progress
batchId = progress.batchId
val inputRowsPerSecond: Double = progress.inputRowsPerSecond
val processRowsPerSecond: Double = progress.processedRowsPerSecond
val numInputRows: Long = progress.numInputRows
println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond +
" processRowsPerSecond=" + processRowsPerSecond)
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
//使用structuredStreaming自带的Source产生数据
//|-- timestamp: timestamp (nullable = true)
// |-- value: long (nullable = true)
val rateSource: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", 100)
.load()
//增加一列 batchId
val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
tuple
}).toDF("batchId","timestamp","num")
val resultDS: Dataset[Row] = addDF.filter("num%2=0")
resultDS.writeStream
.outputMode("complete")
.foreach(new redisSink())
//也可以这样
/*.foreach(new ForeachWriter[Row](){
override def open(partitionId: Long, version: Long): Boolean ={。。。。。}
override def process(value: Row): Unit ={。。。。。。}
override def close(errorOrNull: Throwable): Unit ={。。。。。。}
.start()
.awaitTermination()
class redisSink extends ForeachWriter[Row](){
var jedis:Jedis=null
override def open(partitionId: Long, version: Long): Boolean ={
val config: JedisPoolConfig = new JedisPoolConfig()
config.setMaxTotal(20)
config.setMaxIdle(5)
config.setMaxWaitMillis(1000)
config.setMinIdle(2)
config.setTestOnBorrow(false)
val jedisPool = new JedisPool(config,"127.0.0.1",6379)
jedis=jedisPool.getResource()
return true
override def process(value: Row): Unit ={
//写入数据到redis
jedis.rpush("rate",value.get(0)+" "+value.get(1)+" "+value.get(2))
override def close(errorOrNull: Throwable): Unit = {
//关闭连接
jedis.close()
写入到Mysql:
import java.sql.{Connection, PreparedStatement, Timestamp}
import com.mchange.v2.c3p0.ComboPooledDataSource
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
object writeToMysql {
def main(args:Array[String]):Unit= {
val rows = 100
//获取sparkSession对象
val spark: SparkSession = SparkSession.builder
.appName("continuousTrigger")
.master("local[2]")
.getOrCreate()
//设置日志输出级别
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
var batchId: Long = 0
//对查询添加一个监听,获取每个批次的处理信息
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val progress: StreamingQueryProgress = event.progress
batchId = progress.batchId
val inputRowsPerSecond: Double = progress.inputRowsPerSecond
val processRowsPerSecond: Double = progress.processedRowsPerSecond
val numInputRows: Long = progress.numInputRows
println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond +
" processRowsPerSecond=" + processRowsPerSecond)
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
//读取数据源
val rateSource: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", rows)
.load()
rateSource.printSchema()
val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
tuple
}).toDF("batchId","timestamp","num")
val resultDS: Dataset[Row] = addDF.filter("num%2=0")
resultDS.writeStream
.foreach(new mysqlSink())
.start()
.awaitTermination()
class mysqlSink extends ForeachWriter[Row](){
var conn:Connection=null
var ps:PreparedStatement=null
var dataSource:ComboPooledDataSource=_
val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)"
override def open(partitionId: Long, version: Long): Boolean ={
dataSource = new ComboPooledDataSource()
dataSource.setDriverClass("com.mysql.jdbc.Driver")
dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/entrobus")
dataSource.setUser("root")
dataSource.setPassword("root")
dataSource.setInitialPoolSize(40)
dataSource.setMaxPoolSize(100)
dataSource.setMinPoolSize(10)
dataSource.setAcquireIncrement(5)
conn= dataSource.getConnection
ps= conn.prepareStatement(sql)
return true
override def process(value: Row): Unit ={
ps.setObject(1,value.get(0))
ps.setObject(2,value.get(1))
ps.setObject(3,value.get(2))
val i: Int = ps.executeUpdate()
println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2))
override def close(errorOrNull: Throwable): Unit = {
dataSource.close()
很久没用mysql数据库了,不知道连接池那块这样写合不合理,不过数据写入到mysql数据库是没问题的。
Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。 要使用ForeachSink自定义sink,必须实现ForeachWrite...
Spark Structured Streaming
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
简单来说Spark Structured Streaming提供了流数据的快速、可靠、容错、端对端的精确一次处理语义,它是建立在SparkSQL基础之上的一个流数据处理引擎;
我们依然可以使用Spark SQL的Dataset/DataFrame API操作处理流数据(操作方式类似于Spark SQL的批数据处理); 默认情况下,Spark Structured Strea
class TestForeachWriter extends ForeachWriter[Row] with Serializable {
var connection:Connection = _
var statement:Statement = _
val ip...
import java.sql.{Connection, DriverManager, Statement}
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}
object StructruedForeach {
def main(args: Array[String]): Unit = {
val spark = SparkSession.b.
使用官方提供的格式调用Structured Streaming 的foreachBatch案例输出时报异常,下面是案例的代码
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.for
在 Micro-batch 模式下,每个微批次作为一个job调度单元,官网描述其带来的延迟在 100ms
在 Continus 连续处理模式下,延迟低于1ms
好的架构在于其巧妙的构思,而最终形式又是极其精简的。
编程模型:可将流看做是不断地往一个表上append内容的过程,查询操...
Structured streaming默认支持的sink类型有File sink,Foreach sink,Console sink,Memory sink。
特别的说明一下Foreach sink的用法(ps:以通过Foreach sink写入外部redis为例)。
lastEtlData.writeStream().foreach(new TestForeachWriter()).o...
基于python的StructuredStream开发示例如下:
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import *
from pyspar...
大数据-玩转数据-Spark-Structured Streaming 容错(python版)
由于网络问题,链路中断,系统崩溃,JVM故障都会导致数据流的运行结果出现错误,Spark设计了输入源,执行引擎和接收器多个松散耦合组件隔离故障。
输入源通过位置偏移量来记录目前所处位置,引擎通过检查点保存中间状态,接收器使用“幂等”的接收器来保障输出的稳定性。
我们希望数据是它产生的时间,而不是到达的时间,Spark模型当中,事件时间是数据中的一列,为了避免存储空间无限扩大,同时还引入“水印”机制,将超过
package org.sun.IndustryBigDataAnalyticsPartform
import org.apache.spark.sql.SparkSession
import java.io._
import org.apache.spark.sql.ForeachWriter
import
Spark Structured streaming API支持的输出源有:Console、Memory、File和Foreach。其中Console在前两篇博文中已有详述,而Memory使用非常简单。本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式。
1. File
Structured Streaming支持将数据以File形式保存起来,...
Structured Streaming 是 Apache Spark 的一种流式处理引擎,它可以帮助我们对流式数据进行转换、处理和聚合。Structured Streaming 使用了基于 SQL 的语法,因此我们可以使用熟悉的 SQL 语句来对流式数据进行操作。例如,我们可以使用以下 SQL 语句对流式数据进行过滤:
SELECT * FROM stream WHERE value > 5
或者使用以下 SQL 语句对流式数据进行分组并计算每组的平均值:
SELECT key, AVG(value) FROM stream GROUP BY key
Structured Streaming 还支持联机处理,即可以对流式数据进行实时处理,并将结果实时输出。这使得我们可以使用 Structured Streaming 实现各种实时数据处理任务,如实时统计、实时分析等。