Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the MapReduce programming model?

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "what is the MapReduce programming model". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn what the MapReduce programming model is.

MapReduce: simple data processing on large clusters

Abstract

MapReduce is not only a design model, but also a related implementation of processing and generating massive data. The user specifies an map function that processes a key-value pair to produce an intermediate result in the form of a set of key/value pairs, and a reduce function that merges key-value pairs with the same key as the intermediate result. Many real-world tasks can satisfy this model, as shown in this article.

Programs implemented using this functional form can be executed in parallel on a large number of ordinary machines. The system that runs the program is concerned with the following details: the partition of the input data, the execution of the scheduler on a set of machines, the handling of machine failures, and the management of communications within the machine as needed. This enables programmers who have no experience in parallel processing or distributed systems to take advantage of the resources of this large distributed system.

Our MapReduce implementation runs on a large cluster of ordinary machines and is highly scalable: a typical MapReduce computing processes a lot of TB data on thousands of machines. Programmers find the system easy to use: hundreds of MapReduce programs have been implemented, and more than a thousand MapReduce jobs are running on Google clusters every day.

I. introduction

In the past five years, the author and many Google programmers have implemented hundreds of special-purpose computing programs, dealing with huge amounts of raw data, including crawled documents, web request logs, etc., and calculate all kinds of derived data. Such as reverse indexing, various representations of the graphical structure of web documents, the total number of pages crawled under each host, the collection of the most frequent queries within a given date, and so on. Most of these calculations are conceptually clear. However, the input data is usually large, and the calculations must be distributed over hundreds or thousands of machines to ensure that they are completed in a reasonable time. The problems of how to calculate in parallel, distribute data, deal with errors and so on make this calculation very simple at first, but it becomes very complicated by adding a lot of code to deal with these problems.

In order to solve this complex problem, we design a new abstract model, which allows us to simply express the calculations we want to perform, while hiding troublesome details such as parallel computing, fault tolerance, data distribution and load balancing. Our abstract concept is inspired by map and reduce, which first appeared in lisp and other structural languages. We realize that most calculations involve performing a map operation on each logical record in the input data to obtain a set of intermediate key/value pairs, and then performing a reduce operation on all intermediate values containing the same key to properly merge the previously derived data. The functional model of user-specified map and reduce operations allows us to simply perform parallel massive computing and use re-execution as the main fault-tolerant mechanism.

The greatest contribution of this work is to provide a simple and powerful interface that enables us to automatically carry out parallel and distributed large-scale computing and merge by implementing high-performance interfaces on a large-scale cluster composed of ordinary PC.

The second chapter describes the basic programming model and gives several examples. The third chapter describes an implementation of MapReduce interface customized for our clustering computing environment. The fourth chapter describes several optimizations that we find useful for the program model. Chapter 6 explores the use of MapReduce within Google, including some of our experience in using it as the basis for rewriting production index systems. Chapter 7 discusses the related and future work.

Second, programming model

This calculation inputs a set of key/value pairs to produce a set of output key/value pairs. Users of the MapReduce library identify the calculation by two functions: Map and Reduce.

Map, written by the user, receives an input pair and produces a set of intermediate key/value pairs. The MapReduce library aggregates those with the same intermediate key I and sends them to the Reduce function.

Reduce, also written by the user, receives a collection of intermediate key I and the values of this key, combining these values to form a collection as small as possible. Typically, each Reduce call produces only 0 or 1 output value. These intermediate values are provided to the user through a reduce function provided by an iterator. This allows us to deal with linked lists of values that cannot be loaded into memory because the amount of data is too large.

2.1 exampl

Considering the number of occurrences of each word in a massive file set, the user will write a pseudo code similar to the following:

The Map function adds a corresponding number of occurrences to each word (only "1" in this case). The Reduce function adds up all the counts of a specified word.

In addition, the user writes code with the names of the input and output files and optional tuning parameters to populate an mapreduce specification object, then calls the MapReduce function and passes the object to it. The user's code is linked to the MapReduce library (C++ implementation). Appendix A contains the entire program for this example.

2.2 Typ

Although the input and output in string format were used in the previous pseudo code, conceptually, the user-defined map and reduce functions require associated types:

Map (K1, v1)-- > list (K2, v2)

