In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
Lab name: Datajoin data connection
Experimental purpose:
1. Record the process of my Hadoop experiment. I am a student of NCU HANG TIAN BAN. Complete runnable code will be attached. The framework in the program is a set of templates from Baidu, which is also available in the book, but the important algorithm is written by myself and will be marked. Http://blog.csdn.net/wawmg/article/details/8759076 this is the frame template that I refer to.
2. For an overview of the hint, you can see the bold part [1, 2, 3, 4]
Experimental requirements:
Task 1. Internal connection of multiple data sources
[data sample]
Enter:
Factory:
Factoryname addressID
Beijing Red Star 1
Shenzhen Thunder 3
Guangzhou Honda 2
Beijing Rising 1
Guangzhou Development Bank 2
Tencent 3
Bank of Beijing 1
Nanchang Univ 5
Address:
AddressID addressname
1 Beijing
2 Guangzhou
3 Shenzhen
4 Xian
Output:
FactorynameaddressIDaddressname
Bank of Beijing1Beijing
Beijing Red Star1Beijing
Beijing Rising1eijing
Guangzhou Development Bank2 Guangzhou
Guangzhou Honda2 Guangzhou
Shenzhen Thunder3 Shenzhen
Tencent3 Shenzhen
[code starts] [1]
/ / first, the copy of TaggedWritable class will not be changed.
Import java.io.DataInput
Import java.io.DataOutput
Import java.io.IOException
Import org.apache.hadoop.contrib.utils.join.TaggedMapOutput
Import org.apache.hadoop.io.Text
Import org.apache.hadoop.io.Writable
Import org.apache.hadoop.util.ReflectionUtils
/ * TaggedMapOutput is an abstract data type that encapsulates tags and records
Here, as the output value type of DataJoinMapperBase, you need to implement the Writable interface, so you need to implement two serialization methods
Custom input type * /
Public class TaggedWritable extends TaggedMapOutput {
Private Writable data
Public TaggedWritable () {
This.tag = new Text ()
}
Public TaggedWritable (Writable data) / / constructor
{
This.tag = new Text (); / / tag can be set through the setTag () method
This.data = data
}
@ Override
Public void readFields (DataInput in) throws IOException {
Tag.readFields (in)
String dataClz = in.readUTF ()
If (this.data = = null
|! this.data.getClass () .getName () .equals (dataClz)) {
Try {
This.data = (Writable) ReflectionUtils.newInstance (
Class.forName (dataClz), null)
} catch (ClassNotFoundException e) {
E.printStackTrace ()
}
}
Data.readFields (in)
}
@ Override
Public void write (DataOutput out) throws IOException {
Tag.write (out)
Out.writeUTF (this.data.getClass () .getName ())
Data.write (out)
}
@ Override
Public Writable getData () {
Return data
}
}
/ / http://blog.csdn.net/wawmg/article/details/8759076
[2] the Map phase algorithm is written by itself.
Import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase
Import org.apache.hadoop.contrib.utils.join.TaggedMapOutput
Import org.apache.hadoop.io.Text
Public class JoinMapper extends DataJoinMapperBase {
/ / this is called at the beginning of the task to generate the tag.
/ / use the file name as the label here
@ Override
Protected Text generateInputTag (String inputFile) {
System.out.println ("inputFile =" + inputFile)
Return new Text (inputFile)
}
/ / here we have determined that the delimiter is',', and more generally, the user should be able to specify the separator and group key.
/ / set group keys
@ Override
Protected Text generateGroupKey (TaggedMapOutput record) {
String tag = (Text) record.getTag (). ToString ()
If (tag.indexOf ("factory")! =-1) {
String line = (Text) record.getData (). ToString ()
String [] tokens = line.split ("")
Int len = tokens.length-1
Return new Text (tokens [len])
} else {
String line = (Text) record.getData (). ToString ()
String [] tokens = line.split ("")
Return new Text (tokens [0])
}
}
/ / return any TaggedWritable with any Text tag we want
@ Override
Protected TaggedMapOutput generateTaggedMapOutput (Object value) {
TaggedWritable retv = new TaggedWritable ((Text) value)
Retv.setTag (this.inputTag); / / Don't forget to label the current key
Return retv
}
}
[3] the reduce phase algorithm is also written by itself.
Import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase
Import org.apache.hadoop.contrib.utils.join.TaggedMapOutput
Import org.apache.hadoop.io.Text
Public class JoinReducer extends DataJoinReducerBase {
/ / the two parameter arrays must be the same size and equal to the number of data sources at most
@ Override
Protected TaggedMapOutput combine (Object [] tags, Object [] values) {
If (tags.length < 2) return null; / / this step to achieve internal connection
String joinedStr = ""
String dd = ""
For (int I = 0; I < values.length; iTunes +) {
/ / use a comma as the delimiter of the link between the original two data source records
TaggedWritable tw = (TaggedWritable) values [I]
String line = (Text) tw.getData (). ToString ()
/ / divide a record into two groups and remove the group key name of the first group.
If (I = = 0) {
String [] tokens = line.split ("")
Dd + = tokens [1]
}
If (I = = 1) {
JoinedStr + = line
System.out.println (joinedStr)
}
}
JoinedStr + = dd
TaggedWritable retv = new TaggedWritable (new Text (joinedStr))
Retv.setTag ((Text) tags [1]); / / the group key of this retv as the final output key.
Return retv
}
}
[4] the copy of the Driver driver class remains unchanged.
Import org.apache.hadoop.conf.Configuration
Import org.apache.hadoop.conf.Configured
Import org.apache.hadoop.fs.FileSystem
Import org.apache.hadoop.fs.Path
Import org.apache.hadoop.io.IntWritable
Import org.apache.hadoop.io.Text
Import org.apache.hadoop.mapred.FileInputFormat
Import org.apache.hadoop.mapred.FileOutputFormat
Import org.apache.hadoop.mapred.JobClient
Import org.apache.hadoop.mapred.JobConf
Import org.apache.hadoop.mapred.TextInputFormat
Import org.apache.hadoop.mapred.TextOutputFormat
Import org.apache.hadoop.util.Tool
Import org.apache.hadoop.util.ToolRunner
Public class DataJoinDriver extends Configured implements Tool {
Public int run (String [] args) throws Exception {
Configuration conf = getConf ()
Path in = new Path ("hdfs://localhost:9000/user/c/input/*.txt")
Path out = new Path ("hdfs://localhost:9000/user/c/output2")
JobConf job = new JobConf (conf, DataJoinDriver.class)
Job.setJobName ("DataJoin")
FileSystem hdfs = FileSystem.get (conf)
FileInputFormat.setInputPaths (job, in)
FileOutputFormat.setOutputPath (job, out)
Job.setMapperClass (JoinMapper.class)
Job.setReducerClass (JoinReducer.class)
Job.setInputFormat (TextInputFormat.class)
Job.setOutputFormat (TextOutputFormat.class)
Job.setOutputKeyClass (Text.class)
Job.setOutputValueClass (TaggedWritable.class)
JobClient.runJob (job)
Return 0
}
Public static void main (String [] args) throws Exception {
Int res = ToolRunner.run (new Configuration (), new DataJoinDriver ()
Args)
System.exit (res)
}
}
Finally: there is a slight problem with the output, but there is no sorting.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.