Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to Design and implement Golang Protocol Pool gopool

2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces "how to design and implement Golang protocol pool gopool". In daily operation, I believe many people have doubts about how to design and implement Golang protocol pool gopool. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to design and implement Golang protocol pool gopool". Next, please follow the editor to study!

Goroutine

Goroutine is a kind of lightweight thread provided by Golang, which is often called "co-program". Compared with threads, the cost of creating a co-program is very low. So you will often see thousands of concurrent scenarios in applications developed by Golang.

Advantages of Goroutine:

Goroutines costs less than threads.

Their stack size is only a few kb, the stack can grow and shrink according to the needs of the application, the context switch is also very fast, and in the case of threads, the stack size must be specified and fixed.

Goroutine is multiplexed to a smaller number of OS threads.

A program that contains thousands of Goroutine may have only one thread. If any Goroutine in this thread blocks waiting for user input, another OS thread is created and the remaining Goroutine is moved to the new OS thread. All of this is handled by the runtime, and as a developer, we don't have to worry about it, which gives us a clean API to support concurrency.

Goroutines uses channel for communication.

Channel is designed to effectively prevent race conditions (race conditions) when using Goroutine to access shared memory. Channel can be thought of as a channel for Goroutine to communicate.

Co-program pool

In high concurrency scenarios, we may initiate a large number of collaborations to deal with business logic. Collaborative program pool is a technology that makes use of pooling technology, reuses objects, reduces the frequency of memory allocation and the overhead of collaborative program creation, so as to improve the efficiency of collaborative program execution.

Recently, I took the time to learn about the gopool protocol pool implementation provided by the official open source gopkg library. I feel that the quality is very high, the code is also very simple and clear, and the bottom layer of Kitex is also using gopool to manage the protocol. Here we sort out the design and implementation.

Gopool

If you know the official README, you will find that the use of gopool is actually very simple, replacing go func () {...}, which we used to use frequently, with gopool.Go (func () {...}).

At this point, gopool will use the default configuration to manage the collaboration process you started. You can also choose to configure the pool size for business scenarios and expand the maximum capacity.

Old:

Go func () {/ / do your job} ()

New:

Core implementation of import ("github.com/bytedance/gopkg/util/gopool") gopool.Go (func () {/ do your job})

Let's take a look at how gopool implements collaborative pool management.

Pool

Pool is an interface that defines the capability of a collaborative pool.

Type Pool interface {/ / name of the pool Name () string / / set the capacity of the Goroutine in the pool SetCap (cap int32) / / execute the f function Go (f func ()) / / with ctx Execute the f function CtxGo (ctx context.Context, f func ()) / / set the function SetPanicHandler to be called when panic occurs (f func (context.Context, interface {}))}

Gopool provides a default implementation of this interface (that is, pool, which we will introduce below), which we rely on when we call gopool.CtxGo directly.

Such design patterns are also common in Kitex, where all dependencies are designed as interfaces to facilitate subsequent expansion, and the underlying layer provides a default implementation to be exposed, which is also friendly to callers.

Type pool struct {/ / Pool name name string / / capacity of the pool That is, the maximum number of goroutine working concurrently cap int32 / / Pool configuration config * Config / / task linked list taskHead * task taskTail * task taskLock sync.Mutex taskCount int32 / / record the number of worker currently running workerCount int32 / / when panic appears in worker Called panicHandler func (context.Context Interface {})} / / NewPool create a new protocol pool Initialization name, capacity, configuration func NewPool (name string, cap int32, config * Config) Pool {p: = & pool {name: name, cap: cap, config: config,} return p}

Calling NewPool gets the pool structure returned in the form of Pool.

Tasktype task struct {ctx context.Context f func () next * task}

Task is a linked list structure that can be understood as a task to be performed, which contains the function f that the current node needs to execute, as well as a pointer to the next task.

Combining the definition of pool in the previous section, we can see that a protocol pool pool corresponds to a set of task.

Pool maintains two pointers to the head and tail of the linked list: taskHead and taskTail, as well as the length taskCount of the linked list and the corresponding lock taskLock.

Workertype worker struct {pool * pool}

A worker is a logical executor that uniquely corresponds to a co-program pool pool. When a worker is called, a goroutine will be opened, constantly fetching tasks from the task list in pool and executing them.

