Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to customize mapreduce output to database by hadoop2.2.0

2025-03-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Today, I will talk to you about how hadoop2.2.0 customizes mapreduce output to the database. Many people may not know much about it. In order to make you understand better, the editor has summarized the following content for you. I hope you can get something according to this article.

Hadoop2.2.0 custom mapreduce output to the database:

Take the redis database as an example.

The example here is that I want to count the number of visits per hour of a day in the log file. The log format is:

2014-02-10 04:52:34 127.0.0.1 xxx

We know that when writing mapreduce job, we need to configure the input and output, and then write the mapper and reducer classes. The default output of hadoop is in the file of hdfs, for example:

Job.setOutputFormatClass (FileOutputFormat.class)

Now that we want to output the results of the task calculation to the database (redis), what do we do? You can inherit the FileOutputFormat class, customize your own class, and look at the code:

Public class LoginLogOutputFormat extends FileOutputFormat {/ * also focuses on customizing a RecordWriter class. For each record processed by reduce, we can output the record to the database * / protected static class RedisRecordWriter extends RecordWriter {private Jedis jedis / / client instance of redis public RedisRecordWriter (Jedis jedis) {this.jedis = jedis } @ Override public void write (K key, V value) throws IOException, InterruptedException {boolean nullKey = key = = null; boolean nullValue = value = = null If (nullKey | | nullValue) return; String [] sKey = key.toString () .split ("-"); String outKey = sKey [0] + "+ sKey [1] +"-"+ sKey [2] +" _ login_stat " / / zset key is yyyy-MM-dd_login_stat jedis.zadd (outKey.getBytes ("UTF-8"),-1, (sKey [3] + ":" + value) .getBytes ("UTF-8")) / / zadd, the format of the value is: moment: traffic} @ Override public void close (TaskAttemptContext context) throws IOException, InterruptedException {if (jedis! = null) jedis.disconnect () / / close link} @ Override public RecordWriter getRecordWriter (TaskAttemptContext job) throws IOException, InterruptedException {Jedis jedis = RedisClient.newJedis (); / / build a redis, where you can build database connection objects / / System.out.println ("build RedisRecordWriter") according to the actual situation. Return new RedisRecordWriter (jedis);}}

Here is the entire job implementation:

Public class LoginLogStatTask extends Configured implements Tool {public static class MyMapper extends Mapper {@ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {if (value = = null | | ".equals (value)) return / / parse value, such as: 2014-02-10 04:52:34 127.0.0.1 xxx String [] fields = value.toString () .split (""); String date = fields [0]; String time = fields [1] String hour = time.split (":") [0]; String outKey = date+ "-" + hour; context.write (new Text (outKey), new IntWritable (1)) }} public static class MyReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {int count = 0 While (values.iterator (). HasNext ()) {/ / Statistics quantity count + +; values.iterator () .next ();} context.write (key, new IntWritable (count)) } @ Override public int run (String [] args) throws Exception {Configuration conf = getConf (); List inputs = new ArrayList (); String inputPath = args [0] If (inputPath.endsWith ("/")) {/ / if it is a directory inputs.addAll (HdfsUtil.listFiles (inputPath, conf));} else {/ / if it is a file inputs.add (new Path (inputPath));} long ts = System.currentTimeMillis () String jobName = "login_logs_stat_job_" + ts; Job job = Job.getInstance (conf, jobName); job.setJarByClass (LoginLogStatTask.class); / / add the input file path for (Path p: inputs) {FileInputFormat.addInputPath (job, p) } / / set the output path Path out = new Path (jobName + ".out"); / / use jobName.out as the output FileOutputFormat.setOutputPath (job, out); / / set mapper job.setMapperClass (MyMapper.class) / / set reducer job.setReducerClass (MyReducer.class); / / set input format job.setInputFormatClass (TextInputFormat.class); / / set output format job.setOutputFormatClass (LoginLogOutputFormat.class) / / set output key type job.setOutputKeyClass (Text.class); / / set output value type job.setOutputValueClass (IntWritable.class); job.waitForCompletion (true); return job.isSuccessful ()? 0:1 } public static void main (String [] args) throws Exception {Configuration conf = new Configuration (); int res = ToolRunner.run (conf, new LoginLogStatTask (), args); System.exit (res);}

After running job, there is a corresponding key in the redis database:

After reading the above, do you have any further understanding of how hadoop2.2.0 customizes mapreduce output to the database? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report