In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how Storm changes parallelism". In daily operation, I believe many people have doubts about how to change parallelism in Storm. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "how Storm changes parallelism". Next, please follow the editor to study!
Package bolts; import java.util.ArrayList;import java.util.List;import java.util.Map; import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Tuple;import backtype.storm.tuple.Values; public class WordNormalizer implements IRichBolt {private OutputCollector collector Public void cleanup () {} / * * The bolt will receive the line from the * words file and process it to Normalize this line * * The normalize will be put the words in lower case * and split the line to get all words in this * / public void execute (Tuple input) {String sentence = input.getString (0); String [] words= sentence.split ("") For (String word:words) {word = word.trim (); if (! word.isEmpty ()) {word = word.toLowerCase (); / / Emit the word List a = new ArrayList (); a.add (input); collector.emit (a meme new Values (word)) }} / / Acknowledge the tuple collector.ack (input);} public void prepare (Map stormConf,TopologyContext context,OutputCollector collector) {this.collector=collector;} / * The bolt will only emit the field "word" * / public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("word"));}}
Tip: in this class, multiple tuples are sent each time the execute () method is called. For example, when the execute () method receives the sentence "This is the Storm book", the method sends five new tuples.
The second bolt,WordCounter is responsible for counting the number of words. When the topology ends (when the cleanup () method is called), the number of words is displayed.
Tip: nothing is sent in the second bolt. In this case, the data is added to a map object, but in real life, bolt can store the data in a database.
Package bolts; import java.util.HashMap;import java.util.Map; import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;import backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple; public class WordCounter implements IRichBolt {Integer id; String name; Mapcounters; private OutputCollector collector / * At the end of the spout (when the cluster is shutdown * We will show the word counters * / @ Override public void cleanup () {System.out.println ("--Word Counter [" + name+ "-" + id+ "]--"); for (Map.Entryentry: counters.entrySet ()) {System.out.println (entry.getKey () + ":" + entry.getValue ()) }} / * On each word We will count * / @ Override public void execute (Tuple input) {String str = input.getString (0); / * * If the word dosn't exist in the map we will create * this, if not We will add 1 * / if (! counters.containsKey (str)) {counters.put (str,1) } else {Integer c = counters.get (str) + 1; counters.put (str,c);} / / Set the tuple as Acknowledge collector.ack (input);} / * On create * / @ Override public void prepare (Map stormConf,TopologyContext context,OutputCollector collector) {this.counters=newHashMap (); this.collector=collector This.name=context.getThisComponentId (); this.id=context.getThisTaskId ();} @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {}}
The execute () method uses a mapping (Map type) to collect words and count the number of words. When the topology ends, the cleanup () method is called and the counter map is printed out. (this is just an example. In general, when topology is closed, you should use the cleanup () method to close active links and other resources. )
Main class
In the main class, you will create a topology and a LocalCluster object, which allows you to test and debug topology locally. LocalCluster in conjunction with the Config object allows you to try different cluster configurations. For example, if you accidentally use a global variable or class variable, you will find this error when configuring a different number of worker tests for topology. (more on config objects in Chapter 3)
Tip: all topology nodes should be able to run independently without data sharing between processes (that is, no global or class variables), because when topology is running on a real cluster, these processes may be running on different machines.
You will use TopologyBuilder to create a topology,TopologyBuilder that tells Storm how to order the nodes and how they exchange data.
TopologyBuilder builder = new TopologyBuilder (); builder.setSpout ("word-reader", new WordReader ()); builder.setBolt ("word-normalizer", new WordNormalizer ()) .shuffleGrouping ("word-reader"); builder.setBolt ("word-counter", new WordCounter (), 2). FieldsGrouping ("word-normalizer", new Fields ("word"))
In this example, a random packet (shuffleGrouping) connection is used between spout and bolt, which tells Storm to send messages from the source node to the destination node in a randomly distributed manner.
Next, create a Config object that contains topology configuration information, which is merged with the cluster configuration information at run time and sent to all nodes through the prepare () method.
Config conf = new Config (); conf.put ("wordsFile", args [0]); conf.setDebug (false)
Set the wordFile property to the name of the file to be read by spout (the file name is passed in the args parameter), and set the debug property to true, because during your development process, when debug is true, Storm will print all messages and other debug data exchanged between nodes, which helps to understand how topology works.
As mentioned earlier, you will use LocalCluster to run topology. In a production environment, topology runs continuously, but in this case, you only need to run topology for a few seconds to see the results.
LocalCluster cluster = new LocalCluster (); cluster.submitTopology ("Getting-Started-Toplogie", conf,builder.createTopology ()); Thread.sleep (1000); cluster.shutdown ()
Use createTopology and submitTopology to create and run topology, sleep for two seconds (topology runs in different threads), and then stop topology by shutting down the cluster.
Example 2-3 piece together the above code.
Example 2-3.src/main/java/TopologyMain.java
Import spouts.WordReader;import bolts.WordCounter;import bolts.WordNormalizer; import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;import backtype.storm.tuple.Fields; public class TopologyMain {public static void main (String [] args) throws InterruptedException {/ / Topology definition TopologyBuilder builder = new TopologyBuilder (); builder.setSpout ("word-reader", new WordReader ()); builder.setBolt ("word-normalizer", new WordNormalizer ()) .shuffleGrouping ("word-reader") Builder.setBolt ("word-counter", new WordCounter (), 2). FieldsGrouping ("word-normalizer", new Fields ("word")); / / Configuration Config conf = new Config (); conf.put ("wordsFile", args [0]); conf.setDebug (false); / / Topology run conf.put (Config.TOPOLOGY_MAX_SPOUT_PENDING,1); LocalCluster cluster = new LocalCluster () Cluster.submitTopology ("Getting-Started-Toplogie", conf,builder.createTopology ()); Thread.sleep (1000); cluster.shutdown ();}} run this project
Now get ready to run the first topology! If you create a new text file (src/main/resources/words.txt) with one word per line, you can run the topology with the following command:
Mvn exec:java-Dexec.mainClass= "TopologyMain"-Dexec.args= "src/main/resources/words.txt"
For example, if you use the following words.txt file:
Storm
Test
Are
Great
Is
An
Storm
Simple
Application
But
Very
Powerful
Really
Storm
Is
Great
In the log, you will see information similar to the following:
Is: 2
Application: 1
But: 1
Great: 1
Test: 1
Simple: 1
Storm: 3
Really: 1
Are: 1
Great: 1
An: 1
Powerful: 1
Very: 1
In this case, you only use a single instance of each node. How do you count each word if there is a very large log file at this time? At this point, you can easily change the number of nodes in the system to work in parallel, such as creating two instances of WordCounter:
1builder.setBolt ("word-counter", new WordCounter (), 2) shuffleGrouping ("word-normalizer")
Rerun the program and you will see:
-Word Counter [word-counter-2]-
Application: 1
Is: 1
Great: 1
Are: 1
Powerful: 1
Storm: 3
-Word Counter [word-counter-3]-
Really: 1
Is: 1
But: 1
Great: 1
Test: 1
Simple: 1
An: 1
Very: 1
great! Change the parallelism, so easy (of course, in real life, each instance runs on a different machine). On closer inspection, however, there seems to be a problem: the words "is" and "great" are calculated once in each WordCounter instance. Why? When random grouping (shuffleGrouping) is used, Storm sends each message to each bolt instance in a randomly distributed manner. In this example, it is more desirable to send the same word to the same WordCounter instance. To do this, you can change shuffleGrounping ("word-normalizer") to fieldsGrouping ("word-normalizer", new Fields ("word")). Try and rerun the program to confirm the results.
At this point, the study on "how to change the degree of parallelism in Storm" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.