Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

MapReduce--mapjoin and reducejoin

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

1. Map join

1. Applicable scenarios:

One table is very large, one table is very small

2. Solution:

Cache multiple tables on the map side and process the business logic in advance, so as to increase the business on the map side, reduce the data pressure on the reduce side, and reduce the data tilt as much as possible.

3. Specific method: using distributed cache

(1) in the setup phase of mapper, the file is read into the cache collection

(2) load the cache in driver, job.addCacheFile (new URI ("file:/e:/mapjoincache/pd.txt")); / / cache ordinary files to the task runtime node.

4. Examples

/ order.txt order id goods id goods quantity 1001 01 11002 02 21003 03 31004 01 41005 02 51006 03 6//pd.txt goods id name 01 Xiaomi 02 Huawei 03 Gree

To replace the commodity id in order with the commodity name, cache the small table pd.txt

Mapper:

Package MapJoin;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.*;import java.util.HashMap;import java.util.Map;public class MapJoinMapper extends Mapper {Map productMap = new HashMap (); Text k = new Text () / * * load pd.txt into hashmap only once * @ param context * @ throws IOException * @ throws InterruptedException * / @ Override protected void setup (Context context) throws IOException, InterruptedException {BufferedReader productReader = new BufferedReader (new FileInputStream (new File ("G:\ test\ A\\ mapjoin\ pd.txt"); String line While (StringUtils.isNotEmpty (line = productReader.readLine ()) {String [] fields = line.split ("\ t"); productMap.put (fields [0], fields [1]);} productReader.close ();} @ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString () String [] fields = line.split ("\ t"); String productName = productMap.get (fields [1]); k.set (fields [0] + "\ t" + productName + "\ t" + fields [2]); context.write (k, NullWritable.get ());} @ Override protected void cleanup (Context context) throws IOException, InterruptedException {super.cleanup (context);}}

Driver:

Package MapJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException Public class MapJoinDriver {public static void main (String [] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {args = new String [] {"G:\\ test\ A\\ mapjoin\\ order.txt", "G:\ test\ A\\ mapjoin\\ join2\"}; Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (MapJoinDriver.class); job.setMapperClass (MapJoinMapper.class) Job.setOutputKeyClass (Text.class); job.setOutputValueClass (NullWritable.class); / / load small reused files into the cache job.addCacheFile (new URI ("file:///G:/test/A/mapjoin/pd.txt")); job.setNumReduceTasks (0); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1]) Job.waitForCompletion (true);}} II, reduce join

1. Analytical ideas

By using the key of the association condition as the output of the map, that is, using the commodity ID as the key, the data of the two tables satisfying the join condition and carrying the file information of the data source are sent to the same reduce task, and the data are concatenated in reduce.

The input data is the same as the above map join, and the output is similar to the above.

Bean:

Package ReduceJoin;import lombok.AllArgsConstructor;import lombok.Getter;import lombok.NoArgsConstructor;import lombok.Setter;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;@Getter@Setter@NoArgsConstructor@AllArgsConstructorpublic class OrderBean implements Writable {private String orderID; private String productID; private int amount; private String productName; private String flag; @ Override public void write (DataOutput dataOutput) throws IOException {dataOutput.writeUTF (this.orderID) DataOutput.writeUTF (this.productID); dataOutput.writeInt (this.amount); dataOutput.writeUTF (this.productName); dataOutput.writeUTF (this.flag);} @ Override public void readFields (DataInput dataInput) throws IOException {this.orderID = dataInput.readUTF (); this.productID = dataInput.readUTF (); this.amount = dataInput.readInt (); this.productName = dataInput.readUTF () This.flag = dataInput.readUTF ();} @ Override public String toString () {StringBuilder sb = new StringBuilder (); sb.append (this.orderID); sb.append ("\ t"); sb.append (this.productName); sb.append ("\ t"); sb.append (this.amount); sb.append ("\ t") Sb.append (this.flag); return sb.toString ();}}

Map:

Package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class OrderMapper extends Mapper {Text k = new Text (); OrderBean v = new OrderBean () Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString (); String [] fields = line.split ("\ t"); FileSplit inputSplit = (FileSplit) context.getInputSplit (); String fileName = inputSplit.getPath (). GetName () / key if (fileName.startsWith ("order")) {k.set (fields [1]); v.setOrderID (fields [0]); v.setProductID (fields [1]); v.setAmount (Integer.parseInt (fields [2])); v.setFlag ("0"); v.setProductName (") } else {k.set (fields [0]); v.setOrderID (""); v.setAmount (0); v.setProductID (fields [0]); v.setProductName (fields [1]); v.setFlag ("1");} context.write (k, v);}}

Reduce:

Package ReduceJoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList Public class OrderReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {/ / key is productID. If the productID of the order table and the product name table are the same, then the key is the same and will merge together / reduce output is ArrayList orderBeans = new ArrayList (); OrderBean pdBean = new OrderBean (); OrderBean tmp = new OrderBean () For (OrderBean bean: values) {if ("0" .equals (bean.getFlag () {try {BeanUtils.copyProperties (tmp, bean);} catch (IllegalAccessException e) {e.printStackTrace ();} catch (InvocationTargetException e) {e.printStackTrace () } orderBeans.add (tmp); / / orderBeans.add (bean);} else {/ / KV try {BeanUtils.copyProperties (pdBean, bean);} catch (IllegalAccessException e) {e.printStackTrace () } catch (InvocationTargetException e) {e.printStackTrace ();} / / get the productName of the current KV and output for (OrderBean o: orderBeans) {o.setProductName (pdBean.getProductName ()); context.write (o, NullWritable.get ());}

Driver:

Package ReduceJoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException Public class OrderDriver {public static void main (String [] args) throws IOException, ClassNotFoundException, InterruptedException {args = new String [] {"G:\\ test\\ A\\ mapjoin\", "G:\\ test\ A\\ reducejoin12\"}; Configuration conf = new Configuration (); Job job = Job.getInstance (conf); job.setJarByClass (OrderDriver.class); job.setMapperClass (OrderMapper.class) Job.setReducerClass (OrderReducer.class); job.setMapOutputKeyClass (Text.class); job.setMapOutputValueClass (OrderBean.class); job.setOutputKeyClass (OrderBean.class); job.setOutputValueClass (NullWritable.class); FileInputFormat.setInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath (job, new Path (args [1])); job.waitForCompletion (true);}}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report