Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Reliability and unreliability in storm

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article will explain in detail the example analysis of reliability and unreliability in storm. The editor thinks it is very practical, so I share it with you for reference. I hope you can get something after reading this article.

Reliability guarantee of 1.Spout

In Storm, message processing reliability begins with Spout. In order to ensure that the data can be processed correctly, storm can track every tuple,storm generated by spout, which involves the processing of ack/fail. If a tuple is processed successfully, then spout will call its ack method, and if it fails, the fail method will be called. Every bolt that processes tuple in topology uses OutputCollector to tell storm whether the current bolt processing is successful.

We know that spout must be able to track all its emitted tuples or its child tuples and be able to resend when these tuples processing fails. So how does spout track tuple? Storm is implemented through a simple anchor mechanism (discussed in bolt Reliability below).

Spout emits root tuple, and root tuple produces sub-tuples. This forms a TupleTree. In this tree, all bolt will ack or fail a tuple. If all bolt in tree ack the tuple passing through it, then the ack method of Spout will be called, indicating that the whole message has been processed. If any one of the tree bolt fail a tuple, or the entire process times out, the fail method of the Spout will be called.

In addition, storm only tells the application side how to handle the bolt through the ack/fail mechanism. It must be up to the application to decide how to handle the success / failure, because the specific data of the failure is not stored inside the storm, but there is also a way to know the failure record, because the ack/fail method of spout comes with a msgId object, and we can set msgId to tuple when we initially launch the tuple. The tuple is then processed in ack/fail. In fact, there is a problem here, that is, after each bolt is executed, it is necessary to explicitly call ack/fail, otherwise there will be no release of tuple resulting in oom. I don't know why storm didn't set bolt's ack as the default call when it was originally designed.

The ISpout interface of Storm defines three reliability-related methods: nextTuple,ack and fail.

Public interface ISpout extends Serializable {void open (Map conf, TopologyContext context, SpoutOutputCollector collector); void close (); void nextTuple (); void ack (Object msgId); void fail (Object msgId);}

We know that when Storm's Spout launches a Tuple, he calls the nextTuple () method. In this process, the first step in ensuring reliability processing is to assign a unique ID to the emitted Tuple and pass that ID to the emit () method:

Collector.emit (new Values ("value1", "value2"), msgId)

The purpose of assigning a unique ID to Tuple is to tell Storm,Spout that you want the Tuple tree generated by the Tuple to tell it that if the Tuple is processed successfully, the ack () method of Spout will be called, whereas if the processing fails, the fail () method of Spout will be called, and the ID of Tuple will be passed into both methods.

It is important to note that although spout has a reliability mechanism, whether this mechanism is enabled or not is under our control. IBasicBolt automatically calls the ack () method after emit a tuple, which is used to implement relatively simple calculations, which is unreliable. If it is IRichBolt, if you want to implement anchor, you must call the ack method yourself, which ensures reliability.

Reliability in 2.Bolt

Reliability in Bolt is achieved in two steps:

Anchor the original Tuple while launching the derived Tuple

Do ack or fail processing on each Tuples

Anchor A Tuple means that an association is established between the input Tuple and its derived Tuple, and the associated Tuple is added to the Tuple tree. We can anchor a Tuple in the following ways:

Collector.emit (tuple, new Values (word))

If we launch the new tuple without transmitting the element tuple at the same time, then the newly launched Tuple will not participate in the whole reliability mechanism, and their fail will not cause the retransmission of the root tuple, and we will become the unanchor:

Collector.emit (new Values (word))

Ack and fail a tuple operation method:

This .promotor.ack (tuple); this .promotor.fail (tuple)

As mentioned above, the IBasicBolt implementation class does not care about ack/fail, and the ack/fail of spout is completely determined by the ack/fail of bolt. The BasicOutputCollector parameter of its execute method does not provide an ack/fail method for you to call. It is equivalent to ignoring the ack/fail behavior of the bolt.

