Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the reliable way to handle storm messages?

2025-01-31 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article introduces the relevant knowledge of "what is the reliable processing method of storm messages". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!

4.1 introduction

Storm ensures that every message sent by spout is fully processed. This chapter will describe how the storm system achieves this goal, and will detail how developers should use these mechanisms of storm to achieve reliable data processing.

4.2 understand that the message is fully processed

A message (tuple) sent from spout may cause hundreds of messages to be created based on this message.

Let's consider an example of streaming "word statistics":

The storm task reads one complete English sentence at a time from the data source (Kestrel queue); breaks the sentence into separate words, and finally outputs each word and the number of times it has appeared in real time.

In this case, each message sent from the spout (each English sentence) triggers a lot of messages to be created, and the words separated from the sentences are the new messages created.

These messages form a tree structure, which we call "tuple tree", which looks like figure 1:

Figure 1 example tuple tree

Under what conditions does Storm think that a message sent from spout is fully processed? The answer is to meet the following conditions at the same time:

Tuple tree no longer grows

Any message in the tree is identified as processed

If the tuple tree derived from a message is not fully processed within the specified time, the message is considered to have not been fully processed. This timeout value can be configured with the task-level parameter Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, and the default timeout value is 30 seconds.

4.3 Lifecycle of messages

If the message is processed completely or not, what does Storm do next? To understand this, let's take a look at the life cycle of messages sent from spout. Here are the interfaces that spout should implement:

First, Storm requests a message (tuple) from spout using the nextTuple () method of the spout instance. After receiving the request, spout sends one or more messages to its output stream using the SpoutOutputCollector provided in the open method. Each time a message is sent, Spout will provide a message ID to the message, which will be used to identify the message.

Suppose we read the message from the kestrel queue, and Spout will use the ID set by the kestrel queue for the message as the message ID of the message. The format of the message sent to SpoutOutputCollector is as follows:

Next, these messages will be sent to the bolts for subsequent business processing, and the Storm will track the new messages generated by this message. When it is detected that the tuple tree derived from a message is fully processed, Storm calls the ack method in Spout and passes in the messageID of the message as an argument. Similarly, if the processing of a message times out, the fail method of the corresponding Spout of the message will be called, and the messageID of the message will be passed as an argument.

Note: a message will only be called ack or fail by the spout task that sent it. If a spout in the system is run by multiple tasks, the message is answered only by the spout task that created it (ack or fail), and never by other spout tasks.

Let's continue to use the example of reading messages from the kestrel queue to illustrate what spout needs to do with high reliability (assuming that the name of the spout is KestrelSpout).

Let's start with a brief overview of kestrel message queues:

When KestrelSpout reads a message from a kestrel queue, it "opens" a message in the queue. This means that the message is not actually deleted from the queue, but is set to the "pending" state, which waits for a reply from the client before it is actually deleted from the queue. Messages in the "pending" state are not seen by other clients. In addition, if a client is unexpectedly disconnected, all messages "opened" by the client will be rejoined to the queue. When a message is "opened", the kestrel queue also provides a unique identity for the message.

KestrelSpout uses this unique identity as the messageID of this tuple. Later, when ack or fail is called, KestrelSpout sends the ack or fail to the kestrel queue along with the messageID, and kestrel actually removes the message from the queue or puts it back into the queue.

4.4reliable dependent API

To use the reliable processing features provided by Storm, we need to do two things:

Whenever a new node is created in tuple tree, we need to explicitly notify Storm

When we finish processing a single message, we need to tell Storm about the changed state of the tuple tree.

With the above two steps, storm can detect when a tuple tree has been fully processed and will call the relevant ack or fail methods. Storm provides a simple and straightforward way to accomplish the above two steps.

Add a new node to the node specified in the tuple tree, which we call anchoring. Anchoring occurs at the same time as we send the message. To make it easier to illustrate, let's use the following code as an example. The bolt for this example breaks down the message containing the whole sentence into a series of sub-messages, each containing one word.

