Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Learning log-join processing of hadoop

2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Join method

Requirements: deal with input1 and input2 files, the id in both files is the same, that is, the key value is the same, the value value is different, merge the two. Input1 stores id and names, while input2 stores id and all kinds of information.

Treatment method 1:

Package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat Import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MyReduceJoin {Intermediate variables private Text key = new Text (); private Text value = new Text (); private String [] keyValue = null to be used by the public static class MapClass extends Mapper {/ / process @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {/ / separated by commas, keyValue = value.toString () .split (",", 2); this.key.set (keyValue [0]); this.value.set (keyValue [1]) Context.write (this.key, this.value);}} public static class Reduce extends Reducer {private Text value = new Text (); @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {StringBuilder valueStr = new StringBuilder () / / the reduce process can iterate out the same id, because the shuffle process is partitioned, sorted, sorted and grouped before entering reduce. / / the value of the same key is scored by default in a group of for (Text val: values) {valueStr.append (val). ValueStr.append (",");} this.value.set (valueStr.deleteCharAt (valueStr.length ()-1). ToString ()); context.write (key, this.value);}} public static void main (String [] args) throws Exception {Configuration conf = new Configuration () Job job = Job.getInstance (conf); job.setJarByClass (MyReduceJoin.class); job.setMapperClass (MapClass.class); job.setReducerClass (Reduce.class); / / format of reduce output job.setOutputKeyClass (Text.class); job.setOutputValueClass (Text.class); job.setInputFormatClass (TextInputFormat.class); job.setOutputFormatClass (TextOutputFormat.class) Path outputPath = new Path (args [1]); FileInputFormat.addInputPath (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, outputPath); outputPath.getFileSystem (conf) .delete (outputPath, true); System.exit (job.waitForCompletion (true)? 0: 1);}}

A disadvantage of the method: the value value is not needed, either the value of the first file or the value of the second file is first.

Treatment method 2:

A custom type was introduced:

