In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article mainly explains "storm how to build topology code", the content of the article is simple and clear, easy to learn and understand, the following please follow the editor's ideas slowly in depth, together to study and learn "storm how to build topology code" bar!
1. Build topology code
Package demo;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class AreaAmtTopo {public static void main (String [] args) {TopologyBuilder builder = new TopologyBuilder (); builder.setSpout ("spout", new OrderBaseSpout (KafkaProperties.Order_topic), 5); builder.setBolt ("filter", new AreaFilterBolt (), 5). ShuffleGrouping ("spout"); builder.setBolt ("areabolt", new AreaAmtBolt (), 2). FieldsGrouping ("filter", new Fields ("area_id")) Builder.setBolt ("rsltbolt", new AreaRsltBolt (), 1). ShuffleGrouping ("areabolt");}
two。 Primary filter bolt
Package demo;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values / / filter boltpublic class AreaFilterBolt implements IBasicBolt {@ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {/ / TODO Auto-generated method stub declarer.declare (new Fields ("area_id", "order_amt", "create_time")); / / the corresponding name} @ Override public Map getComponentConfiguration () {/ / TODO Auto-generated method stub return null of each value in the tuple } @ Override public void cleanup () {/ / TODO Auto-generated method stub} @ Override public void execute (Tuple input, BasicOutputCollector collector) {/ / order_id,order_amt,create_time,area_id String order=input.getString (0); / / fetch the first value if in the collection values {String orderArr [] = order.split ("\\ t") Collector.emit (new Values (orderArr [3], orderArr [1], DateFmt.getCountDate (orderArr [2], DateFmt.date_short)); / / area_id,order_amt,create_time}} @ Override public void prepare (Map arg0, TopologyContext arg1) {/ / TODO Auto-generated method stub}}
3. Partial summary bolt (by date and region and summary)
Package demo;import java.util.HashMap;import java.util.Map;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;// partial summary public class AreaAmtBolt implements IBasicBolt {Map countsMap=null @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("date_area", "amt"));} @ Override public Map getComponentConfiguration () {/ / TODO Auto-generated method stub return null;} @ Override public void prepare (Map paramMap, TopologyContext paramTopologyContext) {/ / TODO Auto-generated method stub countsMap = new HashMap () } @ Override public void execute (Tuple input, BasicOutputCollector collector) {if (input null null) / / if there is no data on the spout side, a null value will be sent, so make a judgment and send {String area_id=input.getString (0); Double order_amt=input.getDouble (1); String order_date=input.getStringByField ("order_date"). Double count=countsMap.get (area_id+ "_" + order_date); if (count==null) {count= 0.0;} count+=order_amt; countsMap.put (area_id+ "_" + order_date,count); System.err.println ("areaAmtBolt" + order_date+ "_" + area_id+ "=" + count) Collector.emit (new Values (area_id+ "_" + order_date,count));} @ Override public void cleanup () {countsMap.clear ();}}
4. The final result is written to Hbase
Package demo;import java.util.HashMap;import java.util.HashSet;import java.util.Map;import java.util.Set;import backtype.storm.task.TopologyContext;import backtype.storm.topology.BasicOutputCollector;import backtype.storm.topology.IBasicBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;// results are regularly written to boltpublic class AreaRsltBolt implements IBasicBolt {Map countsMap=null; long beginTime=System.currentTimeMillis (); long endTime=0L; HBaseDao dao=null of hbase. @ Override public void declareOutputFields (OutputFieldsDeclarer paramOutputFieldsDeclarer) {/ / TODO Auto-generated method stub} @ Override public Map getComponentConfiguration () {/ / TODO Auto-generated method stub return null;} @ Override public void prepare (Map paramMap, TopologyContext paramTopologyContext) {countsMap = new HashMap (); dao=new HBaseDAOImp () } @ Override public void execute (Tuple input, BasicOutputCollector paramBasicOutputCollector) {String date_areaid=input.getString (0); double order_amt=input.getDouble (1); countsMap.put (date_areaid,order_amt); endTime=System.currentTimeMillis () If (endTime-beginTime > = 5x1000) {for (String key:countsMap.keySet ()) {/ / put into hbase / / 2014-05-05-0551) System.err.println ("rsltBolt put hbase: key=" + key+ "; order_amt=" + countsMap.get (key)) } beginTime=System.currentTimeMillis ();} @ Override public void cleanup () {/ / TODO Auto-generated method stub}}
5. DateFmt code
Package demo;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;public class DateFmt {public static final String date_long= "yyyy-MM-dd HH:mm:ss"; public static final String date_short= "yyyy-MM-dd"; public static SimpleDateFormat sdf=new SimpleDateFormat (date_short); public static String getCountDate (String date,String patton) {SimpleDateFormat sdf=new SimpleDateFormat (patton); Calendar cal = Calendar.getInstance () If (dateworthy null) {try {cal.setTime (sdf.parse (date));} catch (ParseException e) {e.printStackTrace ();}} return sdf.format (cal.getTime ()) } public static Date parseDate (String dateStr) throws Exception {return sdf.parse (dateStr);} public static void main (String [] args) {System.out.println (DateFmt.getCountDate ("2015-09-08 09:09:08", DateFmt.date_long)) }} Thank you for reading, the above is the content of "how to build topology code for storm". After the study of this article, I believe you have a deeper understanding of how to build topology code for storm, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.