Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The operation method of MongoDB aggregation by hour and day based on Morphia

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Database >

Share

Shulou(Shulou.com)06/01 Report--

MongoDB aggregates by day or hour

Demand

Recently received demand, it is necessary to aggregate the equipment status under the user account by day and hour respectively, and draw the equipment status trend chart on this basis.

The realization idea is to start the timing task, aggregate the equipment status data of each user according to the hour and day, and store it in the database for subsequent query.

The technology stacks involved are: Spring Boot,MongoDB,Morphia.

Data model

@ Data@Builder@Entity (value = "rawDevStatus", noClassnameStored = true) / / device status index @ Indexes ({/ / set data timeout (TTL) MongoDB performs data deletion operations in the background according to TTL) @ Index (fields = @ Field ("time"), options = @ IndexOptions (expireAfterSeconds = 3600 * 24 * 72), @ Index (fields = {@ Field ("userId"), @ Field (value = "time", type = IndexType.DESC)}) public class RawDevStatus {@ Id @ JsonProperty (access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId Private String userId; private Instant time; @ Embedded ("points") List protocolPoints; @ Data @ AllArgsConstructor public static class Point {/ * * Protocol Type * / private Protocol protocol; / * Total number of devices * / private Integer total; / * number of devices online * / private Integer onlineNum / * number of enabled devices * / private Integer enableNum;}}

The above code is a device status entity class, in which the device status data is distinguished according to the protocol to which the device belongs.

@ Data@Builder@Entity (value = "aggregationDevStatus", noClassnameStored = true) @ Indexes ({@ Index (fields = @ Field ("expireAt"), options = @ IndexOptions (expireAfterSeconds = 0)), @ Index (fields = {@ Field ("userId"), @ Field (value = "time", type = IndexType.DESC)}) public class AggregationDevStatus {@ Id @ JsonProperty (access = JsonProperty.Access.WRITE_ONLY) private ObjectId objectId; / * * user ID * / private String userId / * * Total number of devices * / private Double total; / * number of devices online * / private Double onlineNum; / * number of enabled devices * / private Double enableNum; / * aggregation type (aggregate by hour or by day) * / @ Property ("aggDuration") private AggregationDuration aggregationDuration; private Instant time / * dynamically set document expiration time * / private Instant expireAt;}

The above code is the expected aggregate result, in which two indexes are built: (1) timeout index and (2) compound index, the program will query the device status aggregation result according to user name and time.

Introduction to aggregation operator

The aggregation operation is similar to the pipeline, in which the intermediate result produced by each operation in the pipeline is used as the input source of the next step, and the aggregate result is finally output.

This aggregation mainly involves the following operations:

$project: specifies the fields in the output document.

$unwind: split the array in the data

Match: select document data to process

Group: group and aggregate the results according to key.

Original aggregate statement

Db.getCollection ('raw_dev_status') .aggregate ([{$match: {time: {$gte: ISODate ("2019-06-27T00:00:00Z")},}, {$unwind: "$points"}, {$project: {userId:1,points:1, tmp: {$dateToString: {format: "% Y:%m:%dT%H:00:00Z", date: "$time"} {$project: {userId:1,points:1, groupTime: {$dateFromString: {dateString: "$tmp", format: "% Y:%m:%dT%H:%M:%SZ",}, {$group: {_ id: {user_id:'$userId', cal_time:'$groupTime'}, devTotal: {'$avg':'$points.total'} OnlineTotal: {'$avg':'$points.onlineNum'}, enableTotal: {'$avg':'$points.enableNum'}},])

The above code aggregates data on an hourly basis to introduce processing ideas step by step:

(1) $match

According to the hourly aggregate data, because only need to obtain nearly 24 hours of aggregation results, so the data for preliminary screening.

(2) $unwind

The device status in raw_dev_status is an array distinguished by protocol, so it needs to be expanded for further filtering.

(3) $project

{$project: {userId:1,points:1, tmp: {$dateToString: {format: "% Y:%m:%dT%H:00:00Z", date: "$time"}

Select the data to be exported, which are: userId,points and tmp.

Note that in order to aggregate according to time, the $time attribute is manipulated, and the% Y:%m:%dT%H time information is extracted to $tmp as the basis for the next step of aggregation.

If daily aggregation is required, the format data can be modified to:% Y:%m:%dT00:00:00Z to meet the requirements.

(4) $project

{$project: {userId:1,points:1, groupTime: {$dateFromString: {dateString: "$tmp", format: "% Y:%m:%dT%H:%M:%SZ",}

Because in the previous project operation, tmp is string data, the final aggregate result requires a timestamp (mainly lazy, do not want to convert in the program).

Therefore, $tmp is operated on here and converted to time-type data, that is, groupTime.

(5) $group

Classify the aggregation results and generate the final output results.

{$group: {# Group operation according to _ id, based on `groupTime` and `$ groupTime` _ id: {user_id:'$userId', cal_time:'$groupTime'}, # calculate the average number of devices devTotal: {'$avg':'$points.total'}, # calculate the average number of devices online onlineTotal: {'$avg':'$points.onlineNum'}, #... EnableTotal: {'$avg':'$points.enableNum'}

Code writing

Here ODM chooses Morphia, or you can use MongoTemplate, with a similar principle.

/ * create aggregation condition * * @ param pastTime past time period * @ param dateToString format string (% Y:%m:%dT%H:00:00Z or% Y:%m:%dT00:00:00Z) * @ return aggregation condition * / private AggregationPipeline createAggregationPipeline (Instant pastTime, String dateToString, String stringToDate) {Query query = datastore.createQuery (RawDevStatus.class) Return datastore.createAggregation (RawDevStatus.class) .match (query.field ("time") .greaterThanOrEq (pastTime)) .unwind ("points", new UnwindOptions () .preserveNullAndEmptyArrays (false)) .match (query.field ("points.protocol"). Equal ("ALL") .project (Projection.projection ("userId"), Projection.projection ("points"), Projection.projection ("convertTime") Projection._expression ("$dateToString", new BasicDBObject ("format", dateToString) .append ("date", "$time") .project (Projection.projection ("userId"), Projection.projection ("points"), Projection.projection ("convertTime") Projection._expression ("$dateFromString", new BasicDBObject ("format", stringToDate) .append ("dateString", "$convertTime") .group (Group.id (Group.grouping ("userId"), Group.grouping ("convertTime")), Group.grouping ("total") Group.average ("points.total"), Group.grouping ("onlineNum", Group.average ("points.onlineNum"), Group.grouping ("enableNum", Group.average ("points.enableNum") } / * get aggregation result * * @ param pipeline aggregation condition * @ return aggregation result * / private List getAggregationResult (AggregationPipeline pipeline) {List statuses = new ArrayList (); Iterator resultIterator = pipeline.aggregate (AggregationMidDevStatus.class, AggregationOptions.builder (). AllowDiskUse (true). Build ()); while (resultIterator.hasNext ()) {statuses.add (resultIterator.next ());} return statuses } / /. / / get the aggregate result (omitting some codes) AggregationPipeline pipeline = createAggregationPipeline (pastTime, dateToString, stringToDate); List midStatuses = getAggregationResult (pipeline); if (CollectionUtils.isEmpty (midStatuses)) {log.warn ("Can not get dev status aggregation result."); return;}

Summary

The above is the operation method of realizing MongoDB by hour and day based on Morphia, which is introduced by the editor. I hope it will be helpful to you. If you have any questions, please leave me a message and the editor will reply to you in time. Thank you very much for your support to the website!

If you think this article is helpful to you, you are welcome to reprint it, please indicate the source, thank you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Database

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report