In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-06 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
I. Overview of MapReduce
Hadoop MapReduce is a distributed computing framework for writing batch applications. The written program can be submitted to the Hadoop cluster for parallel processing of large-scale data sets.
The MapReduce job splits the input data set into separate blocks, which are processed by map in parallel, and the framework sorts the output of map and then inputs it into reduce. The MapReduce framework is dedicated to key-value pair processing, which treats the input of a job as a set of pairs and generates a set of pairs as output. Both the output and the output key and value must implement the Writable interface.
(input)-> map->-> combine-> reduce-> (output) II. Brief introduction of MapReduce programming model
Here we take word frequency statistics as an example. The process of MapReduce processing is as follows:
Input: reading text files
Splitting: split the file by line, and the number of K1 lines obtained at this time. V1 represents the text content of the corresponding line.
Mapping: split each line into spaces in parallel, resulting in List (K2MagneV2), where K2 represents each word. Since word frequency statistics is done, the value of V2 is 1, which represents one occurrence. Shuffling: since Mapping operations may be processed in parallel on different machines, data with the same key value need to be distributed to the same node for merging through shuffling, so that the final result can be counted. In this case, K2 is each word, List (V2) is an iterative set, and V2 is V2 in Mapping. Reducing: the case here is to count the total number of occurrences of words, so Reducing does a reduction and sum operation on List (V2) and finally outputs it.
In the MapReduce programming model, both splitting and shuffing operations are implemented by the framework, and the only ones that need to be implemented by ourselves are mapping and reducing, which is the source of the name MapReduce.
3. Combiner & partitioner
3.1 InputFormat & RecordReaders
InputFormat splits the output file into multiple InputSplit, and RecordReaders converts the InputSplit into standard key-value pairs as the output of map. The significance of this step is that only after a logical split is performed and converted to a standard key-value pair format can multiple map be provided with input for parallel processing.
3.2 Combiner
Combiner is an optional operation after map operation. It is actually a localized reduce operation, which mainly does a simple operation of merging duplicate key values after map calculates the intermediate files. Here, take word frequency statistics as an example:
Map will record as 1 when it encounters a word of hadoop, but in this article, hadoop may appear more than n times, so there will be a lot of map output file redundancy, so do a merge operation on the same key before reduce calculation, then the amount of data that needs to be transferred will be reduced and the transmission efficiency can be improved.
However, not all scenarios are suitable to use combiner. The principle of using it is that the output of combiner will not affect the final input of reduce calculation. For example, you can use combiner when calculating the total, maximum and minimum values, but you cannot use combiner for average calculation.
Situations where combiner is not used:
When using combiner:
You can see that when using combiner, the data that needs to be transferred to reducer is reduced from 12keys to 10keys. The extent of the reduction depends on the repetition rate of your keys. The following case of word frequency statistics shows that you can reduce the transmission volume by hundreds of times with combiner.
3.3 Partitioner
Partitioner can be understood as a classifier, which divides the output of map to the corresponding reducer according to different key values, and supports custom implementation, which will be demonstrated in the following example.
IV. Project introduction of MapReduce word Frequency Statistics case 4.1
Here is a classic case of word frequency statistics: counting the number of occurrences of each word in the following sample data.
Spark HBaseHive Flink Storm HadoopHBase SparkFlinkHBase StormHBase Hadoop Hive FlinkHBase Flink Hive StormHive Flink HadoopHBase HiveHadoop Spark HBase StormHBase Hadoop Hive FlinkHBase Flink Hive StormHive Flink HadoopHBase Hive
For the convenience of everyone's development, I put a tool class WordCountDataUtils in the project source code, which is used to simulate the sample of word frequency statistics. The generated files support output to local or write directly to HDFS.
Project code download address: hadoop-word-count
4.2 Project dependence
To do MapReduce programming, you need to import hadoop-client dependencies:
Org.apache.hadoop hadoop-client ${hadoop.version} 4.3 WordCountMapper
Splits each row of data according to the specified delimiter. It is important to note that Hadoop-defined types must be used in MapReduce, because Hadoop predefined types are serializable and comparable, and all types implement the WritableComparable interface.
Public class WordCountMapper extends Mapper {@ Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {String [] words = value.toString (). Split ("\ t"); for (String word: words) {context.write (new Text (word), new IntWritable (1));}
WordCountMapper corresponds to the Mapping operation of the following figure:
WordCountMapper inherits from the Mappe class, which is a generic class defined as follows:
WordCountMapper extends Mapperpublic class Mapper {.} KEYIN: mapping input key type, that is, the offset of each line (the position of the first character of each line in the entire text), Long type, corresponding to the LongWritable type in Hadoop; VALUEIN: mapping input value type, that is, each line of data; String type, corresponding to the Text type in Hadoop; KEYOUT: the type of key output by mapping, that is, each word; String type, corresponding to Text type in Hadoop VALUEOUT:mapping outputs the type of value, that is, the number of times each word appears; here we use the int type, which corresponds to the IntWritable type. 4.4 WordCountReducer
Count the number of word occurrences in Reduce:
Public class WordCountReducer extends Reducer {@ Override protected void reduce (Text key, Iterable values, Context context) throws IOException, InterruptedException {int count = 0; for (IntWritable value: values) {count + = value.get ();} context.write (key, new IntWritable (count));}}
As shown in the following figure, the output of shuffling is the input of reduce. Here key is each word, and values is an iterating data type, similar to (1meme 1pm 1pm...).
4.4 WordCountApp
Assemble the MapReduce job and submit it to the server to run, as follows:
/ * assemble the job and submit it to the cluster to run * / public class WordCountApp {/ / here the hard code is used to visually display the parameters. In actual development, you can pass the parameters private static final String HDFS_URL = "hdfs://192.168.0.107:8020"; private static final String HADOOP_USER_NAME = "root" through external parameters. Public static void main (String [] args) throws Exception {/ / File input path and output path are specified by external parameters if (args.length)
< 2) { System.out.println("Input and output paths are necessary!"); return; } // 需要指明 hadoop 用户名,否则在 HDFS 上创建目录时可能会抛出权限不足的异常 System.setProperty("HADOOP_USER_NAME", HADOOP_USER_NAME); Configuration configuration = new Configuration(); // 指明 HDFS 的地址 configuration.set("fs.defaultFS", HDFS_URL); // 创建一个 Job Job job = Job.getInstance(configuration); // 设置运行的主类 job.setJarByClass(WordCountApp.class); // 设置 Mapper 和 Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置 Mapper 输出 key 和 value 的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置 Reducer 输出 key 和 value 的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 如果输出目录已经存在,则必须先删除,否则重复运行程序时会抛出异常 FileSystem fileSystem = FileSystem.get(new URI(HDFS_URL), configuration, HADOOP_USER_NAME); Path outputPath = new Path(args[1]); if (fileSystem.exists(outputPath)) { fileSystem.delete(outputPath, true); } // 设置作业输入文件和输出文件的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, outputPath); // 将作业提交到群集并等待它完成,参数设置为 true 代表打印显示对应的进度 boolean result = job.waitForCompletion(true); // 关闭之前创建的 fileSystem fileSystem.close(); // 根据作业结果,终止当前运行的 Java 虚拟机,退出程序 System.exit(result ? 0 : -1); }} 需要注意的是:如果不设置 Mapper 操作的输出类型,则程序默认它和 Reducer 操作输出的类型相同。 4.5 提交到服务器运行 在实际开发中,可以在本机配置 hadoop 开发环境,直接在 IDE 中启动进行测试。这里主要介绍一下打包提交到服务器运行。由于本项目没有使用除 Hadoop 外的第三方依赖,直接打包即可: # mvn clean package 使用以下命令提交作业: hadoop jar /usr/appjar/hadoop-word-count-1.0.jar \com.heibaiying.WordCountApp \/wordcount/input.txt /wordcount/output/WordCountApp 作业完成后查看 HDFS 上生成目录: # 查看目录hadoop fs -ls /wordcount/output/WordCountApp# 查看统计结果hadoop fs -cat /wordcount/output/WordCountApp/part-r-00000 五、词频统计案例进阶之Combiner5.1 代码实现 想要使用 combiner 功能只要在组装作业时,添加下面一行代码即可: // 设置 Combinerjob.setCombinerClass(WordCountReducer.class);5.2 执行结果 加入 combiner 后统计结果是不会有变化的,但是可以从打印的日志看出 combiner 的效果: 没有加入 combiner 的打印日志: 加入 combiner 后的打印日志如下:Here we have only one input file and less than 128m, so there is only one Map to process. You can see that after combiner, records is reduced from 3519 to 6 (there are only six types of words in the sample). In this use case, combiner can greatly reduce the amount of data that needs to be transferred.
Sixth, word frequency statistics case advanced Partitioner6.1 default Partitioner
Suppose there is a need to output the statistical results of different words to different files. This kind of demand is actually quite common, for example, when counting the sales of products, the results need to be split according to the type of product. To do this, you need to use a custom Partitioner.
Let's first introduce the default classification rules of MapReduce: when building a job, if not specified, the default is HashPartitioner: hash the key value and take the remainder of the numReduceTasks. The implementation is as follows:
Public class HashPartitioner extends Partitioner {public int getPartition (K key, V value, int numReduceTasks) {return (key.hashCode () & Integer.MAX_VALUE)% numReduceTasks;}} 6.2 Custom Partitioner
Here we inherit the Partitioner custom classification rules, which are classified by words:
Public class CustomPartitioner extends Partitioner {public int getPartition (Text text, IntWritable intWritable, int numPartitions) {return WordCountDataUtils.WORD_LIST.indexOf (text.toString ());}}
Specify to use our own classification rules when building job, and set the number of reduce:
/ / set custom partition rule job.setPartitionerClass (CustomPartitioner.class); / / set the number of reduce job.setNumReduceTasks (WordCountDataUtils.WORD_LIST.size ()); 6.3 execution result
The execution results are as follows: six files are generated, each of which is the statistical result of the corresponding words:
Reference: distributed computing framework MapReduceApache Hadoop 2.9.2 > MapReduce TutorialMapReduce-Combiners
For more articles in big data's series, please see the GitHub Open Source Project: big data's getting started Guide.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.