Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

MongoDB aggregation Pipeline (Aggregation Pipeline)

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

Pipeline concept

One of the most important ways of using POSIX multithreading is pipelining (also known as "pipelining"), in which the "data element" stream is executed sequentially by a set of threads. Its usage architecture can be seen in the following figure:

According to the object-oriented idea, the whole pipeline can be understood as a pipeline of data transmission; each worker thread in the pipeline can be understood as a stage of a working stage of the entire pipeline, and the cooperation between these worker threads is interlinked. The worker thread closer to the input port is the earlier work stage stage, and its work result will affect the work result of the next worker thread stage (stage), that is, the next stage depends on the output of the previous stage, and the output of the previous stage becomes the input of this stage. This is also a common feature of pipeline!

In response to users' needs for simple data access, the MongoDB2.2 version introduces a new functional aggregation framework (Aggregation Framework), which is a new framework for data aggregation, and its concept is similar to that of a data processing pipeline. Each document passes through a pipeline composed of multiple nodes, and each node has its own special function (grouping, filtering, etc.). After the document is processed by the pipeline, the corresponding results are output. Pipes have two basic functions:

The first is to "filter" the documents, that is, to filter out the documents that meet the criteria.

The second is to "transform" the document, that is, to change the output form of the document.

Other features include grouping and sorting by a specified field. And at each stage, you can also use the expression operator to calculate related operations such as averaging and concatenating strings. Pipes provide an alternative to MapReduce, MapReduce usage is relatively complex, while pipes have fixed interfaces (operator representations), which are relatively easy to use, and are generally the preferred method for most aggregation tasks.

The framework uses declarative pipe symbols to support functions similar to SQL Group By operations, eliminating the need for users to write custom JavaScript routines.

Most plumbing operations start with "$match" after the "aggregate" clause. Together, they are similar to the from and where clauses of SQL, or the find function of MongoDB. The "$project" clause also looks very similar to a concept in SQL or MongoDB (unlike SQL, it is at the end of the expression).

The operations described next are unique in the MongoDB aggregation framework. Unlike most relational databases, MongoDB inherently stores arrays in rows / documents. Although this feature is convenient for all-nothing data access, it is quite complex for work that requires a combination of projection, grouping, and filtering operations to write reports. The "$unwind" clause decomposes the array into individual elements and returns with the rest of the document.

The "$group" operation has the same purpose as the Group By clause of SQL, but uses it more like a grouping operator in LINQ. Instead of fetching a row of flat data, the result set of the "$group" operation presents a continuous nested structure. For this reason, you can use "$group" to return aggregate information, such as calculating the number and average of the whole or part of the document for the actual document in each group.

Pipe operator

Pipes are made up of functional nodes, which are represented by pipe operators. The aggregation pipeline starts with all the documents in a collection, and then these documents flow from one action node to the next, and each action node acts on the document accordingly. These actions may create new documents or filter out documents that do not meet the criteria, and the documents can be repeated in the pipeline.

Let's take a look at an example of pipeline aggregation:

Types of pipe operators:

Name

Description

$project

Reshapes a document stream. $project can rename, add, or remove fields as well as create computed values and sub-documents.

$match

Filters the document stream, and only allows matching documents to pass into the next pipeline stage.$match uses standard MongoDB queries.

$limit

Restricts the number of documents in an aggregation pipeline.

$skip

Skips over a specified number of documents from the pipeline and returns the rest.

$unwind

Takes an array of documents and returns them as a stream of documents.

$group

Groups documents together for the purpose of calculating aggregate values based on a collection of documents.

$sort

Takes all input documents and returns them in a stream of sorted documents.

$geoNear

Returns an ordered stream of documents based on proximity to a geospatial point.

Detailed instructions for the use of pipe operators

1. $project: data projection, mainly used to rename, add, and delete fields

For example:

Db.article.aggregate (

{$project: {

Title: 1

Author: 1

}}

);

