In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-15 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how to configure the Topology of Storm". In the daily operation, I believe that many people have doubts about how to configure the Topology of Storm. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "how to configure Topology of Storm". Next, please follow the editor to study!
Data flow group
One of the most important things you need to do when designing a topology is to define how data is exchanged between components (how data flow is consumed by bolts). A data flow group specifies which data streams are consumed by each bolt and how they are consumed.
NOTE: a node can publish more than one data stream, and a data stream group allows us to choose which one to receive.
The data flow group is set when defining the topology, as we saw in Chapter 2:
Builder.setBolt ("word-normalizer", new WordNormalizer ()) .shuffleGrouping ("word-reader");
In the previous code block, a bolt is set by the TopologyBuilder object, and then a random data stream group is used to specify the data source. Data flow groups usually take the ID of the data source component as a parameter, depending on the type of data flow group and other optional parameters.
NOTE: each InputDeclarer can have more than one data source, and each data source can be divided into different groups.
Random data flow group (random grouping)
Random flow groups are the most commonly used data flow groups. It has only one parameter (the data source component), and the data source sends tuples to randomly selected bolt, ensuring that each consumer receives an approximate number of tuples.
Random data stream groups are used for atomic operations such as mathematical calculations. However, if operations cannot be randomly assigned, as in the example of word counting in Chapter 2, you need to consider other grouping methods.
Domain data flow group (field grouping)
Domain data flow groups allow you to control how tuples are sent to bolts based on one or more domains of tuples. It guarantees that a set of values with the same combination of fields is sent to the same bolt. Going back to the example of the word counter, if you use the word field to group data streams, word-normalizer bolt will only send tuples of the same words to the same word-counterbolt instance.
Builder.setBolt ("word-counter", new WordCounter (), 2) .fieldsGrouping ("word-normalizer", new Fields ("word"))
NOTE: all domain collections in the domain data flow group must exist in the domain declaration of the data source.
All data flow groups (all grouped)
All data flow groups, making a copy of the tuple for each instance that receives the data. This grouping method is used to send signals to the bolts. For example, if you want to refresh the cache, you can send a cache refresh signal to all bolts. In the case of the word counter, you can use a full data flow group and add the ability to clear the counter cache (see topology example)
Public void execute (Tuple input) {String str = null; try {if (input.getSourceStreamId (). Equals ("signals")) {str = input.getStringByField ("action"); if ("refreshCache" .equals (str)) counters.clear () }} catch (IllegalArgumentException e) {/ / nothing}}
We added an if branch to check the source data stream. Storm allows us to declare named data streams (if you don't send tuples to a named data stream, by default to a data stream named "default"). This is an excellent way to identify tuples, just like in this case, we want to identify signals. In the topology definition, you add a second data stream to word-counter bolt to receive each tuple sent from the signals-spout data stream to all bolt instances.
Builder.setBolt ("word-counter", new WordCounter (), 2) .fieldsGroupint ("word-normalizer", new Fields ("word")) .allGrouping ("signals-spout", "signals")
Please refer to the git repository for the implementation of signals-spout.
Custom data flow groups (custom grouping)
You can create custom data flow groups by implementing the backtype.storm.grouping.CustormStreamGrouping interface and let you decide which bolt receives which tuples.
Let's modify the word counter example so that words with the same initials are received by the same bolt.
Public class ModuleGrouping mplents CustormStreamGrouping, Serializable {int numTasks = 0; @ Override public List chooseTasks (List values) {List boltIds = new ArrayList (); if (values.size () > 0) {String str = values.get (0) .toString (); if (str.isEmpty ()) {boltIds.add (0) } else {boltIds.add (str.charAt (0)% numTasks);}} return boltIds;} @ Override public void prepare (TopologyContext context, Fields outFields, List targetTasks) {numTasks = targetTasks.size ();}}
This is a simple implementation of CustomStreamGrouping, where we use the integer value of the word's first letter character and the remainder of the number of tasks to decide to receive the bolt of the tuple.
This custom data stream group can be used by word-normalizer modification in the following way.
Builder.setBolt ("word-normalizer", new WordNormalizer ()) .customGrouping ("word-reader", new ModuleGrouping ()); direct data stream group (direct grouping)
This is a special data flow group that the data source can use to determine which component receives the tuple. Similar to the previous example, the data source determines which bolt receives the tuple based on the first letter of the word. To use a direct data flow group, in WordNormalizer bolt, use the emitDirect method instead of emit.
Public void execute (Tuple input) {... For (String word: words) {if (! word.isEmpty ()) {... Collector.emitDirect (getWordCountIndex (word), new Values (word));}} / / reply collector.ack (input) to a pair of tuples;} public Integer getWordCountIndex (String word) {word = word.trim (). ToUpperCase (); if (word.isEmpty ()) {return 0;} else {return word.charAt (0)% numCounterTasks }}
Calculate the number of tasks in prepare method
Public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector; this.numCounterTasks = context.getComponentTasks ("word-counter");}
Specify in the topology definition that the data flow will be grouped directly:
Builder.setBolt ("word-counter", new WordCounter (), 2) .directGrouping ("word-normalizer"); global data flow group (global grouping)
The global data flow group sends tuples created by all data sources to a single target instance (that is, the task with the lowest ID).
No grouping (no grouping)
At the time of writing this book (Stom0.7.1 version), this data stream group is equivalent to a random data stream group. That is, when using this data flow group, you don't care how the data streams are grouped.
LocalCluster VS StormSubmitter
So far, you have used a tool called LocalCluster to run a topology on your local machine. Storm's basic tools enable you to easily run and debug different topologies on your computer. But how do you submit your topology to a running Storm cluster? Storm has an interesting feature that makes it easy to run your own topology on a real cluster. To do this, you need to replace LocalCluster with StormSubmitter and implement the submitTopology method, which is responsible for sending the topology to the cluster.
The following is the modified code:
/ / LocalCluster cluster = new LocalCluster (); / / cluster.submitTopology ("Count-Word-Topology-With-Refresh-Cache", conf, / / builder.createTopology ()); StormSubmitter.submitTopology ("Count-Word-Topology-With_Refresh-Cache", conf, builder.createTopology ()); / / Thread.sleep (1000); / / cluster.shutdown ()
NOTE: when you use StormSubmitter, you can't control the cluster through code as you did with LocalCluster.
Next, compress the source code into a jar package and run the Storm client command to submit the topology to the cluster. If you have already used Maven, you just need to go to the source directory on the command line and run: mvn package.
Now that you have generated a jar package, use the storm jar command to submit the topology (see Appendix An on how to install the Storm client). Command format: storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3.
For this example, run under the topology project directory:
Storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt
With these commands, you publish the topology on the cluster.
If you want to stop or kill it, run:
Storm kill Count-Word-Topology-With-Refresh-Cache
NOTE: topology names must be unique.
NOTE: how to install the Storm client, refer to Appendix A
DRPC topology
There is a special type of topology called distributed remote procedure call (DRPC), which takes advantage of the distributed nature of Storm to perform remote procedure call (RPC) (see figure below). Storm provides some tools for implementing DRPC. The first is the DRPC server, which acts like a connector between the client and the Storm topology as a data source for the spout of the topology. It receives a function and function arguments to be executed, and then for each block of data operated by the function, the server allocates a request ID through the topology to identify the RPC request. When the topology executes the final bolt, it must assign the RPC request ID and the result so that the DRPC server returns the result to the correct client.
NOTE: a single instance DRPC server can perform many functions. Each function is identified by a unique name.
The second tool Storm provides (which has been used in the example) is LineDRPCTopologyBuilder, an abstract concept that assists in building DRPC topologies. The resulting topology creates a DRPCSpouts-- that connects to the DRPC server and distributes data to the rest of the topology-- and wraps the bolts so that the result is returned from the last bolt. Execute all the bolts added to the LinearDRPCTopologyBuilder object in turn.
As an example of this type of topology, we create a process that performs addition operations. Although this is a simple example, the concept can be extended to complex distributed computing.
Bolt declares the output as follows:
Public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("id", "result");}
Because this is the only bolt in the topology, it must publish the RPC ID and the results. The execute method is responsible for performing addition operations.
Public void execute (Tuple input) {String [] numbers = input.getString (1). Split ("\\ +"); Integer added = 0; if (numbers.length
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.