Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Analysis of the implementation process of ES Learning Notes-AvgAggregation

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

When we need to look at the statistics of the data, the mean is one of the most important features.

For large amounts of data, this kind of simple aggregate ES can be returned in seconds. Aggregation is a feature of ES.

So how does ES achieve this function?

We know that the data of ES is stored in each node, so the basic idea of implementing AvgAggregation in ES is to count each node first, and then summarize it.

First learn how ES counts individual nodes: refer to AvgAggregator

@ Override public LeafBucketCollector getLeafCollector (LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {if (valuesSource = = null) {return LeafBucketCollector.NO_OP_COLLECTOR;} final BigArrays bigArrays = context.bigArrays (); final SortedNumericDoubleValues values = valuesSource.doubleValues (ctx) Return new LeafBucketCollectorBase (sub, values) {@ Override public void collect (int doc, long bucket) throws IOException {counts = bigArrays.grow (counts, bucket + 1); sums = bigArrays.grow (sums, bucket + 1); values.setDocument (doc); final int valueCount = values.count (); counts.increment (bucket, valueCount) Double sum = 0; for (int I = 0; I

< valueCount; i++) { sum += values.valueAt(i); } sums.increment(bucket, sum); } }; } 即实现Collector类的collect()方法。然后通过doc_values机制获取文档相关字段的值,分别汇入counts和sums两个变量中。 收集完成counts和sums过后,就需要汇总各个节点的值, 这在搜索的第二阶段。 从第一阶段到第二阶段,整个链路如下: s1: 前端请求发送到集群某一节点的TransportSearchAction.doExecute()方法中。 switch(searchRequest.searchType()) { ..... case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener); break; ...... } searchAsyncAction.start(); 见到start()方法,我以为这个是另启一个线程,后面发现原来不是的。 这个start()方法把整个查询过程分为两个阶段: 阶段一: performFirstPhase(), 即把请求分发到各个节点,然后记录节点处理的结果。如果返回的分片是最后一个分片,则转入阶段二。 阶段二: performFirstPhase() ->

OnFirstPhaseResult ()-> innerMoveToSecondPhase ()-> moveToSecondPhase (). The template design pattern is used here. In phase 2, a request is made to each node again to get the document content through docId.

S2: for aggregation, the most important link in phase 2 is moveToSecondPhase ()-> executeFetch ()-> finishHim ()-> searchPhaseController.merge (). Merge () contains the following business logic: merge hits, merge suggest, merge addAggregation, and so on. Here we focus on aggregation.

The entry method for aggregation is InternalAggregations.reduce (), and if you are familiar with hadoop, the execution logic of the reduce method can also be understood in part by the name. The Chinese translation of "induction" of reduce is very vivid. The entrance to the entire link is InternalAvg.doReduce ().

@ Override public InternalAvg doReduce (List aggregations, ReduceContext reduceContext) {long count = 0; double sum = 0; for (InternalAggregation aggregation: aggregations) {count + = ((InternalAvg) aggregation) .count; sum + = ((InternalAvg) aggregation) .sum;} return new InternalAvg (getName (), sum, count, valueFormatter, pipelineAggregators (), getMetaData ());}

The logic is quite simple, count addition, sum addition. The end result is

Public double getValue () {return sum / count;}

The above describes the key nodes that the ES distribution will summarize, so what is the business logic that is distributed to each node?

First, locate the entrance:

Class SearchQueryTransportHandler extends TransportRequestHandler {@ Override public void messageReceived (ShardSearchTransportRequest request, TransportChannel channel) throws Exception {QuerySearchResultProvider result = searchService.executeQueryPhase (request); channel.sendResponse (result);}}

Then navigate to QueryPhrase.execute (), and at this stage of QueryPhrase, the main things to do are as follows:

AggregationPhase.preProcess (searchContext): parses the syntax of ES and generates Collector.

Execute: before calling the API of Lucene to query data, combine each Collecotr, collector = MultiCollector.wrap (subCollectors), and then query the Lucene index. For AvgAggregator, the key logic is:

Override public void collect (int doc, long bucket) throws IOException {counts = bigArrays.grow (counts, bucket + 1); sums = bigArrays.grow (sums, bucket + 1); values.setDocument (doc); final int valueCount = values.count (); counts.increment (bucket, valueCount); double sum = 0 For (int I = 0; I < valueCount; iTunes +) {sum + = values.valueAt (I);} sums.increment (bucket, sum);}

This is the second time it has appeared, and its function is to collect doc-related information for each hit query. The value corresponding to each docId is obtained here, which is based on the forward index of doc_value.

This is the implementation process of the whole Avg Aggregation. Through the source code, it can be confirmed that AvgAggregation is accurate and reliable. There are several aggregate functions whose ideas are the same as those of AvgAggregation, so I won't go into details. They are: Max, Min, Sum, ValueCount, Stats.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report