Each message is anchored in this way: the input message is taken as the first parameter of the emit method. Because the word message is anchored to the input message, which is the root node of the tuple tree sent by spout, if any word message processing fails, the derived tuple tree and that spout message will be re-sent.

In contrast, let's look at how Storm handles emit messages in the following way:

If the message is sent in this way, it will cause the message to not be anchored. If message processing in this tuple tree fails, the root message that derives this tuple tree will not be resent. Depending on the fault tolerance level of the task, it is sometimes appropriate to send an unanchored message.

An output message can be anchored to one or more input messages, which is useful when doing join or aggregation. The failure of processing a message that is multi-anchored can cause multiple spout messages associated with it to be resent. Multiple anchoring is achieved by specifying multiple input messages in the emit method:

Multiple anchors add anchored messages to multiple tuple tree.

Note: multiple bindings may break the traditional tree structure to form a DAGs (directed acyclic graph), as shown in figure 2:

Fig. 2 Diamond structure composed of multiple anchors

The implementation of Storm can handle DAGs like a tree.

Anchoring shows how to add a message to a specified tuple tree, and the next section of the highly reliable processing API will describe to you what to do when we finish processing a single message in the tuple tree. This is achieved through OutputCollector's ack and fail methods. Looking back at the example SplitSentence, you can see that when all the word messages have been sent, the entered message representing the sentence will be acked.

Each message processed must indicate success or failure (acked or failed). Storm uses memory to track the processing of each message. If the message being processed is not answered, the memory will be exhausted sooner or later! >

Many bolt follow a specific process: read a message, send its derived submessage, and reply to the message at the end of the execute. General filters (filter) or simple processing functions are such applications. Storm has a BasicBolt interface that encapsulates the above process. The example SplitSentence can be overridden using BasicBolt:

In this way, the code is a little simpler than before, but the functionality is the same. Messages sent to the BasicOutputCollector are automatically anchored to the input message, and the input message is automatically answered when the execute execution is complete.

In many cases, a message requires a delayed reply, such as aggregation or join. Only after a result is obtained from a set of input messages will all previous input messages be answered. And aggregation and join are multiple anchors to output messages most of the time. However, these features are not something that IBasicBolt can handle.

4.5 efficient implementation of tuple tree

There is a special set of tasks called "acker" in the Storm system, which are responsible for tracking every message in DAG (directed acyclic graph). Whenever a DAG is found to be fully processed, it sends a signal to the spout task that created the root message. The degree of parallelism of acker tasks in the topology can be set by the configuration parameter Config.TOPOLOGY_ACKERS. The default parallelism of acker tasks is 1, and when there are a large number of messages in the system, the concurrency of acker tasks should be increased appropriately.

In order to understand the reliability processing mechanism of Storm, we start with the study of the life cycle of a message and the management of tuple tree. When a message is created (whether in spout or bolt), the system assigns a random value of 64bit to the message as id. These random id are used by acker to track the tuple tree derived from spout messages.

Each message knows the id of the root message corresponding to the tuple tree in which it is located. Whenever a new message is generated by bolt, the messageId of the corresponding root message in the tuple tree is copied into that message. When this message is answered, it sends information about the tuple tree change to the acker that tracks the tree. For example, he will tell acker: this message has been processed, but I have derived some new messages, please help track it.

For example, suppose messages D and E are derived from message C, which demonstrates how tuple tree changes when message C is answered.

Because D and E are added to the tuple tree when C is removed from the tree, tuple tree is not prematurely considered to be fully processed.

Let's take a closer look at how Storm tracks tuple tree. As mentioned earlier, there can be any number of acker in the system, so how does it know which acker to notify whenever a message is created or answered?

The system uses a hashing algorithm based on the messageId of the spout message to determine which acker tracks the tuple tree derived from this message. Because each message knows the messageId of the corresponding root message, it knows which acker it should communicate with.

When spout sends a message, it notifies the corresponding acker that a new root message is generated, and acker creates a new tuple tree. When acker finds that the tree is fully processed, he notifies the corresponding spout task.

