Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to write MapReudce programs

2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "how to write MapReudce program". In the operation process of actual cases, many people will encounter such difficulties. Next, let Xiaobian lead you to learn how to deal with these situations! I hope you can read carefully and learn something!

5. Combiner programming

Combiner is essentially a Reducer in different contexts with similar functions. So it is essentially a Reducer. Each map may produce a large number of outputs. The role of the combiner is to combine the outputs at the map end first to reduce the amount of data transmitted to the reducer. Combiner is the most basic way to merge local keys. Combiner has similar local reduce function. If you don't use combiners, then all the results are reduced, and the efficiency will be relatively low (it will consume more network IO). Using combiner, the map completed earlier will be aggregated locally, increasing speed.

To achieve aggregation of local keys, sort the key output of the map, iterate over the value.

Local reduce functionality.

Case 3: Combiner programming based on wordcount

Write Combiner implementation class, directly inherit Reduce, write content and reduce almost.

package cn.itcast.yun10;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WordcountCombiner extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // accept // the same as reduce String word = key.toString(); long count = 0L; for (LongWritable v : values) { count += v.get(); } context.write(new Text(word), new LongWritable(count)); }}

2. Specify Combiner

Two things to note about programming with Combiner:

a. Don't assume that just because you set Combiner when you write MapReduce, it will work. Is that really the case? The answer is no. The hadoop documentation also states that Combiners may or may not be executed. So under what circumstances is it not implemented? If the current cluster is busy, job will not perform Combiner even if it is set.

The output of Combiner is the input of Reducer, so adding Combiner can never change the final calculation result. Combiners should only be used in situations where the input key/value of Reduce is exactly the same as the output key/value and does not affect the final result. Such as accumulation, maximum value, etc. But it does not apply to operations like averaging.

As for the timing of Combiner execution, wait until after analyzing Shuffle to say...

Shuffle Process Analysis

MapReduce ensures that each input to the Reducer is key-sorted. The system performs sorting after the map output and before the reducer input. It's called Shuffle. Observing how shuffle works helps us understand how it works e.g.(optimizing MapReduce programs).Shuffle is part of a code base that is constantly being optimized and improved. It changes from one version to another. There is a saying in Hadoop that shuffle is the heart of MapReduce, where miracles happen.

Map end: after the map function.

When the map function starts producing output, it doesn't simply write it to disk. This process is complicated. It will be written to memory using buffers. And in efficiency will consider pre-sorting.

Each map task has a circular memory buffer that stores the output of the task. By default, the buffer size is 100MB, which can be specified by the io.sort.mb property. Once the buffer reaches the threshold (io.sort.spill.percent, 80% by default), a background thread begins writing the contents to spill disk. Map output continues to be written to the buffer during the disk write process, but if the buffer fills during this time, map output is blocked until the disk write process is complete. The write disk will poll to the directory in the job-specific subdirectory specified by the mapred.local.dir attribute. Create a new overflow write file in this directory.

Before writing to disk, partition, sort. If there is a combiner, the combiner sorts the data. Combiner to be discussed.

When writing to disk, compressed format is used. Compression libraries in Hadoop are specified by mapred.map.output.compression.codec. I'll explain in detail later.

When the last record is finished, merge all overflow write files into a partitioned and sorted file. The configuration property io.sort.factor controls the maximum number of streams that can be merged at a time, and the default size is 10. This is the merge merge.

In fact, the Conbiner function may be executed before the merge operation of the map is completed, or after the merge operation. This timing is determined by the configuration parameter min.num. spil.for.combine(the default value is 3). That is to say, when the spill overflow file generated on the map side has at least min.num. spil.for.combine, the Conbiner function will be executed (before the merge operation merges the final native result file), otherwise it will be executed after the merge. In this way, you can reduce the amount of data written to the local disk when there are a lot of spill files and you need to do a biner, and also reduce the frequency of reading and writing to the disk, which can optimize the job. ------ That is to say, if the number of files spill out reaches three, you can execute the Combiner function. Then meger.

The reducer will get the result of the above execution (partition of the output file) via HTTP (in map), and the number of worker threads used for file partition is controlled by the tracker.http.threads attribute of the task. The default value is 40.

Reducer side: before the reduce function

The output of multiple map tasks in the cluster is required as partition material before running the reduce task. But the completion time of each map task is likely to be different. So as soon as a map task completes, reduce copies COPY its output. This is the replication phase. On the reduce side, several copy threads are opened to COPY. This number is determined by the mapred.reduce.parallel.copies attribute. The default value is 5.

Copy to reduce, it is possible to memory, it is possible to disk. This is the memory buffer size controlled by the mapred.job.shuffle.input.buffer. perforet attribute. Percentage of heap space. Once the buffer reaches the threshold size, it merges and overflows to disk. As disk files copy more and more files. Larger files will be merged.

Then enter the sorting phase. The merge phase is accurate because sorting is done on the map side. Merging occurs cyclically. This merger is also complicated.

Finally, input the resulting data into the reduce function. The final merge may come from memory or disk. Let's finish with some pictures.

7. Custom Sort Programming---Inverted Index

Case 4: There are two files a.txt,b.txt. The contents of both are as follows:

a.txt file

--------------------------------

hello world

hello tom

how are you

how do you do

-----------------------------------

b.txt file

hello is fool

i say hi

how do you think

---------------------------------------

c.txt file

you are all handsome

i am the superman

how do you think

---------------------------------------

Create an inverted index in the above file, like the following format:

hello --> a.txt,2 b.txt,1

how --->a.txt,2 b.txt,1 c.txt,1

The idea is as follows: Perform MapReduce twice to get the desired result.

Code omitted...

Experimental results:

8. Common MapReduce algorithms

Word count, data de-duplication, sorting,Top K, selection, projection, grouping, multi-table join, single-table association. This can be done through MapReduce. Familiar with these words, for the later Hive learning has a great use.

Here's an example of a multi-table join.

Case 5: There are two tables A and B. There is a relationship between the two tables. Assuming both tables are stored as text files, SQL statement:select a.id, b.name from a,b where a.id = b.id, get the result output to the file.

The idea is as follows:

Code omitted.

9. Split Principle and Source Code Analysis

Hadoop divides mapreduce input data into small chunks of equal length. This is called an input split. This was discussed earlier in the mapreduce input class InputFormat. These small chunks of data are called fragments. A fragment corresponds to a map task. With regard to fragmentation size, experience tends to be a default block size of HDFS.

In this case, you can get the size of the fragment...

"How to write MapReudce program" content is introduced here, thank you for reading. If you want to know more about industry-related knowledge, you can pay attention to the website. Xiaobian will output more high-quality practical articles for everyone!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report