Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to apply the pattern and algorithm of MapReduce

2025-01-17 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "how to apply the pattern and algorithm of MapReduce". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

All descriptive text and code uses the standard hadoop MapReduce model, including Mappers, Reduces, Combiners, Partitioners, and sorting. This is shown in the following figure.

Counting and summation of basic MapReduce patterns

Problem statement: there are many documents, each of which consists of fields. You need to calculate the number of occurrences of each field in all documents or any other statistics for these fields. For example, given a log file, each record contains a response time, and the average response time needs to be calculated.

Solution:

Let's start with a simple example. In the following code snippet, Mapper jots down the frequency of a specified word and then iterates through the set of these words one by one and then adds their frequency.

Class Mappermethod Map (docid id, doc d) for all term t in doc d doEmit (term t, count 1) class Reducermethod Reduce (term t, counts [C1, C2...]) sum = 0for all count c in [C1, C2...] Dosum = sum + cEmit (term t, count sum)

The drawback of this approach is obvious: Mapper submits too many meaningless counts. It is entirely possible to reduce the amount of data passed to Reducer by counting the words in each document:

Class Mappermethod Map (docid id, doc d) H = new AssociativeArrayfor all term t in doc d doH {t} = H {t} + 1for all term t in H doEmit (term t, count H {t})

If you want to accumulate and count not only the contents of a single document, but also all the documents processed by a Mapper node, you will use Combiner:

Class Mappermethod Map (docid id, doc d) for all term t in doc d doEmit (term t, count 1) class Combinermethod Combine (term t, [C1, C2...]) sum = 0for all count c in [C1, C2...] Dosum = sum + cEmit (term t, count sum) class Reducermethod Reduce (term t, counts [C1, C2...]) sum = 0for all count c in [C1, C2...] Dosum = sum + cEmit (term t, count sum)

Application:

Log analysis, data query

Sort out and classify

Problem statement:

There is a series of entries, each with several attributes, to save entries with the same attribute value in a file, or to group entries by attribute value. The most typical application is inverted indexing.

Solution:

The solution is simple. In Mapper, the desired attribute value of each entry is taken as the key, which itself is passed to the Reducer as a value. Reducer gets entries grouped by attribute value, which can then be processed or saved. If you are building an inverted index, each entry is equivalent to a word and the attribute value is the document ID in which the word is located.

Application:

Inverted index, ETL

Filtering (text lookup), parsing and checking

Problem statement:

Suppose there are many records, from which you need to find all the records that meet a certain condition, or transfer each record into another form (the conversion operation is independent of each record, that is, the operation on one record has nothing to do with other records). Such as text parsing, specific value extraction, format conversion, etc., all belong to the latter use case.

Solution:

It is very simple to operate one by one in Mapper to output the desired value or converted form.

Application:

Log analysis, data query, ETL, data validation

Distributed task execution

Problem statement:

Large-scale calculations can be divided into multiple parts and then merge the results of each calculation to get the final result.

Solution: split the data into multiple parts as input to each Mapper, each Mapper processes a piece of data, performs the same operation, and produces results, and Reducer combines the results of multiple Mapper into one.

Case study: digital Communication system Simulation

Digital communication simulation software such as WiMAX transmits large amounts of random data through system models, and then calculates the probability of errors in transmission. Each Mapper processes the data of sample 1max N, calculates the error rate of this part of the data, and then calculates the average error rate in Reducer.

Application:

Engineering simulation, digital analysis, performance testing

Sort

Problem statement:

There are many records, and you need to sort all the records according to some rule or process them in order.

Solution: simple sorting is easy-Mappers outputs the value of the property to be sorted as a key and the entire record as a value. But sorting in practice is a little more ingenious, which is why it is called the MapReduce core ("core" means sorting? Because the experiment to prove the computing power of Hadoop is big data sort? Or is it the process of sorting key in the process of Hadoop processing? ). In practice, key combinations are commonly used to achieve secondary sorting and grouping.

MapReduce can only sort keys at first, but there are techniques that can take advantage of the features of Hadoop to sort by value. If you want to know more, you can read this blog.

According to the concept of BigTable, it must be noted that it is more beneficial to use MapReduce to sort the initial data rather than the intermediate data, that is, to keep the data in an orderly state. In other words, sorting once at the time of data insertion is more efficient than sorting each time the data is queried.

Application:

ETL, data analysis

Non-basic MapReduce schema iterative message passing (figure processing)

