In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "how to count words in Storm". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn how to count words in Storm.
Make the following changes based on the last word count: use a custom grouping policy to send words with the same first letter to the same task count
Custom CustomStreamGrouping
Package com.zhch.v4;import backtype.storm.generated.GlobalStreamId;import backtype.storm.grouping.CustomStreamGrouping;import backtype.storm.task.WorkerTopologyContext;import java.io.Serializable;import java.util.ArrayList;import java.util.List;public class ModuleGrouping implements CustomStreamGrouping, Serializable {private List tasks; @ Override public void prepare (WorkerTopologyContext workerContext, GlobalStreamId streamId, List targetTasks) {this.tasks = targetTasks;} @ Override public List chooseTasks (int taskId, List values) {List taskIds = new ArrayList () If (values.size () > 0) {String str = values.get (0). ToString (); if (str.isEmpty ()) {taskIds.add (0);} else {Integer index = str.charAt (0)% tasks.size (); taskIds.add (tasks.get (index)) }} return taskIds;}}
Data source spout
Package com.zhch.v4;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMap pending Private SpoutOutputCollector collector; @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("sentence"));} @ Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try {this.fileReader = new FileReader (map.get ("wordsFile"). ToString ()) } catch (Exception e) {throw new RuntimeException ("Error reading file [" + map.get ("wordsFile") + "]");} @ Override public void nextTuple () {if (completed) {try {Thread.sleep (1000);} catch (InterruptedException e) {} String line BufferedReader reader = new BufferedReader (fileReader); try {while ((line = reader.readLine ())! = null) {Values values = new Values (line); UUID msgId = UUID.randomUUID (); this.pending.put (msgId, values); this.collector.emit (values, msgId) } catch (Exception e) {throw new RuntimeException ("Error reading tuple", e);} finally {completed = true;}} @ Override public void ack (Object msgId) {this.pending.remove (msgId);} @ Override public void fail (Object msgId) {this.collector.emit (this.pending.get (msgId), msgId) }}
Implement statement segmentation bolt
Package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector collector; @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector } @ Override public void execute (Tuple tuple) {String sentence = tuple.getStringByField ("sentence"); String [] words = sentence.split (""); for (String word: words) {collector.emit (tuple, new Values (word));} this.collector.ack (tuple) @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("word"));}}
Implement word count bolt
Package com.zhch.v4;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.List;import java.util.Map;public class WordCountBolt extends BaseRichBolt {private OutputCollector collector; private HashMap counts = null @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector; this.counts = new HashMap ();} @ Override public void execute (Tuple tuple) {String word = tuple.getStringByField ("word"); Long count = this.counts.get (word); if (count = = null) {count = 0L;} count++ This.counts.put (word, count); BufferedWriter writer = null; try {writer = new BufferedWriter (new FileWriter ("/ home/grid/stormData/result.txt")); List keys = new ArrayList (); keys.addAll (this.counts.keySet ()); Collections.sort (keys) For (String key: keys) {Long c = this.counts.get (key); writer.write (key + ":" + c); writer.newLine (); writer.flush ();}} catch (Exception e) {e.printStackTrace () } finally {if (writer! = null) {try {writer.close ();} catch (Exception e) {e.printStackTrace ();} writer = null;}} this.collector.ack (tuple) @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("word", "count"));}}
Implement word count topology
Package com.zhch.v4;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v4" Public static void main (String [] args) throws Exception {SentenceSpout spout = new SentenceSpout (); SplitSentenceBolt spiltBolt = new SplitSentenceBolt (); WordCountBolt countBolt = new WordCountBolt (); TopologyBuilder builder = new TopologyBuilder (); builder.setSpout (SENTENCE_SPOUT_ID, spout, 2); builder.setBolt (SPLIT_BOLT_ID, spiltBolt, 2) .setNumTasks (4) .shuffleGrouping (SENTENCE_SPOUT_ID) Builder.setBolt (COUNT_BOLT_ID, countBolt, 2) .customGrouping (SPLIT_BOLT_ID, new ModuleGrouping ()); / use custom grouping policy Config config = new Config (); config.put ("wordsFile", args [0]); if (args! = null & & args.length > 1) {config.setNumWorkers (2) / / start StormSubmitter.submitTopology (args [1], config, builder.createTopology ()) in cluster mode;} else {LocalCluster cluster = new LocalCluster (); cluster.submitTopology (TOPOLOGY_NAME, config, builder.createTopology ()); try {Thread.sleep (5 * 1000) } catch (InterruptedException e) {} cluster.killTopology (TOPOLOGY_NAME); cluster.shutdown ();}
Submit to Storm cluster
Storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v4.WordCountTopology / home/grid/stormData/input.txt word-count-topology-v4
Running result:
[grid@hadoop5 stormData] $cat result.txt Apache: 1ETL: 1It: 1Storm: 4a: 4analytics: 1and: 5any: 1at: 1can: 1cases:: 1clocked: 1computation: 2continuous: 1easy: 2guarantees: 1is: 6it: 2machine: 1makes: 1many: 1million: 1more: 1of: 2online: 1open: 1operate: 1over: 1scalable: 1second: 1set: 1simple: 1source: 1streams: 1system: 1unbounded: 1up: 1up: 1up: 1up: 1up: 1use: 1use:: : 1data: 2did: 1distributed: 2doing: 1fast:: 1fault-tolerant: 1for: 2free: 1fun: 1has: 1language: 1learning: 1lot: 1node: 1per: 2process: 1processed: 2processing: 2programming: 1realtime: 3reliably: 1to: 3torm: 1tuples: 1 so far I believe you have a deeper understanding of "how to achieve word counting in Storm". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.