Func (w * worker) run () {go func () {for {/ / declare the task linked list in the upcoming task var t * task / / operation pool Lock w.pool.taskLock.Lock () if w.pool.taskHead! = nil {/ / get the taskHead ready to execute t = w.pool.taskHead / / update the head of the linked list and the quantity w.pool.taskHead = w.pool.taskHead.next atomic.AddInt32 (& w.pool.taskCount) -1)} / / if the taskHead obtained in the previous step is empty Indicates that there are no tasks to perform Return if t = = nil {w.close () w.pool.taskLock.Unlock () w.Recycle () return} after cleaning W.pool.taskLock.Unlock () / / execute the task Recover for panic and call the configured handler func () {defer func () {if r: = recover () R! = nil {msg: = fmt.Sprintf ("GOPOOL: panic in pool:% s:% v:% s", w.pool.name, r, debug.Stack ()) logger.CtxErrorf (t.ctx Msg) if w.pool.panicHandler! = nil {w.pool.panicHandler (t.ctx R)} () t.f ()} () t.Recycle () } ()} as a whole

When you see this, you can actually string the whole process together. Let's take a look at what the external interface CtxGo (context.Context, f func ()) does.

Func Go (f func ()) {CtxGo (context.Background (), f)} func CtxGo (ctx context.Context, f func ()) {defaultPool.CtxGo (ctx, f)} func (p * pool) CtxGo (ctx context.Context, f func ()) {/ / create a task object Assign ctx and the function to be executed t: = taskPool.Get (). (* task) t.ctx = ctx t.f = f / / insert task into the tail of pool's linked list Update list number p.taskLock.Lock () if p.taskHead = = nil {p.taskHead = t p.taskTail = t} else {p.taskTail.next = t p.taskTail = t} p.taskLock.Unlock () atomic.AddInt32 (& p.taskCount 1) / / when the following two conditions are met Create a new worker and evoke execution: / / 1. The number of task exceeds the configured limit / / 2. The number of worker currently running is less than the upper limit (or running without worker) if (& p.taskCount) > = p.config.ScaleThreshold & & p.WorkerCount () < atomic.LoadInt32 (& p.cap) | | p.WorkerCount () = 0 {/ / worker + 1 p.incWorkerCount () / / create a new worker And assign the current pool w: = workerPool.Get (). (* worker) w.pool = p / / evoke worker to execute w.run ()}

I believe that after reading the code comments, you will be able to understand what happened.

Gopool maintains a defaultPool itself, which is a default pool structure that is initialized when the package is introduced. When we call gopool.CtxGo () directly, we essentially call the method of the same name of defaultPool

Func init () {defaultPool = NewPool ("gopool.DefaultPool", 10000, NewConfig ())} const (defaultScalaThreshold = 1) / / Config is used to config pool.type Config struct {/ / controls the threshold for capacity expansion. Once the task to be executed exceeds this value, and the number of worker does not reach the upper limit. Start the new worker ScaleThreshold int32} / / NewConfig creates a default Config.func NewConfig () * Config {c: = & Config {ScaleThreshold: defaultScalaThreshold,} return c}

The name of defaultPool is gopool.DefaultPool, the pool capacity is 10, 000, and the lower limit of expansion is 1.

When we call CtxGo, gopool updates the maintained task list and determines whether the worker needs to be expanded:

If there are already many worker starts (one worker at the bottom corresponds to a goroutine), you don't need to expand the capacity and return it directly.

If you decide that you need to expand the capacity, create a new worker and start it by calling the worker.run () method. Each worker will asynchronously check whether the task list in the pool has any tasks to be executed, and if so, execute it.

The orientation of the three roles

Task is a task node to be executed, and also contains a pointer to the next task, a linked list structure

Worker is an executor that actually executes a task. It starts an unexecuted task in the goroutine execution protocol pool asynchronously.

Pool is a logical pool of collaborators that corresponds to a task linked list and is responsible for maintaining task status updates and creating new worker when needed.

Using sync.Pool for performance optimization

In fact, up to this point, gopool is already a concise and clear co-program pool library, but there is obviously room for improvement in performance, so the authors of gopool have applied sync.Pool several times to pool object creation and reuse woker and task objects.

It is recommended that you look directly at the source code, which has already been involved in the above code.

Task pooling

Var taskPool sync.Poolfunc init () {taskPool.New = newTask} func newTask () interface {} {return & task {} func (t * task) Recycle () {t.zero () taskPool.Put (t)}

Worker pooling

Var workerPool sync.Poolfunc init () {workerPool.New = newWorker} func newWorker () interface {} {return & worker {} func (w * worker) Recycle () {w.zero () workerPool.Put (w)}, the study on "how to design and implement Golang protocol pool gopool" is over, hoping to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report