Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to customize data grouping in storm

2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

Today, I will talk to you about how to customize data grouping in storm, which may not be well understood by many people. in order to make you understand better, the editor has summarized the following for you. I hope you can get something according to this article.

Data flow group

One of the most important things you need to do when designing a topology is to define how data is exchanged between components (how data flow is consumed by bolts). A data flow group specifies which data streams are consumed by each bolt and how they are consumed.

Storm comes with data flow groups

Random data flow group

Random flow groups are the most commonly used data flow groups. It has only one parameter (the data source component), and the data source sends tuples to randomly selected bolt, ensuring that each consumer receives an approximate number of tuples.

Builder.setBolt ("word-counter", new WordCounter ()) .shuffleGrouping ("word-normalizer")

Domain data flow group

Domain data flow groups allow you to control how tuples are sent to bolts based on one or more domains of tuples. It guarantees that a set of values with the same combination of fields is sent to the same bolt. Going back to the example of the word counter, if you use the word field to group data streams, word-normalizer bolt will only send tuples of the same words to the same word-counterbolt instance.

Builder.setBolt ("word-counter", new WordCounter (), 2) .fieldsgrouping ("word-normalizer", new Fields ("word"))

All data flow groups

All data flow groups, making a copy of the tuple for each instance that receives the data. This grouping method is used to send signals to the bolts. For example, if you want to refresh the cache, you can send a cache refresh signal to all bolts. In the case of the word counter, you can use a full data flow group to add the ability to clear the counter cache.

Builder.setBolt ("word-counter", new WordCounter (), 2) .fieldsGroupint ("word-normalizer", new Fields ("word")) .allGrouping ("signals-spout", "signals")

Direct data flow group

This is a special data stream group that the data source can use to determine which component receives the tuple

Builder.setBolt ("word-counter", new WordCounter (), 2) .directGrouping ("word-normalizer")

. Similar to the previous example, the data source determines which bolt receives the tuple based on the first letter of the word. To use a direct data flow group, in WordNormalizer bolt, use the emitDirect method instead of emit.

Public void execute (Tuple input) {... For (String word: words) {if (! word.isEmpty ()) {... Collector.emitDirect (getWordCountIndex (word), new Values (word));}} / / reply collector.ack (input) to a pair of tuples;} public Integer getWordCountIndex (String word) {word = word.trim (). ToUpperCase (); if (word.isEmpty ()) {return 0;} else {return word.charAt (0)% numCounterTasks }}

Calculate the number of tasks in prepare method

Public void prepare (Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector; this.numCounterTasks = context.getComponentTasks ("word-counter");}

Global data flow group

The global data flow group sends tuples created by all data sources to a single target instance (that is, the task with the lowest ID).

No grouping

This data flow group is equivalent to a random data flow group. That is, when using this data flow group, you don't care how the data streams are grouped.

Custom data flow group

The storm custom data stream group is very similar to the hadoop Partitioner grouping. To implement the CustomStreamGrouping API for storm custom grouping, the source codes of the API are as follows:

Public interface CustomStreamGrouping extends Serializable {void prepare (WorkerTopologyContext context, GlobalStreamId stream, List targetTasks); List chooseTasks (int taskId, List values);}

TargetTasks is the Storm runtime that tells you that there are currently several target Task to choose from, each numbered. And chooseTasks (int taskId, List values); is for you to choose, which target values do you want to process for this piece of data Task?

This is a custom grouping I wrote that always divides the data into the first Task:

Public class MyFirstStreamGrouping implements CustomStreamGrouping {private static Logger log = LoggerFactory.getLogger (MyFirstStreamGrouping. Class); private List tasks; @ Override public void prepare (WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {this .tasks = targetTasks; log.info (tasks.toString ());} @ Override public List chooseTasks (int taskId, List values) {log.info (values.toString ()); return Arrays.asList (tasks.get (0));}}

As you can see from the above code, this custom grouping merges the data into the first TaskArrays.asList (tasks.get (0)); that is, when the data arrives, it is always dispatched to the first group. Unlike Hadoop, Storm allows a piece of data to be processed by multiple Task, so the return value is List. It is up to you to select any number of (must be at least one) List targetTasks' Task from the 'Task provided' to process the data.

In the second custom grouping, words with the same initials in wordcount are handed over to the same bolt for processing:

Public class ModuleGrouping implements CustormStreamGrouping {int numTasks = 0; @ Override public List chooseTasks (List values) {List boltIds = new ArrayList (); if (values.size () > 0) {String str = values.get (0) .toString (); if (str.isEmpty ()) {boltIds.add (0) } else {boltIds.add (str.charAt (0)% numTasks);}} return boltIds;} @ Override public void prepare (TopologyContext context, Fields outFields, List targetTasks) {numTasks = targetTasks.size ();}}

This is a simple implementation of CustomStreamGrouping, where we use the integer value of the word's first letter character and the remainder of the number of tasks to decide to receive the bolt of the tuple.

Builder.setBolt ("word-normalizer", new WordNormalizer ()) .customGrouping ("word-reader", new ModuleGrouping ()); after reading the above, do you have any further understanding of how to customize data grouping in storm? If you want to know more knowledge or related content, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report