How is tuple tracked? There are thousands of messages in the system, and if you build a tree for each message sent by spout, memory will soon run out. Therefore, different strategies must be used to track each message. Thanks to the new tracking algorithm, Storm only needs fixed memory (about 20 bytes) to track a tree. This algorithm is the core of the correct operation of storm, but also the biggest breakthrough of storm.

The acker task saves the mapping of spout messages id to a pair of values. The first value is spout's task id, through which id,acker you know which spout task to notify when message processing is complete. The second value is a number of 64bit, which we call "ack val", which is the XOR result of the random id of all messages in the tree. Ack val represents the state of the whole tree, no matter how big the tree is, you only need this fixed size number to track the whole tree. When a message is created and answered, the same message, id, is sent to do XOR.

Whenever acker finds that a tree has an ack value of 0, it knows that the tree has been completely processed. Because the random ID of the message is a value of 64bit, there is very little chance that ack val will be set to 0 before the tree is finished processing. Assuming that you send 10,000 messages per second, it will take at least 50 million years to have a chance to make an error. Even so, there will be data loss only if the message does fail to be processed!

4.6 Select the appropriate reliability level

Acker tasks are lightweight, so you don't need much acker in the topology. You can observe the throughput of acker tasks through Storm UI, and if it looks like there is not enough throughput, it means that additional acker needs to be added.

If you do not require that every message must be processed (you are allowed to lose some information during processing), you can turn off the reliable message processing mechanism to achieve better performance. Turning off the reliable processing mechanism of messages means that the number of messages in the system will be halved (each message does not need to be answered). In addition, turning off reliable processing of messages can reduce the size of messages (there is no need for each tuple to record its root id), thereby saving bandwidth.

There are three ways to relate to the reliable processing mechanism of a message:

Set the parameter Config.TOPOLOGY_ACKERS to 0, through which when Spout sends a message, its ack method will be called immediately

The second method is that when Spout sends a message, it does not specify the messageID of the message. You can use this method when you need to turn off specific message reliability.

Finally, if you don't care about the reliability of the descendant message derived from a message, the submessage derived from this message is not anchored when it is sent, that is, the input message is not specified in the emit method. Because these descendant messages are not anchored in any tuple tree, their failure does not cause any spout to resend the message.

4.7 Fault tolerance at all levels of the cluster

By now, you have understood the reliability mechanism of Storm and know how to choose different reliability levels to meet the requirements. Next, let's take a look at how Storm ensures that data is not lost in a variety of situations.

3.7.1 Task level failure

The message caused by the bolt task crash was not answered. At this point, all messages associated with this bolt task in acker will fail because of a timeout, and the fail method of the corresponding spout will be called.

The acker task failed. If the acker task itself fails, all messages it holds before it fails will fail because of a timeout. The fail method of Spout will be called.

The Spout task failed. In this case, the external device that the Spout task docks (such as MQ) is responsible for the integrity of the message. For example, when the client is abnormal, the kestrel queue will put all messages in the pending state back into the queue.

4.7.2 Task slot (slot) failure

Worker failed. Each worker contains several bolt (or spout) tasks. Supervisor is responsible for monitoring these tasks, and when worker fails, supervisor attempts to restart it locally.

Supervisor failed. Supervisor is stateless, so the failure of supervisor will not affect the currently running task, as long as it is restarted in time. Supervisor is not bootstrapped and requires external monitoring to restart in time.

Nimbus failed. Nimbus is stateless, so the failure of nimbus does not affect the currently running task (when nimbus fails, a new task cannot be submitted), as long as it is restarted in time. Nimbus is not bootstrapped and requires external monitoring to restart in time.

4.7.3. Cluster node (machine) failure

Node failure in the storm cluster. At this point, nimbus will transfer all running tasks on this machine to other available machines to run.

Node failure in the zookeeper cluster. Zookeeper ensures that less than half of the machines can still operate normally when they are down, and the faulty machines can be repaired in time.

This is the end of the content of "what is the reliable handling of storm messages?" Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report