In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article to share with you is about Spark broadcast variable analysis and how to dynamically update broadcast variables, Xiaobian think it is very practical, so share it with you to learn, I hope you can gain something after reading this article, not much to say, follow Xiaobian to see it.
Today we'll focus on broadcast variables based on Spark2.4. Previous versions, such as Spark2.1, have two implementations of broadcast variables: HttpBroadcast and TorrentBroadcast, but due to various drawbacks of HttpBroadcast, this implementation has been abandoned at present. The small editor mainly describes the overview of TorrentBroadcast broadcast variables.
Broadcast variable is a read-only variable, through which we can cache some shared data sets or large variables on each machine in the Spark cluster without having to copy a copy for each task. Subsequent calculations can be reused, reducing the use of network bandwidth during data transmission and improving efficiency. In contrast to Hadoop's distributed cache, broadcast content can be shared across jobs. Broadcast variables require that the broadcast data is immutable, not too large but not too small (generally tens of M or more), can be serialized and deserialized, and must be declared on the driver side. It is applicable to broadcasting data common to multiple stages. The storage level is MEMORY_AND_DISK at present.
Broadcast variable storage is currently based on the Block Manager distributed storage system implemented by Spark. Shuffle data in Spark and block blocks split when loading HDFS data are stored in Block Manager, which is not today's discussion point. I will not go into detail here.
How to create and acquire broadcast variables
//Create broadcast variables
val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
//Get broadcast variables
broadcastVar.value Broadcast variable instantiation procedure
1. First call val broadcastVar = sparkSession.sparkContext.broadcast(Array(1, 2, 3))
2. Call BroadcastManager newBroadcast method val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
3. Created by the newBroadcast method of the broadcast factory
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
The Broadcast Factory initialization (initialize method) has been completed when the Broadcast Manager newBroadcast method is called. We only need to look at the instantiation process of TorrentBroadcast in the BroadcastFactory implementation:
new TorrentBroadcast[T](value_, id)4. When building TorrentBroadcast, write the broadcast data into BlockManager1) First, divide the serialized broadcast variable into multiple blocks and store them in the BlockManager on the driver side, so that the task running on the driver side does not need to create a copy of the broadcast variable (see the writeBlocks method of TorrentBroadcast for details) 2) Each executor first obtains the broadcast variable from the local BlockManager when obtaining it. If you can't get it, you will get it from the driver or other executor. After you get it, you will save the obtained data in your own BlockManager. 3) Block Size Default 4Mconf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
Broadcast variable initialization procedure
1. First call broadcastVar.value2.TorrentBroadcast lazy variable_value to initialize, call readBroadcastBlock()3. Read from cache first, pattern match the result, match successfully and return directly 4. Read not read through readBlocks()
Read from driver or other executor, store the read object locally and store it in cache
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
Comparison of two broadcast variables in Spark
As mentioned in the preface, HttpBroadcast has been abandoned in subsequent versions of Spark, but considering that some companies use lower versions of Spark, it is still possible to ask questions related to two implementations in the interview. Here is a brief introduction: HttpBroadcast will store broadcast variable objects in the BlockManager on the driver side, and serialize the broadcast variables into files. All requests for broadcast data are on the driver side, so there are single points of failure and network IO performance issues. TorrentBroadcast stores the broadcast variable object in the Block Manager on the driver side, and divides the broadcast object into several serialized block blocks (default 4M), which are stored in the Block Manager. Small blocks store location information, stored in the BlockManagerMaster on the Driver side. Data requests are not concentrated on the driver side, avoiding single points of failure and driver-side network disk IO overload.
TorrentBroadcast stores an object on the executor side and stores the acquired block in the BlockManager, and reports the block storage information to the BlockManager on the driver side.
When requesting data, all storage location information of the block will be obtained first, and it will be obtained randomly from all BlockManagers storing the executor, avoiding data request service concentration at one point.
In short, HttpBroadcast causes the request for obtaining broadcast variables to be concentrated on the driver side, which is easy to cause single-point failures on the driver side, and the network IO is too high to affect performance. TorrentBroadcast can request the request service for obtaining broadcast variables to the driver side or the executor, avoiding the above problems. Of course, this is only the main optimization point.
Dynamically updating broadcast variables Through the above introduction, we all know that broadcast variables are read-only, so how to dynamically update broadcast variables in Spark streaming processing?
Since it cannot be updated, it can only be generated dynamically. Application scenarios include adjusting the rule base according to business conditions in real-time risk control, obtaining the latest log format and field changes in real-time log ETL service, etc.
@volatile private var instance: Broadcast[Array[Int]] = null
//Get broadcast variable singleton object
def getInstance(sc: SparkContext, ctime: Long): Broadcast[Array[Int]] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(fetchLastestData())
}
}
}
instance
}
//Load data to broadcast and update broadcast variables
def updateBroadCastVar(sc: SparkContext, blocking: Boolean = false): Unit = {
if (instance != null) {
//Delete broadcast copies cached on executors, and optionally block wait after deletion
//The bottom layer can choose whether to delete the broadcast copy on the driver side as well
instance.unpersist(blocking)
instance = sc.broadcast(fetchLastestData())
}
}
def fetchLastestData() = {
//Get data that needs updating dynamically
//This is pseudocode
Array(1, 2, 3)
}val dataFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
...
...
stream.foreachRDD { rdd =>
val current_time = dataFormat.format(new Date())
val new_time = current_time.substring(14, 16).toLong
//Updated every 10 minutes
if (new_time % 10 == 0) {
updateBroadCastVar(rdd.sparkContext, true)
}
rdd.foreachPartition { records =>
instance.value
...
}
Note: The above is pseudocode that gives an implementation idea, and some optimization is needed in actual production. In addition, this method has certain disadvantages, that is, because the broadcast data is periodically updated, there is a certain lag. The broadcasting period should not be too short, and the pressure on the external storage system to store the data to be broadcast should be considered. Specific also depends on the specific business scenario, if the real-time requirements are not particularly high, you can take this, of course, you can also refer to Flink is how to achieve dynamic broadcasting.
Why singleton patterns are used in Spark streaming programs
1. Broadcast variables are read-only. Using singleton mode can reduce the overhead of creating broadcast variables frequently every time a job is generated and executed in Spark streaming programs.
2. Broadcast variable singleton patterns also require synchronization. In FIFO scheduling mode, concurrency problem does not occur. However, if you change the scheduling mode, such as fair scheduling mode, and set the number of jobs executed in parallel by Spark streaming programs greater than 1, such as setting the parameter spark.streaming. currentJobs =4, you must add synchronization code.
3. Concurrency also occurs when multiple output streams share broadcast variables and fair scheduling is configured simultaneously. It is recommended to use local variables in foreachRDD or transform for broadcasting to avoid the impact between different jobs in fair scheduling mode.
The same is true for accumulators, except for broadcast variables. At the bottom of Spark Streaming components such as Spark Streaming, each output stream generates a job, forming a job collection that is submitted to the thread pool for concurrent execution.
The above is Spark broadcast variable analysis and how to dynamically update broadcast variables, Xiaobian believes that some knowledge points may be seen or used in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.