Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Hadoop2.6.0 study notes (7) MapReduce partition

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Lu Chunli's work notes, who said that programmers should not have literary style?

The number of map task tasks in MapReduce is determined by spli fragmentation, so what determines the number of reduce task? This is the MapReduce partition to be discussed here. By default, HashPartitioner is used in MapReduce.

/ * Partition keys by their {@ link Object#hashCode ()}. * / public class HashPartitioner extends Partitioner {/ * Use {@ link Object#hashCode ()} to partition. * / public int getPartition (K key, V value, int numReduceTasks) {return (key.hashCode () & Integer.MAX_VALUE)% numReduceTasks;}}

In HashPartitioner, the getPartition () method has three formal parameters, key and value refer to the output of Mapper tasks, and numReduceTasks refers to the number of Reducer tasks set, and the default value is 1. By taking the hashCode of key and then converting it to a non-negative integer by the sum and Integer.MAX_VALUE operation, the remainder of any integer divided by 1 must be 0. In other words, getPartition (…) The return value of the method is always 0, which means that the output of the Mapper task is always sent to a Reducer task and can only be output to one file.

Example: statistics for accessing some url data through different protocols (log quintuple)

raw data

[hadoop@nnode code] $hdfs dfs-text / http_interceptor_20130913.txt2013-09-13 16:04:08 www.subnetc1.com 192.168.1.7 80 192.168.1.139 18863 FTP www.subnetc1.com/index.html2013-09-13 16:04:08 www.subnetc2.com 192.168.1.7 80 192.168.1.159 14100 HTTP www.subnetc2.com / index.html2013-09-13 16:04:08 www.subnetc3.com 192.168.1.7 80 192.168.1.130 4927 HTTPS www.subnetc3.com/index.html2013-09-13 16:04:08 www.subnetc4.com 192.168.1.7 80 192.168.1.154 39044 HTTP www.subnetc4.com/index.html [hadoop@nnode code] $

Implement Mapper

Package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** @ author luchunli * @ description implements Mapper * * / public class ProtocolMapper extends Mapper {@ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] values = value.toString () .split ("\ t") If (null = = values | | values.length! = 8) {return;} Text newKey = new Text (); Text newValue = new Text (); newKey.set (values [6] .trim ()); newValue.set (values [7] .trim ()); context.write (newKey, newValue);}}

Implement Reducer

Package com.lucl.hadoop.mapreduce.part;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** @ author luchunli * @ description implements Reducer * * / public class ProtocolReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {StringBuffer sbf = new StringBuffer () For (Text text: values) {sbf.append (text.toString ()); sbf.append (";");} context.write (key, new Text (sbf.toString ();}}

Implement Partitioner

Package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** @ author luchunli * @ description custom partition class * * / public class ProtocolPartitioner extends Partitioner {@ Override public int getPartition (Text key, Text value, int numReduceTasks) {if (key.toString (). Equals ("FTP")) {return 0 } if (key.toString (). Equals ("HTTP")) {return 1;} if (key.toString (). Equals ("HTTPS")) {return 2;} return 0;}}

Implement the driver class

Package com.lucl.hadoop.mapreduce.part;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner Public class ProtocolDriver extends Configured implements Tool {public static void main (String [] args) {try {ToolRunner.run (new ProtocolDriver (), args);} catch (Exception e) {e.printStackTrace ();}} @ Override public int run (String [] args) throws Exception {Job job = Job.getInstance (this.getConf (), this.getClass (). GetSimpleName ()) Job.setJarByClass (ProtocolDriver.class); FileInputFormat.addInputPath (job, new Path (args [0])); job.setMapperClass (ProtocolMapper.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class); / / set the number of task reduce job.setNumReduceTasks (3); job.setPartitionerClass (ProtocolPartitioner.class) Job.setReducerClass (ProtocolReducer.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (Text.class); / / job.setOutputFormatClass (ProtocolOutputFormat.class); FileOutputFormat.setOutputPath (job, new Path (args [1])); return job.waitForCompletion (true)? 0: 1;}}

Call execution

[hadoop@nnode code] $hadoop jar PartMR.jar / http_interceptor_20130913.txt / 201512050001815 job_1449302623953_000815/12/05 1205 21:41:12 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:41:13 INFO input.FileInputFormat: Total input paths to process: 115-12-05 21:41:13 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:41:13 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_000815/12/05 21 : 41:13 INFO impl.YarnClientImpl: Submitted application application_1449302623953_000815/12/05 21:41:14 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0008/15/12/05 21:41:14 INFO mapreduce.Job: Running job: job_1449302623953_000815/12/05 21:41:43 INFO mapreduce.Job: Job job_1449302623953_0008 running in uber mode: false15/12/05 21:41:43 INFO mapreduce.Job: map 0 Reduce 0 reduce 12 reduce 05 21:42:12 INFO mapreduce.Job: map 100% reduce 0 INFO mapreduce.Job 05 21:42:32 INFO mapreduce.Job: map 100% reduce 33-12-05 21:42:52 INFO mapreduce.Job: map 100% reduce 100-12-05 21:42:55 INFO mapreduce.Job: Job job_1449302623953_0008 completed successfully15/12/05 21:42:55 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=431827 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms) = 26277 Total time spent by all reduces in occupied slots (ms) = 105054 Total time spent by all map tasks (ms) = 26277 Total time spent by all reduce tasks ( Ms) = 105054 Total vcore-seconds taken by all map tasks=26277 Total vcore-seconds taken by all reduce tasks=105054 Total megabyte-seconds taken by all map tasks=26907648 Total megabyte-seconds taken by all reduce tasks=107575296 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps = 3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms) = 410 CPU time spent (ms) = 4360 Physical memory (bytes) snapshot=515862528 Virtual memory (bytes) snapshot=3399213056 Total committed heap usage (bytes) = 167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130 [hadoop@nnode code] $