Reduce (K2, list (v2))-- > list (v2)

That is, the input keys and values and the output keys and values come from different fields. In addition, the keys and values of the intermediate results have the same fields as the output keys and values.

MapReduce's C++ implementation and user-defined functions use string types to pass parameters, leaving the conversion work to the user's code.

2.3 more examples

Here are a few simple and interesting programs that can be easily expressed using MapReduce calculations.

Distributed string lookup (Distributed Grep): the map function finds lines that match a pattern. The Reduce function is an identity function that simply copies the intermediate value to the output.

URL access frequency count (Count of URL Access Frequency): the map function processes the log of the web page request and outputs it. The Reduce function adds up the values of the same URL to generate a pair.

Flip the page link diagram (Reverse Web-Link Graph): the map function outputs pairs for each link to the target (target) URL in a page called source. The Reduce function concatenates all the source URLs associated with a given target URL into a linked list and generates pairs of:.

Host key Vector Index (Term-Vector per Host): a search word vector will appear in a document or a set of documents and the most important words will be summarized as a linked list. The Map function produces one for each input document (hostname comes from the URL in the document). The Reduce function receives all the document retrieval word vectors for a given hostname, adds these vectors together, throws away the rare vectors, and generates a final pair.

Inverted index (Inverted Index): the map function parses each document and generates a sequence. The Reduce function receives all the key-value pairs of a given word, and all the output pairs form a simple inverted index. You can keep track of the location of words by making changes to the calculation.

Distributed sorting (Distributed Sort): the map function extracts the key of each record and generates a pair. The Reduce function does not change any key-value pairs. This calculation relies on the partitioning functionality mentioned in Section 4.1 and the sorting properties mentioned in Section 4.2.

III. Realization

There are many different implementations of the MapReduce interface, and appropriate choices need to be made according to the environment. For example, one implementation may be suitable for a small shared memory machine, while another implementation may be suitable for a large NUMA multiprocessor machine, and another may be suitable for a larger collection of network machines.

This chapter mainly describes an implementation of a computing environment that is widely used within Google: a cluster that connects a large number of ordinary PC together through switched Ethernet. In our environment:

(1) the machine is usually a dual-core x86 processor, runs the Linux operating system, and has 2-4G memory.

(2) using ordinary network hardware-usually 100Mb/s or 1Gb/s machine bandwidth, but the average is much less than half the bandwidth.

(3) A cluster of hundreds or thousands of machines, so machine failures are common.

(4) Storage is provided by cheap IDE disks installed directly on different machines. An internal distributed file system is used to manage and store data on these disks. File systems use replica mechanisms on unreliable hardware to provide availability and reliability.

(5) the user submits the work to a scheduling system, and each work consists of a task set, which is mapped to the set of available machines in the cluster by the scheduler.

3.1 Executive Overview

By automatically partitioning the input data into M shards, Map calls are assigned to run on multiple machines. Slicing of data can be processed in parallel on different machines. The key of the intermediate result is partitioned into R fragments using a partition function (for example, hash (key) mod R), and the Reduce call is also assigned to run on multiple machines. The number of partitions (R) and the partition function are specified by the user.

The counter values of independent working machines are periodically passed to master (attached to the ping response) master summarizes the counter values obtained from successful map and reduce tasks and returns them to the user's code when the MapReduce operation is complete. The current counter value is also displayed on the master status page, allowing people to see the progress of the current calculation. When summarizing counter values, master prevents repeated counting by removing the impact of multiple executions of the same map or reduce task. (repeated execution may occur when we use alternate tasks and re-execute failed tasks. )

Some counter values are automatically maintained by the MapReduce library, such as the number of input key/value pairs processed and the number of output key/value pairs that have been generated.

Users find counters useful for checking the behavior of MapReduce operations. For example, in some MapReduce operations, user code may want to ensure that the number of output pairs generated is exactly equal to the number of input pairs processed, or whether the number of documents processed in Germany is tolerated in the total number of documents processed.

V. performance

In this chapter, we test the performance of two MapReduce computations running on a large cluster. One calculation performs specific pattern matching in the data about 1TB, and the other sorts the data about 1TB.

