Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the Shuffle process?

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces the relevant knowledge of "what is the Shuffle process". In the operation of actual cases, many people will encounter such a dilemma. Then let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

1. After writing kv from the map method in the WordCountMapper class, enter the shuffle process-- context.write (outK,outV); enter the write () method in TaskInputOutputContext-- mapContext.write (key, value) in WrappedMapper.java; and enter output.write (key, value) in TaskInputOutputContextImpl.java on line / / 112. The method / / line 89 is finally located in the write () method of MapTask. / / line 2, the key step, the collector object collects the kv into the buffer and calculates the partition number of the kv before the collection. Key, value,partitioner.getPartition (key, value, partitions)) When entering this method for the first time, because the number of reduce is not set, the final return will always be partition 3, navigate to the collect method in the MapTask class, and enter the / / 1082 line bufferRemaining-= METASIZE; / / to calculate the remaining size of the buffer. The code in front of this line is a judgment of the kv type if bufferRemaining

< 0 则开始进行溢写操作,内部是对数据的一些校验和计算4、定位到startSpill(); --1126行 //只有当溢写数据大小满足80%时,才会触发该操作WordCountMapper持续往缓冲区写数据,当达到溢写条件80%时,开始溢写5、进入到startSpill()方法内部 --MapTask类1590行spillReady.signal(); //1602行 --线程通信, 通知溢写线程开始干活//执行溢写线程(MapTask内部类SpillThread)的run方法//run方法中调用MapTask$MapOutputBuffer中的sortAndSpill()方法直接执行下面的排序和溢写方法 --sortAndSpill()方法 --MapTask的1605行6、定位到1615行final SpillRecord spillRec = new SpillRecord(partitions); //根据分区数创建溢写记录对象--排序按照分区排序,溢写按照分区溢写final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//获取溢写文件名称 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),这时还没有溢写文件,只有目录out = rfs.create(filename); //创建执行改步后,在上述的目录下生成溢写文件spill0.out文件 7、继续向下走,定位到MapTask类的1625行sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢写前排序8、定位到1629行,进入for循环 --按照分区进行溢写9、分析for循环内代码,看具体溢写过程 9.1 先有一个writer对象,通过该对象来完成数据溢写 writer = new Writer(job, partitionOut, keyClass, valClass, codec, 9.2 判断是否有设置combinerRunner对象 如果有,则按照设置的combinerRunner业务去处理; 如果没有,则走默认的溢写规则10、执行到1667行,即writer.close();方法,本次溢写完毕,此时我们再去看溢写文件spill0.out文件有数据 11、if (totalIndexCacheMemory >

= indexCacheMemoryLimit (size: 1m) {} / / 1685 lines of the MapTask class / / if the index data exceeds the specified memory size, it also needs to be overwritten to the file. (this phenomenon is usually very difficult to occur.) 12. After this overflow, continue to return to context.write (outk,outv) in the map method in the WordCountMapper class. Method description: because we use local debug mode to debug, we can only see the serial effect instead of parallel effect. Therefore, when the in-memory read satisfies 80%, the overflow operation occurs. In fact, the overflow operation does not stop, but we cannot see that the remaining overflow data is carried out in 20% memory for 13 times. As mentioned above, the overflow process will occur N times in the whole mapTask, depending on the amount of data. If the last data in the map is written to the buffer, but the 80% overflow condition is not met, the buffer data will eventually need to be written to disk (the last overflow). The last time it happens is when the NewOutputCollector object is closed in MapTask. That is, output.close (mapperContext) occurs at this line of code;-- line 14 of MapTask 805, enter output.close (mapperContext); inside the method-- line 732 of MapTask locates to collector.flush (); method / / 735 lines-- > writes the data of the buffer to disk-- > re-walks the sortAndSpill () method (last time)

In the above process, each overflow will generate a small overflow file (the data in the overflow file is sorted). After all the data is written to the disk, there are multiple overflow files on the disk. For example, after spill0.out,spill1.out,...spillN.out15 and overflow are all completed, enter the merge operation-- 1527 lines of MapTask mergeParts () Method, enter the method, and navigate to line 1844 of MapTask filename [0]: / tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out

16. Continue to go down and navigate to line 1880 of MapTask Path finalOutputFile = mapOutputFile.getOutputFileForWrite (finalOutFileSize);-- after merging, the final output file path / tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out17, continue to go down to line 1882of MapTask Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite (finalIndexFileSize) -- after merging, the index file / tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/file.out.index18 of the final output file, create file.out file FSDataOutputStream finalOut = rfs.create (finalOutputFile, true, 4096); 19, for (int parts = 0; parts < partitions) Parts++) {} / / 1925 lines Merge and sort by partition 20, specific merge operations in the for loop / / 1950 lines RawKeyValueIterator kvIter = Merger.merge (job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path (mapId.toString ()), job.getOutputKeyComparator (), reporter, sortSegments Null, spilledRecordsCounter, sortPhase.phase (), TaskType.MAP) 21. Write the merged data to file Writer writer = new Writer (job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); / / 1961 lines / / merge can also use combiner, but only if combiner is set and the number of overwrites is greater than or equal to 3 if (combinerRunner = = null | | numSpills < minSpillsForCombine (3)) {Merger.writeFile (kvIter, writer, reporter, job);} else {combineCollector.setWriter (writer); combinerRunner.combine (kvIter, combineCollector) } 22. Merge completed writer.close (); / / 1972 lines

23. Write out the index file spillRec.writeToFile (finalIndexFile, job); / / 1986 line 24, delete all overwritten files spill0.out spill1.out. Spill0.out, keeping only the final output file. For (int I = 0; I < numSpills; iTunes +) {rfs.delete (filename [I], true);}

This is the end of the content of "what is the Shuffle process". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report