Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to deal with simplified data on MapReduce large clusters

2025-04-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article focuses on "how to deal with simplified data on a large MapReduce cluster". Interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "how to deal with simplified data on a large MapReduce cluster".

Summary:

MapReduce is a programming model and a related implementation for processing and generating large data sets, which is suitable for a variety of real-world tasks. The user specifies the calculated map and reduce functions. The underlying operating system automatically parallelizes computing between large-scale cluster machines, handles machine failures, and schedules communication between machines to make full use of networks and disks. Programmers will find the system easy to use: in the past year, more than 10, 000 different MapReduce programs have been implemented within Google, and an average of 100, 000 MapReuce jobs are executed on the Google cluster every day, processing more than 20PB data every day.

1 introduction

Before MapReduce development, the author and many other Google employees implemented hundreds of special calculation methods for dealing with large amounts of raw data (such as crawled documents, Web request logs, etc.) to calculate various exported data, such as inverted indexes, various representations of Web document graph structures, a summary of the number of pages crawled by each host, and the most frequent set of queries on a given day. Most of these calculations are conceptually simple, but their input data is usually very large. In order to complete these calculations in a reasonable time, they must be distributed to hundreds of machines. The combination of how to parallelize computing, distribute data, and deal with failures often allows programmers to use a lot of complex code to deal with, masking the original simple calculation.

To deal with this complexity, we designed a new abstraction that allows us to express the simple calculations we are trying to perform, but hides messy details such as parallelization, fault tolerance, data distribution, and load balancing into the library. This abstraction is inspired by map and reduce primitives that appear in Lisp and many other functional languages. We implement most of the calculations, including applying a map operation to each logical record entered to calculate a set of intermediate key-value pairs, and then applying a reduce operation to all values that share the same key to properly combine the exported data. This functional model supports user-defined map and reduce operations, making it very easy for us to process large-scale computing in parallel and use reexecution as the main fault-tolerant mechanism.

The main contribution of this work is a simple and powerful interface, which completes automatic parallelization and large-scale distributed computing. Combined with an implementation of this interface, high performance has been achieved on a large commercial PC cluster. The programming model can also be used for parallel computing among multiple cores on the same machine.

Part 2 describes the basic programming model and gives several examples. Part 3 describes an implementation of the MapReduce interface specifically for cluster-based computing environments. Part 4 describes several useful improvements (refinements) to this programming model that we found. Part 5 describes the performance metrics for the implementation of various tasks. Part 6 explores the use of MapReduce in Google, including some experience using it as the basis for rewriting our production index system. Part 7 discusses the relevant and future work.

2 programming model

This calculation requires a set of input key / value pairs and a set of output key / value pairs. Users of the MapReduce library express computation as two functions: map and reduce.

Map, written by the user, requires a pair of inputs and generates a set of intermediate key / value pairs. The MapReduce library combines all the values associated with the same key value I and passes them to the reduce function.

The Reduce function, also written by the user, accepts the intermediate key I and a set of values for this key. It combines these values to form a set of values that may be smaller. Usually only 0 or 1 output values are generated per reduce call. The median value depends on a reduce function provided to the user by an iterator. This allows us to deal with a large list of values that are too large to be loaded into memory.

2.1 exampl

Consider the problem of counting the number of times each word appears in a large collection of documents. The user will write code similar to the following pseudo code:

Map (String key, String value); / / key: document name// value: document contentsfor each word w in value: EmitIntermediate (w, "1"); reduce (String key, String values); / / key: a word// values: a list of countsint result = 0 for each v in values: result + = ParseInt (v); Emit (AsString (result))

The map function emits each word plus an associated number of occurrences (count) (only 1 in this simple example). The reduce function sums all the numbers (count) sent to a word.

In addition, the user writes code to populate the input and output file names and optional tuning parameters into the mapreduce specification object. Then call the MapReduce function and pass it to the canonical object. The user's code is linked to the MapReduce library (implemented by C++). We have a complete program for this example in our initial MapReduce material [8].