Package org.robby.join;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;public class CombineValues implements WritableComparable {/ / the custom type here. The data in the WritableComparable API / / uses the type Text private Text joinKey; private Text flag; private Text secondPart that comes with hadoop. Public void setJoinKey (Text joinKey) {this.joinKey = joinKey;} public void setFlag (Text flag) {this.flag = flag;} public void setSecondPart (Text secondPart) {this.secondPart = secondPart;} public Text getFlag () {return flag;} public Text getSecondPart () {return secondPart;} public Text getJoinKey () {return joinKey } public CombineValues () {/ / initialize the data during construction, add this.joinKey = new Text () with set; this.flag = new Text (); this.secondPart = new Text () } / / sequence and deserialization, which is embodied in the incoming file stream Use the file stream provided by hadoop to transfer data @ Override public void write (DataOutput out) throws IOException {/ / because you are using the Text that comes with hadoop So when serializing, you can use your own Text, and pass in the stream out to this.joinKey.write (out). This.flag.write (out); this.secondPart.write (out);} @ Override public void readFields (DataInput in) throws IOException {this.joinKey.readFields (in); this.flag.readFields (in); this.secondPart.readFields (in);} @ Override public int compareTo (CombineValues o) {return this.joinKey.compareTo (o.getJoinKey ()) @ Override public String toString () {/ / TODO Auto-generated method stub return "[flag=" + this.flag.toString () + ", joinKey=" + this.joinKey.toString () + ", secondPart=" + this.secondPart.toString () + "]";}}

Processing: which files can be processed through context in the mapper phase, so they can be processed separately.

Package org.robby.join;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat Import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin1 {public static class Map extends Mapper {private CombineValues combineValues = new CombineValues (); private Text flag = new Text (); private Text key = new Text (); private Text value = new Text (); private String [] keyValue = null @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {/ / FileSplit is a file block. Through context, the file can be processed to which file String pathName = ((FileSplit) context.getInputSplit ()) .getPath () .toString () / / get the name of the processing file through pathName, and then mark if (pathName.endsWith ("input1.txt")) flag.set ("0") with flag; else flag.set ("1"); combineValues.setFlag (flag); keyValue = value.toString (). Split (",", 2) CombineValues.setJoinKey (new Text (keyValue [0])); combineValues.setSecondPart (new Text (keyValue [1])); this.key.set (keyValue [0]); / / send the encapsulated data out. Key is id and is used for partition sorting and grouping. Value is a custom class. Context.write (this.key, combineValues) needs to be specified in the main function. }} public static class Reduce extends Reducer {private Text value = new Text (); private Text left = new Text (); private Text right = new Text () @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {/ / because key is the same, it is divided into a group of for (CombineValues val: values) {System.out.println ("val:" + val.toString ()) by default. Text secondPar = new Text (val.getSecondPart (). ToString ()); / / based on flag, judge whether left or right if (val.getFlag (). ToString (). Equals ("0")) {System.out.println ("left:" + secondPar); left.set (secondPar) } else {System.out.println ("right:" + secondPar); right.set (secondPar);}} / / integrate value, output Text output = new Text (left.toString () + "," + right.toString ()) Context.write (key, output);}} public static void main (String [] args) throws Exception {Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (MyReduceJoin1.class); job.setMapperClass (Map.class); job.setReducerClass (Reduce.class) / / specify the output of map, because the default is Text.class job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (CombineValues.class); / / indicates the output of reduce job.setOutputKeyClass (Text.class); job.setOutputValueClass (Text.class); / / the file input and output form of the job task job.setInputFormatClass (TextInputFormat.class) Job.setOutputFormatClass (TextOutputFormat.class); / / the output and input file paths of the job task Path outputPath = new Path (args [1]); FileInputFormat.addInputPath (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, outputPath); / / use an outputPath to check whether hdfs already has this file, and delete outputPath.getFileSystem (conf) .delete (outputPath, true) if so System.exit (job.waitForCompletion (true)? 0: 1);}}

Disadvantages: if the number of files is different, and you also need to merge the same id

Treatment method 3:

Package org.robby.join;import java.io.IOException;import java.util.ArrayList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat Import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MyReduceJoin2 {public static class Map extends Mapper {private CombineValues combineValues = new CombineValues (); private Text flag = new Text (); private Text key = new Text (); private Text value = new Text (); private String [] keyValue = null The processing of @ Override / / map is the same as before, the file is marked with flag, and the output protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String pathName = ((FileSplit) context.getInputSplit ()) .getPath () .toString () If (pathName.endsWith ("input1.txt")) flag.set ("0"); else flag.set ("1"); combineValues.setFlag (flag); keyValue = value.toString (). Split (",", 2); combineValues.setJoinKey (keyValue [0])) CombineValues.setSecondPart (new Text (keyValue [1])); this.key.set (keyValue [0]); context.write (this.key, combineValues);} public static class Reduce extends Reducer {private Text value = new Text (); private Text left = new Text (); private ArrayList right = new ArrayList () @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {right.clear (); for (CombineValues val: values) {/ / the same merge of id here, there are multiple System.out.println ("val:" + val.toString ()) Text secondPar = new Text (val.getSecondPart (). ToString ()); if (val.getFlag (). ToString (). Equals ("0")) {left.set (secondPar) } else {/ / File 1 is the name, and File 2 is all kinds of information, so there is a right.add (secondPar) in a list collection }} for (Text t: right) {Text output = new Text (left.toString () + "," + t.toString ()); context.write (key, output) } public static void main (String [] args) throws Exception {Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (MyReduceJoin2.class); job.setMapperClass (Map.class); job.setReducerClass (Reduce.class); job.setMapOutputKeyClass (Text.class) Job.setMapOutputValueClass (CombineValues.class); job.setOutputKeyClass (Text.class); job.setOutputValueClass (Text.class); job.setInputFormatClass (TextInputFormat.class); job.setOutputFormatClass (TextOutputFormat.class); Path outputPath = new Path (args [1]); FileInputFormat.addInputPath (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, outputPath) OutputPath.getFileSystem (conf) .delete (outputPath, true); System.exit (job.waitForCompletion (true)? 0: 1);}}

Other processing methods:

Use distributedCache to map in mapper

The main purpose is to rewrite the setup method in mapper, use a context to read the file passed in by job, and then store it in the mapper object, so that mapper can call these pre-stored data every time it implements the map method.

Using setup to preprocess input1, the map method of mapper can handle input2.

Package org.robby.join;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer Import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;public class MapJoinWithCache {public static class Map extends Mapper {private CombineValues combineValues = new CombineValues (); private Text flag = new Text (); private Text key = new Text (); private Text value = new Text () Private String [] keyValue = null; / / this keyMap is the private HashMap keyMap = null that stores file data for map to share. @ Override / / this map is called once per line, and the incoming data / / accesses the keyMap collection every time / / because the setup method processes the input1 file, so here you only need to deal with input2 (LongWritable key, Text value, Context context) throws IOException, InterruptedException {keyValue = value.toString (). Split (",", 2) String name = keyMap.get (keyValue [0]); this.key.set (keyValue [0]); String output = name + "," + keyValue [1]; this.value.set (output); context.write (this.key, this.value) } @ Override / / this setup method is the method protected void setup (Context context) throws IOException that is initialized in the mapper class, and InterruptedException {/ / context passes in the file path URI [] localPaths = context.getCacheFiles (); keyMap = new HashMap () For (URI url: localPaths) {/ / Open the hdfs file system through uri FileSystem fs = FileSystem.get (URI.create ("hdfs://hadoop1:9000"), context.getConfiguration ()); FSDataInputStream in = null / / Open the corresponding file of hdfs, which needs to be created and passed in by path class to get the stream object in = fs.open (new Path (url.getPath (); BufferedReader br=new BufferedReader (new InputStreamReader (in)); String S1 = null While ((S1 = br.readLine ())! = null) {keyValue = s1.split (",", 2); keyMap.put (keyValue [0], keyValue [1]); System.out.println (S1) } br.close () } public static class Reduce extends Reducer {/ / processing is carried out in mpper, and the data after reduce iterative grouping is @ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {for (Text val: values) context.write (key, val) }} public static void main (String [] args) throws Exception {Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (MapJoinWithCache.class); job.setMapperClass (Map.class); job.setReducerClass (Reduce.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (Text.class) Job.setOutputKeyClass (Text.class); job.setOutputValueClass (Text.class); job.setInputFormatClass (TextInputFormat.class); job.setOutputFormatClass (TextOutputFormat.class); Path outputPath = new Path (args [1]); FileInputFormat.addInputPath (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, outputPath); outputPath.getFileSystem (conf) .delete (outputPath, true) / / everything else is the same. Here, the file path to be passed in is added to the job, which is used as cache / / multiple files can be passed in. The full path of the file is job.addCacheFile (new Path (args [2]). ToUri ()); System.exit (job.waitForCompletion (true)? 0: 1);}}

Other linux directives:

[root@hadoop1 dataFile] # wc test* 6 14 35 test2.txt 7 16 41 test.txt13 30 76 total

You can view the number of files through wc

Attachment: http://down.51cto.com/data/2366171

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report