In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-23 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "what are the negative effects of Spark's production operation fault tolerance ability". The explanation in this article is simple and clear, easy to learn and understand. Please follow the ideas of Xiaobian to study and learn "what are the negative effects of Spark's production operation fault tolerance ability" together.
1. Spark TaskLocality
In Spark, data locality is represented by TaskLocality, which has several levels:
PROCESS_LOCAL
NODE_LOCAL
NO_PREF
RACK_LOCAL
ANY
Data locality decreases from top to bottom.
Spark calculates the Locality of a Task by partitioning the data before execution. A Task is always assigned preferentially to the node where the data is to be calculated to minimize network IO. This calculation is controlled by spark.locality.wait, which defaults to 3s.
2. Spark internal fault tolerance
The principle is not detailed here, in short, retry. Spark specifies the maximum number of consecutive failures (spark.stage.maxConsecutiveAttempts) for the same Stage in the same Job, which defaults to 4. It also specifies the number of failures (spark.task.maxFailures) for the same Task in a Stage, which defaults to 4. When any one of these thresholds reaches the upper limit, Spark causes the entire Job to fail, stopping possible "meaningless" retries.
3. Conflict between data locality and fault tolerance
Let's start with an example, as shown in the figure, which shows a detailed view of the Task Page under the Spark Stage page.
The first column indicates that the Task has been retried four times, so the Job corresponding to this Task has failed.
The third column represents the data locality of the Task, both at NODE_LOCAL level, which is clearly optimal for a task reading data from HDFS
The fourth column represents the Executor ID, and we can see that the retry of our task is assigned to two Executors with IDs 5 and 6.
The fifth column indicates the physical machine address of the Executor where we run these retry Tasks, and we can see that they are all dispatched to the same machine.
The last column represents the error stack for each retry failure
3.1 Question 1: Why does a single Task retry fail?
Combined with hardware level troubleshooting, it is found that/mnt/dfs/4 is hung on the NodeManager physical node. Hardware failure causes disk read-only. When ShuffleMapTask is about to be completed, FileNotFoundException occurs when index file and data file are committed and index temporary file is obtained.
java.io.FileNotFoundException: /mnt/dfs/4/yarn/local/usercache/da_haitao/appcache/application_1568691584183_1953115/blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/0a/shuffle_96_2685_0.index.82594412-1f46-465e-a067-2c5e386a978e (No such file or directory) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream. (FileOutputStream.java:213) at java.io.FileOutputStream. (FileOutputStream.java:162) at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:245) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:190) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:109)
3.2 Question 2: Why are all four retries of this Task on the same physical node?
This is because the Driver performs data locality operations when scheduling the Task, and successfully obtains data locality at NODE_LOCAL level within the time constraint of spark.locality.wait defaulted to 3s, so both are scheduled to the same NodeManger physical node.
3.3 Question 3: Why is it always "local retry" instead of "remote retry"? Logically speaking, this process is no longer "local retry," but "remote retry." This can be judged from the Executor ID of the 4 retries, the 0th, 1st, and 3rd on ID 6, and the 2nd on ID 5. But since IDs 5 and 6 are both in the same NodeManger node, we look like a "local retry." Another reason is the successful resolution of the data locality mentioned above, so each retry of these tasks has a high probability of coming to this node.
All retries at the Spark Task level should logically be "remote retries," and they all need to be rescheduled to a new Executor via the Driver for retry. The "local" and "foreign" we observe belong to "phenomenon" rather than "essence." The conditions affecting this phenomenon include the following (not necessarily comprehensive): 1. Data Locality 2. Executor Due to NodeLabel restrictions, only a limited number of physical machines are allocated 3. ResourceManager assigns all executors to a node when scheduling. 3.4 Question 5: Why did you fail to operate the same bad disk four times? The NodeManger actually has/mnt/dfs/{0-11}, a total of 12 disks. From the physical inspection point of view, only/mnt/dfs/4 has abnormal alarms during the whole process. Then why is Spark so stupid? So many good plates do not use, pick a bad plate to die? We can first look at the error file, we package this file into five parts to see, 1. /mnt/dfs/4/yarn/local/2. usercache/da_haitao/appcache/application_1568691584183_1953115/ blockmgr-1b6553f2-a564-4b31-a4a6-031f21c9c30f/3. 0a/4. shuffle_96_2685_0.index5. .82594412-1f46-465e-a067-2c5e386a978e The first line is a part of LOCAL_DIR configured by Yarn NodeManger. The complete line should include 12 disks. The second line is one of the root directories of BlockManger generated by Spark. There is also a similar directory under other drive letters. The third line is a first-level subdirectory under a root directory. The number is controlled by spark.diskStore.subDirectories by default to 64. The fourth line One of the two important files produced by the Spark Shuffle process is the end of the data file.data, and the other is the corresponding.index file. 96 is ShuffleID table identifies which Shuffle process, 2685 is MapID corresponds to an RDD, so there is a sequence number in one of the partitions, and 0 is a fixed value, which originally means ReduceID, Spark Sort Based Shuffle implementation does not need to rely on this value, so it is fixed to 0. With Shuffle ID and MapId, Shuffle Write phase can generate files like shuffle_96_2685_0.index, and Shuffle Read phase can also locate this file with two IDs. The fifth line is the UUID identifier of the Index file corresponding to the temporary file. Based on this logic, the final output file name of a partition of a Shuffle process is actually predictable and fixed. For example, in our case, the file name of the index file of the partition 2685 of the 96th shuffle is shuffle_96_2685_0.index. Spark guarantees dependencies based on the same location logic (algorithm) when writing and reading the file,
The first step is to determine the root directory. Spark determines the root directory by the absolute hash value of the file name modulo the number of letters as an index scala> math.abs("shuffle_96_2685_0.index".hashCode) % 12res0: Int = 6. The array of root directories is deterministic for the life cycle of an Executive. It is a fixed array that breaks up all paths by a simple random algorithm. Therefore, once the file name is determined and the Executor is not changed, the root directory must be determined. So all fixed to access/mnt/dfs/4 this bad disk.
But this only explains why an Executor assigned a Task failed, and our Task was tried on different executors. 3.5 Question 5: Why did retry fail on both executors? In fact, this problem is only a matter of probability. Spark uses an algorithm similar to the following to disrupt the configuration of all LOCAL_DIRS. For example, the following simple test shows that the probability of such collision is still extremely high. The localDirs(6) contained in DiskBlockManager under the Executor of ID 5 and 6 should correspond to the bad disk/mnt/dfs/4. scala> def randomizeInPlace[T](arr: Array[Int], rand: java.util.Random = new java.util.Random): Array[Int] = { | for (i randomizeInPlace(res11)res23: Array[Int] = Array(3, 2, 4, 1)
scala> randomizeInPlace(res11)res24: Array[Int] = Array(2, 3, 4, 1)
scala> randomizeInPlace(res11)res25: Array[Int] = Array(2, 1, 3, 4)
scala> randomizeInPlace(res11)res26: Array[Int] = Array(4, 2, 1, 3)
scala> randomizeInPlace(res11)res27: Array[Int] = Array(2, 3, 4, 1) Thank you for reading, the above is the content of "What are the negative effects of Spark's production job fault tolerance ability", after learning this article, I believe that everyone has a deeper understanding of the negative effects of Spark's production job fault tolerance ability. The specific use situation still needs to be verified by practice. Here is, Xiaobian will push more articles related to knowledge points for everyone, welcome to pay attention!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.