Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,

如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。

要使用ForeachSink自定义sink,必须实现 ForeachWriter[T](), 包括open(),process(),close()三个方法:

class redisSink extends ForeachWriter[Row](){
  override def open(partitionId: Long, version: Long): Boolean ={
     //这个方法进行一些初始化,如redis,获取连接
  override def process(value: Row): Unit ={
    //具体的处理逻辑,写数据到数据库中
  override def close(errorOrNull: Throwable): Unit = {
   //关闭连接

        在每个batch中,这三个方法各调用一次,相当每批数据调用一次。下面是写入到redis和mysql的具体实现:

写入到Redis:     

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>
import java.sql.Timestamp
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object writeToRedis {
  def main(args:Array[String]):Unit= {
    //获取sparkSession对象
    val spark: SparkSession = SparkSession.builder
      .appName("continuousTrigger")
      .master("local[2]")
      .getOrCreate()
    //设置日志输出级别
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._
    var batchId: Long = 0
    //对查询添加一个监听,获取每个批次的处理信息
    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        val progress: StreamingQueryProgress = event.progress
        batchId = progress.batchId
        val inputRowsPerSecond: Double = progress.inputRowsPerSecond
        val processRowsPerSecond: Double = progress.processedRowsPerSecond
        val numInputRows: Long = progress.numInputRows
        println("batchId=" + batchId, "  numInputRows=" + numInputRows + "  inputRowsPerSecond=" + inputRowsPerSecond +
          "  processRowsPerSecond=" + processRowsPerSecond)
      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
    //使用structuredStreaming自带的Source产生数据
    //|-- timestamp: timestamp (nullable = true)
    // |-- value: long (nullable = true)
    val rateSource: DataFrame = spark.readStream
      .format("rate")
      .option("rowsPerSecond", 100)
      .load()
    //增加一列 batchId
    val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
      val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
      tuple
    }).toDF("batchId","timestamp","num")
    val resultDS: Dataset[Row] = addDF.filter("num%2=0")
    resultDS.writeStream
      .outputMode("complete")
      .foreach(new redisSink())
      //也可以这样
      /*.foreach(new ForeachWriter[Row](){
        override def open(partitionId: Long, version: Long): Boolean ={。。。。。}
        override def process(value: Row): Unit ={。。。。。。}
        override def close(errorOrNull: Throwable): Unit ={。。。。。。}
      .start()
      .awaitTermination()
  class redisSink extends ForeachWriter[Row](){
    var jedis:Jedis=null
    override def open(partitionId: Long, version: Long): Boolean ={
      val config: JedisPoolConfig = new JedisPoolConfig()
      config.setMaxTotal(20)
      config.setMaxIdle(5)
      config.setMaxWaitMillis(1000)
      config.setMinIdle(2)
      config.setTestOnBorrow(false)
      val jedisPool = new JedisPool(config,"127.0.0.1",6379)
      jedis=jedisPool.getResource()
      return true
    override def process(value: Row): Unit ={
      //写入数据到redis
      jedis.rpush("rate",value.get(0)+" "+value.get(1)+" "+value.get(2))
    override def close(errorOrNull: Throwable): Unit = {
      //关闭连接
      jedis.close()

写入到Mysql:

import java.sql.{Connection, PreparedStatement, Timestamp}
import com.mchange.v2.c3p0.ComboPooledDataSource
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
object writeToMysql {
  def main(args:Array[String]):Unit= {
    val rows = 100
    //获取sparkSession对象
    val spark: SparkSession = SparkSession.builder
      .appName("continuousTrigger")
      .master("local[2]")
      .getOrCreate()
    //设置日志输出级别
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._
    var batchId: Long = 0
    //对查询添加一个监听,获取每个批次的处理信息
    spark.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        val progress: StreamingQueryProgress = event.progress
        batchId = progress.batchId
        val inputRowsPerSecond: Double = progress.inputRowsPerSecond
        val processRowsPerSecond: Double = progress.processedRowsPerSecond
        val numInputRows: Long = progress.numInputRows
        println("batchId=" + batchId, "  numInputRows=" + numInputRows + "  inputRowsPerSecond=" + inputRowsPerSecond +
          "  processRowsPerSecond=" + processRowsPerSecond)
      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
    //读取数据源
    val rateSource: DataFrame = spark.readStream
      .format("rate")
      .option("rowsPerSecond", rows)
      .load()
    rateSource.printSchema()
    val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
      val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
      tuple
    }).toDF("batchId","timestamp","num")
    val resultDS: Dataset[Row] = addDF.filter("num%2=0")
    resultDS.writeStream
      .foreach(new mysqlSink())
      .start()
      .awaitTermination()
  class mysqlSink extends ForeachWriter[Row](){
    var conn:Connection=null
    var ps:PreparedStatement=null
    var dataSource:ComboPooledDataSource=_
    val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)"
    override def open(partitionId: Long, version: Long): Boolean ={
      dataSource = new ComboPooledDataSource()
      dataSource.setDriverClass("com.mysql.jdbc.Driver")
      dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/entrobus")
      dataSource.setUser("root")
      dataSource.setPassword("root")
      dataSource.setInitialPoolSize(40)
      dataSource.setMaxPoolSize(100)
      dataSource.setMinPoolSize(10)
      dataSource.setAcquireIncrement(5)
      conn= dataSource.getConnection
      ps= conn.prepareStatement(sql)
      return true
    override def process(value: Row): Unit ={
      ps.setObject(1,value.get(0))
      ps.setObject(2,value.get(1))
      ps.setObject(3,value.get(2))
      val i: Int = ps.executeUpdate()
      println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2))
    override def close(errorOrNull: Throwable): Unit = {
      dataSource.close()

    很久没用mysql数据库了,不知道连接池那块这样写合不合理,不过数据写入到mysql数据库是没问题的。

Spark2.3中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink,如果有其他的需求我们只有通过ForeachSink自定义sink,这篇文件主要以写入到Redis和Mysql为例。       要使用ForeachSink自定义sink,必须实现ForeachWrite...
Spark Structured Streaming http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html 简单来说Spark Structured Streaming提供了流数据的快速、可靠、容错、端对端的精确一次处理语义,它是建立在SparkSQL基础之上的一个流数据处理引擎; 我们依然可以使用Spark SQL的Dataset/DataFrame API操作处理流数据(操作方式类似于Spark SQL的批数据处理); 默认情况下,Spark Structured Strea
class TestForeachWriter extends ForeachWriter[Row] with Serializable { var connection:Connection = _ var statement:Statement = _ val ip... import java.sql.{Connection, DriverManager, Statement} import org.apache.spark.sql.{ForeachWriter, Row, SparkSession} object StructruedForeach { def main(args: Array[String]): Unit = { val spark = SparkSession.b. 使用官方提供的格式调用Structured StreamingforeachBatch案例输出时报异常,下面是案例的代码 streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(...).save(...) // location 1 batchDF.write.for 在 Micro-batch 模式下,每个微批次作为一个job调度单元,官网描述其带来的延迟在 100ms 在 Continus 连续处理模式下,延迟低于1ms 好的架构在于其巧妙的构思,而最终形式又是极其精简的。 编程模型:可将流看做是不断地往一个表上append内容的过程,查询操...
Structured streaming默认支持的sink类型有File sinkForeach sink,Console sink,Memory sink。 特别的说明一下Foreach sink用法(ps:以通过Foreach sink写入外部redis为例)。 lastEtlData.writeStream().foreach(new TestForeachWriter()).o...
基于python的StructuredStream开发示例如下: from pyspark.sql import Row, SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split from pyspark.sql.types import * from pyspar...
大数据-玩转数据-Spark-Structured Streaming 容错(python版) 由于网络问题,链路断,系统崩溃,JVM故障都会导致数据流的运行结果出现错误,Spark设计了输入源,执行引擎和接收器多个松散耦合组件隔离故障。 输入源通过位置偏移量来记录目前所处位置,引擎通过检查点保存间状态,接收器使用“幂等”的接收器来保障输出的稳定性。 我们希望数据是它产生的时间,而不是到达的时间,Spark模型当,事件时间是数据的一列,为了避免存储空间无限扩大,同时还引入“水印”机制,将超过
package org.sun.IndustryBigDataAnalyticsPartform import org.apache.spark.sql.SparkSession import java.io._ import org.apache.spark.sql.ForeachWriter import
  Spark Structured streaming API支持的输出源有:Console、Memory、File和Foreach。其Console在前两篇博文已有详述,而Memory使用非常简单。本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式。 1. File   Structured Streaming支持将数据以File形式保存起来,...
Structured Streaming 是 Apache Spark 的一种流式处理引擎,它可以帮助我们对流式数据进行转换、处理和聚合。Structured Streaming 使用了基于 SQL 的语法,因此我们可以使用熟悉的 SQL 语句来对流式数据进行操作。例如,我们可以使用以下 SQL 语句对流式数据进行过滤: SELECT * FROM stream WHERE value > 5 或者使用以下 SQL 语句对流式数据进行分组并计算每组的平均值: SELECT key, AVG(value) FROM stream GROUP BY key Structured Streaming 还支持联机处理,即可以对流式数据进行实时处理,并将结果实时输出。这使得我们可以使用 Structured Streaming 实现各种实时数据处理任务,如实时统计、实时分析等。