In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article shows you how to write a custom grouping implementation for Twitter Storm Stream Grouping. The content is concise and easy to understand. It will definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.
# # Custom Grouping Test
Storm supports custom grouping, and this article is to explore how Storm writes a custom marshaller and how the Storm marshaller groups data.
This is a custom grouping I wrote that always divides the data into the first Task:
Public class MyFirstStreamGrouping implements CustomStreamGrouping {private static Logger log = LoggerFactory.getLogger (MyFirstStreamGrouping.class); private List tasks; @ Override public void prepare (WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {this.tasks = targetTasks; log.info (tasks.toString ());} @ Override public List chooseTasks (int taskId, List values) {log.info (values.toString ()) Return Arrays.asList (tasks.get (0));}}
As you can see from the above code, this custom grouping merges the data into the first TaskArrays.asList (tasks.get (0)); that is, when the data arrives, it is always dispatched to the first group.
Test the code:
TopologyBuilder builder = new TopologyBuilder (); builder.setSpout ("words", new TestWordSpout (), 2); / / Custom grouping, builder.setBolt ("exclaim1", new DefaultStringBolt (), 3) .customGrouping ("words", new MyFirstStreamGrouping ())
As with previous test cases, Spout always sends a string of new String [] {"nathan", "mike", "jackson", "golda", "bertels"} lists. Let's run and verify:
11878 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt-rev a message: jackson11943 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping-[nathan] 11944 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt-rev a message: nathan11979 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping-[mike] 11980 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt-rev a message: mike12045 [Thread-41 -words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping-[jackson] 12045 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt-rev a message: jackson12080 [Thread-29-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping-[jackson] 12081 [Thread-25-exclaim1] INFO cn.pointways.dstorm.bolt.DefaultStringBolt-rev a message: jackson12145 [Thread-41-words] INFO cn.pointways.dstorm.grouping.MyFirstStreamGrouping-[mike] 12146 [Thread-25-exclaim1] INFO cn .pointways.dstorm.bolt.DefaultStringBolt-rev a message: mike
From this running log, we can see that data is always dispatched to a Blot:Thread-25-exclaim1. Because I am testing locally, Thread-25-exclaim1 is the thread name. The dispatched thread is data from multiple threads. So the test is as expected, that is, it is always sent to a Task, and this Task is also the first.
# # understanding Custom grouping implementation
Is it difficult to implement a custom grouping by yourself? In fact, if you understand Hadoop's Partitioner,Storm 's CustomStreamGrouping, it's the same thing.
After the completion of the Map of Hadoop MapReduce, the intermediate result of the Map is written to the disk. Before writing to the disk, the thread first divides the data into corresponding partitions according to the Reducer to which the data will eventually be transmitted, and then different partitions enter different Reduce. Let's first take a look at how Hadoop groups the data, which is the only way for Partitioner:
Public class Partitioner {@ Override public int getPartition (K key, V value, int numReduceTasks) {return 0;}}
In the above code: the data output by Map goes through the getPartition () method to determine the next grouping. NumReduceTasks is the number of Reduce of a Job, and the return value is to determine which Reduce the data enters. The return value must be greater than or equal to 0 and less than numReduceTasks, otherwise an error will be reported. Returning 0 means that this piece of data enters the first Reduce. For random grouping, this method can be implemented as follows:
Public int getPartition (K key, V value, int numReduceTasks) {return hash (key)% numReduceTasks;}
In fact, Hadoop's default Hash grouping strategy is also implemented in this way. The advantage of this is that the data is basically load-balanced throughout the cluster.
Having figured out Hadoop's Partitioner, let's take a look at Storm's CustomStreamGrouping.
This is the source code for the CustomStreamGrouping class:
Public interface CustomStreamGrouping extends Serializable {void prepare (WorkerTopologyContext context, GlobalStreamId stream, List targetTasks); List chooseTasks (int taskId, List values);}
By the same token, targetTasks means that the Storm runtime tells you that there are several target Task to choose from, each numbered. And chooseTasks (int taskId, List values); is for you to choose, which target values do you want to process for this piece of data Task?
As in the code for the custom marshaller implementation at the beginning of the article above, I always choose to let the first Task handle the data, return Arrays.asList (tasks.get (0));. Unlike Hadoop, Storm allows a piece of data to be processed by multiple Task, so the return value is List. It is up to you to select any number of (must be at least one) List targetTasks' Task from the 'Task provided' to process the data.
The above is how to write a custom grouping implementation in Twitter Storm Stream Grouping. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.