In this way, there are only _ id,tilte and author fields in the result. By default, the _ id field is included, and you can do this if you want not to include _ id:

Db.article.aggregate (

{$project: {

_ id: 0

Title: 1

Author: 1

})

You can also use arithmetic type expression operators within $project, for example:

Db.article.aggregate (

{$project: {

Title: 1

DoctoredPageViews: {$add: ["$pageViews", 10]}

})

Add 10 to the value of the pageViews field by using $add, and then assign the result to a new field: doctoredPageViews

Note: the $add evaluation expression must be placed in square brackets

In addition, you can rename the field name and the field name of the subdocument using $project:

Db.article.aggregate (

{$project: {

Title: 1

Page_views: "$pageViews"

Bar: "$other.foo"

})

You can also add subdocuments:

Db.article.aggregate (

{$project: {

Title: 1

Stats: {

Pv: "$pageViews"

Foo: "$other.foo"

Dpv: {$add: ["$pageViews", 10]}

}

})

A subdocument stats is generated, which contains three fields of pv,foo,dpv.

2.$match: filter operation, filter eligible documents as input for the next stage

The syntax of $match is the same as that of the query expression (db.collection.find ())

Db.articles.aggregate ([

{$match: {score: {$gt: 70, $lte: 90}

{$group: {_ id: null, count: {$sum: 1}

])

$match is used to get records with scores greater than 70, less than or equal to 90, and then send the eligible records to the next stage $group pipeline operator for processing.

Note: 1. You cannot use the $where expression operator in the $match operator.

The 2.$match appears in front of the pipeline as much as possible so that documents can be filtered early and aggregation can be accelerated.

3. If $match comes first, you can use the index to speed up the query.

3. $limit: limit the number of documents that pass through the pipeline

The argument to $limit can only be a positive integer

Db.article.aggregate (

{$limit: 5})

In this way, after being processed by the $limit pipeline operator, there are only the first five documents left in the pipeline.

4. $skip: the number of documents skipped from the beginning of the collection to be operated

The $skip parameter can only be a positive integer.

Db.article.aggregate (

{$skip: 5})

After being processed by the $skip pipeline operator, the first five documents are filtered out

5.$unwind: split array elements into separate fields

For example, there is an array field named tags in the article document:

> db.article.find ()

{"_ id": ObjectId ("528751b0e7f3eea3d1412ce2")

"author": "Jone", "title": "Abook"

"tags": ["good", "fun", "good"]}

After using the $unwind operator:

> db.article.aggregate ({$project: {author:1,title:1,tags:1}}, {$unwind: "$tags"})

{

"result": [

{

"_ id": ObjectId ("528751b0e7f3eea3d1412ce2")

"author": "Jone"

"title": "A book"

"tags": "good"

}

{

"_ id": ObjectId ("528751b0e7f3eea3d1412ce2")

"author": "Jone"

"title": "A book"

"tags": "fun"

}

{

"_ id": ObjectId ("528751b0e7f3eea3d1412ce2")

"author": "Jone"

"title": "A book"

"tags": "good"

}

]

"ok": 1

}

Note: a. {$unwind: "$tags"}) Don't forget the $symbol

b. If the $unwind target field does not exist, the document will be ignored and filtered out, for example:

> db.article.aggregate ({$project: {author:1,title:1,tags:1}}, {$unwind: "$tag"})

{"result": [], "ok": 1}

Change $tags to $tag because the field does not exist, the document is ignored and the output is empty

c. If the $unwind target field is not an array, an error will be generated, such as:

> db.article.aggregate ({$project: {author:1,title:1,tags:1}}, {$unwind: "$title"})

Error: Printing Stack Trace

At printStackTrace (src/mongo/shell/utils.js:37:15)

At DBCollection.aggregate (src/mongo/shell/collection.js:897:9)

At (shell): 1:12

Sat Nov 16 19:16:54.488 JavaScript execution failed: aggregate failed: {

"errmsg": "exception: $unwind: value at end of field path must be an array"

"code": 15978

"ok": 0

} at src/mongo/shell/collection.js:L898

d. If the array of $unwind target fields is empty, the document will also be ignored.

6.$group groups data

You must specify a _ id field for $group, and you can also include some arithmetic type expression operators:

Db.article.aggregate (

{$group: {

_ id: "$author"

DocsPerAuthor: {$sum: 1}

ViewsPerAuthor: {$sum: "$pageViews"}

})

Note: the output of 1.$group is unordered.

2.$group operations are currently done in memory, so you cannot use it to group a large number of documents.

7.$sort: sorts documents by specified fields

The mode of use is as follows:

Db.users.aggregate ({$sort: {age:-1, posts: 1})

Operate in descending order according to age and in ascending order according to posts

Note: 1. If you put $sort in front of the pipe, you can use the index to improve efficiency.

2.MongoDB 24. Memory is optimized. If $sort appears before $limit in the pipeline, $sort will only operate on the first $limit documents, so only the first $limit documents will be retained in memory, thus greatly saving memory.

The 3.$sort operation is performed in memory, and the program will generate an error if it occupies more than 10% of the physical memory.

8.$goNear

$goNear returns coordinate values sorted from near to far from the specified point

The specific usage parameters are shown in the following table:

Field

Type

Description

Near

GeoJSON point orlegacy coordinate pairs

The point for which to find the closest documents.

DistanceField

String

The output field that contains the calculated distance. To specify a field within a subdocument, use dot notation.

Limit

Number

Optional. The maximum number of documents to return. The default value is 100. See also the num option.

Num

Number

Optional. The num option provides the same function as the limitoption. Both define the maximum number of documents to return. If both options are included, the num value overrides the limit value.

MaxDistance

Number

Optional. A distance from the center point. Specify the distance in radians. MongoDB limits the results to those documents that fall within the specified distance from the center point.

Query

Document

Optional. Limits the results to the documents that match the query. The query syntax is the usual MongoDB read operation query syntax.

Spherical

Boolean

Optional. If true, MongoDB references points using a spherical surface. The default value is false.

DistanceMultiplier

Number

Optional. The factor to multiply all distances returned by the query. For example, use the distanceMultiplier to convert radians, as returned by a spherical query, to kilometers by multiplying by the radius of the Earth.

IncludeLocs

String

Optional. This specifies the output field that identifies the location used to calculate the distance. This option is useful when a location field contains multiple locations. To specify a field within a subdocument, usedot notation.

UniqueDocs

Boolean

Optional. If this value is true, the query returns a matching document once, even if more than one of the document's location fields match the query. If this value is false, the query returns a document multiple times if the document has multiple matching location fields. See $uniqueDocsfor more information.

For example:

Db.places.aggregate ([

{

$geoNear: {

Near: [40.724,-73.997]

DistanceField: "dist.calculated"

MaxDistance: 0.008

Query: {type: "public"}

IncludeLocs: "dist.location"

UniqueDocs: true

Num: 5

}

}

])

The result is:

{

"result": [

{"_ id": 7

"name": "Washington Square"

"type": "public"

"location": [

[40.731,-73.999]

[40.732,-73.998]

[40.730,-73.995]

[40.729,-73.996]

]

"dist": {

"calculated": 0.0050990195135962296

"location": [40.729,-73.996]

}

}

{"_ id": 8

"name": "Sara D. Roosevelt Park"

"type": "public"

"location": [

[40.723,-73.991]

[40.723,-73.990]

[40.715,-73.994]

[40.715,-73.994]

]

"dist": {

"calculated": 0.006082762530298062

"location": [40.723,-73.991]

}

}

]

"ok": 1}

Among them, dist.calculated contains the calculated results, while dist.location contains the coordinates actually used to calculate the distance.

Note: 1. Using $goNear can only be done at the first stage of pipeline processing

two。 You must specify distanceField, which is used to determine whether to include a distance field

The 3.$gonNear and geoNear commands are similar, but there are some differences: distanceField is required in $geoNear and optional in geoNear; includeLocs is of type string in $geoNear and boolen in geoNear.

Pipe expression

The pipe operator acts as a "key", and the corresponding "value" is called a pipe expression. For example, in the above example, {$match: {status: "A"}, $match is called a pipe operator, and {status: "A"} is called a pipe expression, which can be thought of as the Operand of a pipe operator. Each pipe expression is a document structure, which consists of a field name, a field value, and some expression operators. For example, the pipe expression in the above example contains an expression operator $sum for summation.

Each pipe expression can only be used to process the document that is currently being processed, not to operate across documents. The processing of documents by pipe expressions is done in memory. Except for pipe expressions that can be cumulatively evaluated, other expressions are stateless, that is, no context information is retained. The cumulative expression operator is usually used with the $group operator to count the maximum, minimum, and so on within the group. For example, in the above example, we used the cumulative $sum in the $group pipeline operator to calculate the sum.

In addition to what $sum thinks, there are expression operators with the following properties:

Group aggregation operator

Name

Description

$addToSet

Returns an array of all the unique values for the selected field among for each document in that group.

$first

Returns the first value in a group.

$last

Returns the last value in a group.

$max

Returns the highest value in a group.

$min

Returns the lowest value in a group.

$avg

Returns an average of all the values in a group.

$push

Returns an array of all values for the selected field among for each document in that group.

$sum

Returns the sum of all the values in a group.

Bool type aggregation operator

Name

Description

$and

Returns true only when all values in its input array are true.

$or

Returns true when any value in its input array are true.

$not

Returns the boolean value that is the opposite of the input value.

Comparison type aggregation operator

Name

Description

$cmp

Compares two values and returns the result of the comparison as an integer.

$eq

Takes two values and returns true if the values are equivalent.

$gt

Takes two values and returns true if the first is larger than the second.

$gte

Takes two values and returns true if the first is larger than or equal to the second.

$lt

Takes two values and returns true if the second value is larger than the first.

$lte

Takes two values and returns true if the second value is larger than or equal to the first.

$ne

Takes two values and returns true if the values are not equivalent.

Arithmetic type aggregation operator

Name

Description

$add

Computes the sum of an array of numbers.

$divide

Takes two numbers and divides the first number by the second.

$mod

Takes two numbers and calcualtes the modulo of the first number divided by the second.

$multiply

Computes the product of an array of numbers.

$subtract

Takes two numbers and subtracts the second number from the first.

String type aggregation operator

Name

Description

$concat

Concatenates two strings.

$strcasecmp

Compares two strings and returns an integer that reflects the comparison.

$substr

Takes a string and returns portion of that string.

$toLower

Converts a string to lowercase.

$toUpper

Converts a string to uppercase.

Date type aggregation operator

Name

Description

$dayOfYear

Converts a date to a number between 1 and 366.

$dayOfMonth

Converts a date to a number between 1 and 31.

$dayOfWeek

Converts a date to a number between 1 and 7.

$year

Converts a date to the full year.

$month

Converts a date into a number between 1 and 12.

$week

Converts a date into a number between 0 and 53

$hour

Converts a date into a number between 0 and 23.

$minute

Converts a date into a number between 0 and 59.

$second

Converts a date into a number between 0 and 59. May be 60 to account for leap seconds.

$millisecond

Returns the millisecond portion of a date as an integer between 0 and 999.

Conditional type aggregation operator

Name

Description

$cond

A ternary operator that evaluates one expression, and depending on the result returns the value of one following expressions.

$ifNull

Evaluates an expression and returns a value.

Note: all of the above operators must be used within the expression of the pipe operator.

For the specific use of each expression operator, see:

Http://docs.mongodb.org/manual/reference/operator/aggregation-group/

Optimization of Polymeric Pipeline

1.$sort + $skip + $limit sequence optimization

If $sort, $skip, and $limit appear in turn when performing a pipeline aggregation, for example:

{$sort: {age:-1}}

{$skip: 10}

{$limit: 5}

Then the order of actual execution is:

{$sort: {age:-1}}

{$limit: 15}

{$skip: 10}

$limit will be executed ahead of $skip.

At this point $limit = before optimization $skip+ before optimization $limit

This has two advantages: 1. After passing through the $limit pipeline, the number of documents in the pipeline will be reduced "in advance", which will save memory and improve memory utilization efficiency. After 2.$limit is advanced, if $sort is next to $limit, it will stop when you get the first "$limit" document when you make $sort.

2.$limit + $skip + $limit + $skip Sequence Optimization

If the following aggregation sequence appears repeatedly in the aggregation pipeline:

{$limit: 100}

{$skip: 5}

{$limit: 10}

{$skip: 2}

First of all, the local optimization is as follows: you can advance the second $limit as described above:

{$limit: 100}

{$limit: 15}

{$skip: 5}

{$skip: 2}

Further optimization: two $limit can be directly minimized, and two $skip can be directly added:

{$limit: 15}

{$skip: 7}

3.Projection Optimization

Using the $project projection prematurely, setting the fields you need and removing the fields you don't use can greatly reduce memory. In addition, it can also be used prematurely.

We should also use the $match, $limit, and $skip operators too early, which can reduce the number of documents in the pipeline ahead of time, reduce memory footprint, and provide aggregation efficiency.

In addition, $match is put into the first stage of aggregation as much as possible, in which case $match is equivalent to a conditional query statement, so that indexes can be used to speed up the query efficiency.

Limitations of aggregation pipes

1. Type restriction

Cannot manipulate data of Symbol, MinKey, MaxKey, DBRef, Code, CodeWScope types in the pipeline (version 2.4 removes restrictions on binary data).

two。 Result size limit

The output of the pipeline cannot exceed the size of the BSON document (16m), which will cause an error.

3. Memory limit

An error occurs when a pipe operator occupies more than 10% of the system's memory capacity during execution.

When the $sort and $group operators are executed, the entire input is loaded into memory, and if these occupy more than 5% of the system memory, a warning is logged to the log file. Similarly, an error occurs when the occupied memory exceeds 10% of the system's memory capacity.

Use aggregation pipes on slices

The aggregation pipeline supports aggregation operations on fragmented collections. When aggregate manipulation is performed on a sharded collection, the aggregation pipeline is divided into two parts, which operate on mongod instances and mongos respectively.

Aggregation pipe usage

First download the test data: http://media.mongodb.org/zips.json and import it into the database.

1. Inquire about the population of each state

Var connectionString = ConfigurationManager.AppSettings ["MongodbConnection"]

Var client = new MongoClient (connectionString)

Var DatabaseName = ConfigurationManager.AppSettings ["DatabaseName"]

String collName = ConfigurationManager.AppSettings ["collName"]

MongoServer mongoDBConn = client.GetServer ()

MongoDatabase db = mongoDBConn.GetDatabase (DatabaseName)

MongoCollection table = db [collName]

Var group = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_ id", "$state"

}

{

"totalPop", new BsonDocument

{

{"$sum", "$pop"}

}

}

}

}

}

Var sort = new BsonDocument

{

{"$sort", new BsonDocument {{"_ id", 1}

}

Var pipeline = new [] {group, sort}

Var result = table.Aggregate (pipeline)

Var matchingExamples = result.ResultDocuments.Select (x = > x.ToDynamic (). ToList ()

Foreach (var example in matchingExamples)

{

Var message = string.Format ("{0}-{1}", example ["_ id"], example ["totalPop"])

Console.WriteLine (message)

}

two。 Calculate the average number of people in each state per city

> db.zipcode.aggregate ({$group: {_ id: {state: "$state", city: "$city"}, pop: {$sum: "$pop"}

{$group: {_ id: "$_ id.state", avCityPop: {$avg: "$pop"}

{$sort: {_ id:1}})

Var group1 = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_ id", new BsonDocument

{

{"state", "$state"}

{"city", "$city"}

}

}

{

"pop", new BsonDocument

{

{"$sum", "$pop"}

}

}

}

}

}

Var group2 = new BsonDocument

{

{"$group", new BsonDocument

{

{

"_ id", "$_ id.state"

}

{

"avCityPop", new BsonDocument

{

{"$avg", "$pop"}

}

}

}

}

}

Var pipeline1 = new [] {group1,group2, sort}

Var result1 = table.Aggregate (pipeline1)

Var matchingExamples1 = result1.ResultDocuments.Select (x = > x.ToDynamic (). ToList ()

Foreach (var example in matchingExamples1)

{

Var message = string.Format ("{0}-{1}", example ["_ id"], example ["avCityPop"])

Console.WriteLine (message)

}

3. Calculate the names of the most and least populous cities in each state

> db.zipcode.aggregate ({$group: {_ id: {state: "$state", city: "$city"}, pop: {$sum: "$pop"}

{$sort: {pop:1}}

{$group: {_ id: "$_ id.state", biggestCity: {$last: "$_ id.city"}, biggestPop: {$last: "$pop"}, smallestCity: {$first: "$_ id.city"}, smallestPop: {$first: "$pop"}

{$project: {_ id:0,state: "$_ id", biggestCity: {name: "$biggestCity", pop: "$biggestPop"}, smallestCity: {name: "$smallestCity", pop: "$smallestPop"})

Var sort1 = new BsonDocument

{

{"$sort", new BsonDocument {{"pop", 1}

}

Var group3 = new BsonDocument

{

{

"$group", new BsonDocument

{

{

"_ id", "$_ id.state"

}

{

"biggestCity", new BsonDocument

{

{"$last", "$_ id.city"}

}

}

{

"biggestPop", new BsonDocument

{

{"$last", "$pop"}

}

}

{

"smallestCity", new BsonDocument

{

{"$first", "$_ id.city"}

}

}

{

"smallestPop", new BsonDocument

{

{"$first", "$pop"}

}

}

}

}

}

Var project = new BsonDocument

{

{

"$project", new BsonDocument

{

{"_ id", 0}

{"state", "$_ id"}

{"biggestCity", new BsonDocument

{

{"name", "$biggestCity"}

{"pop", "$biggestPop"}

}}

{"smallestCity", new BsonDocument

{

{"name", "$smallestCity"}

{"pop", "$smallestPop"}

}

}

}

}

}

Var pipeline2 = new [] {group1,sort1, group3, project}

Var result2 = table.Aggregate (pipeline2)

Var matchingExamples2 = result2.ResultDocuments.Select (x = > x.ToDynamic (). ToList ()

Foreach (var example in matchingExamples2)

{

Console.WriteLine (example.ToString ())

/ / var message = string.Format ("{0}-{1}", example ["_ id"], example ["avCityPop"])

/ / Console.WriteLine (message)

}

Summary

For most aggregation operations, the aggregation pipeline can provide good performance and a consistent interface, which is easy to use. Like MapReduce, it can also act on sharded collections, but the output can only be retained in one document, subject to the BSON Document size limit (currently 16m).

Pipes have some restrictions on the type of data and the size of the results. Pipes can be used for some simple fixed aggregation operations, but MapReduce is still used for complex, large dataset aggregation tasks.

Related articles:

Http://mikaelkoskinen.net/mongodb-aggregation-framework-examples-in-c/

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report