These two programs can represent a large number of MapReduce programs written by users, one kind of program converts data from one representation to another; the other is to extract a small part of interesting data from the sea data set.

5.1 Cluster configuration

All the programs run on a cluster of nearly 1800 machines. Each machine has two 2GHz, hyper-threaded Intel Xeon processors, 4GB memory, two 160GB IDE disks, and an 1Gbps Ethernet link. These machines are deployed in a two-tier tree-switched network with approximately 100-200Gbps bandwidth at the root node. All machines have the same deployment, so the RTT between any two machines is less than 1ms.

In 4GB memory, there are nearly 1-1.5GB for other tasks running on the cluster. The program starts on a weekend afternoon, when the host's CPU, disk, and network are basically idle.

5.2 string lookup (Grep)

The grep program scans about 1010 100-byte records for a 3-character pattern with a relatively low probability of occurrence (this pattern appears in 92337 records). The input is divided into slices close to 64MB (MF15000), and the entire output is put into a file (Run1).

Figure 3: data transfer rate over time for different execution processes of the sorter

Figure 3 (a) shows the normal execution of the sorter. The figure at the top left shows the rate of input reads, which peaked at about 13GB/s because all map tasks were completed and the rate dropped to zero 200 seconds ago. Note that the input rate here is smaller than that of string lookups, because the sorter's map task takes about half of the processing time and IBO bandwidth to output the termination results to their local disk, and the corresponding intermediate output of string lookups is almost negligible.

The middle figure on the left shows the rate at which data is sent over the network from the map task to the reduce task. This slow data movement starts as soon as the first map task is completed. The first peak in the figure is the start of the first batch of about 1700 reduce tasks (the entire MapReduce is assigned to about 1700 machines, with each machine performing at most one reduce task at a time). After about 300 seconds of this calculation, some of the first reduce tasks are completed, and we begin to execute the remaining reduce tasks for data processing. All processing is completed about 600 seconds after the start of the calculation.

The figure at the bottom left shows the rate at which the reduce task likes to write the sorted data to the final output file. There is a delay between the completion of the first processing cycle and the start of the write cycle because the machine is busy sorting intermediate data. The rate of writes will continue on 2-4GB/s for some time. All write operations will be completed about 850 seconds after the calculation begins. Including the startup cost, the entire calculation takes 891 seconds, which is similar to the best record of 1057 seconds in TeraSort benchmark.

A few things to note: because of our location optimization strategy, most of the data is read from the local disk, bypassing the display of network bandwidth, so the input rate is higher than the processing rate and output rate. The processing rate is higher than the output rate because the output process writes the sorted data to two copies (we write the data to two copies for reliability and availability). We write the data to two copies because our underlying file system provides mechanisms for reliability and availability. If the underlying file system uses fault-tolerant encoding (erasure coding) instead of replication, the network bandwidth requirement for writing data is reduced.

5.4 role of standby tasks

In figure 3 (b), we show the execution of a sorter that disables alternate tasks. The process performed is similar to that shown in 3 (a), except for a very long tail, during which little write occurs. After 960 seconds, all but five reduce tasks are completed. However, these laggards did not complete their execution until 300 seconds later. The entire computing task took 1283 seconds, an increase of about 44% of the time.

5.5 Machine failure

In figure 3 (c), we show the execution of a sorter. A few minutes after the beginning of the calculation, we deliberately kill 200 of the 1746 worker processes. The underlying scheduler quickly restarts new working processes on these machines (because only the process is killed and the machine itself is working properly).

When the worker process dies, there will be a negative input rate because some previously completed map work disappears (because the map worker process in Hong Kong has been dropped by kill) and needs to be re-executed. This map task will be re-executed fairly quickly. The entire calculation process was completed in 933 seconds, including startup overhead (only 5% more time than normal).

VI. Experience

We completed the first version of the MapReduce library in February 2003 and made major improvements in August 2003, including location optimization, dynamic load balancing of tasks on the work machine, and so on. Since then, we have been pleasantly surprised to find that the MapReduce library can be widely used for a variety of problems in our work. It has been used in a wide range of areas within Google, including:

Large scale machine learning problem

Cluster issues of Google News and Froogle products

Product reports that extract data for public inquiry

Extract features from the web pages of a large number of new applications and products (for example, geolocation information from a large number of location query pages)