View the result

[hadoop@nnode code] $hdfs dfs-ls / 2015120500018Found 4 items-rw-r--r-- 2 hadoop hadoop 0 2015-12-05 21:42 / 2015120500018Accordant hadoop hadoop-2 hadoop hadoop 33 2015-12-05 21:42 / 2015120500018Accorp Mustang-2 hadoop hadoop 62 2015-12-05 21:42 / 2015120500018Compan 00001aft Rmuri-2 hadoop hadoop 35 2015-12-05 21:42 / 2015120500018/part-r-00002 [hadoop@nnode code] $hdfs dfs-text / 2015120500018/part-r-00000FTP www.subnetc1.com/index.html [hadoop@nnode code] $hdfs dfs-text / 2015120500018/part-r-00001HTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code] $hdfs dfs-text / 2015120500018/part-r-00002HTTPS www.subnetc3.com/index.html; [hadoop@nnode code] $

The above generated file naming format is automatically generated by MapReduce according to the task, and we can customize the name of the output file by customizing the OutputFormat.

The custom OutputFormat code is as follows, and the difference between this and the previous MultipleWorkCount is that it is implemented directly through FSDataOutputStream in this example, rather than the way LineRecordWriter was called before.

Package com.lucl.hadoop.mapreduce.part;import java.io.DataOutputStream;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.HashMap;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat / * @ author luchunli * @ description Custom OutputFormat * / public class ProtocolOutputFormat extends TextOutputFormat {protected static class ProtocolRecordWriter extends RecordWriter {private static final String utf8 = "UTF-8"; private static final byte [] newline; static {try {newline = "\ n" .getBytes (utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException ("can't find" + utf8 + "encoding") }} protected TaskAttemptContext context = null; protected HashMap recordStream = null; protected Path workPath = null; public ProtocolRecordWriter () {} public ProtocolRecordWriter (TaskAttemptContext context, Path workPath) {this.context = context; this.workPath = workPath; recordStream = new HashMap () } @ Override public void write (Text key, Text value) throws IOException, InterruptedException {boolean nullKey = key = = null; boolean nullValue = value = = null; if (nullKey & & nullValue) {return;} DataOutputStream out = recordStream.get (key) If (null = = out) {Path file = new Path (workPath, key + ".txt"); out = file.getFileSystem (this.context.getConfiguration ()) .create (file, false); recordStream.put (key, out) } if (! nullKey) {out.write (key.getBytes (), 0, key.getLength ());} if (! (nullKey | | nullValue)) {out.write ("\ t" .getBytes ()) } if (! nullValue) {out.write (value.getBytes (), 0, value.getLength ());} out.write (newline) } @ Override public void close (TaskAttemptContext context) throws IOException, InterruptedException {for (DataOutputStream out: recordStream.values ()) {out.close ();} recordStream.clear (); recordStream = null } @ Override public RecordWriter getRecordWriter (TaskAttemptContext context) throws IOException, InterruptedException {Path workPath = this.getTaskOutputPath (context); return new ProtocolRecordWriter (context, workPath);} private Path getTaskOutputPath (TaskAttemptContext context) throws IOException {Path workPath = null; OutputCommitter committer = super.getOutputCommitter (context) If (committer instanceof FileOutputCommitter) {/ / Get the directory that the task should write results into. WorkPath = (FileOutputCommitter) committer). GetWorkPath ();} else {/ / Get the {@ link Path} to the output directory for the map-reduce job. / / context.getConfiguration () .get (FileOutputFormat.OUTDIR); Path outputPath = super.getOutputPath (context); if (null = = outputPath) {throw new IOException ("Undefined job output-path.");} workPath = outputPath;} return workPath;}}

Run again

[hadoop@nnode code] $hadoop jar PartMR.jar / http_interceptor_20130913.txt / 201512050002015 1205 21:59:28 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:803215/12/05 21:59:30 INFO input.FileInputFormat: Total input paths to process: 115-12-05 21:59:30 INFO mapreduce.JobSubmitter: number of splits:115/12/05 21:59:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1449302623953_001015/12/05 21 : 59:30 INFO impl.YarnClientImpl: Submitted application application_1449302623953_001015/12/05 21:59:31 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1449302623953_0010/15/12/05 21:59:31 INFO mapreduce.Job: Running job: job_1449302623953_001015/12/05 22:00:00 INFO mapreduce.Job: Job job_1449302623953_0010 running in uber mode: false15/12/05 22:00:00 INFO mapreduce.Job: map 0 Reduce 0 reduce 12 reduce 05 22:00:29 INFO mapreduce.Job: map 100% reduce 0 INFO mapreduce.Job 05 22:00:48 INFO mapreduce.Job: map 100% reduce 33-12-05 22:01:07 INFO mapreduce.Job: map 100% reduce 100-12-05 22:01:07 INFO mapreduce.Job: Job job_1449302623953_0010 completed successfully15/12/05 22:01:07 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=158 FILE: Number of bytes written=432595 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=532 HDFS: Number of bytes written=130 HDFS: Number of read operations=12 HDFS: Number of large read operations=0 HDFS: Number of write operations=6 Job Counters Killed reduce tasks=1 Launched map tasks=1 Launched reduce tasks=4 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms) = 26075 Total time spent by all reduces in occupied slots (ms) = 92427 Total time spent by all map tasks (ms) = 26075 Total time spent by all reduce tasks ( Ms) = 92427 Total vcore-seconds taken by all map tasks=26075 Total vcore-seconds taken by all reduce tasks=92427 Total megabyte-seconds taken by all map tasks=26700800 Total megabyte-seconds taken by all reduce tasks=94645248 Map-Reduce Framework Map input records=4 Map output records=4 Map output bytes=132 Map output materialized bytes=158 Input split bytes=109 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=158 Reduce input records=4 Reduce output records=3 Spilled Records=8 Shuffled Maps = 3 Failed Shuffles=0 Merged Map outputs=3 GC time elapsed (ms) = 339 CPU time spent (ms) = 4690 Physical memory (bytes) snapshot=513667072 Virtual memory (bytes) snapshot=3405312000 Total committed heap usage (bytes) = 167907328 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=423 File Output Format Counters Bytes Written=130 [hadoop@nnode code] $

View the result

[hadoop@nnode code] $hdfs dfs-ls / 2015120500020Found 4 items-rw-r--r-- 2 hadoop hadoop 33 2015-12-05 22:01 / 2015120500020Universe hadoop hadoop S.txtmuri / 2015120500020Universe hadoop hadoop S.txtripr RWRAR- 2 hadoop hadoop 0 2015-12-05 22:01 / 2015120500020Universe _ success [Hadoop @ nnode code] $hdfs dfs-text / 2015120500020/FTP.txtFTP www.subnetc1.com/index.html [hadoop@nnode code] $hdfs dfs-text / 2015120500020/HTTP.txtHTTP www.subnetc4.com/index.html;www.subnetc2.com/index.html; [hadoop@nnode code] $hdfs dfs-text / 2015120500020/HTTPS.txtHTTPS www.subnetc3.com/index.html; [hadoop@nnode code] $

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report