Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to implement Auxiliary sorting in Hadoop

2025-01-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly shows you "Hadoop how to achieve auxiliary sorting", the content is easy to understand, clear, hope to help you solve your doubts, the following let the editor lead you to study and learn "Hadoop how to achieve auxiliary sorting" this article.

1. Sample data

011990-99999 SIHCCAJAVRI012650-99999 TYNSET-HANSMOEN

012650-99999 194903241200 111012650-9999194903241800 78011990-9999195005150700 0011990-9999195005151200 22011990-9999195005151800-11

two。 Demand

3. Ideas, codes

The weather station information and weather information with the same weather station ID are handed over to the same Reducer, and the weather station information is guaranteed to arrive first; then the reduce () function obtains the weather station name from the first line, starts from the second line and outputs the weather information.

Import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.io.WritableUtils;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * key combination, which in this case is used to assist sorting, including weather station ID and "marking". * Mark is a virtual field whose sole purpose is to sort the records so that the records of the weather station arrive before the weather records. * although the data transfer order can be unspecified and the records to be processed are cached in memory, this should be avoided as far as possible, * because the number of records in any of these groups may be very large, far exceeding the amount of memory available in reducer * / public class TextPair implements WritableComparable {private Text first; private Text second; public TextPair () {set (new Text (), new Text () } public TextPair (String first, String second) {set (new Text (first), new Text (second));} public TextPair (Text first, Text second) {set (first, second);} public void set (Text first, Text second) {this.first = first; this.second = second;} public Text getFirst () {return first } public Text getSecond () {return second;} public void write (DataOutput out) throws IOException {first.write (out); second.write (out);} public void readFields (DataInput in) throws IOException {first.readFields (in); second.readFields (in);} @ Override public int hashCode () {return first.hashCode () @ Override public boolean equals (Object obj) {if (obj instanceof TextPair) {TextPair tp = (TextPair) obj; return first.equals (tp.first) & & second.equals (tp.second);} return false;} @ Override public String toString () {return first + "\ t" + second } public int compareTo (TextPair o) {int cmp = first.compareTo (o.first); if (cmp = = 0) {cmp = second.compareTo (o.second);} return cmp } / / RawComparator allows you to compare records in the data flow directly without first deserializing the data stream into objects, thus avoiding the extra overhead of creating new objects. / / WritableComparator is a general implementation of RawComparator inherited from the WritableComparable class. Public static class FirstComparator extends WritableComparator {private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator (); public FirstComparator () {super (TextPair.class) } @ Override public int compare (byte [] b1, int S1, int L1, byte [] b2, int S2, int L2) {try {/ / firstL1, firstL2 represents the length of the first Text field in each byte stream int firstL1 = WritableUtils.decodeVIntSize (b1 [S1]) + readVInt (b1, S1) Int firstL2 = WritableUtils.decodeVIntSize (b2 [S2]) + readVInt (b2, S2); return TEXT_COMPARATOR.compare (b1, S1, firstL1, b2, S2, firstL2);} catch (IOException e) {throw new IllegalArgumentException (e) } @ Override public int compare (WritableComparable a, WritableComparable b) {if (an instanceof TextPair & & b instanceof TextPair) {return ((TextPair) a) .first.compareTo (TextPair) b) .first);} return super.compare (a, b);}

Import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * marks the mapper * / public class JoinStationMapper extends Mapper {@ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] val = value.toString () .split ("\\ t") recorded by the weather station. If (val.length = = 2) {context.write (new TextPair (val [0], "0"), new Text (val [1]);}

Import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * Mark mapper * / public class JoinRecordMapper extends Mapper {@ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] val = value.toString () .split ("\\ t") If (val.length = = 3) {context.write (new TextPair (val [0], "1"), new Text (val [1] + "\ t" + val [2]);}

Import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.Iterator;/** * reducer * / public class JoinReducer extends Reducer {@ Override protected void reduce (TextPair key, Iterable values, Context context) throws IOException, InterruptedException {Iterator iter = values.iterator (); Text stationName = new Text (iter.next ()) / / reducer will first receive weather station records (never write as Text stationName = iter.next ();) while (iter.hasNext ()) {Text record = iter.next (); Text outValue = new Text (stationName.toString () + "\ t" + record.toString ()); context.write (key.getFirst (), outValue);}

Import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser Public class JoinRecordWithStationName {static class KeyPartitioner extends Partitioner {@ Override public int getPartition (TextPair textPair, Text text, int numPartitions) {return (textPair.getFirst (). HashCode () & Integer.MAX_VALUE)% numPartitions;}} public static void main (String [] args) throws Exception {Configuration conf = new Configuration (); String [] otherArgs = new GenericOptionsParser (conf, args). GetRemainingArgs () If (otherArgs.length! = 3) {System.err.println ("Parameter number is wrong, please enter three parameters:"); System.exit (- 1);} Path ncdcInputPath = new Path (otherArgs [0]); Path stationInputPath = new Path (otherArgs [1]); Path outputPath = new Path (otherArgs [2]) / / conf.set ("fs.defaultFS", "hdfs://vmnode.zhch:9000"); Job job = Job.getInstance (conf, "JoinRecordWithStationName"); / / job.setJar ("F:/workspace/AssistRanking/target/AssistRanking-1.0-SNAPSHOT.jar"); job.setJarByClass (JoinRecordWithStationName.class); MultipleInputs.addInputPath (job, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class) MultipleInputs.addInputPath (job, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath (job, outputPath); / only according to first (meteorological station ID) partition, grouping (records in the same partition will be processed by the same Reducer, records in the same area and the same group of records will be processed by the same Reducer in the same reduce () function call) job.setPartitionerClass (KeyPartitioner.class); job.setGroupingComparatorClass (TextPair.FirstComparator.class) Job.setMapOutputKeyClass (TextPair.class); job.setReducerClass (JoinReducer.class); job.setOutputKeyClass (Text.class); System.exit (job.waitForCompletion (true)? 0: 1);}}

4. Running result

The above is all the content of the article "how to achieve Auxiliary sorting in Hadoop". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report