Large-scale graphic computing

Table 1: MapReduce tasks run in August 2004

At the end of each work, the MapReduce library counts the computing resources used by the work. In Table 1, we see some statistics about the MapReduce work running within Google in August 2004.

6.1 large-scale index

So far, one of the most important applications of MapReduce is to rewrite the production index system, which generates data structures for Google web search services. The input data of the index system is a large number of documents retrieved by our crawling system and stored as a collection of GFS files. The original contents of these files also contain more data than 20TB. The indexer is a sequence of 5-10 MapReduce operations. Using MapReduce, which replaces adhoc distributed processing in previous versions of the indexing system, has several advantages:

The indexer code is simple, short, and easy to understand because fault tolerance, distributed, and parallel processing are hidden in the MapReduce library. For example, the size of a computing program has been reduced from nearly 3800 lines of C++ code to about 700 lines of code using MapReduce.

The performance of the MapReduce library is so good that it is possible to separate conceptually unrelated calculations instead of mixing them together to avoid additional data processing. This makes the indexing program easy to change. For example, it may take months to make a change to the previous index system, while it only takes a few days to make a change to the new system.

Indexing programs become easier to operate because most of the problems caused by machine failures, slow machine processing and instant network congestion are automatically dealt with by the MapReduce library without human intervention.

VII. Related work

Many systems provide limited program models and limit the use of automatic parallel computing. For example, an associative function can use parallel prefixes on an array of N elements on N processors in logN time to get all prefixes [6, 9, 13]. MapReduce is considered to be the simplification and essence of these models based on our experience in large-scale work computing. More importantly, we provide a fault-tolerant implementation on thousands of processors. On the contrary, most parallel processing systems are only implemented on a small scale and hand over the details of machine failure to the program developer.

Bulk Synchronous Programming and some MPI stem from providing a higher level of abstraction that makes it easier for developers to write parallel programs. A key difference between these systems and MapReduce is that MapReduce develops a limited program model to automatically execute user programs in parallel and provides a transparent fault-tolerant mechanism.

Our location optimization mechanism is inspired by mobile disk technology, which calculates data that is close to the local disk and reduces the number of times data is transferred on the IWeiO subsystem or network. Our system runs on an ordinary machine with several disks mounted, rather than on a disk processor, but the general approach is similar.

Our standby task mechanism is similar to the eager scheduling mechanism used in the Charlotte system. A drawback of the simple Eager scheduling mechanism is that if a given task causes repeated failures, the entire calculation will end in failure. We have solved some of this problem by skipping the mechanism of damage calculation.

The MapReduce implementation relies on an internal cluster management system, which is responsible for distributing and running user tasks in a large collection of shared machines. Although it is not the focus of this article, the cluster management system is essentially similar to other systems like Condor.

The sorting function is part of the MapReduce library, similar to the operation in NOW-Sort. The source machine (map worker process) partitions the data to be sorted and sends it to one of the R Reduce worker processes. Each reduce worker process sorts the data locally (in memory if possible). Of course, NOW-Sort does not have the user-defined Map and Reduce functions that make the MapReduce library widely available.

River provides a programming model for processing processes to communicate by sending data on distributed queues. Like MapReduce, River systems try to provide better average performance even in the case of uneven hardware or system bumps. The River system balances the completion time through careful disk and network transfer scheduling. By limiting the programming model, the MapReduce framework can decompose the problem into many fine-grained tasks that are dynamically scheduled on available workers, so that the faster the worker processes, the more tasks. This limited programming model also allows us to schedule redundant tasks for processing when the work is about to end, which reduces the completion time in uneven situations.

BAD-FS has a completely different programming model from MapReduce, unlike MapReduce, which is used to perform work over a wide area network. However, they have two basic similarities. (1) both systems use the method of reexecution to deal with the data lost due to failure. (2) both systems are locally limited scheduling principles to reduce the number of data transmitted on the network link.

TASCC is a high-availability network service used to simplify the structure. Like MapReduce, it relies on re-execution as a fault-tolerant mechanism.

Thank you for reading, the above is the content of "what is the MapReduce programming model". After the study of this article, I believe you have a deeper understanding of what the MapReduce programming model is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report