In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the knowledge of "how to use Spark Streaming, Broadcast and Accumulaor together". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Broadcasting can be customized, and complex business logic can be completed through the combination of Broadcast and Accumulator.
The following code listens on native port 9999 and sends words to the client on the connection, including the blacklisted words Hadoop,Mahout and Hive.
Package org.scala.opt
Import java.io. {PrintWriter, IOException}
Import java.net. {Socket, SocketException, ServerSocket}
Case class ServerThread (socket: Socket) extends Thread ("ServerThread") {
Override def run (): Unit = {
Val ptWriter = new PrintWriter (socket.getOutputStream)
Try {
Var count = 0
Var totalCount = 0
Var isThreadRunning: Boolean = true
Val batchCount = 1
Val words = List ("Java Scala C C++ C# Python JavaScript"
"Hadoop Spark Ngix MFC Net Mahout Hive")
While (isThreadRunning) {
Words.foreach (ptWriter.println)
Count + = 1
If (count > = batchCount) {
TotalCount + = count
Count = 0
Println ("batch" + batchCount + "totalCount = >" + totalCount)
Thread.sleep (1000)
}
Methods in the / / out.println class do not throw an Imax O exception, although some of its constructors may throw an exception. The client may query whether there is an error in the call to checkError ().
If (ptWriter.checkError ()) {
IsThreadRunning = false
Println ("ptWriter error then close socket")
}
}
}
Catch {
Case e: SocketException = >
Println ("SocketException:", e)
Case e: IOException = >
E.printStackTrace ()
} finally {
If (ptWriter! = null) ptWriter.close ()
Println ("Client" + socket.getInetAddress + "disconnected")
If (socket! = null) socket.close ()
}
Println (Thread.currentThread () .getName + "Exit")
}
}
Object SocketServer {
Def main (args: Array [String]): Unit = {
Try {
Val listener = new ServerSocket (9999)
Println ("Server is started, waiting for client connect...")
While (true) {
Val socket = listener.accept ()
Println ("Client:" + socket.getLocalAddress + "connected")
New ServerThread (socket) .start ()
}
Listener.close ()
}
Catch {
Case e: IOException = >
System.err.println ("Could not listen on port: 9999.")
System.exit (- 1)
}
}
}
The following code realizes the function of receiving the words sent by the local port 9999 and counting the number of blacklists.
Package com.dt.spark.streaming_scala
Import org.apache.spark.streaming. {Seconds, StreamingContext}
Import org.apache.spark. {SparkConf, Accumulator}
Import org.apache.spark.broadcast.Broadcast
/ * *
* lesson 103: online blacklist filtering and counting using Spark Streaming, Broadcast and Accumulator in hands-on practice
* content of this issue:
1Magical Spark Streaming joined forces with Broadcast and Accumulator
2, online blacklist filtering and computing
, /
Object _ 103SparkStreamingBroadcastAccumulator {
@ volatile private var broadcastList: Broadcasting [list [string]] = null
@ volatile private var accumulator: Accumulator [Int] = null
Def main (args: Array [String]): Unit = {
Val conf = new SparkConf () .setMaster ("local [5]") .setAppName ("_ 103SparkStreamingBroadcastAccumulator")
Val ssc = new StreamingContext (conf, Seconds (5))
Ssc.sparkContext.setLogLevel ("WARN")
/ * *
* use Broadcast to broadcast blacklists to each Executor
, /
BroadcastList = ssc.sparkContext.broadcast (Array ("Hadoop", "Mahout", "Hive") .toList)
/ * *
* Global counter to notify how many blacklists have been filtered online
, /
Accumulator = ssc.sparkContext.accumulator (0, "OnlineBlackListCounter")
Ssc.socketTextStream ("localhost", 9999). FlatMap (_ .split (")). Map ((_, 1)). ReduceByKey (_ + _). ForeachRDD {rdd = > {
If (! rdd.isEmpty ()) {
Rdd.filter (wordPair = > {
If (broadcastList.value.contains (wordPair._1)) {
Println ("BlackList word% s appeared" .formatted (wordPair._1))
Accumulator.add (wordPair._2)
False
} else {
True
}
}) .collect ()
Println ("BlackList appeared:% d times" .format (accumulator.value))
}
}}
Ssc.start ()
Ssc.awaitTermination ()
Ssc.stop ()
}
}
The Server sender log is as follows, the number of times the output is continuously printed.
The Spark Streaming side prints the words of the blacklist and the number of times they appear.
This is the end of the content of "how to use Spark Streaming, Broadcast and Accumulaor together". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.