In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly introduces "what are the basic principles of MapReduce". In daily operation, I believe many people have doubts about what the basic principles of MapReduce are. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful for you to answer the doubts about "what are the basic principles of MapReduce?" Next, please follow the editor to study!
1. Basic overview of MapReduce 1. Definition
Is a distributed computing program programming framework. The core function is to integrate the business logic code written by users and the default components into a complete distributed program, which runs concurrently on a hadoop cluster.
2. Advantages and disadvantages
(1) advantages
1 > easy to program: with the programming method of ordinary programs and using the interface provided by MapReduce, you can quickly complete the writing of distributed programs.
2 > good scalability: when computing resources are not satisfied, computing power can be expanded by simply adding computing machines.
3 > High fault tolerance: if the computing node where a task is located dies, the above computing task can be automatically transferred to another node for execution, that is, automatic failover. This process is completed internally without human intervention.
4 > suitable for offline processing of data above PB level
(2) shortcomings
1 > Real-time computing: the calculation result cannot be returned in milliseconds or seconds like mysql.
2 > streaming computing: the input data of streaming computing is dynamic, while MapReduce requires that the input data is static and has been persisted on storage.
3 > DAG (directed acyclic graph) calculation: multiple applications have dependencies, and the input of the latter application is the output of the previous one. In this case, the performance of MapReduce is very low. Because the output of each phase of MapReduce is written to disk first, a large number of disk IO will cause a sharp decline in performance.
3. The core idea of MapReduce
The core idea is divided into two stages: map and reduce.
1) first, the output data is sliced, and then each slice data is assigned to a separate map task task. Map carries out statistical processing of the data according to the business logic. Each map task does not affect each other.
2) then take the output of all the map task as the input of the reduce task (the number of reduce task is related to the partition, which will be discussed in more detail later), summarize the local statistics of each map task into global statistics, and finally complete the output of the result.
3) in the MapReduce programming model, there can be only one map and reduce phase, and multiple MapReduce programs can only be run serially, not in parallel.
Second, the basic architecture of MapReduce 1. The architecture of MapReduce1.x
Basic Overview:
1) when we finish writing MR jobs, we need to submit a job through JobClient, and the submitted information will be sent to the JobTracker module, which is one of the cores of the first generation of MapReduce computing framework. It is responsible for maintaining the heartbeat with other nodes in the cluster, allocating resources to submitted jobs, and managing the normal operation of submitted jobs (failure, restart, etc.).
2) another core function of the first generation MapReduce is TaskTracker. On each TaskTracker installation node, its main function is to monitor the resource usage of its own node.
3) TaskTracker monitors the operation of the Tasks of the current node, including Map Task and Reduce Task, and finally transfers the results to the file system of HDFS from Reduce Task to Reduce. The specific process is shown in the steps 1-7 described in the figure. During the monitoring period, TaskTracker needs to send this information to JobTracker,JobTracker through the heartbeat mechanism to collect this information, and then allocate other resources to the newly submitted job to avoid repeated resource allocation.
Disadvantages:
1) JobTracker is the entry point of the first generation MapReduce. If the JobTracker service goes down, the whole service will be paralyzed and there will be a single point of problem.
2) JobTracker is responsible for too many things, accomplishes too many tasks and takes up too many resources. When the number of Job is very large, it will consume a lot of memory and is prone to performance bottlenecks.
3) for TaskTracker, the role played by Task is too simple to take into account the usage of CPU and memory. If multiple Task with large memory are centrally scheduled, memory overflow is easy to occur.
4) in addition, TaskTracker forcibly divides resources into map task slot and reduce task slot. If only one of the MR tasks exists (map or reduce), resources will be wasted and resource utilization will be low. That is to say, resources are allocated statically.
2. The structure of MapReduce2.x
The biggest difference between V2 and V1 is the addition of yarn.
The basic idea of architecture refactoring is to separate the two core functions of JobTracker into separate components. The separated components are Resource Management (Applications Manager) and Task Scheduler (Resource Scheduler). The new resource manager (Resource Manager) manages the resource allocation of the whole system, and the App Master (Application Master) under each Node Manager is responsible for the corresponding scheduling and coordination (each MapReduce task has a corresponding app master). In practice, App Master obtains resources from Resource Manager and lets Node Manager work together and monitor tasks.
Compared with the monitoring of Task in MR V1, internal heat such as restart is handled by App Master, and Resource Manager provides central services and is responsible for resource allocation and scheduling. Node Manager is responsible for maintaining the status of the Container, reporting the collected information to the Resource Manager, and maintaining the heartbeat with Resource Manager.
Advantages:
1) reduce resource consumption and make monitoring each job more distributed.
2) after joining yarn, more programming models are supported, such as spark, etc.
3) it is more reasonable to describe resources in terms of the amount of memory than slot in V1, and resources are allocated dynamically.
4) Resource scheduling and allocation are more hierarchical. RM is responsible for the overall resource management and scheduling, and the appMaster on each node is responsible for the resource management and scheduling of the current node.
Third, the principle of MapReduce framework 1. Workflow
The above steps from steps 7 to 16 are called shuffle mechanisms
1) maptask collects the kv pairs output by our map () method and puts them in the memory buffer
2) keep overflowing local disk files from memory buffers, possibly overflowing multiple files
3) multiple overflow files will be merged into large overflow files
4) in the process of overflow and merge, partitioner is called to partition and sort against key.
5) reducetask goes to each maptask machine to get the corresponding result partition data according to its own partition number
6) reducetask will get the result files from different maptask of the same partition, and reducetask will merge (merge and sort) these files.
7) after merging into a large file, the process of shuffle ends, and then goes into the logical operation of reducetask (taking a key-value pair group from the file and calling the user-defined reduce () method)
2. Working mechanism of slicing
As you can see from the workflow of MapReduce, the number of maptask depends on the number of slices, so let's look at how slicing works.
(1) slicing code analysis
In the workflow of MapReduce, the data is sliced before map operation, and then each piece is handed over to an independent map task for processing. So how did map task get the sliced implementation class?
First, MapTask starts the map task with the run method as the entry point.
/ * MapTask.java*/public void run (JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException {/ / A lot of code is omitted here. Looking directly at this method, it is actually the compatible this.runNewMapper (job, this.splitMetaInfo, umbilical, reporter) of the old and new api;} else {this.runOldMapper (job, this.splitMetaInfo, umbilical, reporter) } this.done (umbilical, reporter);}} / / below is the runNewMapper method private void runNewMapper (JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException {. / / here we can see that the key to getting the implementation class of inputFormat lies in the object taskContext, whose class is TaskAttemptContextImpl InputFormat inputFormat = (InputFormat) ReflectionUtils.newInstance (taskContext.getInputFormatClass (), job) } / * TaskAttemptContextImpl.java inherits the JobContextImpl class JobContextImpl to implement the JobContext interface, which defines a number of set and get methods that are used to configure the * / public class JobContextImpl implements JobContext {public Class > getInputFormatClass () throws ClassNotFoundException of the job context object {/ / you can see here that inputformat.class is obtained from the conf object. The default value is TextInputFormat return this.conf.getClass ("mapreduce.job.inputformat.class", TextInputFormat.class);}}
As we can see, the class that handles input data by default is TextInputFormat, but this class does not implement the slicing method, which is implemented in its parent class, FileInputFormat:
/ * FileInputFormat.java*/ public List getSplits (JobContext job) throws IOException {StopWatch sw = (new StopWatch ()) .start (); long minSize = Math.max (this.getFormatMinSplitSize (), getMinSplitSize (job)); long maxSize = getMaxSplitSize (job); / / this is the array List splits = new ArrayList () that stores slice information / / get all files in the input path List files = this.listStatus (job); Iterator i$ = files.iterator (); while (true) {while (true) {while (i$.hasNext ()) {FileStatus file = (FileStatus) i$.next (); Path path = file.getPath () Long length = file.getLen (); if (length! = 0L) {BlockLocation [] blkLocations; if (file instanceof LocatedFileStatus) {/ / get file block information blkLocations = ((LocatedFileStatus) file) .getBlockLocations () } else {FileSystem fs = path.getFileSystem (job.getConfiguration ()); blkLocations = fs.getFileBlockLocations (file, 0L, length) } / / officially slice if (this.isSplitable (job, path)) {long blockSize = file.getBlockSize () from here on / / get slice size long splitSize = this.computeSplitSize (blockSize, minSize, maxSize); long bytesRemaining; int blkIndex / / cycle to slice the file, you can see that here is the for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining-= splitSize) {blkIndex = this.getBlockIndex (blkLocations, length-bytesRemaining) to determine whether the rest of the file is larger than 1.1 times the size of the slice. / / record the file, the start and end position of the slice, the size of the slice, the host where the block of the slice is located, etc., into the slice array as slice information. Splits.add (this.makeSplit (path, length-bytesRemaining, splitSize, blkLocations [blkIndex]. GetHosts (), blkLocations [blkIndex]. GetCachedHosts () } / / here is to add the last content of the file as the last slice to the slice planning if (bytesRemaining! = 0L) {blkIndex = this.getBlockIndex (blkLocations, length-bytesRemaining) Splits.add (this.makeSplit (path, length-bytesRemaining, bytesRemaining, blkLocations [blkIndex] .getHosts (), blkLocations [blkIndex]. GetCachedHosts ());}} else {splits.add (this.makeSplit (path, 0L, length, blkLocations [0] .getHosts (), blkLocations [0]. GetCachedHosts () }} else {splits.add (this.makeSplit (path, 0L, length, new String [0]));}} job.getConfiguration () .setLong ("mapreduce.input.fileinputformat.numinputfiles", (long) files.size ()); sw.stop () If (LOG.isDebugEnabled ()) {LOG.debug ("Total # of splits generated by getSplits:" + splits.size () + ", TimeTaken:" + sw.now (TimeUnit.MILLISECONDS));} return splits } / * this method determines the size of the slice. To put it simply, it mainly depends on the size of maxsize and blocksize. If maxsize > blockSize, then splitsize = blockSizemaxsize.
< blockSize, 则 splitsize = maxsizeminSize>BlockSize, then splitsize = minSizeminSize
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.