Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Detailed explanation of Storm fault-tolerant mechanism Acker and actual combat cases

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

There is a special Executor in Storm called acker, and they are responsible for tracking the Tuple tree of every Tuple sent out by spout. When acker finds that a Tuple tree has been processed, it tells the framework to call back ack () of Spout, otherwise call back fail () of Spout.

Acker's tracking algorithm is one of the main breakthroughs of Storm. For any large Tuple tree, it only needs a constant 20 bytes to track.

What we expect is that if a Tuple fails to be executed by the Bolt, the Spout side can resend the Tuple. But unfortunately, the framework will not automatically resend, we need to manually code the implementation. Follow-up to give you actual combat cases!

What is a Tuple tree?

The Spout class code is as follows:

Package les19.Ack_Fail

Import java.io.BufferedReader

Import java.io.FileInputStream

Import java.io.InputStreamReader

Import java.util.Map

Import java.util.UUID

Import java.util.concurrent.ConcurrentHashMap

Import org.apache.storm.spout.SpoutOutputCollector

Import org.apache.storm.task.TopologyContext

Import org.apache.storm.topology.IRichSpout

Import org.apache.storm.topology.OutputFieldsDeclarer

Import org.apache.storm.tuple.Fields

Import org.apache.storm.tuple.Values

Public class AckSpout implements IRichSpout {

/ * *

*

, /

Private static final long serialVersionUID = 1L

FileInputStream fis

InputStreamReader isr

BufferedReader br

Private ConcurrentHashMap _ pending;// Thread-safe Map, which stores emit-passed tuple

Private ConcurrentHashMap fail_pending;// stores failed tuple and its number of failures

SpoutOutputCollector collector = null

String str = null

@ Override

Public void nextTuple () {

Try {

While ((str = this.br.readLine ())! = null) {

/ / filter action

UUID msgId = UUID.randomUUID ()

String arr [] = str.split ("\ t")

String date = arr [2] .substring (0,10)

String orderAmt = arr [1]

Values val = new Values (date,orderAmt)

This._pending.put (msgId, val)

Collector.emit (val, msgId)

System.out.println ("_ pending.size () =" + _ pending.size ())

}

} catch (Exception e) {

/ / TODO: handle exception

}

}

@ Override

Public void close () {

/ / TODO Auto-generated method stub

Try {

Br.close ()

Isr.close ()

Fis.close ()

} catch (Exception e) {

/ / TODO: handle exception

E.printStackTrace ()

}

}

@ Override

/ / initialize function

Public void open (Map conf, TopologyContext context

SpoutOutputCollector collector) {

Try {

This.collector = collector

This.fis = new FileInputStream ("order.log")

This.isr = new InputStreamReader (fis, "UTF-8")

This.br = new BufferedReader (isr)

_ pending = new ConcurrentHashMap ()

Fail_pending = new ConcurrentHashMap ()

} catch (Exception e) {

E.printStackTrace ()

}

/ / TODO Auto-generated method stub

}

@ Override

Public void declareOutputFields (OutputFieldsDeclarer declarer) {

/ / TODO Auto-generated method stub

Declarer.declare (new Fields ("date", "orderAmt"))

}

@ Override

Public Map getComponentConfiguration () {

/ / TODO Auto-generated method stub

Return null

}

@ Override

Public void ack (Object msgId) {

/ / TODO Auto-generated method stub

System.out.println ("_ pending size total:" + _ pending.size ())

System.out.println ("spout ack:" + msgId.toString () + "- -" + msgId.getClass ())

This._pending.remove (msgId)

System.out.println ("_ pending size remaining:" + _ pending.size ())

}

@ Override

Public void activate () {

/ / TODO Auto-generated method stub

}

@ Override

Public void deactivate () {

/ / TODO Auto-generated method stub

}

@ Override

Public void fail (Object msgId) {

/ / TODO Auto-generated method stub

System.out.println ("spout fail:" + msgId.toString ())

Integer fail_count = fail_pending.get (msgId); / / number of failures to get the Tuple

If (fail_count = = null) {

Fail_count = 0

}

Fail_count + +

If (fail_count > = 3) {

/ / the number of retries has expired. No re-emit will be carried out.

Fail_pending.remove (msgId)

} else {

/ / record the number of failures of the tuple

Fail_pending.put (msgId, fail_count)

/ / resend

This.collector.emit (this._pending.get (msgId), msgId)

}

}

}

Bolt is as follows:

Package les19.Ack_Fail

Import java.util.Map

Import org.apache.storm.task.OutputCollector

Import org.apache.storm.task.TopologyContext

Import org.apache.storm.topology.IRichBolt

Import org.apache.storm.topology.OutputFieldsDeclarer

Import org.apache.storm.tuple.Fields

Import org.apache.storm.tuple.Tuple

Import org.apache.storm.tuple.Values

Public class AckBolt implements IRichBolt {

/ * *

*

, /

Private static final long serialVersionUID = 1L

OutputCollector collector = null

TopologyContext context = null

@ Override

Public void cleanup () {

/ / TODO Auto-generated method stub

}

Int num = 0

String url = null

String session_id = null

String date = null

String province_id = null

@ Override

Public void execute (Tuple input) {

Try {

Date = input.getStringByField ("date")

Double orderAmt = Double.parseDouble (input.getStringByField ("orderAmt"))

Collector.emit (input,new Values (date,orderAmt)); / / pay attention to the parameters. The first parameter is Tuple itself.

Collector.ack (input)

/ / Thread.sleep (300)

} catch (Exception e) {

Collector.fail (input)

E.printStackTrace ()

}

}

/ / initialization, corresponding to the open function of spout

@ Override

Public void prepare (Map stormConf, TopologyContext context

OutputCollector collector) {

/ / TODO Auto-generated method

This.context = context

This.collector = collector

}

@ Override

Public void declareOutputFields (OutputFieldsDeclarer declarer) {

/ / TODO Auto-generated method stub

Declarer.declare (new Fields ("date", "orderAmt"))

}

@ Override

Public Map getComponentConfiguration () {

/ / TODO Auto-generated method stub

Return null

}

}

The TOPO class is as follows:

Package les19.Ack_Fail

Import org.apache.storm.Config

Import org.apache.storm.LocalCluster

Import org.apache.storm.StormSubmitter

Import org.apache.storm.topology.TopologyBuilder

Public class Ack_FailTopo {

/ * *

* @ param args

, /

Public static void main (String [] args) {

/ / TODO Auto-generated method stub

TopologyBuilder builder = new TopologyBuilder ()

Builder.setSpout ("spout", new AckSpout (), 1)

Builder.setBolt ("bolt", new AckBolt (), 1) shuffleGrouping ("spout")

Config conf = new Config ()

/ / conf.put (Config.TOPOLOGY_ACKER_EXECUTORS, 0)

Conf.setDebug (false)

If (args.length > 0) {

Try {

StormSubmitter.submitTopology (args [0], conf, builder.createTopology ())

} catch (Exception e) {

E.printStackTrace ()

}

} else {

LocalCluster localCluster = new LocalCluster ()

LocalCluster.submitTopology ("mytopology", conf, builder.createTopology ())

}

}

}

To learn more, see the Storm video tutorial http://edu.51cto.com/course/course_id-9041.html on my 51CTO

This section comes from lectures 18-19.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report