In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
Today, I will talk to you about how to interpret workqueue in client-go. Many people may not know much about it. In order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.
The following is mainly about workqueue in client-go and take a look at the overall data trend of client-go. As shown below:
While workqueue is mainly referenced here in listener, listener uses chan to get the data and put it into the work queue for processing. It is mainly because chan is too simple to satisfy the scenario of K8S, so workqueue is derived.
Characteristics
Order
Deweighting
Concurrence
Delayed processing
Speed limit
There are currently three types of workqueue
Basic queue
Delay queue
Speed limit queue
The delay queue is based on the basic queue, while the current limiting queue is based on the delay queue.
Basic queue
Take a look at the interface of the basic queue
/ / client-go source code path util/workqueue/queue.gotype Interface interface {/ / the new element can be any object Add (item interface {}) / / get the length of the current queue Len () int / / blocking the header element (first in first out) return element and whether the queue closes Get () (item interface {}) Shutdown bool) / / display the processing of the tag completion element Done (item interface {}) / / close the queue ShutDown () / / whether the queue is closed ShuttingDown () bool}
Take a look at the data structure of the basic queue. There are only three key points to deal with, and the rest are not shown.
Type Type struct {/ / the queue of elements containing all elements guarantees an ordered queue [] t / / all elements to be processed set is based on the structure implemented by map with value as null struct, which guarantees that dirty set / / elements currently being processed processing set.} type empty struct {} type t interface {} type set map [t] empty
The hello world of the basic queue is also very simple
Wq: = workqueue.New () wq.Add ("hello") v, _: = wq.Get ()
Basic queue Add
Func (Q * Type) Add (item interface {}) {q.cond.L.Lock () defer q.cond.L.Unlock () / / if it is currently closed, no new element if q.shuttingDown {return} / / if the element is already waiting for processing Do not add if q.dirty.has (item) {return} / / add to metrics q.metrics.add (item) / / join pending processing q.dirty.insert (item) / / if the element is currently being processed, do not add the element to the queue if q.processing.has (item) {return} q.queue = append (q.queue) Item) q.cond.Signal ()}
Basic queue Get
Func (Q * Type) Get () (item interface {}, shutdown bool) {q.cond.L.Lock () defer q.cond.L.Unlock () / / if there are currently no elements and are not closed, block for len (q.queue) = = 0 & &! q.shuttingDown {q.cond.Wait ()}. Item, q.queue = q.queue [0], q.queue [1:] q.metrics.get (item) / / add elements to the processing queue q.processing.insert (item) / / remove the queue from the waiting queue q.dirty.delete (item) return item, false}
Basic queue instantiation
Func newQueue (c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) * Type {t: = & Type {clock: C, dirty: set {}, processing: set {}, cond: sync.NewCond (& sync.Mutex {}) Metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod } / / start a co-program to update metrics go t.updateUnfinishedWorkLoop () return t} func (Q * Type) updateUnfinishedWorkLoop () {t: = q.clock.NewTicker (q.unfinishedWorkUpdatePeriod) defer t.Stop () for range t.C () {if! func () bool {q.cond.L.Lock () defer q.cond.L.Unlock () If! q.shuttingDown {q.metrics.updateUnfinishedWork () return true} return false} () {return}
Delay queue
The main idea of implementing the delay queue is to use the priority queue to store the elements that need to be delayed, each time to determine whether the element book with the minimum delay has met the requirements for joining the queue (delay time is up), and if so, to judge the next element. until there are no elements or elements need to be delayed.
Take a look at the data structure of the delay queue
Type delayingType struct {Interface. / / place the delayed added element waitingForAddCh chan * waitFor.}
Chan is mainly used to save delayed added elements, and the specific implementation is through an implementation of an AddAfter method. Take a look at the specific content.
/ / delay queue interface type DelayingInterface interface {Interface / / AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter (item interface {}, duration time.Duration)} func (Q * delayingType) AddAfter (item interface {}) Duration time.Duration) {. / / if the delay implementation is less than or equal to 0, add directly to the queue if duration math.MaxInt64 {return r.maxDelay} calculated: = time.Duration (backoff) if calculated > r.maxDelay {return r.maxDelay} return calculated}
Counter mode
Counter mode can be used through workqueue.NewItemFastSlowRateLimiter (fastDelay, slowDelay time.Duration, maxFastAttempts int), with three parameters
FastDelay fast speed limit time
SlowDelay slow speed limit time
Number of fast speed limit elements in maxFastAttempts
The principle is like this, assuming that fastDelay is set to 1 second, slowDelay is set to 10 seconds, and maxFastAttempts is set to 3, and five identical elements are quickly inserted in a speed limit period. The first three elements are added to the queue with a speed limit of 1 second. When the fourth element is added, the slowDelay speed limit time is used, that is, 10 seconds later, the queue is added. The following elements will be added to the queue with a speed limit of 10 seconds until the end of the speed limit period.
Take a look at the source code.
Func (r * ItemFastSlowRateLimiter) When (item interface {}) time.Duration {r.failuresLock.Lock () defer r.failuresLock.Unlock () / / count once r.failures [item] = r.failures [item] + 1 / / count less than maxFastAttempts take fastDelay as the speed limit time, otherwise slowDelay as the speed limit time if r.failures [item]
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.