Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to parse workqueue in client-go

2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

Today, I will talk to you about how to interpret workqueue in client-go. Many people may not know much about it. In order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.

The following is mainly about workqueue in client-go and take a look at the overall data trend of client-go. As shown below:

While workqueue is mainly referenced here in listener, listener uses chan to get the data and put it into the work queue for processing. It is mainly because chan is too simple to satisfy the scenario of K8S, so workqueue is derived.

Characteristics

Order

Deweighting

Concurrence

Delayed processing

Speed limit

There are currently three types of workqueue

Basic queue

Delay queue

Speed limit queue

The delay queue is based on the basic queue, while the current limiting queue is based on the delay queue.

Basic queue

Take a look at the interface of the basic queue

/ / client-go source code path util/workqueue/queue.gotype Interface interface {/ / the new element can be any object Add (item interface {}) / / get the length of the current queue Len () int / / blocking the header element (first in first out) return element and whether the queue closes Get () (item interface {}) Shutdown bool) / / display the processing of the tag completion element Done (item interface {}) / / close the queue ShutDown () / / whether the queue is closed ShuttingDown () bool}

Take a look at the data structure of the basic queue. There are only three key points to deal with, and the rest are not shown.

Type Type struct {/ / the queue of elements containing all elements guarantees an ordered queue [] t / / all elements to be processed set is based on the structure implemented by map with value as null struct, which guarantees that dirty set / / elements currently being processed processing set.} type empty struct {} type t interface {} type set map [t] empty

The hello world of the basic queue is also very simple

Wq: = workqueue.New () wq.Add ("hello") v, _: = wq.Get ()

Basic queue Add

Func (Q * Type) Add (item interface {}) {q.cond.L.Lock () defer q.cond.L.Unlock () / / if it is currently closed, no new element if q.shuttingDown {return} / / if the element is already waiting for processing Do not add if q.dirty.has (item) {return} / / add to metrics q.metrics.add (item) / / join pending processing q.dirty.insert (item) / / if the element is currently being processed, do not add the element to the queue if q.processing.has (item) {return} q.queue = append (q.queue) Item) q.cond.Signal ()}

Basic queue Get

Func (Q * Type) Get () (item interface {}, shutdown bool) {q.cond.L.Lock () defer q.cond.L.Unlock () / / if there are currently no elements and are not closed, block for len (q.queue) = = 0 & &! q.shuttingDown {q.cond.Wait ()}. Item, q.queue = q.queue [0], q.queue [1:] q.metrics.get (item) / / add elements to the processing queue q.processing.insert (item) / / remove the queue from the waiting queue q.dirty.delete (item) return item, false}

Basic queue instantiation

Func newQueue (c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) * Type {t: = & Type {clock: C, dirty: set {}, processing: set {}, cond: sync.NewCond (& sync.Mutex {}) Metrics: metrics, unfinishedWorkUpdatePeriod: updatePeriod } / / start a co-program to update metrics go t.updateUnfinishedWorkLoop () return t} func (Q * Type) updateUnfinishedWorkLoop () {t: = q.clock.NewTicker (q.unfinishedWorkUpdatePeriod) defer t.Stop () for range t.C () {if! func () bool {q.cond.L.Lock () defer q.cond.L.Unlock () If! q.shuttingDown {q.metrics.updateUnfinishedWork () return true} return false} () {return}

Delay queue

The main idea of implementing the delay queue is to use the priority queue to store the elements that need to be delayed, each time to determine whether the element book with the minimum delay has met the requirements for joining the queue (delay time is up), and if so, to judge the next element. until there are no elements or elements need to be delayed.

Take a look at the data structure of the delay queue

Type delayingType struct {Interface. / / place the delayed added element waitingForAddCh chan * waitFor.}

Chan is mainly used to save delayed added elements, and the specific implementation is through an implementation of an AddAfter method. Take a look at the specific content.

/ / delay queue interface type DelayingInterface interface {Interface / / AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter (item interface {}, duration time.Duration)} func (Q * delayingType) AddAfter (item interface {}) Duration time.Duration) {. / / if the delay implementation is less than or equal to 0, add directly to the queue if duration math.MaxInt64 {return r.maxDelay} calculated: = time.Duration (backoff) if calculated > r.maxDelay {return r.maxDelay} return calculated}

Counter mode

Counter mode can be used through workqueue.NewItemFastSlowRateLimiter (fastDelay, slowDelay time.Duration, maxFastAttempts int), with three parameters

FastDelay fast speed limit time

SlowDelay slow speed limit time

Number of fast speed limit elements in maxFastAttempts

The principle is like this, assuming that fastDelay is set to 1 second, slowDelay is set to 10 seconds, and maxFastAttempts is set to 3, and five identical elements are quickly inserted in a speed limit period. The first three elements are added to the queue with a speed limit of 1 second. When the fourth element is added, the slowDelay speed limit time is used, that is, 10 seconds later, the queue is added. The following elements will be added to the queue with a speed limit of 10 seconds until the end of the speed limit period.

Take a look at the source code.

Func (r * ItemFastSlowRateLimiter) When (item interface {}) time.Duration {r.failuresLock.Lock () defer r.failuresLock.Unlock () / / count once r.failures [item] = r.failures [item] + 1 / / count less than maxFastAttempts take fastDelay as the speed limit time, otherwise slowDelay as the speed limit time if r.failures [item]

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report