Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Case study of Storm Reliability acker

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "Storm reliability acker case study", interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Now let the editor take you to learn the "Storm Reliability acker case study"!

Basic principles of Storm Reliability Analysis

The worker process is dead

The worker process hangs and the storm cluster restarts a worker process.

The supervisor process is dead

The failure of the supervisor process does not affect the previously submitted topology, but tasks cannot be assigned to this node later, because the node is no longer a member of the cluster.

The nimbus process dies (there is a problem with HA) and fails quickly.

The failure of the nimbus process will not affect the previously submitted topology, but the new topology cannot be submitted to the cluster at a later stage. There is a problem with HA in versions below 1. 0, which has been fixed since 1. 0 and there can be multiple alternative nimbus.

Node downtime

Ack/fail message acknowledgement mechanism (to ensure that a tuple is fully processed)

When you launch tuple in spout, you need to send messageid at the same time, which is equivalent to turning on the message confirmation mechanism.

If you have more tuple in your topology, then setting the number of acker a little more will be more efficient.

Set the number of acker in a topology through config.setNumAckers (num). The default value is 1.

Note: acker uses a special algorithm so that the amount of memory needed to track the status of each spout tuple is constant (20 bytes) (you can learn about its algorithm. Baidu storm acker can find relevant analysis articles without doing an in-depth understanding of this algorithm for the time being).

Note: if a tuple is not successfully processed within the specified timeout (the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS default is 30 seconds), then the tuple will be considered to have failed.

Fully handle tuple

The fact that a tuple is fully processed in storm means that the tuple and all tuple derived from this tuple are processed successfully.

Reliability acker case

As mentioned earlier, if you want to use the qck/fail confirmation mechanism, you need to do the following:

1. Override ack and fail methods in our spout 2.spout needs to carry messageId3.bolt when sending tuple after success or failure processing, take the initiative to callback

According to the above instructions, the program code is as follows, pay attention to these points:

Package cn.xpleaf.bigdata.storm.acker;import cn.xpleaf.bigdata.storm.utils.StormUtil;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.StormTopology;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt Import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import java.util.Date;import java.util.Map;import java.util.UUID;/** * 1 °, the case of realizing the summation of numbers: the data source constantly generates increasing numbers and accumulates the resulting numbers. *

* Storm components: Spout, Bolt, data is Tuple, use Topology in main to associate spout and bolt * components of MapReduce: Mapper and Reducer, data is Writable, associate them through a job in main *

* Adapter pattern (Adapter): BaseRichSpout, which overrides some unnecessary methods in the inheritance interface, but the rewritten code does not implement any functionality. * We call this adapter pattern *

* storm message confirmation mechanism-Reliability analysis * acker * fail * / public class AckerSumTopology {/ * data source * / static class OrderSpout extends BaseRichSpout {private Map conf; / / current component configuration information private TopologyContext context; / / current component context object private SpoutOutputCollector collector / / send the component of tuple @ Override public void open (Map conf, TopologyContext context, SpoutOutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector;} private long num = 0 Core method for receiving data * / @ Override public void nextTuple () {String messageId = UUID.randomUUID () .toString () .replaceAll ("-", ") .toLowerCase (); / / while (true) {num++; StormUtil.sleep (1000) Order amount generated by System.out.println ("current time" + StormUtil.df_yyyyMMddHHmmss.format (new Date ()) + ":" + num); this.collector.emit (new Values (num), messageId) / /}} / * is the description of the data sent out schema * / @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("order_cost")) } @ Override public void ack (Object msgId) {System.out.println (msgId + "the corresponding message was processed successfully");} @ Override public void fail (Object msgId) {System.out.println (msgId + "- > corresponding message failed to be processed") }} / * calculation and Bolt node * / static class SumBolt extends BaseRichBolt {private Map conf; / / current component configuration information private TopologyContext context; / / current component context object private OutputCollector collector / / send the component of tuple @ Override public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.conf = conf; this.context = context; this.collector = collector;} private Long sumOrderCost = 0L / * * the core method for processing data * / @ Override public void execute (Tuple input) {Long orderCost = input.getLongByField ("order_cost"); sumOrderCost + = orderCost; if (orderCost% 10 = = 1) {/ / collector.fail (input) every 10 simulated messages } else {System.out.println ("Thread ID:" + Thread.currentThread (). GetId () + ", the total merchandise transaction volume from the mall website to" + StormUtil.df_yyyyMMddHHmmss.format (new Date ()) + "+ sumOrderCost); collector.ack (input);} StormUtil.sleep (1000) } / * if the current bolt is the last processing unit, this method can build the topology regardless of * / @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {}} / *, which is equivalent to building Job * / public static void main (String [] args) throws Exception {TopologyBuilder builder = new TopologyBuilder () in MapReduce. / * set dag (directed acyclic graph) of spout and bolt * / builder.setSpout ("id_order_spout", new OrderSpout ()); builder.setBolt ("id_sum_bolt", new SumBolt (), 1) .shuffleGrouping ("id_order_spout") / / specify the upstream components of the data through different data flow methods / / build topology StormTopology topology = builder.createTopology () using builder; String topologyName = AckerSumTopology.class.getSimpleName (); / / name of the topology Config config = new Config () / / Config () object inherits from HashMap, but encapsulates some basic configuration / / startup topology, using LocalCluster for local startup and StormSubmitter if for cluster startup (args = = null | | args.length

< 1) { // 没有参数时使用本地模式,有参数时使用集群模式 LocalCluster localCluster = new LocalCluster(); // 本地开发模式,创建的对象为LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } }} 运行后(本地运行或上传到集群上提交作业),输出如下: 当前时间20180413215706产生的订单金额:1当前时间20180413215707产生的订单金额:27a4ce596fd3a40659f2d7f80a7738f55---->

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report