In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-14 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
MapReduce concept
Is a programming model for parallel operations on large datasets (larger than 1TB). The concepts "Map" and "Reduce", and their main ideas, are borrowed from functional programming languages, as well as features borrowed from vector programming languages. It greatly facilitates programmers to run their programs on distributed systems without distributed parallel programming. The current software implementation is to specify a Map (mapping) function, which is used to map a set of key-value pairs into a new set of key-value pairs, and to specify concurrent Reduce (reduction) functions, which are used to ensure that each of the mapped key-value pairs shares the same key group.
MapReduce provides the following main functions:
1) data partitioning and computing task scheduling:
The system automatically divides a job (Job) big data into many data blocks, each data block corresponds to a computing task (Task), and automatically dispatches computing nodes to deal with the corresponding data blocks. The job and task scheduling function is mainly responsible for assigning and scheduling computing nodes (Map nodes or Reduce nodes), monitoring the execution status of these nodes, and being responsible for the synchronization control of Map nodes.
2) data / code mutual positioning:
In order to reduce data communication, a basic principle is localized data processing, that is, a computing node handles the data distributed on its local disk as much as possible, which realizes the migration of code to data. When this localized data processing is not possible, find other available nodes and transfer the data from the network to that node (data to code migration), but try to find the available nodes from the local rack where the data is located to reduce communication latency.
3) system optimization:
In order to reduce the data communication overhead, the intermediate result data will be merged before entering the Reduce node; the data processed by one Reduce node may come from multiple Map nodes, and in order to avoid data correlation in the Reduce computing phase, the intermediate results output by the Map node need to be properly partitioned using certain strategies to ensure that the correlation data is sent to the same Reduce node. In addition, the system also carries out some computing performance optimization, such as using multiple backups to execute the slowest computing tasks and selecting the fastest completion as the result.
4) error detection and recovery:
In a large-scale MapReduce computing cluster composed of low-end commercial servers, node hardware (host, disk, memory, etc.) errors and software errors are normal, so MapReduce needs to be able to detect and isolate error nodes and schedule and assign new nodes to take over the computing tasks of error nodes. At the same time, the system will also maintain the reliability of data storage, improve the reliability of data storage with multi-backup redundant storage mechanism, and can detect and recover erroneous data in time.
Test text:
Tom 20 8000nancy 22 8000ketty 22 9000stone 19 10000green 19 11000white 39 29000socrates 30 40000
In MapReduce, partition, sort, and group according to key
MapReduce sorts according to the key corresponding to the basic type, such as the LongWritable,Text type of IntWritable,long type of int type, which is sorted by default in ascending order
Why does customize the collation? For existing requirements, you need to customize the key type, and customize the sorting rules of key. For example, sort by person's salary descending order, if the same, then sort by age ascending order.
Take the Text type as an example:
The Text class implements the WritableComparable interface and has write (), readFields (), and compare () methods.
ReadFields () method: used to deserialize operations
Write () method: used to serialize operations
So if you want to customize the type to sort, you need to have the above method.
Custom class code:
Import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class Person implements WritableComparable {private String name; private int age; private int salary; public Person () {} public Person (String name, int age, int salary) {/ / super (); this.name = name; this.age = age; this.salary = salary } public String getName () {return name;} public void setName (String name) {this.name = name;} public int getAge () {return age;} public void setAge (int age) {this.age = age;} public int getSalary () {return salary;} public void setSalary (int salary) {this.salary = salary } @ Override public String toString () {return this.salary + "" + this.age + "" + this.name;} / / compare salary first, the highest sort comes first. If the same, age small before public int compareTo (Person o) {int compareResult1= this.salary-o.salt; if (compareResult1! = 0) {return-compareResult1;} else {return this.age-o.age;}} / / serialize NewKey into binary public void write (DataOutput dataOutput) throws IOException {dataOutput.writeUTF (name) using streaming DataOutput.writeInt (age); dataOutput.writeInt (salary);} / / use in to read fields in the same order as written in the write method. Public void readFields (DataInput dataInput) throws IOException {/ / read string this.name = dataInput.readUTF (); this.age = dataInput.readInt (); this.salary = dataInput.readInt ();}}
MapReuduce program:
Import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat Import java.io.IOException;import java.net.URI;public class SecondarySort {public static void main (String [] args) throws Exception {System.setProperty ("HADOOP_USER_NAME", "hadoop2.7"); Configuration configuration = new Configuration () / / set the local mapreduce program jar package configuration.set ("mapreduce.job.jar", "C:\ Users\\ tanglei1\\ IdeaProjects\\ Hadooptang\\ target\\ com.kaikeba.hadoop-1.0-SNAPSHOT.jar"); Job job = Job.getInstance (configuration, SecondarySort.class.getSimpleName ()); FileSystem fileSystem = FileSystem.get (args [1]), configuration) If (fileSystem.exists (new Path (args [1])) {fileSystem.delete (new Path (args [1]), true);} FileInputFormat.setInputPaths (job, new Path (args [0])); job.setMapperClass (MyMap.class); job.setMapOutputKeyClass (Person.class); job.setMapOutputValueClass (NullWritable.class); / / set the number of reduce job.setNumReduceTasks (1) Job.setReducerClass (MyReduce.class); job.setOutputKeyClass (Person.class); job.setOutputValueClass (NullWritable.class); FileOutputFormat.setOutputPath (job, new Path (args [1])); job.waitForCompletion (true) } public static class MyMap extends Mapper {/ / LongWritable: input parameter key type, Text: input parameter value type / / Persion: output parameter key type, NullWritable: output parameter value type @ Override / / map the output value is the key value pair NullWritable said that he is concerned about the value of V protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException {/ / LongWritable key: enter the key of the parameter key value pair, Text value: enter the value of the parameter key value pair / / to get a row of data Enter the key of the parameter (position from the first line), and Hadoop reads the text line by line when Hadoop reads the data / / fields: data representing a line of text String [] fields = value.toString () .split ("") / / A row of data in this column: nancy 22 8000 String name = fields [0]; / / string conversion to int int age = Integer.parseInt (fields [1]); int salary = Integer.parseInt (fields [2]); / / compare Person person = new Person (name, age, salary) in a custom class Context.write (person, NullWritable.get ();}} public static class MyReduce extends Reducer {@ Override protected void reduce (Person key, Iterable values, Context context) throws IOException, InterruptedException {context.write (key, NullWritable.get ());}
Running result:
40000 30 socrates29000 39 white11000 19 green10000 19 stone9000 22 ketty8000 20 tom8000 22 nancy
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 204
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.