Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use distributed Task + message queuing Framework go-queue

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly introduces "how to use distributed tasks + message queuing framework go-queue". In daily operations, I believe many people have doubts about how to use distributed tasks + message queuing framework go-queue. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "how to use distributed tasks + message queuing framework go-queue". Next, please follow the editor to study!

Why write this library?

Before we start to develop our own go-queue, we investigate the current open source queuing scheme based on the following:

Beanstalkd

Beanstalkd has some special features: support task priority, delay (delay), overtime retransmission (time-to-run) and reservation (buried), and can well support distributed background tasks and timed task processing. Here are the basic parts of beanstalkd:

Job: task unit

Tube: task queue, which stores uniform type job. Producer and consumer operands

Producer:job producer, adding job to a tube through put

Consumer:job consumers, using reserve/release/bury/delete to obtain job or change the state of job

Fortunately, the official go client: https://github.com/beanstalkd/go-beanstalk is available.

But for go developers who are not familiar with beanstalkd operations, there is a learning cost.

Kafka

Similar to the scheme based on kafka message queue as storage, the storage unit is the message. If you want to achieve delayed execution, the solution you can think of is to use the delayed execution time as topic. In this way, in large-scale messaging systems, there are a large number of one-time topic (dq_1616324404788, dq_1616324417622). When time is scattered, it is easy to cause random disk writes.

And in the go ecology,

At the same time, consider the following factors:

Support for deferred tasks

High availability to ensure that data is not lost

Scalable resources and performanc

So we developed go-queue based on the above two basic components:

Dq is developed based on beanstalkd, which supports timing and delay operation. At the same time, join redis to ensure the uniqueness of consumption.

Kq is developed based on kafka, which simplifies the development of API for producers and consumers. At the same time, batch writing is used in writing kafka to save IO.

The overall design is as follows:

Application scenario

First of all, in the consumption scenario, one is for the task queue and the other is the message queue. And the biggest difference between the two:

There are no order constraints for tasks; messages need

While the task is joining or waiting, there may be a status update (or cancellation), while the message is a single storage.

Therefore, the selection of infrastructure behind it is also based on this consumption scenario.

Dq: depending on beanstalkd, suitable for delayed and scheduled task execution

Kq: dependent on kafka, suitable for asynchronous, batch task execution

It can also be seen from the API of dq:

/ / delayed task execution-dq.Delay (msg, delayTime); / / scheduled task execution-dq.At (msg, atTime)

And inside us:

If it is asynchronous message consumption / push, it will choose to use kq:kq.Push (msg)

If it is a 15-minute reminder / send a text message at noon tomorrow, etc., use dq

How to use

Describe the use of dq and kq respectively:

Dq// [Producer] producer: = dq.NewProducer ([] dq.Beanstalk {{Endpoint: "localhost:11300", Tube: "tube",}, {Endpoint: "localhost:11301", Tube: "tube",},}) for I: = 1000; I < 1005 ITunes + {_, err: = producer.Delay ([] byte (strconv.Itoa (I)), time.Second*5) if err! = nil {fmt.Println (err)}} / / [Consumer] consumer: = dq.NewConsumer (dq.DqConf {Beanstalks: [] dq.Beanstalk {{Endpoint: "localhost:11300", Tube: "tube",} {Endpoint: "localhost:11301", Tube: "tube",},}, Redis: redis.RedisConf {Host: "localhost:6379", Type: redis.NodeType,},}) consumer.Consume (func (body [] byte) {/ / your consume logic fmt.Println (string (body))})

Similar to the normal producer-consumer model, developers only need to focus on the following:

Developers only need to pay attention to their own task type "delay / timing"

Consumption logic on the consumer side

Kq

Producer.go:

/ message structuretype message struct {Key string `json: "key" `Value string `json: "value" `Payload string `json: "message" `} pusher: = kq.NewPusher ([] string {"127.0.0.1json 19092", "127.0.0.119093", "127.0.0.119094",}, "kq") ticker: = time.NewTicker (time.Millisecond) for round: = 0; round < 3 Round++ {select {case

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report