In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how to use Direct Grouping grouping strategy in Storm". In daily operation, I believe many people have doubts about how to use Direct Grouping grouping strategy in Storm. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how to use Direct Grouping grouping strategy in Storm". Next, please follow the editor to study!
Use the Direct Grouping grouping strategy to send words with the same initials to the same task count
Data source spout
Package com.zhch.v3;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichSpout;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import java.io.BufferedReader;import java.io.FileReader;import java.util.Map;import java.util.UUID;import java.util.concurrent.ConcurrentHashMap;public class SentenceSpout extends BaseRichSpout {private FileReader fileReader = null; private boolean completed = false; private ConcurrentHashMap pending Private SpoutOutputCollector collector; @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("sentence"));} @ Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {this.collector = spoutOutputCollector; this.pending = new ConcurrentHashMap (); try {this.fileReader = new FileReader (map.get ("wordsFile"). ToString ()) } catch (Exception e) {throw new RuntimeException ("Error reading file [" + map.get ("wordsFile") + "]");} @ Override public void nextTuple () {if (completed) {try {Thread.sleep (1000);} catch (InterruptedException e) {} String line BufferedReader reader = new BufferedReader (fileReader); try {while ((line = reader.readLine ())! = null) {Values values = new Values (line); UUID msgId = UUID.randomUUID (); this.pending.put (msgId, values); this.collector.emit (values, msgId) } catch (Exception e) {throw new RuntimeException ("Error reading tuple", e);} finally {completed = true;}} @ Override public void ack (Object msgId) {this.pending.remove (msgId);} @ Override public void fail (Object msgId) {this.collector.emit (this.pending.get (msgId), msgId) }}
Implement statement segmentation bolt
Package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values;import java.util.List;import java.util.Map;public class SplitSentenceBolt extends BaseRichBolt {private OutputCollector collector; private List numCounterTasks @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector; / / get the taskId list of downstream bolt this.numCounterTasks = topologyContext.getComponentTasks (WordCountTopology.COUNT_BOLT_ID);} @ Override public void execute (Tuple tuple) {String sentence = tuple.getStringByField ("sentence"); String [] words = sentence.split ("") For (String word: words) {Integer taskId = this.numCounterTasks.get (this.getWordCountIndex (word)); collector.emitDirect (taskId, tuple, new Values (word));} this.collector.ack (tuple);} public Integer getWordCountIndex (String word) {word = word.trim (). ToUpperCase (); if (word.isEmpty ()) return 0 Else (0)% numCounterTasks.size ();} @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("word"));}
Package com.zhch.v3;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.topology.base.BaseRichBolt;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import java.io.BufferedWriter;import java.io.FileWriter;import java.util.HashMap;import java.util.Iterator;import java.util.Map;public class WordCountBolt extends BaseRichBolt {private OutputCollector collector; private HashMap counts = null @ Override public void prepare (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.collector = outputCollector; this.counts = new HashMap ();} @ Override public void execute (Tuple tuple) {String word = tuple.getStringByField ("word"); Long count = this.counts.get (word); if (count = = null) {count = 0L;} count++ This.counts.put (word, count); BufferedWriter writer = null; try {writer = new BufferedWriter (new FileWriter ("/ home/grid/stormData/result.txt")); Iterator keys = this.counts.keySet (). Iterator (); while (keys.hasNext ()) {String w = keys.next () Long c = this.counts.get (w); writer.write (w + ":" + c); writer.newLine (); writer.flush ();}} catch (Exception e) {e.printStackTrace () } finally {if (writer! = null) {try {writer.close ();} catch (Exception e) {e.printStackTrace ();} writer = null;}} this.collector.ack (tuple) @ Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (new Fields ("word", "count"));}}
Implement word count topology
Package com.zhch.v3;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.StormSubmitter;import backtype.storm.topology.TopologyBuilder;public class WordCountTopology {public static final String SENTENCE_SPOUT_ID = "sentence-spout"; public static final String SPLIT_BOLT_ID = "split-bolt"; public static final String COUNT_BOLT_ID = "count-bolt"; public static final String TOPOLOGY_NAME = "word-count-topology-v3" Public static void main (String [] args) throws Exception {SentenceSpout spout = new SentenceSpout (); SplitSentenceBolt spiltBolt = new SplitSentenceBolt (); WordCountBolt countBolt = new WordCountBolt (); TopologyBuilder builder = new TopologyBuilder (); builder.setSpout (SENTENCE_SPOUT_ID, spout, 2); builder.setBolt (SPLIT_BOLT_ID, spiltBolt, 2) .setNumTasks (4) .shuffleGrouping (SENTENCE_SPOUT_ID) Builder.setBolt (COUNT_BOLT_ID, countBolt, 2) .directGrouping (SPLIT_BOLT_ID); / using Direct Grouping grouping policy Config config = new Config (); config.put ("wordsFile", args [0]); if (args! = null & & args.length > 1) {config.setNumWorkers (2) / / start StormSubmitter.submitTopology (args [1], config, builder.createTopology ()) in cluster mode;} else {LocalCluster cluster = new LocalCluster (); cluster.submitTopology (TOPOLOGY_NAME, config, builder.createTopology ()); try {Thread.sleep (5 * 1000) } catch (InterruptedException e) {} cluster.killTopology (TOPOLOGY_NAME); cluster.shutdown ();}
Submit to Storm cluster
Storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology / home/grid/stormData/input.txt word-count-topology-v3
Running result:
[grid@hadoop5 stormData] $cat result.txt second: 1can: 1set: 1simple: 1use: 1used: 1It: 1Storm: 4online: 1cases:: 1open: 1Apache: 1of: 2over: 1more: 1clocked: 1easy: 2scalable: 1any: 1guarantees: 1ETL: 1million: 1continuous: 1is: 6with: 1it: 2makes: 1your: 1a: 4at: 1machine: 1analytics: 1up: 1and: 5many: 1system: 1system: 1system: 1system: 1system: 1source: 1source: 1what:: : 1free: 1programming: 1reliably: 1fast: 1processing: 2be: 2Hadoop: 1did: 1fun: 1learning: 1torm: 1process: 1RPC: 1node: 1processed: 2per: 2realtime: 3benchmark: 1batch: 1doing: 1lot: 1language: 1tuples: 1fault-tolerant: 1 so far The study on "how to use Direct Grouping grouping strategy in Storm" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.