Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to understand the parallelism, Grouping policy and reliable message processing mechanism of Storm

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

How to understand the parallelism of Storm, Grouping strategy and reliable message processing mechanism, I believe that many inexperienced people do not know what to do. Therefore, this paper summarizes the causes and solutions of the problem. Through this article, I hope you can solve this problem.

Concept:

Workers (JVMs): one or more independent JVM processes can be run on a node. A Topology can contain one or more worker (running in parallel on different machine), so the worker process is to execute a subset of the topology, and the worker can only correspond to one topology.

Executors (threads): there are multiple Java threads running in a worker JVM process. An executor thread can execute one or more tasks. However, by default, there is only one task per executor. A worker can contain one or more executor, and each component (spout or bolt) corresponds to at least one executor, so it can be said that executor executes a subset of compenent, while an executor can only correspond to one component.

Tasks (bolt/spout instances): Task is a specific processing logical object, and each Spout and Bolt are executed as many task throughout the cluster. Each task corresponds to a thread, while stream grouping defines how to emit tuple from one pile of task to another task. You can call TopologyBuilder.setSpout and TopBuilder.setBolt to set the parallelism-that is, how many task there are.

Configure parallelism

The concurrency can be configured in multiple places in storm. The priority is: defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration.

The number of worker processes can be configured in the configuration file and code. Worker is the execution process, so considering the effect of concurrency, the number should at least be larger than the number of machines.

The number of executor, the number of concurrent threads of component, can only be configured in the code (through the parameters of setBolt and setSpout), for example, setBolt ("green-bolt", new GreenBolt (), 2)

The number of tasks, can not be configured, default and executor1:1, can also be configured through setNumTasks ()

The number of worker for Topology is set by config, that is, the number of worker (java) processes executing the topology. It can be adjusted arbitrarily through the storm rebalance command.

Config conf = new Config (); conf.setNumWorkers (2); / / use two worker processestopologyBuilder.setSpout ("blue-spout", new BlueSpout (), 2); / / set parallelism hint to 2topologyBuilder.setBolt ("green-bolt", new GreenBolt (), 2) .setNumTasks (4). ShuffleGrouping ("blue-spout"); / / set tasks number to 4topologyBuilder.setBolt ("yellow-bolt", new YellowBolt (), 6). ShuffleGrouping ("green-bolt"); StormSubmitter.submitTopology ("mytopology", conf, topologyBuilder.createTopology ())

Dynamic change of parallelism

Storm supports dynamically changing (increasing or decreasing) the number of worker processes and the number of executors without restart topology, called rebalancing. Through Storm web UI, or through the storm rebalance command:

Storm rebalance mytopology-n 5-e blue-spout=3-e yellow-bolt=10 flow packet strategy-Stream Grouping

Stream Grouping, telling topology how to send tuple between two components

One of the steps in defining a topology is to define what streams each bolt receives as input. Stream grouping is used to define a stream if data is assigned to multiple tasks on the bolts.

There are seven types of stream grouping in Storm. You can also implement custom flow grouping by implementing the CustomStreamGrouping interface.

1. Shuffle Grouping

Randomly group and randomly distribute the tuple in the stream to ensure that each bolt task receives roughly the same number of tuple.

2. Fields Grouping

Grouping by field, for example, by "user-id" field, then tuple with the same "user-id" will be assigned to a task in the same Bolt, while different "user-id" may be assigned to a different task.

3. All Grouping

Broadcast transmission, for every tuple, all bolts will be received

4. Global Grouping

Global grouping, the entire stream is assigned to one of the task of one of the bolt in the storm. More specifically, it is assigned to the task with the lowest id value.

5. None Grouping

No grouping, this grouping means that stream doesn't care how it is grouped. At present, this grouping has the same effect as Shuffle grouping, except that storm will put the bolt using none grouping into the same thread of the bolt subscriber (if possible).

6. Direct Grouping

Directed grouping, which is a special grouping method, which means that the sender of the message (tuple) specifies which task of the message receiver will process the message. Only message flows declared as Direct Stream can declare this grouping method. And the message tuple must be emitted using the emitDirect method. The message handler can get the id of the task that processes its message through TopologyContext (the OutputCollector.emit method also returns the id of the task)

7. Local or shuffle grouping

Grouped locally or randomly. If the target bolt has one or more task in the same working process as the task of the source bolt, the tuple will be randomly sent to these tasks in the same process. Otherwise, it is consistent with the normal Shuffle Grouping behavior.

Reliable processing mechanism of messages

In storm, a reliable information processing mechanism starts with spout. A spout that provides a reliable processing mechanism needs to record the tuple it sends out, and the spout can retransmit when the downstream bolt fails to process the tuple or sub-tuple.

Storm sends a tuple by calling Spout's nextTuple (). To achieve reliable message processing, first take a unique ID to each emitted tuple and pass ID as an argument to the emit () method of SoputOutputCollector: collector.emit ("value1", "value2"), msgId); assign ID to tuple to tell the Storm system that spout receives notifications from all nodes on the tuple tree regardless of success or failure. If the processing is successful, the ack () method of spout will acknowledge the message numbered msgId; if the processing fails or times out, the fail () method will be called.

There are two steps for bolt to realize a reliable information processing mechanism: 1. When emitting a derived tuple, you need to anchor the read tuple;2. The reply or error is confirmed when the message is processed successfully or failed, respectively.

Anchoring a tuple means establishing a correspondence between the read-in tuple and the derived tuple so that the downstream bolt can be added to the tuple tree structure through reply acknowledgement, error, or timeout. You can anchor one or a group of tuple:collector.emit (tuple, new Values (word)) by calling an overloaded function of emit () of OutputCollector.

Tuple that is not anchored (collector.emit (new Values (word));) has no effect on the reliability of the data flow. If an unanchored tuple fails downstream processing, the original root tuple will not be resent.

The timeout can be configured with the task-level parameter Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, and the default timeout value is 30 seconds.

There is a special set of tasks called "acker" in the Storm system, which are responsible for tracking every message in DAG (directed acyclic graph). The acker task saves the mapping of spout messages id to a pair of values. The first value is spout's task id, through which id,acker you know which spout task to notify when message processing is complete. The second value is a number of 64bit, which we call "ack val", which is the result of the XOR calculation of the random id of all messages in the tree. Ack val represents the state of the whole tree, no matter how big the tree is, you only need this fixed size number to track the whole tree. When a message is created and answered, the same message, id, is sent to do XOR.

Whenever acker finds that a tree has an ack value of 0, it knows that the tree has been completely processed. Because the random ID of the message is a value of 64bit, there is very little chance that ack val will be set to 0 before the tree is finished processing. Assuming that you send 10,000 messages per second, it will take at least 50 million years to have a chance to make an error. Even so, there will be data loss only if the message does fail to be processed!

There are three ways to remove the reliability of a message:

1. Set the parameter Config.TOPOLOGY_ACKERS to 0. With this method, when Spout sends a message, its ack method will be called immediately

2. When Spout sends a message, the messageID of the message is not specified. You can use this method when you need to turn off specific message reliability.

3. Finally, if you don't care about the reliability of the descendant message derived from a message, the submessage derived from this message should not be anchored when it is sent, that is, the input message is not specified in the emit method. Because these descendant messages are not anchored in any tuple tree, their failure does not cause any spout to resend the message.

After reading the above, have you mastered how to understand the parallelism of Storm, Grouping strategy, and reliable message processing mechanism? If you want to learn more skills or want to know more about it, you are welcome to follow the industry information channel, thank you for reading!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report