In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article will explain in detail the example analysis of a variety of join implementations of MapReduce. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.
I. Overview
You must be very familiar with the join operation in RDBMS. When you write sql, you should pay great attention to details. A slight error will take a long time to cause a great performance bottleneck, while it is also time-consuming to use MapReduce framework for join operation in Hadoop. However, due to the particularity of hadoop's distributed design concept, this kind of join operation also has a certain particularity. This paper mainly analyzes in detail several ways to realize the join operation between tables in MapReduce framework, and further explains it according to the actual examples I encountered in the actual development process.
Second, the realization principle
1. Connect at the Reudce end.
Joining on the MapReduce side is the most common pattern for join operations between tables in the Reudce framework. The specific implementation principles are as follows:
The main work of the map side: label key/value pairs from different tables (files) to distinguish records from different sources. Then use the connection field as the key, the rest and the newly added flag as the value, and finally output.
The main work of the reduce side: the grouping of the connection field as the key on the reduce side has been completed, we only need to separate the records from different files (marked in the map phase) in each grouping, and finally Descartes is only ok. The principle is very simple. Let's look at an example:
(1) customize a value return type:
Package com.mr.reduceSizeJoin
Import java.io.DataInput
Import java.io.DataOutput
Import java.io.IOException
Import org.apache.hadoop.io.Text
Import org.apache.hadoop.io.WritableComparable
Public class CombineValues implements WritableComparable {
/ / private static final Logger logger = LoggerFactory.getLogger (CombineValues.class)
Private Text joinKey;// link keyword
Private Text flag;// file source flag
Other parts of private Text secondPart;// except the link keys
Public void setJoinKey (Text joinKey) {
This.joinKey = joinKey
}
Public void setFlag (Text flag) {
This.flag = flag
}
Public void setSecondPart (Text secondPart) {
This.secondPart = secondPart
}
Public Text getFlag () {
Return flag
}
Public Text getSecondPart () {
Return secondPart
}
Public Text getJoinKey () {
Return joinKey
}
Public CombineValues () {
This.joinKey = new Text ()
This.flag = new Text ()
This.secondPart = new Text ()
}
@ Override
Public void write (DataOutput out) throws IOException {
This.joinKey.write (out)
This.flag.write (out)
This.secondPart.write (out)
}
@ Override
Public void readFields (DataInput in) throws IOException {
This.joinKey.readFields (in)
This.flag.readFields (in)
This.secondPart.readFields (in)
}
@ Override
Public int compareTo (CombineValues o) {
Return this.joinKey.compareTo (o.getJoinKey ())
}
@ Override
Public String toString () {
/ / TODO Auto-generated method stub
Return "[flag=" + this.flag.toString () + ", joinKey=" + this.joinKey.toString () + ", secondPart=" + this.secondPart.toString () + "]"
}
}
(2) map, reduce body code:
Package com.mr.reduceSizeJoin
Import java.io.IOException
Import java.util.ArrayList
Import org.apache.hadoop.conf.Configuration
Import org.apache.hadoop.conf.Configured
Import org.apache.hadoop.fs.Path
Import org.apache.hadoop.io.Text
Import org.apache.hadoop.mapreduce.Job
Import org.apache.hadoop.mapreduce.Mapper
Import org.apache.hadoop.mapreduce.Reducer
Import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
Import org.apache.hadoop.mapreduce.lib.input.FileSplit
Import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
Import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
Import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
Import org.apache.hadoop.util.Tool
Import org.apache.hadoop.util.ToolRunner
Import org.slf4j.Logger
Import org.slf4j.LoggerFactory
/ * *
* @ author zengzhaozheng
* usage description:
* left outer join in reudce side join
* to connect to the left, the two files represent two tables, the id field of the connection field table1 and the cityID field of table2
* table1 (left table): tb_dim_city (id int,name string,orderid int,city_code,is_show)
* content of tb_dim_city.dat file, delimited by "|":
* id name orderid city_code is_show
* 0 other 9999 9999 0
* 1 Changchun 1 901 1
* 2 Jilin 2 902 1
* 3 Siping 3 903 1
* 4 Songyuan 4 904 1
* 5 Tonghua 5 905 1
* 6 Liaoyuan 6 906 1
* 7 Baicheng 7 907 1
* 8 Baishan 8 908 1
* 9 Yanji 9 909 1
*-coquettish dividing line--
* table2 (right table): tb_user_profiles (userID int,userName string,network string,double flow,cityID int)
* content of tb_user_profiles.dat file, delimited by "|":
* userID network flow cityID
* 1 2G 123 1
* 2 3G 333 2
* 3 3G 555 1
* 4 2G 777 3
* 5 3G 666 4
*
*-coquettish dividing line--
* results:
* 1 Changchun 1 901 1 1 2G 123
* 1 Changchun 1 901 1 3 3G 555
* 2 Jilin 2 902 1 2 3G 333
* 3 Siping 3 903 1 4 2G 777
* 4 Songyuan 4 904 1 5 3G 666
, /
Public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool {
Private static final Logger logger = LoggerFactory.getLogger (ReduceSideJoin_LeftOuterJoin.class)
Public static class LeftOutJoinMapper extends Mapper {
Private CombineValues combineValues = new CombineValues ()
Private Text flag = new Text ()
Private Text joinKey = new Text ()
Private Text secondPart = new Text ()
@ Override
Protected void map (Object key, Text value, Context context)
Throws IOException, InterruptedException {
/ / obtain the file input path
String pathName = ((FileSplit) context.getInputSplit ()). GetPath (). ToString ()
/ / the data comes from the tb_dim_city.dat file, and the flag is "0"
If (pathName.endsWith ("tb_dim_city.dat")) {
String [] valueItems = value.toString () .split ("\\ |")
/ / filter malformed records
If (valueItems.length! = 5) {
Return
}
Flag.set ("0")
JoinKey.set (valueItems [0])
SecondPart.set (valueItems [1] + "\ t" + valueItems [2] + "\ t" + valueItems [3] + "\ t" + valueItems [4])
CombineValues.setFlag (flag)
CombineValues.setJoinKey (joinKey)
CombineValues.setSecondPart (secondPart)
Context.write (combineValues.getJoinKey (), combineValues)
} / / the data comes from tb_user_profiles.dat, and the flag is "1"
Else if (pathName.endsWith ("tb_user_profiles.dat")) {
String [] valueItems = value.toString () .split ("\\ |")
/ / filter malformed records
If (valueItems.length! = 4) {
Return
}
Flag.set ("1")
JoinKey.set (valueItems [3])
SecondPart.set (valueItems [0] + "\ t" + valueItems [1] + "\ t" + valueItems [2])
CombineValues.setFlag (flag)
CombineValues.setJoinKey (joinKey)
CombineValues.setSecondPart (secondPart)
Context.write (combineValues.getJoinKey (), combineValues)
}
}
}
Public static class LeftOutJoinReducer extends Reducer {
/ / Store the left table information in a group
Private ArrayList leftTable = new ArrayList ()
/ / Store the right table information in a grouping
Private ArrayList rightTable = new ArrayList ()
Private Text secondPar = null
Private Text output = new Text ()
/ * *
* A group calls the reduce function once
, /
@ Override
Protected void reduce (Text key, Iterable value, Context context)
Throws IOException, InterruptedException {
LeftTable.clear ()
RightTable.clear ()
/ * *
* store the elements in the group separately according to the file
* problems to be paid attention to in this method:
* if there are too many elements in a group, it may lead to OOM in the reduce phase
* it is best to understand the distribution of data before dealing with distributed problems.
* appropriate processing methods can effectively prevent excessive tilting of OOM and data.
, /
For (CombineValues cv: value) {
SecondPar = new Text (cv.getSecondPart () .toString ())
/ / tb_dim_city in the left table
If ("0" .equals (cv.getFlag (). ToString (). Trim () {
LeftTable.add (secondPar)
}
/ / Table tb_user_profiles on the right
Else if ("1" .equals (cv.getFlag (). ToString (). Trim () {
RightTable.add (secondPar)
}
}
Logger.info ("tb_dim_city:" + leftTable.toString ())
Logger.info ("tb_user_profiles:" + rightTable.toString ())
For (Text leftPart: leftTable) {
For (Text rightPart: rightTable) {
Output.set (leftPart+ "\ t" + rightPart)
Context.write (key, output)
}
}
}
}
@ Override
Public int run (String [] args) throws Exception {
Configuration conf=getConf (); / / get the profile object
Job job=new Job (conf, "LeftOutJoinMR")
Job.setJarByClass (ReduceSideJoin_LeftOuterJoin.class)
FileInputFormat.addInputPath (job, new Path (args [0])); / / set the map input file path
FileOutputFormat.setOutputPath (job, new Path (args [1])); / / set the reduce output file path
Job.setMapperClass (LeftOutJoinMapper.class)
Job.setReducerClass (LeftOutJoinReducer.class)
Job.setInputFormatClass (TextInputFormat.class); / / sets the file input format
Job.setOutputFormatClass (TextOutputFormat.class); / / use the default output format
/ / set the output key and value types of map
Job.setMapOutputKeyClass (Text.class)
Job.setMapOutputValueClass (CombineValues.class)
/ / set the output key and value types of reduce
Job.setOutputKeyClass (Text.class)
Job.setOutputValueClass (Text.class)
Job.waitForCompletion (true)
Return job.isSuccessful ()? 0:1
}
Public static void main (String [] args) throws IOException
ClassNotFoundException, InterruptedException {
Try {
Int returnCode = ToolRunner.run (new ReduceSideJoin_LeftOuterJoin (), args)
System.exit (returnCode)
} catch (Exception e) {
/ / TODO Auto-generated catch block
Logger.error (e.getMessage ())
}
}
}
For the specific analysis and the output and input of the data, please see that the comments in the code have been written more clearly. Here we mainly analyze some of the shortcomings of reduce join. The reason why there is this way of reduce join, we can clearly see the original: because the overall data is segmented, each map task only processes part of the data and cannot get all the required join fields, so we need to talk about join key as a group on the reduce side to deal with all the same join key records, so reduce join came into being. The disadvantage of this method is obviously that it will cause a large amount of data transmission on the map and reduce side, that is, the shuffle phase, and the efficiency is very low.
This is the end of the sample analysis of a variety of join implementations of MapReduce. I hope the above content can be of some help and learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.