Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of multiple join implementations of MapReduce

2025-01-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article will explain in detail the example analysis of a variety of join implementations of MapReduce. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

I. Overview

You must be very familiar with the join operation in RDBMS. When you write sql, you should pay great attention to details. A slight error will take a long time to cause a great performance bottleneck, while it is also time-consuming to use MapReduce framework for join operation in Hadoop. However, due to the particularity of hadoop's distributed design concept, this kind of join operation also has a certain particularity. This paper mainly analyzes in detail several ways to realize the join operation between tables in MapReduce framework, and further explains it according to the actual examples I encountered in the actual development process.

Second, the realization principle

1. Connect at the Reudce end.

Joining on the MapReduce side is the most common pattern for join operations between tables in the Reudce framework. The specific implementation principles are as follows:

The main work of the map side: label key/value pairs from different tables (files) to distinguish records from different sources. Then use the connection field as the key, the rest and the newly added flag as the value, and finally output.

The main work of the reduce side: the grouping of the connection field as the key on the reduce side has been completed, we only need to separate the records from different files (marked in the map phase) in each grouping, and finally Descartes is only ok. The principle is very simple. Let's look at an example:

(1) customize a value return type:

Package com.mr.reduceSizeJoin

Import java.io.DataInput

Import java.io.DataOutput

Import java.io.IOException

Import org.apache.hadoop.io.Text

Import org.apache.hadoop.io.WritableComparable

Public class CombineValues implements WritableComparable {

/ / private static final Logger logger = LoggerFactory.getLogger (CombineValues.class)

Private Text joinKey;// link keyword

Private Text flag;// file source flag

Other parts of private Text secondPart;// except the link keys

Public void setJoinKey (Text joinKey) {

This.joinKey = joinKey

}

Public void setFlag (Text flag) {

This.flag = flag

}

Public void setSecondPart (Text secondPart) {

This.secondPart = secondPart

}

Public Text getFlag () {

Return flag

}

Public Text getSecondPart () {

Return secondPart

}

Public Text getJoinKey () {

Return joinKey

}

Public CombineValues () {

This.joinKey = new Text ()

This.flag = new Text ()

This.secondPart = new Text ()

}

@ Override

Public void write (DataOutput out) throws IOException {

This.joinKey.write (out)

This.flag.write (out)

This.secondPart.write (out)

}

@ Override

Public void readFields (DataInput in) throws IOException {

This.joinKey.readFields (in)

This.flag.readFields (in)

This.secondPart.readFields (in)

}

@ Override

Public int compareTo (CombineValues o) {

Return this.joinKey.compareTo (o.getJoinKey ())

}

@ Override

Public String toString () {

/ / TODO Auto-generated method stub

Return "[flag=" + this.flag.toString () + ", joinKey=" + this.joinKey.toString () + ", secondPart=" + this.secondPart.toString () + "]"

}

}

(2) map, reduce body code:

Package com.mr.reduceSizeJoin

Import java.io.IOException

Import java.util.ArrayList

Import org.apache.hadoop.conf.Configuration

Import org.apache.hadoop.conf.Configured

Import org.apache.hadoop.fs.Path

Import org.apache.hadoop.io.Text

Import org.apache.hadoop.mapreduce.Job

Import org.apache.hadoop.mapreduce.Mapper

Import org.apache.hadoop.mapreduce.Reducer

Import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

Import org.apache.hadoop.mapreduce.lib.input.FileSplit

Import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

Import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

Import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

Import org.apache.hadoop.util.Tool

Import org.apache.hadoop.util.ToolRunner

Import org.slf4j.Logger

Import org.slf4j.LoggerFactory

/ * *

* @ author zengzhaozheng

* usage description:

* left outer join in reudce side join

* to connect to the left, the two files represent two tables, the id field of the connection field table1 and the cityID field of table2

* table1 (left table): tb_dim_city (id int,name string,orderid int,city_code,is_show)

* content of tb_dim_city.dat file, delimited by "|":

* id name orderid city_code is_show

* 0 other 9999 9999 0

* 1 Changchun 1 901 1

* 2 Jilin 2 902 1

* 3 Siping 3 903 1

* 4 Songyuan 4 904 1

* 5 Tonghua 5 905 1

* 6 Liaoyuan 6 906 1

* 7 Baicheng 7 907 1

* 8 Baishan 8 908 1

* 9 Yanji 9 909 1

*-coquettish dividing line--

* table2 (right table): tb_user_profiles (userID int,userName string,network string,double flow,cityID int)

* content of tb_user_profiles.dat file, delimited by "|":

* userID network flow cityID

* 1 2G 123 1

* 2 3G 333 2

* 3 3G 555 1

* 4 2G 777 3

* 5 3G 666 4

*

*-coquettish dividing line--

* results:

* 1 Changchun 1 901 1 1 2G 123

* 1 Changchun 1 901 1 3 3G 555

* 2 Jilin 2 902 1 2 3G 333

* 3 Siping 3 903 1 4 2G 777

* 4 Songyuan 4 904 1 5 3G 666

, /