2.2 Typ

Although the previous pseudocode is written in the form of input and output strings, conceptually there are related types of map and reduce functions provided by the user.

Map (K1, v1)-- > list (K2, v2) reduce (K2, list (v2))-- > list (v2)

That is, the input keys and values come from different fields from the output keys and values. In addition, the intermediate key and value come from the same domain as the output key and value.

3 realization

Many different implementations of the MapRedue interface are possible. The right choice depends on the environment. For example, one implementation may be suitable for a small machine with shared memory, another may be suitable for a large NUMA multiprocessor, and another may be suitable for a larger collection of networked computers. Since our initial article was published, many open source implementations of MapReduce have been developed [1,2], and the applicability of MapReduce in various problem areas has also been studied [7,16].

This section describes one of our MapReduce implementations, which aims at the computing environment that is widely used in Google: a large PC cluster connected by switched Gigabit Ethernet [4]. In this environment, machines are usually running Linux systems with dual-core x86 processors and 4-8GB memory. Individual machines have the network bandwidth of 1GB/s, but each machine has much less bandwidth than 1GB/s. A computing cluster contains thousands of machines, so machine failures are common. Storage is provided by cheap IDE disks attached directly to separate machines. A distributed file system developed internally by GFS,Google to manage data stored on these disks. File systems use replication to provide availability and reliability on unreliable hardware.

The user submits the jobs to the scheduling system. Each job contains a set of tasks and is mapped (mapped) by the scheduler to a set of available machines across the cluster.

3.1 Executive Overview

By automatically dividing the input data into a group with M splits, map calls are distributed across multiple machines. Input splitting can be processed in parallel by different machines. The reduce call distributes the intermediate bond space into R pieces by using a split function (for example, hash (key) mod R). The split number R and the split function are both specified by the user.

Figure 1 shows the overall flow of MapReduce operations in our implementation. When the user program calls the MapReduce function, the following sequential behavior occurs (the numbers marked in figure 1 correspond to the numbers in the column below).

The MapReduce library in the user program first divides the input file into M slices, usually each piece of 16~64MB (controlled by the user through optional parameters). Then start multiple copies of the program in the cluster.

One of the copies of these programs is a special copy of master. The other copies are the workers of the work assigned by master. M map tasks and R reduce tasks need to be allocated in the cluster. Master picks up idle workers and assigns a map task or reduce task to each worker.

A worker assigned to map task reads the contents of the corresponding input partition. It parses the key / value pairs from the input data and passes each pair to the user-defined map function. The intermediate key / value pairs generated by the map function are buffered in memory.

The key / value pairs of the buffer are periodically written to the local disk and are divided into R regions by the partition function. The locations of these buffer pairs on the local disk are passed to master, which will be responsible for forwarding these locations to reduce workers.

When an reduce worker is notified of these locations by master, it uses remote process calls to read buffered data from map workers's local disk. When reduce worker reads the intermediate data in all partitions, it sorts it by the intermediate key so that all the occurrences of the same key are grouped together. Sorting is necessary because usually many different keys are map to the same reduce task. If the intermediate data is too large to fit in memory, you also need to use an external sort.

Reduce worker iterates over the sorted intermediate data, passing that key and a corresponding set of intermediate values to the user's reduce function for each unique intermediate key. The output of the reduce function is appended to the final output file of the reduce partition.

When all the map tasks and reduce tasks are complete, master wakes up the user program. At this point, the MapReduce call of the user program is returned to the user code.

Upon successful completion, the output executed by mapreduce can be obtained in R output files (one for each reduce task, specified by the user). Typically, users do not need to merge the R output files into one file; they usually use these files as input to another MapReduce call, or use them in another distributed application that can handle input divided into multiple files.

3.2 master data structure

There are many data structures in master. For each map task and reduce task, it stores its status information (limit, proceed, or complete) and the identity of the worker machine (for non-idle tasks).

