Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

DataJoin class implements reduce side connection of data in different formats

2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Lab name: Datajoin data connection

Experimental purpose:

1. Record the process of my Hadoop experiment. I am a student of NCU HANG TIAN BAN. Complete runnable code will be attached. The framework in the program is a set of templates from Baidu, which is also available in the book, but the important algorithm is written by myself and will be marked. Http://blog.csdn.net/wawmg/article/details/8759076 this is the frame template that I refer to.

2. For an overview of the hint, you can see the bold part [1, 2, 3, 4]

Experimental requirements:

Task 1. Internal connection of multiple data sources

[data sample]

Enter:

Factory:

Factoryname addressID

Beijing Red Star 1

Shenzhen Thunder 3

Guangzhou Honda 2

Beijing Rising 1

Guangzhou Development Bank 2

Tencent 3

Bank of Beijing 1

Nanchang Univ 5

Address:

AddressID addressname

1 Beijing

2 Guangzhou

3 Shenzhen

4 Xian

Output:

FactorynameaddressIDaddressname

Bank of Beijing1Beijing

Beijing Red Star1Beijing

Beijing Rising1eijing

Guangzhou Development Bank2 Guangzhou

Guangzhou Honda2 Guangzhou

Shenzhen Thunder3 Shenzhen

Tencent3 Shenzhen

[code starts] [1]

/ / first, the copy of TaggedWritable class will not be changed.

Import java.io.DataInput

Import java.io.DataOutput

Import java.io.IOException

Import org.apache.hadoop.contrib.utils.join.TaggedMapOutput

Import org.apache.hadoop.io.Text

Import org.apache.hadoop.io.Writable

Import org.apache.hadoop.util.ReflectionUtils

/ * TaggedMapOutput is an abstract data type that encapsulates tags and records

Here, as the output value type of DataJoinMapperBase, you need to implement the Writable interface, so you need to implement two serialization methods

Custom input type * /

Public class TaggedWritable extends TaggedMapOutput {

Private Writable data

Public TaggedWritable () {

This.tag = new Text ()

}

Public TaggedWritable (Writable data) / / constructor

{

This.tag = new Text (); / / tag can be set through the setTag () method

This.data = data

}

@ Override

Public void readFields (DataInput in) throws IOException {

Tag.readFields (in)

String dataClz = in.readUTF ()

If (this.data = = null

|! this.data.getClass () .getName () .equals (dataClz)) {

Try {

This.data = (Writable) ReflectionUtils.newInstance (

Class.forName (dataClz), null)

} catch (ClassNotFoundException e) {

E.printStackTrace ()

}

}

Data.readFields (in)

}

@ Override

Public void write (DataOutput out) throws IOException {

Tag.write (out)

Out.writeUTF (this.data.getClass () .getName ())

Data.write (out)

}

@ Override

Public Writable getData () {

Return data

}

}

/ / http://blog.csdn.net/wawmg/article/details/8759076

[2] the Map phase algorithm is written by itself.

Import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase

Import org.apache.hadoop.contrib.utils.join.TaggedMapOutput

Import org.apache.hadoop.io.Text

Public class JoinMapper extends DataJoinMapperBase {

/ / this is called at the beginning of the task to generate the tag.

/ / use the file name as the label here

@ Override

Protected Text generateInputTag (String inputFile) {

System.out.println ("inputFile =" + inputFile)

Return new Text (inputFile)

}

/ / here we have determined that the delimiter is',', and more generally, the user should be able to specify the separator and group key.

/ / set group keys

@ Override

Protected Text generateGroupKey (TaggedMapOutput record) {

String tag = (Text) record.getTag (). ToString ()

If (tag.indexOf ("factory")! =-1) {

String line = (Text) record.getData (). ToString ()

String [] tokens = line.split ("")

Int len = tokens.length-1

Return new Text (tokens [len])

} else {

String line = (Text) record.getData (). ToString ()

String [] tokens = line.split ("")

Return new Text (tokens [0])

}

}

/ / return any TaggedWritable with any Text tag we want

@ Override

Protected TaggedMapOutput generateTaggedMapOutput (Object value) {

TaggedWritable retv = new TaggedWritable ((Text) value)

Retv.setTag (this.inputTag); / / Don't forget to label the current key

Return retv

}

}

[3] the reduce phase algorithm is also written by itself.

Import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase

Import org.apache.hadoop.contrib.utils.join.TaggedMapOutput

Import org.apache.hadoop.io.Text

Public class JoinReducer extends DataJoinReducerBase {

/ / the two parameter arrays must be the same size and equal to the number of data sources at most

@ Override

Protected TaggedMapOutput combine (Object [] tags, Object [] values) {

If (tags.length < 2) return null; / / this step to achieve internal connection

String joinedStr = ""

String dd = ""

For (int I = 0; I < values.length; iTunes +) {

/ / use a comma as the delimiter of the link between the original two data source records

TaggedWritable tw = (TaggedWritable) values [I]

String line = (Text) tw.getData (). ToString ()

/ / divide a record into two groups and remove the group key name of the first group.

If (I = = 0) {

String [] tokens = line.split ("")

Dd + = tokens [1]

}

If (I = = 1) {

JoinedStr + = line

System.out.println (joinedStr)

}

}

JoinedStr + = dd

TaggedWritable retv = new TaggedWritable (new Text (joinedStr))

Retv.setTag ((Text) tags [1]); / / the group key of this retv as the final output key.

Return retv

}

}

[4] the copy of the Driver driver class remains unchanged.

Import org.apache.hadoop.conf.Configuration

Import org.apache.hadoop.conf.Configured

Import org.apache.hadoop.fs.FileSystem

Import org.apache.hadoop.fs.Path

Import org.apache.hadoop.io.IntWritable

Import org.apache.hadoop.io.Text

Import org.apache.hadoop.mapred.FileInputFormat

Import org.apache.hadoop.mapred.FileOutputFormat

Import org.apache.hadoop.mapred.JobClient

Import org.apache.hadoop.mapred.JobConf

Import org.apache.hadoop.mapred.TextInputFormat

Import org.apache.hadoop.mapred.TextOutputFormat

Import org.apache.hadoop.util.Tool

Import org.apache.hadoop.util.ToolRunner

Public class DataJoinDriver extends Configured implements Tool {

Public int run (String [] args) throws Exception {

Configuration conf = getConf ()

Path in = new Path ("hdfs://localhost:9000/user/c/input/*.txt")

Path out = new Path ("hdfs://localhost:9000/user/c/output2")

JobConf job = new JobConf (conf, DataJoinDriver.class)

Job.setJobName ("DataJoin")

FileSystem hdfs = FileSystem.get (conf)

FileInputFormat.setInputPaths (job, in)

FileOutputFormat.setOutputPath (job, out)

Job.setMapperClass (JoinMapper.class)

Job.setReducerClass (JoinReducer.class)

Job.setInputFormat (TextInputFormat.class)

Job.setOutputFormat (TextOutputFormat.class)

Job.setOutputKeyClass (Text.class)

Job.setOutputValueClass (TaggedWritable.class)

JobClient.runJob (job)

Return 0

}

Public static void main (String [] args) throws Exception {

Int res = ToolRunner.run (new Configuration (), new DataJoinDriver ()

Args)

System.exit (res)

}

}

Finally: there is a slight problem with the output, but there is no sorting.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report