Public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool {

Private static final Logger logger = LoggerFactory.getLogger (ReduceSideJoin_LeftOuterJoin.class)

Public static class LeftOutJoinMapper extends Mapper {

Private CombineValues combineValues = new CombineValues ()

Private Text flag = new Text ()

Private Text joinKey = new Text ()

Private Text secondPart = new Text ()

@ Override

Protected void map (Object key, Text value, Context context)

Throws IOException, InterruptedException {

/ / obtain the file input path

String pathName = ((FileSplit) context.getInputSplit ()). GetPath (). ToString ()

/ / the data comes from the tb_dim_city.dat file, and the flag is "0"

If (pathName.endsWith ("tb_dim_city.dat")) {

String [] valueItems = value.toString () .split ("\\ |")

/ / filter malformed records

If (valueItems.length! = 5) {

Return

}

Flag.set ("0")

JoinKey.set (valueItems [0])

SecondPart.set (valueItems [1] + "\ t" + valueItems [2] + "\ t" + valueItems [3] + "\ t" + valueItems [4])

CombineValues.setFlag (flag)

CombineValues.setJoinKey (joinKey)

CombineValues.setSecondPart (secondPart)

Context.write (combineValues.getJoinKey (), combineValues)

} / / the data comes from tb_user_profiles.dat, and the flag is "1"

Else if (pathName.endsWith ("tb_user_profiles.dat")) {

String [] valueItems = value.toString () .split ("\\ |")

/ / filter malformed records

If (valueItems.length! = 4) {

Return

}

Flag.set ("1")

JoinKey.set (valueItems [3])

SecondPart.set (valueItems [0] + "\ t" + valueItems [1] + "\ t" + valueItems [2])

CombineValues.setFlag (flag)

CombineValues.setJoinKey (joinKey)

CombineValues.setSecondPart (secondPart)

Context.write (combineValues.getJoinKey (), combineValues)

}

}

}

Public static class LeftOutJoinReducer extends Reducer {

/ / Store the left table information in a group

Private ArrayList leftTable = new ArrayList ()

/ / Store the right table information in a grouping

Private ArrayList rightTable = new ArrayList ()

Private Text secondPar = null

Private Text output = new Text ()

/ * *

* A group calls the reduce function once

, /

@ Override

Protected void reduce (Text key, Iterable value, Context context)

Throws IOException, InterruptedException {

LeftTable.clear ()

RightTable.clear ()

/ * *

* store the elements in the group separately according to the file

* problems to be paid attention to in this method:

* if there are too many elements in a group, it may lead to OOM in the reduce phase

* it is best to understand the distribution of data before dealing with distributed problems.

* appropriate processing methods can effectively prevent excessive tilting of OOM and data.

, /

For (CombineValues cv: value) {

SecondPar = new Text (cv.getSecondPart () .toString ())

/ / tb_dim_city in the left table

If ("0" .equals (cv.getFlag (). ToString (). Trim () {

LeftTable.add (secondPar)

}

/ / Table tb_user_profiles on the right

Else if ("1" .equals (cv.getFlag (). ToString (). Trim () {

RightTable.add (secondPar)

}

}

Logger.info ("tb_dim_city:" + leftTable.toString ())

Logger.info ("tb_user_profiles:" + rightTable.toString ())

For (Text leftPart: leftTable) {

For (Text rightPart: rightTable) {

Output.set (leftPart+ "\ t" + rightPart)

Context.write (key, output)

}

}

}

}

@ Override

Public int run (String [] args) throws Exception {

Configuration conf=getConf (); / / get the profile object

Job job=new Job (conf, "LeftOutJoinMR")

Job.setJarByClass (ReduceSideJoin_LeftOuterJoin.class)

FileInputFormat.addInputPath (job, new Path (args [0])); / / set the map input file path

FileOutputFormat.setOutputPath (job, new Path (args [1])); / / set the reduce output file path

Job.setMapperClass (LeftOutJoinMapper.class)

Job.setReducerClass (LeftOutJoinReducer.class)

Job.setInputFormatClass (TextInputFormat.class); / / sets the file input format

Job.setOutputFormatClass (TextOutputFormat.class); / / use the default output format

/ / set the output key and value types of map

Job.setMapOutputKeyClass (Text.class)

Job.setMapOutputValueClass (CombineValues.class)

/ / set the output key and value types of reduce

Job.setOutputKeyClass (Text.class)

Job.setOutputValueClass (Text.class)

Job.waitForCompletion (true)

Return job.isSuccessful ()? 0:1

}

Public static void main (String [] args) throws IOException

ClassNotFoundException, InterruptedException {

Try {

Int returnCode = ToolRunner.run (new ReduceSideJoin_LeftOuterJoin (), args)

System.exit (returnCode)

} catch (Exception e) {

/ / TODO Auto-generated catch block

Logger.error (e.getMessage ())

}

}

}

For the specific analysis and the output and input of the data, please see that the comments in the code have been written more clearly. Here we mainly analyze some of the shortcomings of reduce join. The reason why there is this way of reduce join, we can clearly see the original: because the overall data is segmented, each map task only processes part of the data and cannot get all the required join fields, so we need to talk about join key as a group on the reduce side to deal with all the same join key records, so reduce join came into being. The disadvantage of this method is obviously that it will cause a large amount of data transmission on the map and reduce side, that is, the shuffle phase, and the efficiency is very low.

This is the end of the sample analysis of a variety of join implementations of MapReduce. I hope the above content can be of some help and learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report