In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
In this issue, the editor will bring you about the principle of Hystrix index collection in spring cloud. The article is rich in content and analyzed and described from a professional point of view. I hope you can get something after reading this article.
Operation principle of hystrix
The previous article introduced the basic implementation principle of hystrix fuse degradation, focusing on the ability of hystrix itself, combined with the code, made an overall introduction, then how to calculate the index to trigger the circuit breaker, metrics may be mentioned in general, as for its metrics implementation principle, this chapter focuses on the introduction.
The official picture shows:
For users to construct a HystrixCommand object or HystrixObservalbeCommand first
Choose queue or execute, and the caller decides whether to use asynchronous or synchronous mode
Check whether Observalbe exists in the cache according to commandKey. The cache is enabled to improve performance, and the output is returned directly.
If there is no cache, then start to follow the logic of the fuse and first determine whether the fuse is on or not.
Fuse on, trigger quick failure, trigger degradation, to execute user-provided fallback () logic
To determine whether it is a concurrent overrun, overrun, and trigger degradation, an exception is issued to execute the rejection to execute the fallback logic provided by the user.
If the execution of the specific business logic implemented by the user occurs, whether there is an execution exception or timeout, exception or timeout, a downgrade is triggered to execute the fallback logic provided by the user.
End of execution
Whether it is a normal end or an exception, the collection of metrics will be triggered. After calculation, the collected results will be provided to the fuse to make the decision of opening and closing.
Implementation of indicator collection
In this part, we need to analyze from the following aspects: index reporting, index calculation, index use, which will involve the concurrent writing of multi-threads, the sequential arrival of messages, the realization of sliding window, and so on.
Index report
Each request thread will create an ExecutionResult instance, which will be associated with some basic events such as start time, execution delay, event statistics and other basic information, that is, data will be collected through metrics reporting throughout the life cycle of hystrix. Let's take a look at several events reported by the data:
1.1.The executionResult = executionResult.setInvocationStartTime (System.currentTimeMillis ()); / / determine that the circuit breaker is not open and send it within the limit, and record the start time of execution.
ExecutionResult = executionResult.addEvent ((int) latency, HystrixEventType.SUCCESS); / / successful execution will increase the events and time consuming of success
1.3.When the HystrixEventType.SHORT_CIRCUITED// circuit breaker is turned on, it will collect the events of fast circuit breakers and time-consuming.
1.4.The number of concurrency in HystrixEventType.SEMAPHORE_REJECTED// semaphore mode exceeds the limit, and the event and time consumption will be recorded.
1.5.The HystrixEventType.THREAD_POOL_REJECTED// thread pool is not available (concurrency exceeds the limit) and the event and time consuming will be logged.
1.6.The HystrixEventType.TIMEOUT// execution timeout will collect the event and time-consuming
1.7. HystrixEventType.BAD_REQUEST// parameter or status exception, which collects the event and time-consuming
The above overall events are divided into two categories, success and failure. According to the execution result of the user's logic code, if there is an exception, collect the exception event and time-consuming, execute circuitBreaker.markNonSuccess (), otherwise execute circuitBreaker.markNonSuccess ()
In addition, there are only two ways to trigger fuses on and off, as shown in the following figure:
Index calculation
Here is a brief introduction to the multithreading concurrency involved in each step and the calculation of the sliding window:
2.1concurrency (threadLocal&SerializedSubject)
When the same API receives multiple requests, that is, when these requests hit the same commandKey (the statistical indicator is based on the dimension of KEY), each request is an independent thread, and multiple various events are generated within each thread. First, the event in the same thread is assembled and encapsulated into a HystrixCommandCompletion, the HystrixCommandCompletion is reported, and the stream computing operates one by one HystrixCommandCompletion. There is no possibility of mixing the events of each thread in computing. How to ensure it will be discussed below.
2.1.1: the reporter is isolated through the threadLocal thread
First, after hystrix starts, a threadLocal is created. When a client request ends normally or abnormally, the reporting status is reported, that is, the handleCommandEnd is executed, and a HystrixThreadEventStream of the current thread is returned from the threadLocal. The code is as follows:
Private void handleCommandEnd (boolean commandExecutionStarted) {/ / omit some codes if (executionResultAtTimeOfCancellation = = null) {/ / report metrics metrics.markCommandDone (executionResult, commandKey, threadPoolKey, commandExecutionStarted);} else {metrics.markCommandDone (executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted);}}
HystrixThreadEventStream is placed in void markCommandDone (ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {/ / threadLocal, because the init method is overwritten, so you can get HystrixThreadEventStream.getInstance () .executionDone (executionResult, commandKey, threadPoolKey) directly without set; if (executionStarted) {concurrentExecutionCount.decrementAndGet () }} / / get the definition of the event flow public static HystrixThreadEventStream getInstance () {return threadLocalStreams.get ();} / / threadLocal from threadLocal, overwrite the init method, so there is no need to call set private static final ThreadLocal threadLocalStreams = new ThreadLocal () {@ Override protected HystrixThreadEventStream initialValue () {return new HystrixThreadEventStream (Thread.currentThread ());}
2.1.2: current limiting queue
Each thread will have a unique HystrixThreadEventStream, because it is obtained from theadLocal, and each HystrixThreadEventStream will be associated with a queue implemented by Subject, that is, each thread has a private queue. Here, it provides current limit because it adopts the principle of "back pressure". The so-called "back pressure" means that it is provided on demand and produces in the queue according to the consumer's ability. The code is as follows:
Public void executionDone (ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) {/ / encapsulating executionResult into HystrixCommandCompletion,HystrixCommandCompletion is the basic unit of stream computing operation HystrixCommandCompletion event = HystrixCommandCompletion.from (executionResult, commandKey, threadPoolKey); / / writeOnlyCommandCompletionSubject is a current-limiting queue writeOnlyCommandCompletionSubject.onNext (event) implemented through RXjava } / / omit the code writeOnlyCommandCompletionSubject .onBackpressureBuffer () / / turn on the 'back pressure function' .doOnNext (writeCommandCompletionsToShardedStreams) / / the core is the call method of this action. Unloaded subscribe (Subscribers.empty ())
2.2: data stream serialization
Each HystrixCommandCompletion placed in the queue will take advantage of the Action of doOnNext and call the write method of HystrixCommandCompletionStream through its call method. The same commandKey has the same HystrixCommandCompletionStream instance, specifically the instance isolation done by currentHashMap. The internal HystrixCommandCompletionStream implements the serialization of multiple HystrixCommandCompletion parallel writes through a SerializedSubject. The specific code logic is as follows:
/ / the current-limited queue executes the call method after receiving the data. The observer registers the doOnnext event private static final Action1 writeCommandCompletionsToShardedStreams = new Action1 () {@ Override public void call (HystrixCommandCompletion commandCompletion) {/ / the instance of the same commandkey corresponding to the same serial queue, because the same commandKey must collect the metrix events of all threads under the key for statistics. In order to accurately HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance (commandCompletion.getCommandKey ()) CommandStream.write (commandCompletion); / / write to the serial queue, where is the core if (commandCompletion.isExecutedInThread () | | commandCompletion.isResponseThreadPoolRejected ()) {HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance (commandCompletion.getThreadPoolKey ()); threadPoolStream.write (commandCompletion);} / / the specific write methods are as follows. You need to focus on the definition of writeOnlySubject public void write (HystrixCommandCompletion event) {writeOnlySubject.onNext (event);} / / below is the definition of writeOnlySubject, which changes parallel writes into serialized HystrixCommandCompletionStream (final HystrixCommandKey commandKey) {this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject (PublishSubject.create ()); this.readOnlyStream = writeOnlySubject.share ();}
2.3: consuming subscriptions
When hystrixCommand is created, HystrixCommandCompletionStream is subscribed to. Currently, there are:
HealthCountsStream
RollingCommandEventCounterStream
CumulativeCommandEventCounterStream
RollingCommandLatencyDistributionStream
RollingCommandUserLatencyDistributionStream
RollingCommandMaxConcurrencyStream
These consumers do statistics and indicator calculation on the data in the form of a scrolling window. Here is a representative healthCountsStream to explain:
Public static HealthCountsStream getInstance (HystrixCommandKey commandKey, HystrixCommandProperties properties) {/ / time interval for statistical calculation of indicators-metricsHealthSnapshotIntervalInMilliseconds final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds (). Get (); if (healthCountBucketSizeInMs = = 0) {throw new RuntimeException ("You have set the bucket size to 0ms. Please set a positive number, so that the metric stream can be properly consumed ");} / / fuse window sliding period. Default is 10 seconds. Keep statistical data within 10 seconds. Number of valid index calculations in specified window period = metricsRollingStatisticalWindowInMilliseconds/metricsHealthSnapshotIntervalInMilliseconds final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds () .get () / healthCountBucketSizeInMs; return getInstance (commandKey, numHealthCountBuckets, healthCountBucketSizeInMs) } / / inheritance relationship HealthCountStream- "BucketedRollingCounterStream-" BucketedCounterStream / / aggregates events into buckets. Omit the code Complete this.bucketedStream = Observable.defer (new Func0 () {@ Override public Observable call () {return inputEventStream .observe () .window (bucketSizeInMs, TimeUnit.MILLISECONDS) / / bucket it by the counter window so we can emit to the next operator in time chunks in BucketedCounterStream Not on every OnNext .flatMap (reduceBucketToSummary) / / for a given bucket, turn it into a long array containing counts of event types .startWith (emptyEventCountsToStart) / / start it with empty arrays to make consumer logic as generic as possible (windows are always full)}} / / the logic code aggregated into a bucket public static final Func2 appendEventToBucket = new Func2 () {@ Override public long [] call (long [] initialCountArray, HystrixCommandCompletion execution) {ExecutionResult.EventCounts eventCounts = execution.getEventCounts () For (HystrixEventType eventType: ALL_EVENT_TYPES) {switch (eventType) {case EXCEPTION_THROWN: break; / / this is just a sum of other anyway-don't do the work here default: initialCountArray [eventType.ordinal ()] + = eventCounts.getCount (eventType) / / A pair of various types of event to do, categorize and summarize break;}} return initialCountArray;}} / / generate calculation metrics, which are completed in BucketedRollingCounterStream Omit part of the code this.sourceStream = bucketedStream / / stream broken up into buckets .window (numBuckets 1) / / emit overlapping windows of buckets .flatMap (reduceWindowToSummary) / / convert a window of bucket-summaries into a single summary .doOnSubscribe (new Action0 () {@ Override public void call () {isSourceCurrentlySubscribed.set (true)) }}) / / calculate the aggregation of metrics, reduceWindowToSummary private static final Func2 healthCheckAccumulator = new Func2 () {@ Override public HystrixCommandMetrics.HealthCounts call (HystrixCommandMetrics.HealthCounts healthCounts, long [] bucketEventCounts) {return healthCounts.plus (bucketEventCounts); / / focus on this method}}; public HealthCounts plus (long [] eventTypeCounts) {long updatedTotalCount = totalCount Long updatedErrorCount = errorCount; long successCount = eventTypeCounts [HystrixEventType.SUCCESS.ordinal ()]; long failureCount = eventTypeCounts [HystrixEventType.FAILURE.ordinal ()]; long timeoutCount = eventTypeCounts [HystrixEventType.TIMEOUT.ordinal ()]; long threadPoolRejectedCount = eventTypeCounts [HystrixEventType.thread _ POOL_REJECTED.ordinal ()]; long semaphoreRejectedCount = eventTypeCounts [HystrixEventType.SEMAPHORE _ REJECTED.ordinal ()] / / after the events of multiple threads are summed up, the sum of all events is updatedTotalCount + = (successCount + failureCount + timeoutCount+ threadPoolRejectedCount + semaphoreRejectedCount); / / the sum of failed events. Note that only FAIL+timeoutCount+THREAD_POOL_REJECTED+SEMAPHORE_REJECTED updatedErrorCount + = (failureCount + timeoutCount+ threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts (updatedTotalCount, updatedErrorCount);}
Indicator use
The indicator is easy to use and is used to control the closing and opening of fuses. The logic is as follows:
Public void onNext (HealthCounts hc) {if (hc.getTotalRequests () < properties.circuitBreakerRequestVolumeThreshold (). Get ()) {} else {if (hc.getErrorPercentage () < properties.circuitBreakerErrorThresholdPercentage (). Get ()) {} else { If (status.compareAndSet (Status.CLOSED) Status.OPEN) {circuitOpened.set (System.currentTimeMillis ()) }}
The above is what is the principle of Hystrix index collection in spring cloud shared by Xiaobian. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.