In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
How to execute the driver and executor program code in the Spark closure? aiming at this problem, this article introduces the corresponding analysis and solution in detail, hoping to help more partners who want to solve this problem to find a simpler and easier way.
Closures in Spark
The function of closure can be understood as: the function can access the variable defined outside the function, but the modification to the variable inside the function is invisible outside the function, that is, it will not affect the external variable of the function.
In fact, one of the more difficult points to understand when learning Spark is the scope and life cycle of the defined variables and methods in cluster mode. This is easy to get confused when you operate RDD, such as when calling some functions map or foreach, and accessing its external variables for operation. Why does my local program run well and get the right results, but I can't get the desired results when I put it on the cluster?
First of all, we can see the difference between the same code local mode and cluster mode through the following example of summing the elements in RDD:
In order to perform a task, Spark breaks down the operation of RDD into multiple task, and these task are performed by executor. Before execution, Spark evaluates task's closures, that is, defined variables and methods, such as the counter variables and foreach methods in the example, and the closures must be visible to executor, and these closures are serialized to each executor. In cluster mode, driver and executor run in different JVM processes, and the variables in the closures sent to each executor are copies of the variables on the driver side. Therefore, when you refer to counter in the foreach function, you are only dealing with a copy of the variables on the driver side, regardless of the counter on the driver side itself. There is still a counter in the memory of the driver node, but this variable is not visible to executor! Executor can only see a copy of the serialized closure. Therefore, the final value of the counter output from the above example is still zero, because all the operations on counter simply refer to the values within the serialized closure. In local mode, driver and executor are often running in the same JVM process. Then these closures will be shared, and the counter of the executor operation and the counter held by driver are the same, so the final value of counter after processing is 6.
But in production, our tasks are running in cluster mode, how can we meet this kind of business scenario?
This must lead to a concept that we will focus on later: Accumulator is the accumulator. The accumulator in Spark is designed to provide a mechanism to securely update variables as they execute between worker nodes in the cluster.
Generally speaking, closures-constructs, such as loops or locally defined methods, should not be used to change some global state, and Spark does not define or guarantee the behavior of updating objects referenced from outside the closure. If you do this, it will only cause some code to achieve the desired results in local mode, but it will backfire in a distributed environment. If you need some global aggregation, use an accumulator instead. For other business scenarios, we timely consider introducing external storage systems, broadcast variables, and so on.
What has the closure function experienced from its generation to its execution in executor?
First of all, the operation related to RDD needs to pass in a closure function, and if this function needs to access externally defined variables, it needs to meet certain conditions (for example, it must be serializable), otherwise a runtime exception will be thrown. When the closure function is finally passed to executor for execution, you need to go through the following steps:
1.driver through reflection, the runtime finds the variable accessed by the closure, encapsulates it into an object, and then serializes the object
two。 Transfer the serialized object to the worker node over the network
3.worker node deserializes closure object
The executor of the 4.worker node executes the closure function
In short, the function is passed over the network and then executed, which goes through serialization and deserialization, so the passed variable must be serialized and deserialized, otherwise an exception such as Error:Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects will be thrown. The above steps are followed even when executed locally, which is why direct manipulation of RDD within RDD is not allowed (SparkContext does not support serialization). At the same time, variables that are modified externally within these operator closures will not be fed back to the driver. Driver & executor
Driver is the place where users write the main () function of Application, which is responsible for the construction of DAG, the division of tasks, the generation and scheduling of task, and so on. The generation of job,stage,task is inseparable from rdd itself, and the related operations of rdd can not be without sparksession/sparkcontext on the driver side.
Executor is the real place to execute task, and task execution can not be separated from specific data. The results of these task runs can be intermediate results of shuffle, or can be persisted to external storage systems. It is common to aggregate the results, status, and so on to driver. However, at present, executor can not communicate with each other, only with the help of third parties to achieve data sharing or communication. Does the Spark program code run on the driver side or the executor side? Let's start with a simple example: usually when we test the program locally, we print the data in RDD. In local mode, using rdd.foreach (println) or rdd.map (println) directly on a single machine, you can print and output all RDD elements as expected. However, in cluster mode, the output executed by executor is written to executor's stdout, not stdout on driver, so driver's stdout does not display these! To print all the elements on the driver side, you can use the collect () method to bring the RDD data to the driver node first, and then call foreach (println). (but it's important to note that because all the elements in the RDD are loaded to the driver side, it may cause OOM to run out of memory on the driver side. If you only want to get some of the elements in RDD, you can consider using the take or top method.) in short, the elements in RDD are specific data, and the operations on these data are handled by the executor responsible for task execution, so if you want to output these data on the driver side, you must first load the data to the driver side for processing.
Finally, to sum up: all operations on RDD specific data are performed on executor, and all operations on rdd itself are performed on driver. For example, foreach and foreachPartition all deal with the internal data of rdd, so the functions we pass to these operators are executed on the executor side. However, for example, foreachRDD and transform perform a list of operations on RDD itself, so its parameter function is executed on the driver side, so external variables can be used internally, such as operating offset in SparkStreaming programs, dynamically updating broadcast variables, and so on.
The answer to the question about how to execute the driver and executor program code in the Spark closure is shared here. I hope the above content can be of some help to you. If you still have a lot of doubts to be solved, you can follow the industry information channel for more related knowledge.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.