Problem statement:

Suppose an entity network, and there is a relationship between entities. A state needs to be calculated based on the attributes of other entities adjacent to it. This state can be represented by the distance between it and other nodes, signs of the existence of neighbor points with specific attributes, neighborhood density characteristics, and so on.

Solution:

Network storage is a combination of a series of nodes, each containing a list of all its adjacency ID. According to this concept, MapReduce iterations take place, and each node in each iteration sends messages to its neighbors. The adjacency node updates its status according to the information received. When some conditions are satisfied, the iteration stops, such as reaching the maximum number of iterations (network radius) or two consecutive iterations with almost no state change. From a technical point of view, Mapper sends messages with the ID of each adjacent node as the key, and all the information is grouped according to the receiving node, so reducer can recalculate the state of each node and update those nodes whose state has changed. The algorithm is shown below:

Class Mappermethod Map (id n, object N) Emit (id n, object N) for all id m in N.OutgoingRelations doEmit (id m, message getMessage (N)) class Reducermethod Reduce (id m, [S1, S2...]) M = nullmessages = [] for all s in [S1, S2...] Doif IsObject (s) thenM = selse / / s is a messagemessages.add (s) M.State = calculateState (messages) Emit (id m, item M)

The status of a node can quickly spread across the network, and those infected nodes infect their neighbors, as shown in the following diagram:

Case study: validity transfer along the classification tree

Problem statement:

