In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-30 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how to achieve word counting in Storm". In daily operation, I believe many people have doubts about how to achieve word counting in Storm. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful for you to answer the doubts about "how to achieve word counting in Storm". Next, please follow the editor to study!
1. Create a project using the mvn command
Mvn archetype:generate-DgroupId=storm.test-DartifactId=Storm01-DpackageName=com.zhch.v1
Then edit the configuration file pom.xml to add storm dependencies
Org.apache.storm storm-core 0.9.4
Finally, compile the project with the following command, and import it into IDE after the compilation is completed correctly.
Mvn install
Of course, you can also install the maven plug-in in IDE to create a maven project directly in IDE
two。 Implement the data source and simulate the data source with repeated static statements
Package storm.test.v1;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.util.Map Public class SentenceSpout extends BaseRichSpout {private String [] sentences = {"storm integrates with the queueing", "and database technologies you already use", "a storm topology consumes streams of data", "and processes those streams in arbitrarily complex ways", "repartitioning the streams between each stage of the computation however needed"}; private int index = 0; private SpoutOutputCollector collector @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("sentence"));} @ Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector;} @ Override public void nextTuple () {this.collector.emit (new Values); index++; if (index > = sentences.length) {index = 0 } try {Thread.sleep (1);} catch (InterruptedException e) {}
3. Implement statement segmentation bolt
Package storm.test.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector collector; @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector @ Override public void execute (Tuple tuple) {String sentence = tuple.getStringByField ("sentence"); String [] words = sentence.split (""); for (String word: words) {this.collector.emit (new Values (word));}} @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("word"));}}
4. Implement word count bolt
Package storm.test.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.HashMap;import java.util.Map;public class WordCountBolt extends BaseRichBolt {private OutputCollector collector; private HashMap counts = null @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector; this.counts = new HashMap ();} @ Override public void execute (Tuple tuple) {String word = tuple.getStringByField ("word"); Long count = this.counts.get (word); if (count = = null) {count = 0L;} count++ This.counts.put (word, count); this.collector.emit (new Values (word, count));} @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("word", "count"));}}
5. Implement to report to bolt
Package storm.test.v1;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Tuple;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class ReportBolt extends BaseRichBolt {private HashMap counts = null @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {counts = new HashMap ();} @ Override public void execute (Tuple tuple) {String word = tuple.getStringByField ("word"); Long count = tuple.getLongByField ("count"); this.counts.put (word, count) } @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {} @ Override public void cleanup () {/ / Local mode, when terminating topology, you can guarantee that cleanup () will be executed System.out.println ("- FINAL COUNTS -"); List keys = new ArrayList (); keys.addAll (this.counts.keySet ()); Collections.sort (keys) For (String key: keys) {System.out.println (key + ":" + this.counts.get (key));} System.out.println ("-");}}
6. Implement word count topology
Package storm.test.v1;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields;public class WordCountTopology {private static final String SENTENCE_SPOUT_ID = "sentence-spout"; private static final String SPLIT_BOLT_ID = "split-bolt"; private static final String COUNT_BOLT_ID = "count-bolt"; private static final String REPORT_BOLT_ID = "report-bolt" Private static final String TOPOLOGY_NAME = "word-count-topology"; public static void main (String [] args) {SentenceSpout spout = new SentenceSpout (); SplitSentenceBolt spiltBolt = new SplitSentenceBolt (); WordCountBolt countBolt = new WordCountBolt (); ReportBolt reportBolt = new ReportBolt (); TopologyBuilder builder = new TopologyBuilder (); builder.setSpout (SENTENCE_SPOUT_ID, spout) / / register the data source builder.setBolt (SPLIT_BOLT_ID, spiltBolt) / / register bolt .shuffleGrouping (SENTENCE_SPOUT_ID); / / the bolt subscribes to the data stream builder.setBolt (COUNT_BOLT_ID, countBolt) sent randomly and evenly from spout. FieldsGrouping (SPLIT_BOLT_ID, new Fields ("word")) / / the bolt subscribes to the data stream sent by spiltBolt and ensures that the tuple with the same value of "word" field will be routed to the same countBolt builder.setBolt (REPORT_BOLT_ID, reportBolt) .globalGrouping (COUNT_BOLT_ID); / / the bolt will subscribe to the data stream sent by countBolt, and all tuple will be routed to a unique reportBolt Config config = new Config () / / Local mode startup LocalCluster cluster = new LocalCluster (); cluster.submitTopology (TOPOLOGY_NAME, config, builder.createTopology ()); try {Thread.sleep (5 * 1000);} catch (InterruptedException e) {} cluster.killTopology (TOPOLOGY_NAME); cluster.shutdown ();}}
7. Running result:
-FINAL COUNTS-a: 302already: 302and: 604arbitrarily: 302complex: 302computation: 302consumes: 302data: 302database: 302each: 302however: 302in: 302integrates: 302needed: 302of: 604processes: 302queueing: 302repartitioning: 302stage: 302storm: 604streams: 906technologies: 302the: 906those: 302topology: 302use: 302ways: 302with: 302you: 302-this ends the study of "how Storm counts words". I hope you can solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 230
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.