Master is the conduit through which map tasks propagates the location of the intermediate file area to reduce tasks. Therefore, for each completed map task,master, the location and size of the R intermediate file areas generated by this map task are stored. Master received updates of these location and size information after the map tasks call. This information will be gradually pushed to the workers where reduce tasks is in progress.

3.3 Fault tolerance

Because the MapReduce library is designed to help use hundreds of machines to process large amounts of data, it must gracefully tolerate machine failures.

Deal with worker failure

Master periodically ping each worker. If no response is received from a worker within a certain period of time, master marks the worker as a fault. Any map tasks completed by worker is reset to the initial idle state, so it can be scheduled in other workers. Similarly, any map task and reduce task in progress on the failed worker are reset to idle for rescheduling.

The completed map task on the failed worker needs to be reexecuted because their output is stored on the local disk of the failed machine, making it inaccessible. Completed reduce tasks does not need to be reexecuted because their output is stored in the global file system.

When a map task is executed first by worker An and then by worker B (because A has failed), all workers executing the reduce task will be notified to re-execute. Any reduce task that has not read data from worker A will read data from worker B.

MapReduce is adapted to large-scale worker failures. For example, in a MapReduce operation, network maintenance on a running cluster caused a group of 80 machines to be unreachable within minutes. MapReduce master simply redoes the work of the unreachable worker machine and moves on, finally completing the MapReduce operation.

Semantic failure

When the map and reduce operations provided by the user are specific functions of their input values, the output generated by our distributed implementation will be the same as that generated by the error-free sequential execution of the entire program.

We rely on the atomic commit of the output of the map and reduce tasks to achieve this property. Each in-progress task writes its output to a private temporary file. A reduce task generates one such file, and map task generates R such files (one for each reduce task). When a map task is complete, worker sends a message to master containing the names of R temporary files. If master receives a completion message from a completed map task, it ignores the message. Otherwise, it records the R file names in the master data structure.

When a reduce task is complete, reduce worker automatically renames its temporary output file to the final output file. If the same reduce task is executed on multiple machines, multiple rename calls to the same final output file will be executed. We rely on atomic renaming operations provided by the underlying file system to ensure that the final file system state contains only the data generated by an reduce task execution.

Most map and reduce operations are deterministic, and in fact, our semantics are equivalent to a sequential execution in this case, which makes it very easy for programmers to infer the behavior of the program. When map and / or reduce operations are uncertain, we provide weak but still reasonable semantics. When an uncertain operation exists, the output of a particular reduce task R1 is equivalent to the output generated by a sequential execution of R1 by a non-deterministic program. However, the output of a different reduce task R2 may execute the output of R2 in a different order of the non-deterministic program.

Consider map task M and reduce task R1 and R2. Make e (Ri) as the execution of R1 (this is indeed such an execution). Weak semantics occur because e (R1) may have read the output generated by one execution of M, and e (R2) may have read the output generated by another execution of M.

3.4 locality

In our computing environment, network bandwidth is a relatively scarce resource. We save network bandwidth by taking full advantage of the fact that the input data (managed by GFS [10]) is stored on the local disk of the machines that make up the cluster. GFS divides each file into blocks of 64MB and stores multiple copies of each block (usually 3) on different machines. MapReeuce master considers the location information of each input file and attempts to schedule a map task on a machine that contains the corresponding input data. If it fails, it will attempt to schedule a map task adjacent to a copy of the task's input (for example, two machines with the same data in the same network switch). When running large MapReduce operations on an important part of the workers of a cluster, most of the input data is read locally and does not consume network bandwidth.

3.5 Task granularity

We subdivide the map phase into M fragments and the reduce phase into R fragments, as mentioned earlier. Ideally, M and R should be much higher than the number of worker machines. Performing multiple different tasks per worker improves load balancing and speeds up recovery when one worker fails: multiple map tasks it completes can be distributed to all other worker machines for re-execution.

Because master has to make this scheduling decision and keep O (masking) states in memory, as mentioned earlier, there is a practical limit to the number of M and R in our implementation. (however, the amount of memory used is small. O (multiple) states contain about one byte of data for each map/reduce task pair. )

