Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

[summary] MultipleOutputs practice in Hadoop

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

本例子采用hadoop1.1.2版本,附件中有例子的数据文件

采用气象数据作为处理数据

1、MultipleOutputs例子,具体解释在代码中有注释

package StationPatitioner;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapred.lib.MultipleOutputs;import org.apache.hadoop.mapred.lib.NullOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * hadoop Version 1.1.2 * MultipleOutputs例子 * @author 巧克力黑 * */public class PatitionByStationUsingMultipleOutputs extends Configured implements Tool { enum Counter { LINESKIP, //出错的行 } static class StationMapper extends MapReduceBase implements Mapper{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { parser.parse(value); output.collect(new Text(parser.getStationid()), value); } catch (Exception e) { reporter.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 } } } static class MultipleOutputReducer extends MapReduceBase implements Reducer{ private MultipleOutputs multipleOutputs; @Override public void configure(JobConf jobconf) { multipleOutputs = new MultipleOutputs(jobconf);//初始化一个MultipleOutputs } @Override public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { //得到OutputCollector OutputCollector collector = multipleOutputs.getCollector("station", key.toString().replace("-", ""), reporter); while(values.hasNext()){ collector.collect(NullWritable.get(), values.next());//MultipleOutputs用OutputCollector输出数据 } } @Override public void close() throws IOException { multipleOutputs.close(); } } @Override public int run(String[] as) throws Exception { System.setProperty("HADOOP_USER_NAME", "root");//windows下用户与linux用户不一直,采用此方法避免报Permission相关错误 JobConf conf = new JobConf(); conf.setMapperClass(StationMapper.class); conf.setReducerClass(MultipleOutputReducer.class); conf.setMapOutputKeyClass(Text.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(NullOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("hdfs://ubuntu:9000/sample1.txt"));//input路径 FileOutputFormat.setOutputPath(conf, new Path("hdfs://ubuntu:9000/temperature"));//output路径 MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, NullWritable.class, Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new PatitionByStationUsingMultipleOutputs(), args); System.exit(exitCode); } }

2、解析气象数据的类

package StationPatitioner;import org.apache.hadoop.io.Text;public class NcdcRecordParser { private static final int MISSING_TEMPERATURE = 9999; private String year; private int airTemperature; private String quality; private String stationid; public void parse(String record) { stationid = record.substring(0, 5); year = record.substring(15, 19); String airTemperatureString; // Remove leading plus sign as parseInt doesn't like them if (record.charAt(87) == '+') { airTemperatureString = record.substring(88, 92); } else { airTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperatureString); quality = record.substring(92, 93); } public String getStationid(){ return stationid; } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; }}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report