1)Structured Streaming更新broadcast
val enSpark = enSparkSession.session()
enSpark.streams.addListener(new StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val mins = sdf.format(new Date()).substring(14, 16).toInt
if (mins % 5 == 0 && broadcastWrapper.rulebroadcast != null) {
broadcastWrapper.update(enSpark.sparkContext, true)
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
* 广播变量 Wrapper
class BroadcastWrapper extends Serializable{
var rulebroadcast: Broadcast[ArrayBuffer[(Int,String,String,String,String,String,String,String)]] = _
* 更新 instance
* @param sc spark context
* @param blocking unpersist by default
def update(sc: SparkContext, blocking: Boolean = false): Broadcast[ArrayBuffer[(Int,String,String,String,String,String,String,String)]] = {
if (rulebroadcast != null) {
rulebroadcast.unpersist(blocking)
synchronized {
rulebroadcast = sc.broadcast(new JdbcUtil().getRuleBroadcast)
rulebroadcast
* 初始化 instance
* @param sc spark context
* @return
def getInstance(sc: SparkContext): Broadcast[ArrayBuffer[(Int,String,String,String,String,String,String,String)]] = {
if (rulebroadcast == null) {
synchronized {
if (rulebroadcast == null) {
rulebroadcast = sc.broadcast(new JdbcUtil( ).getRuleBroadcast)
rulebroadcast
2)Spark Streaming更新broadcast
def sparkStreaming(): Unit = {
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(15))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream(ipAddr, 19999)
val mro = lines.map(row => {
val fields = row.split(",")
Mro(fields(0), fields(1))
val cellJoinMro = mro.transform(row => {
if (1 < 3) {
println("更新broadcast..." + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date()))
BroadcastWrapper.update(ssc.sparkContext)
var broadcastCellRes = BroadcastWrapper.getInstance(ssc.sparkContext)
row.map(row => {
val int_id: String = row.int_id
val rsrp: String = row.rsrp
val findResult: String = String.join(",", broadcastCellRes.value.get(int_id).get)
val timeStamps: String = String.join(",", findResult)
CellJoinMro(int_id, rsrp, timeStamps)
cellJoinMro.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
object BroadcastWrapper {
@volatile private var instance: Broadcast[Map[String, java.util.List[String]]] = null
private val baseDir = "/user/my/streaming/test/"
def loadData(): Map[String, java.util.List[String]] = {
val files = HdfsUtil.getFiles(baseDir)
var latest: String = null
for (key <- files.keySet) {
if (latest == null) latest = key
else if (latest.compareTo(key) <= 0) latest = key
val filePath = baseDir + latest
val map = HdfsUtil.getFileContent(filePath)
def update(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null)
instance.unpersist(blocking)
instance = sc.broadcast(loadData())
def getInstance(sc: SparkContext): Broadcast[Map[String, java.util.List[String]]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(loadData)
instance
import java.io.{BufferedReader, InputStreamReader}
import java.text.SimpleDateFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import scala.collection.mutable
object HdfsUtil {
private val sdf = new SimpleDateFormat("yyyy-MM-dd 00:00:00")
def getFiles(path: String): mutable.Map[String, String] = {
val fileItems = new mutable.LinkedHashMap[String, String]
val fs = FileSystem.get(new Configuration())
val files = fs.listStatus(new Path(path))
var pathStr: String = ""
for (file <- files) {
if (file.isFile) {
pathStr = file.getPath().getName()
fileItems.put(pathStr.split("/")(pathStr.split("/").length - 1), pathStr)
fs.close()
fileItems
def getFileContent(filePath: String): Map[String, java.util.List[String]] = {
val map = new mutable.LinkedHashMap[String, java.util.List[String]]
val fs = FileSystem.get(new Configuration())
val path = new Path(filePath)
if (fs.exists(path)) {
val bufferedReader = new BufferedReader(new InputStreamReader(fs.open(path)))
var line: String = null
line = bufferedReader.readLine()
while (line != null) {
val fields: Array[String] = line.split(",")
val int_id: String = fields(0)
val date = new java.util.Date(java.lang.Long.valueOf(fields(2)))
val time = sdf.format(date)
System.out.println(line + "(" + time + ")")
if (!map.keySet.contains(int_id))
map.put(int_id, new java.util.ArrayList[String])
map.get(int_id).get.add(time)
line = bufferedReader.readLine()
map.toMap
} else {
throw new RuntimeException("the file do not exists")
一、广播变量的创建与使用
spark 的广播变量允许在每个工作节点缓存一个只读的变量,这样做的好处是避免任务为每一个Task共享的数据单独创建拷贝,大大节省了运算空间占用,在Java中通过JavaSparkContext.broadcast(v)方法,Scala中通过SparkContext.broadcast(v) 方法对变量v进行包装和分发操作,使用时调用broadcast.value(...
实时NLP服务,需要及时识别新添加的领域词与停用词;
实时风控服务,需要根据业务情况调整触发警告的规则;
通过spark的广播变量机制可以做到,配置的实时更新 ;不需要配置每次变化都去手动修改代码,然后重启作业。做到代码的灵活。
那么该如何更新广播变量?
我们知道广播变量是只读的,driver缓存一个只读的变量在每台worker上面,而不是每个任务保存一份拷贝。广播出
Spark流处理中定时更新广播变量值
在实际项目应用上,某些需求会有更新静态规则表的情况,如消息过滤规则、风控规则等。通常这样的表数据量不会大,在spark中使用广播变量的形式使用,而广播变量是不支持更新的,怎样在流处理过程中更新,下面分别论述Spark streaming和Structured streaming的场景。
一、Spark streaming
可以利用单例模式定时的删除已经广播的值,同时获取新的变量值重新广播,假如要广播的是RDS中的表,代码示例如下:
注意事项:
spark stre
1、Spark Streaming更新广播变量的方式
在Driver端通过累加器数据来一条就判断是否需要更新广播变量,通过这种方式就可以实现定时更新广播变量的方式。
lines.foreachRDD(rdd=>{
// 这里单例模式实例化广播变量
val dimTable = DimTable.getInstance(rdd.sparkContext)
// 这里使用累加器保存上一次更新广播变量的时间
val currentAccumulatorI
在实际的项目中,我们一般都会把配置信息放在配置文件或者存到第三方存储中,然后在程序中去读取,但是有的时候我们想修改这些信息,修改完必须要重启job才能生效,那是不是太麻烦了,那有没有办法修改完不重启job就能生效呢?其实我们可以用sparkstreaming的动态广播变量,比如某个配置需要十分钟更新一次,那我们可以在driver端初始化这个变量,在excetors端获取这个变量(注意exceto...
【前言:Spark目前提供了两种有限定类型的共享变量:广播变量和累加器,今天主要介绍一下基于Spark2.4版本的广播变量。先前的版本比如Spark2.1之前的广播变量有两种实现:HttpBroadcast和TorrentBroadcast,但是鉴于HttpBroadcast有各种弊端,目前已经舍弃这种实现,本篇文章也主要阐述TorrentBroadcast】
广播变量概述
广播变量是一个只读...
最近在使用Spark Streaming进行流式计算过程中,遇到在过滤函数中需要用到外部过滤条件列表,且列表会随时更新,一开始只是在main函数中获取过滤条件列表,但是后来发现streaming程序每次触发并非重新执行一遍main函数,部分代码(个人理解为非spark DAG有向图中rdd依赖链中的代码,也就是在driver端执行的这一部分)只会在streaming程序启动的时候执行一次,因此也就...
1. 广播变量我们知道spark 的广播变量允许缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。常见于spark在一些全局统计的场景中应用。通过广播变量,能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。
一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量
我需要使用Python在Spark Structured Streaming中随时间更新广播变量(例如,在定义的时间间隔内)。好多资料都是用Scala或Java。 用Python编写Broadcast Wrapper类 如下:
import time
from datetime import datetime
from pyspark import SparkConf, SparkContext
conf = SparkConf() \
.setMaster("local") \
本次此时是在SPARK2,3 structured streaming下测试,不过这种方案,在spark2.2 structured streaming下应该也可行(请自行测试)。以下是我测试结果:
成功测试结果:
准备工作:创建maven项目,并在pom.xml导入一下依赖配置:
<properties>
<project.build.sou...
最近写的一个流式的程序需要从redis 中获取变量信息,并广播,其中redis里面的信息是变动的,要求广播变量也要跟着改变,下面是测试代码:val dStream = KafkaUtils.createDirectStream[String, String](
PreferConsistent,
Subscribe[String, String](topic...
这两天在使用spark中的用到了广播变量,大致逻辑是从Redis中读取黑名单配置,然后广播到各个节点用于异常监控,但是在使用过程中总是报空指针异常,后面百度了很多资料,发现有说Yarn集群中不支持广播变量的,有说Sparkstreaming不支持广播变量更新的,有说是spark闭包问题的等等各种,最后笔者去查了sparkstreaming官方文档才学会了广播变量的正确使用方法,并将过程记录下来。
通过之前文章的介绍,大家都知道广播变量是只读的,那么在Spark流式处理中如何进行动态更新广播变量?
既然无法更新,那么只能动态生成,应用场景有实时风控中根据业务情况调整规则库、实时日志ETL服务中获取最新的日志格式以及字段变更等。
/** @author : 公众号:大数据学习与分享 */
@volatile private var instance: Broadcast[Array[Int]] = null
//获取广播变量单例对象
def getInstance(sc: SparkContext,