In addition, R is usually limited by the user, because the output of each reduce task is eventually saved in a separate output file. In practice, we tend to choose M so that each independent task has approximately 16MB-to-64MB input data (so the local optimization described earlier is the funniest), and we let R be a small multiple of the number of machines we want to use. We usually use 2000 worker machines to run MapReduce with 200, 000, 500, 000 worker machines.

3.6 backup Tasks

A common reason for prolonging the total MapReduce operation time is a straggler, that is, one machine in this calculation took an unusually long time to complete the last few map or reduce tasks. The stragglers will show up for a lot of reasons. For example, a machine with a bad disk may undergo frequent error correction, reducing read performance from 30MB/s to 1MB/s. The cluster scheduling system may schedule other tasks on this machine, causing it to execute MapReduce code more slowly due to competition for resources such as CPU, memory, local disk, or network bandwidth. A recent problem we experienced was that a bug in the machine initialization code caused the processor cache to fail: the affected machine was 100 times slower.

We have a general mechanism to alleviate the problem of stragglers. When a MapReduce operation is nearing completion, master will schedule backup execution of ongoing tasks. Whether the original or backup execution is completed, this task is marked as complete. We adjusted this mechanism so that it increased the use of computing resources for the calculation, but no more than a few percentage points. We find that it greatly reduces the time it takes to complete large MapReduce operations. As an example, the sorter described in part 5.3 took 44% more time to complete when there was no backup task mechanism.

4 improvement

Although the basic functionality provided by the simply written map and reduce functions is sufficient to meet most needs, we have found some useful extensions. This includes:

User-specified partition function to determine how to map intermediate key-value pairs to R reduce fragments

Sort guarantee: our implementation ensures that each of the R reduce partitions, the middle key-value pair, presses in ascending order.

The user-specified combiner function is used to locally combine intermediate values generated by the same key within the same map task to reduce the amount of intermediate data that must be transferred between networks.

Customize the input and output types in order to read the new input format and generate the new output format

A way to perform simple debug and small-scale testing on a single machine.

These items are discussed in detail in [8].

5 performance performance

In this section, we use two calculations on a large cluster to measure the performance of MapReduce. A calculation finds a specific pattern by searching for data about 1TB. Another calculation sorts the data about 1TB. These two programs represent a large subset of real programs written by MapReduce users-one class of the program is used to represent shuffle data from one representation (representation) to another, and the other class extracts a small portion of the data of interest from the big data set.

5.1 Cluster configuration

All programs are executed on a cluster of about 1800 machines. Each machine has two Intel Xeon processors with hyper-threaded 2GHz, 4GB memory, two IDE disks for 160GB, and Gigabit Ethernet access. These machines are arranged in a two-level tree switching network with approximately the aggregate bandwidth of 100~200Gbps at the root of the network. All machines are in the same managed facility, so the round-trip communication time between any pair of machines does not exceed 1 millisecond.

Although there is 4GB memory, about 1~1.5GB is reserved for other tasks running on the cluster. These programs are executed on a weekend afternoon, when CPUs, disk and network bandwidth are largely idle.

5.2 Grep

The grep program scans 10 ^ 10 100-byte records for a relatively rare three-character pattern string (which appears in about 92337 records). The input is divided into slices about the size of 64MB (MIMO 15000), and the entire output is placed in a file (Renew1).

Figure 2 shows the progress of the calculation over time. The Y axis shows the scanning rate of the input data. As more and more machines are assigned to MapReduce computing, the speed is gradually increasing, reaching a peak speed of more than 1764 workers. At the end of the map task, the rate declines from time to time and reaches 0 at about 80 seconds. The whole calculation took about 150 seconds from start to finish. This includes 1 minute startup consumption. This consumption comes from propagating programs to all workers machines, delaying interaction with GFS to open a set of 1000 input files, and obtaining information needed for local optimization.

At this point, I believe you have a deeper understanding of "how to deal with simplified data on a large MapReduce cluster". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report