In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-13 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Lu Chunli's work notes, who said that programmers should not have literary style?
TextInputFormat provides a way to deal with text files, FileSplit through InputSplit, each slice is read and parsed by new a LineRecordReader, and each line obtained by parsing is passed to the map () function of Mapper in the form of.
Application example: randomly generate 100 decimals and find the maximum value.
The input types included with MapReduce are all based on HDFS. Instead of reading data from HDFS, this example generates 100 decimals from memory and calculates the maximum value.
Custom InputFormat
Package com.lucl.hadoop.mapreduce.rand;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Random;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.InputFormat;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext / * * @ author luchunli * @ description Custom InputFormat * / public class RandomInputFormat extends InputFormat {public static float [] floatValues = null; / * * Custom sharding rules * * / @ Override public List getSplits (JobContext context) throws IOException, InterruptedException {/ / initialize the length of the array int NumOfValues = context.getConfiguration (). GetInt ("lucl.random.nums", 100); floatValues = new float [NumOfValues] Random random = new Random (); for (int I = 0; I
< NumOfValues; i++) { floatValues[i] = random.nextFloat(); } System.out.println("生成的随机数的值如下:"); for (float f : floatValues) { System.out.println(f); } System.out.println("===================="); // 如下代码表示指定两个map task来处理这100个小数,每个map task处理50个小数 // 初始化split分片数目,split分片的数量等于map任务的数量,但是也可以通过配置参数mapred.map.tasks来指定 // 如果该参数和splits的切片数不一致时,map task的数目如何确定,后续再通过代码分析 int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2); int begin = 0; // Math.floor是为了下取整,这里是100刚好整除,如果是99的话Math.floor的值是49.0 // 50 int length = (int)Math.floor(NumOfValues / NumSplits); // end = 49,第一个split的范围就是0~49 int end = length - 1; // 默认的FileInputFormat类的getSplits方法中是通过文件数目和blocksize进行分的, // 文件超过一个块会分成多个split,否则一个文件一个split分片 List splits = new ArrayList(); for (int i = 0; i < NumSplits - 1; i++) { // 2个splits分片,分别为0和1 RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); // begin是上一个split切片的下一个值 begin = end + 1; // 50 // 切片的长度不变,结束位置为起始位置+分片的长度,而数组下标是从0开始的, // 因此结束位置应该是begin加长度-1 end = begin + (length - 1); // 50 + (50 -1) = 99 } RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); /** * * 通过默认的TextInputFormat来实现的时候,如果有两个小文件,则splits=2,参见: * http://luchunli.blog.51cto.com/2368057/1676185 * */ return splits; } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new RandomRecordReader(); }} 自定义InputSplit package com.lucl.hadoop.mapreduce.rand;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputSplit;/** * @author luchunli * @description * 自定义InputSplit,参照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit} * * FileSplit是针对HDFS上文件的实现,因此其属性包括文件绝对路径(Path)、分片起始位置(start)、 * 分片长度(length)、副本信息(保存Block复本数据到的主机数组)。 * * 自定义的InputSplit是针对内存中的数组数据进行的处理,因此无需记录文件路径及副本信息,只需要记录对数组分片的起始位置、分片长度即可。 * */public class RandomInputSplit extends InputSplit implements Writable { private int start; private int end; private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class); public RandomInputSplit () {} /** * Constructs a split * * @param start * @param end * */ public RandomInputSplit (int start, int end) { this.start = start; this.end = end; int len = this.end - this.start + 1; int index = start; FloatWritable [] result = new FloatWritable[len]; for (int i = 0; i < len; i++) { float f = RandomInputFormat.floatValues[index]; FloatWritable fw = new FloatWritable(f); result[i] = fw; index++; } floatArray.set(result); // System.out.println("查看分片数据:");// for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) {// System.out.println(fw.get());// }// System.out.println("====================="); } @Override public long getLength() throws IOException, InterruptedException { return this.end - this.start; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[]{"dnode1", "dnode2"}; } @Override public void readFields(DataInput in) throws IOException { this.start = in.readInt(); this.end = in.readInt(); this.floatArray.readFields(in); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.getStart()); out.writeInt(this.getEnd()); this.floatArray.write(out); } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public ArrayWritable getFloatArray() { return floatArray; } @Override public String toString() { return this.getStart() + "-" + this.getEnd(); }} 自定义RecordReader package com.lucl.hadoop.mapreduce.rand;import java.io.IOException;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * @author luchunli * @description 自定义RecordReader * */public class RandomRecordReader extends RecordReader { private int start; private int end; private int index; private IntWritable key = null; private ArrayWritable value = null; private RandomInputSplit rsplit = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.rsplit = (RandomInputSplit)split; this.start = this.rsplit.getStart(); this.end = this.rsplit.getEnd(); this.index = this.start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (null == key) { key = new IntWritable(); } if (null == value) { value = new ArrayWritable(FloatWritable.class); } if (this.index maxValue) { maxValue = tmp; } } // 这里必须要保证多个map输出的key是一样的,否则reduce处理时会认为是不同的数据, // shuffle会分成多个组,导致每个map task算出一个最大值 context.write(one, new FloatWritable(maxValue)); }} 实现Reducer package com.lucl.hadoop.mapreduce.rand;import java.io.File;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * @author luchunli * @description Rducer */public class RandomReducer extends Reducer { @Override protected void setup(Context context) throws IOException, InterruptedException { SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); // 为了查看当前reduce是在那台机器上执行的,在该机器上创建个随机文件 File file = new File("/home/hadoop", "Reducer-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void reduce(IntWritable key, Iterable value, Context context) throws IOException, InterruptedException { Iterator it = value.iterator(); float maxValue = 0; float tmp = 0; if (it.hasNext()) { maxValue = it.next().get(); } else { context.write(new Text("The max value is : "), new FloatWritable(maxValue)); return; } while (it.hasNext()) { tmp = it.next().get(); if (tmp >MaxValue) {maxValue = tmp;}} context.write (new Text ("The max value is:"), new FloatWritable (maxValue);}}
Define driver classes
Package com.lucl.hadoop.mapreduce.rand;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner The input classes included in import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;/** * @ author luchunli * @ description MapReduce are all based on HDFS. Instead of reading from HDFS, the following example code randomly generates 100 (0-1) float decimals in memory, and then finds the maximum value of the decimals. * / public class RandomDriver extends Configured implements Tool {public static void main (String [] args) {try {ToolRunner.run (new RandomDriver (), args);} catch (Exception e) {e.printStackTrace ();}} @ Override public int run (String [] args) throws Exception {Configuration conf = this.getConf () Conf.set ("lucl.random.nums", "100th"); conf.set ("mapreduce.job.maps", "2"); Job job = Job.getInstance (getConf (), this.getClass (). GetSimpleName ()); job.setJarByClass (RandomDriver.class); job.setInputFormatClass (RandomInputFormat.class); job.setMapperClass (RandomMapper.class) Job.setMapOutputKeyClass (IntWritable.class); job.setMapOutputValueClass (FloatWritable.class); job.setReducerClass (RandomReducer.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (FloatWritable.class); FileOutputFormat.setOutputPath (job, new Path (args [0])); return job.waitForCompletion (true)? 0: 1;}}
Package operation
[hadoop@nnode code] $hadoop jar RandomMR.jar / 20151202002715 the value of the random number generated by INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 at 00:28:07 is as follows: 0.0200757380.7003490.96178760.82860180.033576370.5500.1126459240.4335080.331843550.699020.239120540.85234240.41338520.02425880.903180.3978710.3889670.584260.45692240.4008810.22305370.928893270.201279940.0957460.2339040.43659060.115678550.027440280.69650.7831944340.6575303010. 074726580.52190220.94099520.71225190.87224650.302889230.517736260.912117540.931724250.384843650.448441150.245897890.833616260.409832240.94449630.120615420.84466410.53035810.112955390.0940039160.118222180.49971490.982963440.487460370.314205350.11513960.79043370.800051150.183444020.81716190.87496990.480232540.00445050.438798670.223678350.629249160.69983150.2221480.73928840.41748650.45282370.700348260.30571490.291778330.227823670.81826110.466802950.47785210.63658230.439719140.270550550.268396740.52632450.88246490.511824850.204947830.76794030.319364070.134768720.472816880. 34021110.287065270.0382034780.73518790.61654040.417611960.522970.7284225 INFO mapreduce.JobSubmitter: number of splits:215/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_001415/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_001415/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/15/12/ 02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_001415/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode: false15/12/02 00:28:38 INFO mapreduce.Job: map 0 reduce 0 reduce 00:29:13 INFO mapreduce.Job: map 100 reduce 00:28:38 INFO mapreduce.Job: map reduce 100-12-02 00:29:32 INFO mapreduce.Job: Job Job_1448981819300_0014 completed successfully15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=26 FILE: Number of bytes written=323256 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=520 HDFS: Number of Bytes written=31 HDFS: Number of read operations=7 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms) = 64430 Total time spent by all reduces in occupied slots (ms) = 16195 Total time spent by all map tasks (ms) = 64430 Total time spent by all reduce tasks (ms) = 16195 Total vcore-seconds taken by all map tasks=64430 Total vcore-seconds taken by all reduce tasks=16195 Total megabyte-seconds taken by all map tasks=65976320 Total megabyte-seconds taken by all reduce tasks=16583680 Map-Reduce Framework Map input records=2 Map output records=2 Map output bytes=16 Map output materialized bytes=32 Input split bytes=520 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=32 Reduce input records=2 Reduce output records=1 Spilled Records=4 Shuffled Maps = 2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms) = 356 CPU time spent (ms) = 1940 Physical memory (bytes) snapshot=513851392 Virtual memory (bytes) snapshot=2541506560 Total committed heap usage (bytes) = 257171456 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=31 [hadoop@nnode code] $
View the output result
[hadoop@nnode code] $hdfs dfs-ls / 201512020027Found 2 items-rw-r--r-- 2 hadoop hadoop 0 2015-12-0200: 29 / 201512020027
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.