This problem comes from real e-commerce applications. Classify all kinds of goods, these categories can form a tree structure, and larger categories (such as men, women, children) can be subdivided into smaller categories (such as men's trousers or women's clothing) until they can no longer be divided (such as men's blue jeans). These basic categories that can no longer be divided can be valid (this category contains goods) or invalid (there are no goods that belong to this category). A classification is considered to be valid if it contains at least one valid subcategory. We need to find all the valid classifications on the classification tree when we know that some grass-roots classifications are valid.

Solution:

This problem can be solved using the framework mentioned in the previous section. We define methods called getMessage and calculateState below:

Class NState in {True = 2, False = 1, null = 0}, initialized 1 or 2 for end-of-line categories, 0 otherwisemethod getMessage (object N) return N.Statemethod calculateState (state s, data [D1, D2...]) return max ([D1, D2...])

Case study: breadth-first search

Problem statement: it is necessary to calculate the distance from one node to all other nodes in a graph structure.

Solution: the Source source node sends a signal with a value of 0 to all neighboring points, and the neighboring nodes forward the received signal to their own neighboring nodes, adding 1 to the signal value each time it is forwarded:

Class NState is distance,initialized 0 for source node, INFINITY for all other nodesmethod getMessage (N) return N.State + 1method calculateState (state s, data [D1, D2...]) min ([D1, D2...])

Case study: Web ranking and Mapper-side data aggregation

This algorithm, proposed by Google, uses the authoritative PageRank algorithm to calculate the correlation of a web page by connecting to other pages of a web page. The real algorithm is quite complex, but the core idea is that the weight can be propagated, that is, the weight of the node itself is calculated through the mean value of the weight of each connected node.

Class NState is PageRankmethod getMessage (object N) return N.State / N.OutgoingRelations.size () method calculateState (state s, data [D1, D2...]) return (sum ([D1, D2...])

It is important to point out that using a number as a score above is actually a simplification. In practice, we need to aggregate the value on the Mapper side to get this value. The following code snippet shows the changed logic (for the PageRank algorithm):

Class Mappermethod InitializeH = new AssociativeArraymethod Map (id n, object N) p = N.PageRank / N.OutgoingRelations.size () Emit (id n, object N) for all id m in N.OutgoingRelations doH {m} = H {m} + pmethod Closefor all id n in H doEmit (id n, value H {n}) class Reducermethod Reduce (id m, [S1, S2...]) M = nullp = 0for all s in [S1, S2...] Doif IsObject (s) thenM = selsep = p + sM.PageRank = pEmit (id m, item M)

Application:

Graphic analysis, web page index

Value de-duplicates (count unique items)

Problem statement: records contain range F and range G. it is necessary to count the number of different F values in records with the same G value (equivalent to grouping according to G).

This problem can be extended to faceted search (some e-commerce sites call it Narrow Search).

Record 1: Fatty 1, G = {a, b} Record 2: Fatty 2, G = {a, d, e} Record 3: Fatty 1, G = {b} Result:a-4: Fleet 3, G = {a, b} Result:a-> 3 / / Flex 1, Fatty 2, Flex 3b-> 2 / / Flex 1, Fang 3D-> 1 / Flex 2e-> 1 / Flex 2

Solution I:

The first method is to solve the problem in two stages. In the first stage, F and G are used in Mapper to form a compound value pair, and then each value pair is output in Reducer to ensure the uniqueness of F value. In the second stage, the value pairs are grouped according to G values to calculate the number of entries in each group.

The first stage:

Class Mappermethod Map (null, record [value f, categories [G1, G2...]) for all category g in [G1, G2...] Emit (record [g, f], count 1) class Reducermethod Reduce (record [g, f], counts [N1, N2,...]) Emit (record [g, f], null)

The second stage:

Class Mappermethod Map (record [f, g], null) Emit (value g, count 1) class Reducermethod Reduce (value g, counts [N1, N2...]) Emit (value g, sum ([N1, N2...]))

Solution II:

The second method needs only one MapReduce to implement, but it is not scalable. The algorithm is simple-Mapper outputs values and classifications, de-duplicates the category corresponding to each value in Reducer, then adds 1 to each category count, and finally adds all counts after the end of Reducer. This method is suitable for cases where there are only a limited number of classifications and there are not many records with the same F value. For example, network log processing and user classification, the total number of users is very large, but the events of each user are limited, and the categories obtained by this classification are also limited. It is worth mentioning that in this mode, you can use Combiner to remove the duplicate values of the classification before the data is transferred to Reducer.

Class Mappermethod Map (null, record [value f, categories [G1, G2...]) for all category g in [G1, G2 category...] Emit (value f, category g) class Reducermethod InitializeH = new AssociativeArray: category-> countmethod Reduce (value f, categories [G1, G2...]) [g1century, g2fujong..] = ExcludeDuplicates ([G1, g2jue..]) for all category g in [G1, g2' H {g} = H {g} + 1method Closefor all category g in H doEmit (category g, count H {g})

Application:

Log analysis, user count

Cross correlation

Problem statement: there are several groups each consisting of several items, and the number of times that items appear together in a group is calculated. If the number of items is N, then N should be calculated.

This is common in text analysis (entries are words and tuples are sentences) and market analysis (what else might be purchased by the customer who bought it). If the Numbn is small enough to hold the memory of a machine, it will be easier to implement.

Pairing method

The first method is to pair all the entries in Mapper and then add the counts of the same pair in Reducer. But this approach also has its drawbacks:

The benefits of using combiners are limited because it is likely that all item pairs are unique

Unable to use memory effectively

Class Mappermethod Map (null, items [i1, i2...]) for all item i in [i1, i2...] for all item j in [i1, i2...] Emit (pair [I j], count 1) class Reducermethod Reduce (pair [I j], counts [c 1, c 2...]) s = sum ([c 1, c 2...]) Emit (pair [I j], counts)

Stripes Approach? I don't know how to understand the name.

The second method is to group the data according to the first item in the pair and maintain an associative array in which the count of all associative items is stored. The second approach is to group data by the first item in pair and maintain an associative array ("stripe") where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.

The number of keys in the intermediate result is relatively small, thus reducing sorting consumption.

Combiners can be used effectively.

It can be executed in memory, but it can cause problems if it is not executed correctly.

It's complicated to implement.

Generally speaking, "stripes" is faster than "pairs"

Class Mappermethod Map (null, items [i1, i2...]) for all item i in [i1, i2...] H = new AssociativeArray: item-> counterfor all item j in [i1, i2...] H {j} = H {j} + 1Emit (item I, stripe H) class Reducermethod Reduce (item I, stripes [H1, H2...]) H = new AssociativeArray: item-> counterH = merge-sum ([H1, H2 ) for all item j in H.keys () Emit (pair [I j], H {j})

Application:

Text analysis, market analysis

References:

Lin J. Dyer C. Hirst G.Data Intensive Processing MapReduce

Using MapReduce to express relational schema

In this section we will discuss how to use MapReduce for major relational operations.

Filter (Selection) class Mappermethod Map (rowkey key, tuple t) if t satisfies the predicateEmit (tuple t, null) projection (Projection)

Projection is only slightly more complex than filtering, in which case we can use Reducer to eliminate possible duplicate values.

Class Mappermethod Map (rowkey key, tuple t) tuple g = project (t) / / extract required fields to tuple gEmit (tuple g, null) class Reducermethod Reduce (tuple t, array n) / / n is an array of nullsEmit (tuple t, null) merge (Union)

All records in both datasets are fed to Mapper and duplicated in Reducer.

Class Mappermethod Map (rowkey key, tuple t) Emit (tuple t, null) class Reducermethod Reduce (tuple t, array n) / / n is an array of one or two nullsEmit (tuple t, null) intersection (Intersection)

Input the records that need to be crossed in the two datasets into Mapper,Reducer and output the records that appear twice. This is possible because each record has a primary key that appears only once in each dataset.

Class Mappermethod Map (rowkey key, tuple t) Emit (tuple t, null) class Reducermethod Reduce (tuple t, array n) / / n is an array of one or two nullsif n.size () = 2Emit (tuple t, null) difference (Difference)

Suppose there are two datasets R and S, and we need to find out the difference between R and S. Mapper marks all the tuples to indicate whether they come from R or S and output only those records that exist in R but not in S.

Class Mappermethod Map (rowkey key, tuple t) Emit (tuple t, string t.SetName) / / t.SetName is either 'R'or 'S'class Reducermethod Reduce (tuple t, array n) / / array n can be [' R'], ['S'], ['R'S'], or [' slots,'R'] if n.size () = 1 and n [1] = 'R'Emit (tuple t, null) packet aggregation (GroupBy and Aggregation)

Group aggregation can be done in one of the following MapReduce. The data is extracted by Mapper and grouped and aggregated, and the received data is reaggregated in Reducer. Typical aggregation applications such as summation and maximum values can be calculated as streams, so it is not necessary to keep all values at the same time. But other scenarios require two-phase MapReduce, and the unique value pattern mentioned earlier is an example of this type.

Class Mappermethod Map (null, tuple [value GroupBy, value AggregateBy, value...]) Emit (value GroupBy, value AggregateBy) class Reducermethod Reduce (value GroupBy, [v1, v2 class Reducermethod Reduce.]) Emit (value GroupBy, aggregate ([v1, v2...])) / / aggregate (): sum (), max (),... Connect (Joining)

The MapperReduce framework can handle connections well, but there are still some tricks when faced with different data volumes and processing efficiency requirements. In this section, we will introduce some basic methods, and some feature articles on this area are listed in the following reference documents.

Post-allocation connections (Reduce-end connections, sort-merge connections)

This algorithm connects datasets R and L by key K. Mapper traverses all tuples in R and L, using K as the key to output each tuple marked from R or L. Reducer divides the data of the same K into two containers (R and L), and then nested loops traverse the data in the two containers to get the intersection. Each result output contains the data in R, the data in L and K. This approach has the following disadvantages:

Mapper outputs all the data, even if some key appears in only one collection.

Reducer should keep all the data of a key in memory, and if the amount of data exceeds memory, it should be cached to the hard disk, which increases the consumption of hard disk IO.

However, redistribution of connections is still the most common method, especially when other optimization techniques are not applicable.

Class Mappermethod Map (null, tuple [join_key k, value v1, value v2 in.]) Emit (join_key k, tagged_tuple [set_name tag, values [v1, v2...]) class Reducermethod Reduce (join_key k, tagged_tuples [T1, T2 in...]) H = new AssociativeArray: set_name-> valuesfor all tagged_tuple t in [T1, T2 .] / / separate values into 2 arraysH {t.tag} .add (t.values) for all values r in H {'R'} / / produce a cross-join of the two arraysfor all values l in H {'L'} Emit (null, [k r l])

Copy link Replicated Join (Mapper side connection, Hash connection)

In practical applications, it is common to connect a small dataset to a big data set (such as users and logging). Suppose you want to join two sets R and L, where R is relatively small, so that R can be distributed to all Mapper, and each Mapper can load it and index the data in it with join keys. The most common and efficient indexing technique is hash tables. After that, Mapper iterates through L and joins it with the corresponding record in R stored in the hash table. This approach is very efficient because there is no need to sort the data in L or to transmit the data in L over the network, but the R must be small enough to be distributed to all Mapper.

Class Mappermethod InitializeH = new AssociativeArray: join_key-> tuple from RR = loadR () for all [join_key k, tuple [R1, R2...]] in RH {k} = H {k} .append ([R1, R2 tuple...]) method Map (join_key k, tuple l) for all tuple r in H {k} Emit (null, tuple [k r l]) the content of "how to apply MapReduce patterns and algorithms" ends here. Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report