In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article mainly explains "what are the functions of machinery". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn "what are the functions of machinery"?
Characteristics
The above is just a simple example, the task queue has a wide range of application scenarios, such as a large number of computing tasks, when there are a large number of data inserted, by splitting and inserting the task queue in batches, thus realizing serial chained task processing or grouping parallel task processing, improving system robustness and system concurrency. Or preprocess the data, periodically synchronize the data from the back-end storage to the cache system, so as to directly cache the query in the system when the query request occurs, and improve the response speed of the query request. There are many scenarios that apply to task queues, so I won't list them all here. Coming back to the topic of this article, since we are going to learn machinery, we need to know what features it has.
Task retry mechanism delayed task support task callback mechanism task result record support Workflow mode: Chain,Group,Chord multi-Brokers support: Redis, AMQP, AWS SQS multi-Backends support: Redis, Memcache, AMQP, MongoDB architecture
Task queue, in short, is an enlarged producer-consumer model, in which user requests generate tasks, and task producers constantly insert tasks into the queue. At the same time, the processor of the queue acts as a continuous consumption task for consumers. Based on this framework design idea, let's take a look at the simple design structure legend of machinery:
Sender: business push module, which generates specific tasks, which can be split by interaction according to business logic; Broker: stores specific serialized tasks, which are currently supported in machinery to Redis, AMQP, and SQS;Worker: worker processes, responsible for consumer functions, and processing specific tasks; Backend: back-end storage, used to store data on task execution status; e.g
Learning a new thing, I used to write a demo first, first learn to walk, and then learn to run. So let's first look at an example, the function is very simple, asynchronously calculate the sum of 1 to 10.
Take a look at the configuration file code first:
Broker: redis://localhost:6379
Default_queue: "asong"
Result_backend: redis://localhost:6379
Redis:
Max_idle: 3
Max_active: 3
Max_idle_timeout: 240
Wait: true
Read_timeout: 15
Write_timeout: 15
Connect_timeout: 15
Normal_tasks_poll_period: 1000
Delayed_tasks_poll_period: 500
Delayed_tasks_key: "asong"
Here broker and result_backend to achieve.
Master code, get the full version of github:
Func main () {
Cnf,err: = config.NewFromYaml (". / config.yml", false)
If err! = nil {
Log.Println ("config failed", err)
Return
}
Server,err: = machinery.NewServer (cnf)
If err! = nil {
Log.Println ("start server failed", err)
Return
}
/ / Registration task
Err = server.RegisterTask ("sum", Sum)
If err! = nil {
Log.Println ("reg task failed", err)
Return
}
Worker: = server.NewWorker ("asong", 1)
Go func () {
Err = worker.Launch ()
If err! = nil {
Log.Println ("start worker error", err)
Return
}
} ()
/ / task signature
Signature: = & tasks.Signature {
Name: "sum"
Args: [] tasks.Arg {
{
Type: "[] int64"
Value: [] int64 {1,2,3,4,5,6,7,8,9,10}
}
}
}
AsyncResult, err: = server.SendTask (signature)
If err! = nil {
Log.Fatal (err)
}
Res, err: = asyncResult.Get (1)
If err! = nil {
Log.Fatal (err)
}
Log.Printf ("get res is% v\ n", tasks.HumanReadableResults (res))
}
Running result:
INFO: 2020-10-31 11:32:15 file.go:19 Successfully loaded config from file. / config.yml
INFO: 2020-10-31 11:32:15 worker.go:58 Launching a worker with the following settings:
INFO: 11:32:15 on 2020-10-31 worker.go:59-Broker: redis://localhost:6379
INFO: 11:32:15 on 2020-10-31 worker.go:61-DefaultQueue: asong
INFO: 11:32:15 on 2020-10-31 worker.go:65-ResultBackend: redis://localhost:6379
INFO: 11:32:15 on 2020-10-31 redis.go:100 [*] Waiting for messages. To exit press CTRL+C
DEBUG: 11:32:16 on 2020-10-31 redis.go:342 Received new message: {"UUID": "task_9f01be1f-3237-49f1-8464-eecca2e50597", "Name": "sum", "RoutingKey": "asong", "ETA": null, "GroupUUID": "", "GroupTaskCount": 0, "Args": [{"Name": "," Type ":" [] int64 "," Value ": [1 pint 2, 4 5]," Headers ": {}," Priority ": 0 "Immutable": false, "RetryCount": 0, "RetryTimeout": 0, "OnSuccess": null, "OnError": null, "ChordCallback": null, "BrokerMessageGroupId": "," SQSReceiptHandle ":", "StopTaskDeletionOnError": false, "IgnoreWhenTaskNotRegistered": false}
DEBUG: 11:32:16 on 2020-10-31 worker.go:261 Processed task task_9f01be1f-3237-49f1-8464-eecca2e50597. Results = 55
2020-10-31 11:32:16 get res is 55
All right, now let's talk about the code flow above.
Read the configuration file, this step is to configure broker and result_backend, here I choose redis, because the computer happens to have this environment, it is directly used. The Machinery library must be instantiated before using it. This is done by creating an instance of Server. Server is the basic object for Machinery configuration and registration tasks. Before your workders can consume a task, you need to register it with the server. This is achieved by assigning a unique name to the task. In order to consume tasks, you need to have one or more worker running. All you need to run worker is an instance of Server with registered tasks. Each worker will use only registered tasks. For each task in the queue, the Worker.Process () method runs in a goroutine. You can use the second parameter of server.NewWorker to limit the number of worker.Process () calls that run concurrently (per worker). You can invoke a task by passing the Signature instance to the Server instance. Call the HumanReadableResults method to process the reflection value and get the final result. Multi-function 1. Delayed task
The above code is just a simple example of using machinery. In fact, machiney also supports deferring a task. You can delay a task by setting the ETA timestamp field on the task signature.
Eta: = time.Now () .UTC () .Add (time.Second * 20)
Signature.ETA = & eta
two。 Retry the task
You can set up multiple retries before declaring the task a failure. The Fibonacci sequence will be used to separate retry requests over a period of time. You can use two methods here. The first is to set the retryTimeout and RetryCount fields in tsak signature directly, and the retry time will be superimposed according to the Fibonacci sequence.
/ / task signature
Signature: = & tasks.Signature {
Name: "sum"
Args: [] tasks.Arg {
{
Type: "[] int64"
Value: [] int64 {1,2,3,4,5,6,7,8,9,10}
}
}
RetryTimeout: 100
RetryCount: 3
}
Alternatively, you can use return.tasks.ErrRetryTaskLater to return to the task and specify the duration of the retry.
Func Sum (args [] int64) (int64, error) {
Sum: = int64 (0)
For _, arg: = range args {
Sum + = arg
}
Return sum, tasks.NewErrRetryTaskLater ("I said he was wrong", 4 * time.Second)
}
3. Work flow
All we talked about above is running an asynchronous task, but when we do a project, a requirement requires multiple asynchronous tasks to be executed in a scheduled way, so we can use machinery's workflow to do it.
3.1 Groups
A Group is a set of tasks that will be executed independently of each other in parallel. Let's draw a picture so that it looks clearer:
Let's look at a simple example:
/ / group
Group,err: = tasks.NewGroup (signature1,signature2,signature3)
If err! = nil {
Log.Println ("add group failed", err)
}
AsyncResults, err: = server.SendGroupWithContext (context.Background (), group,10)
If err! = nil {
Log.Println (err)
}
For _, asyncResult: = range asyncResults {
Results,err: = asyncResult.Get (1)
If err! = nil {
Log.Println (err)
Continue
}
Log.Printf (
"% v% v% v\ n"
AsyncResult.Signature.Args [0] .Value
Tasks.HumanReadableResults (results)
)
}
Tasks in group are performed in parallel.
3.2 chords
When we are working on a project, there are often callback scenarios, and machiney takes this into account for us. Chord allows you to specify a callback task to be executed after all tasks in the groups are executed.
Let's look at a piece of code:
Callback: = & tasks.Signature {
Name: "call"
}
Group, err: = tasks.NewGroup (signature1, signature2, signature3)
If err! = nil {
Log.Printf ("Error creating group:% s", err.Error ())
Return
}
Chord, err: = tasks.NewChord (group, callback)
If err! = nil {
Log.Printf ("Error creating chord:% s", err)
Return
}
ChordAsyncResult, err: = server.SendChordWithContext (context.Background (), chord, 0)
If err! = nil {
Log.Printf ("Could not send chord:% s", err.Error ())
Return
}
Results, err: = chordAsyncResult.Get (time.Duration (time.Millisecond * 5))
If err! = nil {
Log.Printf ("Getting chord result failed with error:% s", err.Error ())
Return
}
Log.Printf ("% v\ n", tasks.HumanReadableResults (results))
The above example executes task1, task2, and task3 in parallel, aggregates their results and passes them to the callback task.
3.3 chains
A chain is a set of tasks executed one after another, and each successful task triggers the next task in the chain.
Look at this piece of code:
/ / chain
Chain,err: = tasks.NewChain (signature1,signature2,signature3,callback)
If err! = nil {
Log.Printf ("Error creating group:% s", err.Error ())
Return
}
ChainAsyncResult, err: = server.SendChainWithContext (context.Background (), chain)
If err! = nil {
Log.Printf ("Could not send chain:% s", err.Error ())
Return
}
Results, err: = chainAsyncResult.Get (time.Duration (time.Millisecond * 5))
If err! = nil {
Log.Printf ("Getting chain result failed with error:% s", err.Error ())
}
Log.Printf ("% v\ n", tasks.HumanReadableResults (results))
The above example executes task1, then task2, then task3. When a task completes successfully, the result is appended to the end of the parameter list of the next task in chain, and the callback task is finally executed.
At this point, I believe you have a deeper understanding of "what are the functions of machinery?" you might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.