In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the knowledge of "how to use distributed timing tasks". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Introduction to dq
Dq encapsulates underlying beanstalkd operations, distributed storage, latency, and timing settings. The restart service can be restarted, but the message is not lost because the processing of the message is left to beanstalkd.
You can see that it is very easy to use, and the use of redis setnx in dq ensures that each message is consumed only once. However, redis is not used for message storage on the producer side, which is consistent with the previous description.
With a brief introduction to the overall architecture of dq, the following is a formal exploration: hammer:
Producer examplefunc main () {producer: = dq.NewProducer ([] dq.Beanstalk {{Endpoint: "localhost:11300", Tube: "tube",}, {Endpoint: "localhost:11301" Tube: "tube",},}) for I: = 1000 I
< 1005; i++ { // Delay:延迟执行 _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5) // At:在某一个时刻执行 //_, err := producer.At([]byte(strconv.Itoa(i)), time.Now().Add(time.Second*5)) if err != nil { fmt.Println(err) } }} 从使用上,简单分为两步: NewProducer(opts):将本地队列的端口配置和主题配置传入生产者; producer.Delay():使用刚创建好的 生产者,调用它的 Delay() 。将需要异步发送的消息传入,Delay 还需要传入延迟执行的时间。 需要说明的是:创建的 producer 是一个接口,Delay() 只是接口其中的一个方法。后续会其他的方法和内部设计。那我们就继续往下探索吧~~~ 深入生产者执行流程 下面从 example 的代码进去,看整个函数的调用链。 初始化dq.NewProducer([]dq.Beanstalk{{opt1}, {opt2}, ...}) // 初始化生产者 |- NewProducerNode(endpoint, tube) // endpoint,tube 来自传入的配置数组 紧接着就到 producerNode.go ,这个部分就会牵涉到 beanstalk 的初始化: NewProducerNode(endpoint, tube) |- conn: newConnection(endpoint, tube) |- return &connection{} 这就涉及到 beanstalk:connection.conn ->* beanstalk.Conn.
However, beanstalk.Conn is not initialized in newConnection (), which belongs to delayed initialization.
Delay
First of all, the producer calls producer.Delay (data, timesecond) to insert the message into the internal queue, and timesecond is the time to delay execution. Let's see what Delay () has done.
P.Delay (data, timesecond) |-p.wrap (data, time) / / package data and time together |-p.insert (nodeFn) |-node.Delay () / / for rangre p.node each node executes `Delay () `
And p.insert is to pass the encapsulated data in the previous step to each node of p {cluster} to execute node.Delay.
As mentioned in the previous initialization, the conn was not initialized at the beginning, so now if you want to insert data, you have to initialize the conn.
Each node in the node.Delay () / / configuration executes `Delay () `|-node.conn.get () / / gets the conn [conn==nil] in the node Just initialize a conn] |-_, err: = conn.Put (data, deplay, opts...) |-node.conn.reset () / / in the case of err, such as OOM/Timeout-> close conn to prevent leakage
So in the end, Delay actually executes tube.Put (data, delay):
Tube.Put (data, delay) |-tube.Conn.cmd ("put",...) / / producer publishes job
Here is the Put operation of beanstalk: first, take a look at the producer Put instruction parameter description:
Put
: priority. The smaller the value, the higher the priority. The default is 1024.
Number of seconds of delay ready. During this period, job is delayed.
: time to run, the maximum number of seconds allowed for worker execution. If worker cannot delete,release,bury job during this period, when job times out, the server will automatically release this job
: length of job body, excluding\ r\ n
: job body data
OK . So if the job is inserted successfully, what does it respond to?
INSERTED\ r\ n
The id returned is the task ID for inserting the job. At this point, after the Put analysis, follow the code:
Tube.Put (data, priority, daley, ttr) |-tube.Conn.cmd ("put",...) |-tube.Conn.readResp ("INSERTED id") |-return id, err / / returns id
This completes the actions performed by producers that we can see directly in example. Above, the picture is easier to talk about:
Producer interface
So in addition to Delay () used in example, there are several other methods:
Producer interface {At (body [] byte, at time.Time) (string, error) Close () error Delay (body [] byte, delay time.Duration) (string, error) Revoke (ids string) error}
At: specify a certain time to execute [essentially execute Delay ()]
Close: close the connection to all node
Delay: delay execution. The time of the incoming delay.
Revoke: essentially when there is a minimum write node
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.