In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Description of four phenomena of Spark accumulator commonly used in production description val acc = sc.accumulator (0, "Error Accumulator") val data = sc.parallelize (1 to 10) val newData = data.map (x = > {if (x% 2 = = 0) {accum + = 1}}) newData.countacc.valuenewData.foreach (println) acc.value
The above phenomenon will cause the final value of acc.value to become 10.
Cause analysis
A series of transform operations in Spark are constructed into a long chain of tasks, which needs to be triggered by an action operation (a feature of lazy), as is the case with accumulator.
So after an action operation, calling the value method to check, there is no change. After the first action operation, the call to the value method to view becomes 5. After the second action operation, the call to the value method to check becomes 10.
The reason is that in the second action operation, the accumulator operation is performed again, and the same accumulator adds 5 to the original, thus becoming 10.
Solution
Through the above description of the phenomenon, we can quickly know the solution: do only one action operation. Based on this, we just have to cut off the dependencies between tasks and use cache and persist. After this operation, subsequent accumulator operations will not be affected by the previous transform operation
Related cases
Demand
Use Accumulators to count the number of NULL occurrences in the emp table and the number of normal data & print information about normal data
data
7369 SMITH CLERK 7902 1980-12-17 800.00 207499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 307521 WARD SALESMAN 7698 2-22 1250.00 500.00 307566 JONES MANAGER 7839 7369-4-2 2975.00 207654 MARTIN SALESMAN 76981981-9-28 1250.00 1400.00 307698 BLAKE MANAGER 7839 1981-5-1 2850.00 307782 CLARK MANAGER 7839 1981-6-9 2450 .00 107788 SCOTT ANALYST 7566 1987-4-19 3000.00 207839 KING PRESIDENT 1981-11-17 5000.00 107844 TURNER SALESMAN 7698 1981-9-8 1500.00 307876 ADAMS CLERK 77881987-5-23 1100.00 207900 JAMES CLERK 76981981-12-3 950.00 307902 FORD ANALYST 75661981-12-3 3000.00 207934 MILLER CLERK 77882-1-23 1300.00 10
The pit encountered & the solution
Phenomenon description & cause analysis:
As we all know, a series of transform operations in spark form a long chain of tasks, which needs to be triggered by an action operation; accumulator is the same, and the execution of accumulator is triggered only when the action operation is executed. Therefore, before an action operation, we cannot call the value method of accumulator to check its value, and there must be no change. So after the foreach operation on normalData, that is, after the action operation, we will find that the value of the accumulator becomes 11; then, after another count operation on normalData, that is, after another action operation, we actually perform the previous transform operation again; so the value of the accumulator increases by 11, to 22.
Solution:
After the above analysis, we can know that when using the accumulator, we can only use one action operation to ensure the accuracy of the results. therefore, we have a way to deal with this situation, which is to cut off their dependence on each other, so we can use the cache method on normalData, and when the RDD is calculated for the first time, it will be directly cached and called again. The same calculation will not be recalculated again.
Import org.apache.spark. {SparkConf SparkContext} / * use Spark Accumulators to complete the data volume processing of Job * count the number of NULL occurrences and the number of normal data in the emp table & print information on normal data * / object AccumulatorsApp {def main (args: Array [String]): Unit = {val conf = new SparkConf (). SetMaster ("local [2]"). SetAppName ("AccumulatorsApp") val sc = new SparkContext (conf) val lines = sc.textFile ("E:/emp.txt") ) / / Accumulator value of type long val nullNum = sc.longAccumulator ("NullNumber") val normalData = lines.filter (line = > {var flag = trueval splitLines = line.split ("\ t") for (splitLine)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.