In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
There is a special Executor in Storm called acker, and they are responsible for tracking the Tuple tree of every Tuple sent out by spout. When acker finds that a Tuple tree has been processed, it tells the framework to call back ack () of Spout, otherwise call back fail () of Spout.
Acker's tracking algorithm is one of the main breakthroughs of Storm. For any large Tuple tree, it only needs a constant 20 bytes to track.
What we expect is that if a Tuple fails to be executed by the Bolt, the Spout side can resend the Tuple. But unfortunately, the framework will not automatically resend, we need to manually code the implementation. Follow-up to give you actual combat cases!
What is a Tuple tree?
The Spout class code is as follows:
Package les19.Ack_Fail
Import java.io.BufferedReader
Import java.io.FileInputStream
Import java.io.InputStreamReader
Import java.util.Map
Import java.util.UUID
Import java.util.concurrent.ConcurrentHashMap
Import org.apache.storm.spout.SpoutOutputCollector
Import org.apache.storm.task.TopologyContext
Import org.apache.storm.topology.IRichSpout
Import org.apache.storm.topology.OutputFieldsDeclarer
Import org.apache.storm.tuple.Fields
Import org.apache.storm.tuple.Values
Public class AckSpout implements IRichSpout {
/ * *
*
, /
Private static final long serialVersionUID = 1L
FileInputStream fis
InputStreamReader isr
BufferedReader br
Private ConcurrentHashMap _ pending;// Thread-safe Map, which stores emit-passed tuple
Private ConcurrentHashMap fail_pending;// stores failed tuple and its number of failures
SpoutOutputCollector collector = null
String str = null
@ Override
Public void nextTuple () {
Try {
While ((str = this.br.readLine ())! = null) {
/ / filter action
UUID msgId = UUID.randomUUID ()
String arr [] = str.split ("\ t")
String date = arr [2] .substring (0,10)
String orderAmt = arr [1]
Values val = new Values (date,orderAmt)
This._pending.put (msgId, val)
Collector.emit (val, msgId)
System.out.println ("_ pending.size () =" + _ pending.size ())
}
} catch (Exception e) {
/ / TODO: handle exception
}
}
@ Override
Public void close () {
/ / TODO Auto-generated method stub
Try {
Br.close ()
Isr.close ()
Fis.close ()
} catch (Exception e) {
/ / TODO: handle exception
E.printStackTrace ()
}
}
@ Override
/ / initialize function
Public void open (Map conf, TopologyContext context
SpoutOutputCollector collector) {
Try {
This.collector = collector
This.fis = new FileInputStream ("order.log")
This.isr = new InputStreamReader (fis, "UTF-8")
This.br = new BufferedReader (isr)
_ pending = new ConcurrentHashMap ()
Fail_pending = new ConcurrentHashMap ()
} catch (Exception e) {
E.printStackTrace ()
}
/ / TODO Auto-generated method stub
}
@ Override
Public void declareOutputFields (OutputFieldsDeclarer declarer) {
/ / TODO Auto-generated method stub
Declarer.declare (new Fields ("date", "orderAmt"))
}
@ Override
Public Map getComponentConfiguration () {
/ / TODO Auto-generated method stub
Return null
}
@ Override
Public void ack (Object msgId) {
/ / TODO Auto-generated method stub
System.out.println ("_ pending size total:" + _ pending.size ())
System.out.println ("spout ack:" + msgId.toString () + "- -" + msgId.getClass ())
This._pending.remove (msgId)
System.out.println ("_ pending size remaining:" + _ pending.size ())
}
@ Override
Public void activate () {
/ / TODO Auto-generated method stub
}
@ Override
Public void deactivate () {
/ / TODO Auto-generated method stub
}
@ Override
Public void fail (Object msgId) {
/ / TODO Auto-generated method stub
System.out.println ("spout fail:" + msgId.toString ())
Integer fail_count = fail_pending.get (msgId); / / number of failures to get the Tuple
If (fail_count = = null) {
Fail_count = 0
}
Fail_count + +
If (fail_count > = 3) {
/ / the number of retries has expired. No re-emit will be carried out.
Fail_pending.remove (msgId)
} else {
/ / record the number of failures of the tuple
Fail_pending.put (msgId, fail_count)
/ / resend
This.collector.emit (this._pending.get (msgId), msgId)
}
}
}
Bolt is as follows:
Package les19.Ack_Fail
Import java.util.Map
Import org.apache.storm.task.OutputCollector
Import org.apache.storm.task.TopologyContext
Import org.apache.storm.topology.IRichBolt
Import org.apache.storm.topology.OutputFieldsDeclarer
Import org.apache.storm.tuple.Fields
Import org.apache.storm.tuple.Tuple
Import org.apache.storm.tuple.Values
Public class AckBolt implements IRichBolt {
/ * *
*
, /
Private static final long serialVersionUID = 1L
OutputCollector collector = null
TopologyContext context = null
@ Override
Public void cleanup () {
/ / TODO Auto-generated method stub
}
Int num = 0
String url = null
String session_id = null
String date = null
String province_id = null
@ Override
Public void execute (Tuple input) {
Try {
Date = input.getStringByField ("date")
Double orderAmt = Double.parseDouble (input.getStringByField ("orderAmt"))
Collector.emit (input,new Values (date,orderAmt)); / / pay attention to the parameters. The first parameter is Tuple itself.
Collector.ack (input)
/ / Thread.sleep (300)
} catch (Exception e) {
Collector.fail (input)
E.printStackTrace ()
}
}
/ / initialization, corresponding to the open function of spout
@ Override
Public void prepare (Map stormConf, TopologyContext context
OutputCollector collector) {
/ / TODO Auto-generated method
This.context = context
This.collector = collector
}
@ Override
Public void declareOutputFields (OutputFieldsDeclarer declarer) {
/ / TODO Auto-generated method stub
Declarer.declare (new Fields ("date", "orderAmt"))
}
@ Override
Public Map getComponentConfiguration () {
/ / TODO Auto-generated method stub
Return null
}
}
The TOPO class is as follows:
Package les19.Ack_Fail
Import org.apache.storm.Config
Import org.apache.storm.LocalCluster
Import org.apache.storm.StormSubmitter
Import org.apache.storm.topology.TopologyBuilder
Public class Ack_FailTopo {
/ * *
* @ param args
, /
Public static void main (String [] args) {
/ / TODO Auto-generated method stub
TopologyBuilder builder = new TopologyBuilder ()
Builder.setSpout ("spout", new AckSpout (), 1)
Builder.setBolt ("bolt", new AckBolt (), 1) shuffleGrouping ("spout")
Config conf = new Config ()
/ / conf.put (Config.TOPOLOGY_ACKER_EXECUTORS, 0)
Conf.setDebug (false)
If (args.length > 0) {
Try {
StormSubmitter.submitTopology (args [0], conf, builder.createTopology ())
} catch (Exception e) {
E.printStackTrace ()
}
} else {
LocalCluster localCluster = new LocalCluster ()
LocalCluster.submitTopology ("mytopology", conf, builder.createTopology ())
}
}
}
To learn more, see the Storm video tutorial http://edu.51cto.com/course/course_id-9041.html on my 51CTO
This section comes from lectures 18-19.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.