In the IRichBolt implementation class, if OutputCollector.emit (oldTuple,newTuple) is called to emit tuple (anchoring) in this way, then the ack/fail of the later bolt will affect the spout ack/fail. If collector.emit (newTuple) emits tuple in this way (called anchoring in storm), it is equivalent to disconnecting the influence of the ack/fail of the later bolt on spout. Spout will immediately decide which ack/fail to call spout according to the ack/fail in front of the current bolt. So the success or failure of a bolt behind a bolt doesn't matter to you, you can ignore it directly in this way. Some bolt fail in the middle will not affect the subsequent bolt execution, but will immediately trigger the fail of spout. Equivalent to a short circuit, although the later bolt is also implemented, but ack/fail is no longer meaningful to spout. That is, any fail in the bolt collection will immediately trigger the fail method of the spout. The ack method requires all bolt calls to ack to trigger. Therefore, IBasicBolt is more suitable for filter or simple calculation.

3. Summary

The reliability of storm is determined by both spout and bolt. Storm uses anchor mechanism to ensure the reliability of processing. If a tuple emitted by spout is fully processed, the ack method of spout will be called, and if it fails, its fail method will be called. In bolt, anchor a tuple in emit (oldTuple,newTuple). If the processing is successful, you need to call the ack method of bolt, and if it fails, call its fail method. A tuple and its child tuple constitute a tupletree. When all the tuple in the tree is completed within a specified time, the ack of the spout will be called, but when any tuple in the tree fails, the fail method of the spout will be called.

The IBasicBolt class automatically calls the ack/fail method, while IRichBolt requires us to manually call the ack/fail method. We can specify the processing completion time of a tuple through the TOPOLOGY_MESSAGE_TIMEOUT_SECS parameter, and if this time is not finished, spout will also call the fail method.

4. A reliable example of WordCount

A spout for reliability:

Public class ReliableSentenceSpout extends BaseRichSpout {private static final long serialVersionUID = 1L; private ConcurrentHashMap pending; private SpoutOutputCollector collector; private String [] sentences = {"my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas"}; private int index = 0; public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("sentence")) } public void open (Map config, TopologyContext context, SpoutOutputCollector collector) {this. Collector = collector; this. Pending = new ConcurrentHashMap ();} public void nextTuple () {Values values = new Values (sentences [index]); UUID msgId = UUID. RandomUUID (); this. Pending.put (msgId, values); this. Collector.emit (values, msgId), index++; if (index > = sentences. Length) {index = 0;} / / Utils.waitForMillis (1);} public void ack (Object msgId) {this. Pending.remove (msgId);} public void fail (Object msgId) {this. Collector.emit (this. Pending.get (msgId), msgId);}}

A bolt for reliability:

Public class ReliableSplitSentenceBolt extends BaseRichBolt {private OutputCollector collector; public void prepare (Map config, TopologyContext context, OutputCollector collector) {this. Collector = collector;} public void execute (Tuple tuple) {String sentence = tuple.getStringByField ("sentence"); String [] words = sentence.split (""); for (String word: words) {this. Collector.emit (tuple, new Values (word));} this. Collector.ack (tuple);} public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("word"));}}

In this example, we have achieved the reliability of storm, and if tuple fails, it will be resend until the process is successful. Here pending is a map, in order to realize the failure retransmission of tuple. The topology.max.spout.pending attribute in storm explains:

1. The number of batch active at the same time, you must set the number of batch to be processed at the same time. You can specify it through "topology.max.spout.pending". If you don't specify it, the default is 1.

The meaning of 2.topology.max.spout.pending is to cache the tuple sent by spout. When the dirty bolt and topology.max.spout.pending tuple are not consumed, the spout will stop and wait for the downstream bolt to consume. When the number of tuple is less than the number of topology.max.spout.pending, spout will continue to read messages from the message source. (this property handles reliable messages only).

If a transaction is used, it represents the number of batch processed at the same time, and if it is not a transaction, it is understood as the second type.

All in all, if you do not need to guarantee reliability, spout inherits BaseRichSpout,bolt inherits BaseBasicBolt, and they implement some internal methods, automatic ack, we do not need to care about ack and fail; if you want to ensure reliability, spout implements the IRichSpout interface, sends tuple with msgId, custom fail and ack methods, bolt inherits BaseRichBolt, and sends tuple with the original tuple and manual ack.

This is the end of this article on "sample Analysis of Reliability and unreliability in storm". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report