Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of TeraSort of Hadoop

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces the Hadoop TeraSort example analysis, has a certain reference value, interested friends can refer to, I hope you can learn a lot after reading this article, the following let Xiaobian take you to understand.

The TeraSort source code contains many java files, which can be divided into three parts: TeraGen, TeraSort and TeraValidate.

TeraGen is responsible for generating random data needed for sorting, and TeraValidate is used to verify the sorting results.

The java files related to TeraSort sorting are TeraSort.java, TeraInputFormat.java, TeraOutputFormat.java, TeraScheduler.java.

After compiling the source code, set the running parameters, generate 500m data with TeraGen, and then run TeraSort (add Job.setNumReduceTasks (10) in the code, that is, set the number of ReduceTask to 10, otherwise the default is 1), you will find the following lines of output before entering the Map phase.

Spent 251ms computing base-splits. (1)

Spent 27ms computing TeraScheduler splits. (2)

Computing input splits took 280ms (3)

Sampling 10 splits of 15 (4)

Making 10 from 100000 sampled records (5)

Computing parititions took 1216ms (6)

Spent 1534ms computing partitions. (7)

Next, we will look at it in the order in which these sentences are output.

First, take a look at the main method of TeraSort:

Int res = ToolRunner.run (new Configuration (), new TeraSort (), args); System.exit (res)

Click Ctrl into the run method to see:

Public static int run (Configuration conf, Tool tool, String [] args) throws Exception {if (conf = = null) {conf = new Configuration ();} GenericOptionsParser parser = new GenericOptionsParser (conf, args); / / set the configuration back, so that Tool can configure itself tool.setConf (conf); / / get the args w generic hadoop args String [] toolArgs = parser.getRemainingArgs (); return tool.run (toolArgs);}

That is, the above new TeraSort () corresponds to the following parameter Tool tool, while the following method return tool.run, that is, run TeraSort to get its return value, and TeraSort returns 1 after running, otherwise 0 is returned, so System.exit (res) exits at completion.

Then I began to look at TeraSort.java 's run method, and after all kinds of set, there was a key sentence

TeraInputFormat.writePartitionFile (job, partitionFile)

That is, the writePartitionFile method in TeraInputFormat is called, so I turn to that method and see

Final TeraInputFormat inFormat = new TeraInputFormat (); final TextSampler sampler = new TextSampler (); int partitions = job.getNumReduceTasks (); long sampleSize = conf.getLong (SAMPLE_SIZE, 100000); final List splits = inFormat.getSplits (job)

The last sentence is to get the Splits of the input file. We know that Map Task is dealing with the Split of the input file, so how should we getSplits?

Entering this method, we see:

T1 = System.currentTimeMillis (); lastContext = job;lastResult = super.getSplits (job); T2 = System.currentTimeMillis (); System.out.println ("Spent" + (T2-T1) + "ms computing base-splits."); if (job.getConfiguration (). GetBoolean (TeraScheduler.USE, true)) {TeraScheduler scheduler = new TeraScheduler (lastResult.toArray (new FileSplit [0]), job.getConfiguration ()); lastResult = scheduler.getNewFileSplits (); T3 = System.currentTimeMillis () System.out.println ("Spent" + (T3-T2) + "ms computing TeraScheduler splits.");}

We see that the original TeraInputFormat does not rewrite the slicing method, but inherits the getSplits method in the parent class FileInputFormat to get the slicing, that is, super.getSplits (job). Then we see the first two sentences of the output of the seven sentences mentioned at the beginning, which confirms that the execution order of the code is the same as the order in which we learn, so what is the following TeraScheduler? Read its code as if you don't quite understand what it does, so let's take a look at the introduction of its functions:

Solve the schedule and modify the FileSplit array to reflect the new schedule. It will move placed splits to front and unplacable splits to the end.

It turns out that the input slices of the file are processed to optimize the scheduling. So how does it benefit from the default (or no) scheduling method? From the answers of netizens on Stack Overflaw, we can know that this scheduling method can:

1 、 make sort local as much as possible

2 、 distribute the work evenly across machine.

Knowing this, let's go back to the writePartitionFIle method in TeraInputFormat.java and move on:

Final List splits = inFormat.getSplits (job); long T2 = System.currentTimeMillis (); System.out.println ("Computing input splits took" + (T2-T1) + "ms"); int samples = Math.min (conf.getInt (NUM_PARTITIONS, 10), splits.size ()); System.out.println ("Sampling" + samples + "splits of" + splits.size ()); final long recordsPerSample = sampleSize / samples;final int sampleStep = splits.size () / samples

You can see that after the getSplits work is finished, we output the third and fourth sentences we mentioned at the beginning, the third sentence indicates how long it took us to get slices of these input files, and the fourth sentence output shows how many samples (samples) we have taken from these splits.size, and how much is splits.size? We can see these lines of code by clicking on the getSplits method of the parent class:

Long minSize = Math.max (getFormatMinSplitSize (), getMinSplitSize (job)); long maxSize = getMaxSplitSize (job); blkLocations = fs.getFileBlockLocations (file, 0, length); long splitSize = computeSplitSize (blockSize, minSize, maxSize)

In other words, it is determined by these parameters, which can be set in the configuration file of hadoop, so the number of shards will be different. Here, there are 15 shards of 500m data, and it samples 10 of them as samples, why 10? The answer is here:

Int samples = Math.min (conf.getInt (NUM_PARTITIONS, 10), splits.size ())

Our number of Reduce Task is set to 10, and the number of Partition is the same as the number of Reduce Task, so samples takes the minimum of 10 and 15, that is, 10. 5.

After this work is done, the first sampling operation is performed through SamplerThreadGroup. Why is it the first time? Because there is a second time, ha, we look down after the first sampling:

For (Text split: sampler.createPartitions (partitions)) {split.write (writer);}

This cycle completes the secondary sampling, that is, 10 samples (equal to the number of Partition) are collected twice from the 100000 samples (the default) obtained by one sampling. We see that the createParitions method is called here. Let's take a look at it:

Text [] createPartitions (int numPartitions) {int numRecords = records.size (); System.out.println ("Making" + numPartitions + "from" + numRecords + "sampled records"); if (numPartitions > numRecords) {throw new IllegalArgumentException ("Requested more partitions than input keys (" + numPartitions + ">" + numRecords + ")") } new QuickSort () .sort (this, 0, records.size ()); float stepSize = numRecords / (float) numPartitions; Text [] result = new text [numPartitions-1]; for (int iPartitions 1; I < numPartitions; + + I) {result [I-1] = records.get (Math.round (stepSize * I));} return result;}}

Sure enough, in order, we see the output of the fifth sentence at the beginning, where the records.size is equal to 1000000, that is, the sample size obtained by the first sampling. We can see that this method finally returns an array containing nine pieces of sampling data, why nine? Because we set the number of split points required for 10 Partition is equal to the number of Partition minus one, OK, which completes the secondary sampling, we go back to the writePartitionFile method just now and continue to see

Writer.close (); long T3 = System.currentTimeMillis (); System.out.println ("Computing parititions took" + (T3-T2) + "ms")

The sixth sentence output appears, that is, the time spent in the two sampling processes, and this is the end of our writePartitionFIle part, so go back to the TeraSort.java part and continue to read!

Long end = System.currentTimeMillis (); System.out.println ("Spent" + (end-start) + "ms computing partitions."); job.setPartitionerClass (TotalOrderPartitioner.class)

The last sentence outputs get! This is the total time it takes to complete the writePartitionFile.

After a series of preparatory work, such as slicing, scheduling, sampling, and so on, we enter our mapreduce phase. We see that it sets TotalOrderPartitioner.class as its Partitioner method, while carefully we can see that there are actually two Partitioner defined in TeraSort, one of which is the TotalOrderPartitioner we see, and the other is a non-existent SimplePartitioner. What is the use of this SimplePartitioner? In fact, it is of no use because it simply handles the prefix of key and does not achieve relative load balancing, so in general, useSimplePartitioer = false, even if TotalOrderPartitioner is used as the Partition method, what does this method do?

Read the code we know: TeraInputFormat stores nine sampled cut point in the file _ partition.lst, and TotalOrderPartitioner first reads these cut point from this lst, and then constructs a three-tier Trie, that is, dictionary tree, based on these cut point. The example is as follows:

When the prefix letter in key is less than or equal to a node, it is marked as the Partition corresponding to that node, thus ensuring the global order (Total Order) between Partition, so does it have the legendary effect? We add to the buildTrie-getPartition method

BufferedWriter bf;try {bf = new BufferedWriter (new FileWriter (file,true)); bf.append (String.valueOf (trie.findPartition (key); bf.newLine (); bf.close ();} catch (IOException e) {e.printStackTrace ();}

In this way, the sequence number of the Partition assigned by 5 million records can be output to the file we specified, and then the number is counted with the function in matlab. Because we have set 10 Reduce Task, their sequence number is 0 Reduce Task 9. The following data is the number and proportion of 0Muk 9 records respectively:

Number:

501736

510323

501848

502895

499158

495228

494499

496448

502608

495257

Proportion:

10.0347

10.2065

10.0370

10.0579

9.9832

9.9046

9.8900

9.9290

10.0522

9.9051

Can be seen clearly, quite uniform!

Thank you for reading this article carefully. I hope the article "sample Analysis of Hadoop TeraSort" shared by the editor will be helpful to everyone. At the same time, I also hope that you will support and follow the industry information channel. More related knowledge is waiting for you to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report