In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article shows you what is the method of periodically clearing the state of Spark Streaming streams. The content is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.
In Spark Streaming programs, we often need to use stateful flows to count cumulative indicators, such as the PV of each item. The simple code description is as follows, using the mapWithState () operator:
Val productPvStream = stream.mapPartitions (records = > {
Var result = new ListBuffer [(String, Int)]
For (record {
Val sum = pv.getOrElse (0) + state.getOption () .getOrElse (0)
State.update (sum)
(productId, sum)
}). StateSnapshots ()
The problem now is that PV does not accumulate all the time, but returns to zero every day and re-statistics. To achieve the goal of clearing the state at 0: 00 in the morning, there are two ways.
Write a script to restart the Streaming program
Schedule and execute the following Shell script at 0: 00 a.m. With crontab, Azkaban, etc.:
Stream_app_name='com.xyz.streaming.MallForwardStreaming'
Cnt= `ps aux | grep SparkSubmit | grep ${stream_app_name} | wc-l`
If [${cnt}-eq 1]; then
Pid= `ps aux | grep SparkSubmit | grep ${stream_app_name} | awk'{print $2}'`
Kill-9 ${pid}
Sleep 20
Cnt= `ps aux | grep SparkSubmit | grep ${stream_app_name} | wc-l`
If [${cnt}-eq 0]; then
Nohup sh / path/to/streaming/bin/mall_forward.sh > / path/to/streaming/logs/mall_forward.log 2 > & 1
Fi
Fi
This approach is the simplest and does not require any changes to the program itself. But as more and more Streaming tasks are running at the same time, it becomes more and more cumbersome.
Set a timeout for StreamingContext
Before starting the program, calculate the number of milliseconds from the current point in time to 0: 00 a.m. The next day:
Def msTillTomorrow = {
Val now = new Date ()
Val tomorrow = new Date (now.getYear, now.getMonth, now.getDate + 1)
Tomorrow.getTime-now.getTime
}
Then write the main logic of the Streaming program in the while (true) loop, and instead of calling the StreamingContext.awaitTermination () method as usual, use the awaitTerminationOrTimeout () method, that is:
While (true) {
Val ssc = new StreamingContext (sc, Seconds (BATCH_INTERVAL))
Ssc.checkpoint (CHECKPOINT_DIR)
/ /... Processing logic.
Ssc.start ()
Ssc.awaitTerminationOrTimeout (msTillTomorrow)
Ssc.stop (false, true)
Thread.sleep (BATCH_INTERVAL * 1000)
}
After msTillTomorrow milliseconds, StreamingContext times out and calls its stop () method (note two parameters: stopSparkContext indicates whether to stop the associated SparkContext,stopGracefully indicates whether to stop gracefully), and StreamingContext can be stopped and restarted.
Both methods still use the mechanism of Spark Streaming for state calculation. If other conditions permit, we can also abandon mapWithState () and maintain our own state directly with external storage. For example, if you design the Key of Redis as product_pv: [product _ id]: [date], and then use the incrby instruction in each batch of Spark Streaming, you can easily count PV without thinking about timing.
What is the method of periodically clearing the state of Spark Streaming streams? have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.