Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Detailed introduction of Storm Trident

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains "detailed introduction of Storm Trident". Interested friends may wish to have a look at it. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn the detailed introduction of Storm Trident.

I. Summary

1.1 Storm (introduction)

Storm is a real-time and reliable distributed flow computing framework.

Needless to say, for example, a typical big data real-time computing application scenario: reading messages from Kafka message queues (which can be logs,clicks,sensor data); preprocessing such as computing aggregation of messages through Storm; persisting the processing results to NoSQL database or HDFS for further analysis.

1.2 Trident (introduction)

Trident is a higher-level abstraction of Storm. In addition to providing a set of easy-to-use stream data processing API, it processes in units of batch (a set of tuples), which makes some processing simpler and more efficient.

We know that it is unreliable to keep the running state of Bolt only in memory. If a node dies, then the tasks on that node will be reassigned, but the previous state cannot be restored. Therefore, the smarter way is to persist the computing state information of storm into database. Based on this, trident becomes particularly important. Because when dealing with big data, we usually use batch processing when dealing with database to avoid pressure on it, and trident is precisely in the form of batch groups to process data, and provides some aggregation functions of API.

II. Trident API practice

Trident is actually a set of API, but there is not much information about the usage and meaning of each function in Trident API at this stage. I will introduce the usage and meaning of each function of Trident API in detail according to some English materials and my own understanding.

2.1 each () method

Function: manipulate every tuple content in batch, generally used with Filter or Function functions.

Let's introduce the each () method through an example. Suppose we have a FakeTweetsBatchSpout that simulates a Stream and randomly generates messages. We can change the size of the Spout's batch Size by setting the construction parameters of the Spout class.

2.1.1 Filter class: filter tuple

A Filter that filters messages through the actor field:

Public static class PerActorTweetsFilter extends BaseFilter {String actor; public PerActorTweetsFilter (String actor) {this.actor = actor;} @ Override public boolean isKeep (TridentTuple tuple) {return tuple.getString (0) .equals (actor);}}

Topology:

Topology.newStream ("spout", spout) .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave") .each (new Fields ("actor", "text"), new Utils.PrintFilter ())

As you can see from the example above, the each () method has some construction parameters:

The first construction parameter: as a Field Selector, a tuple may have many fields. By setting Field, we can hide other fields and only receive the specified fields (other fields are actually there).

The second is a Filter: used to filter out messages other than actor's name "dave".

2.1.2 Function class: processing tuple content

A Function that can capitalize text content in tuple:

Public static class UppercaseFunction extends BaseFunction {@ Override public void execute (TridentTuple tuple, TridentCollector collector) {collector.emit (new Values (tuple.getString (0). ToUpperCase ());}}

Topology:

Topology.newStream ("spout", spout) .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave")) .each (new Fields ("text", "actor"), new UppercaseFunction (), new Fields ("uppercased_text") .each (new Fields ("actor", "text", "uppercased_text"), new Utils.PrintFilter ())

First, the input to the UppercaseFunction function is Fields ("text", "actor"), which changes the contents of the "text" field to uppercase.

Second, it has one more output field than Filter, and the effect is that after each tuple is processed by this Function function, the output field is appended to tuple. In this case, there is an extra "uppercased_text" in the tuple content after Function, and this field comes last.

2.1.3 Field Selector and project

We need to note that the first Field field of each each () method above simply hides the contents of the unspecified fields. In fact, the hidden fields are still in the tuple. If we want to get rid of them completely, we need to use the project () method.

Projection operation is used to retain only the data of the fields specified by Stream. For example, a Stream contains the following fields: ["a", "b", "c", "d"], and run the following code:

Mystream.project (new Fields ("b", "d")), the output stream contains only ["b", "d"] fields.

2.2 parallelismHint () method and partitionBy ()

2.2.1 parallelismHint ()

Specify the parallelism of the Topology, that is, how many threads are used to perform the task. We can change our Filter slightly to tell which thread it is by printing the partitionIndex of the current task.

Filter:

Public static class PerActorTweetsFilter extends BaseFilter {private int partitionIndex; private String actor; public PerActorTweetsFilter (String actor) {this.actor = actor;} @ Override public void prepare (Map conf, TridentOperationContext context) {this.partitionIndex = context.getPartitionIndex ();} @ Override public boolean isKeep (TridentTuple tuple) {boolean filter = tuple.getString (0) .equals (actor) If (filter) {System.err.println ("I am partition [" + partitionIndex + "] and I have kept a tweet by:" + actor);} return filter;}}

Topology:

Topology.newStream ("spout", spout) .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave"). ParallelismHint (5) .each (new Fields ("actor", "text"), new Utils.PrintFilter ())

If we specify that the number of threads executing the Filter task is 5, what will be the final execution result? Take a look at our test results:

I am partition [4] and I have kept a tweet by: dave

I am partition [3] and I have kept a tweet by: dave

I am partition [0] and I have kept a tweet by: dave

I am partition [2] and I have kept a tweet by: dave

I am partition [1] and I have kept a tweet by: dave

We can clearly see that there are a total of five threads executing Filter.

What if we want 2 Spout and 5 Filter? As shown in the following code, the implementation is simple.

Topology.newStream ("spout", spout) .parallelismHint (2). Shuffle () .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave")) .parallelismHint (5) .each (new Fields ("actor", "text"), new Utils.PrintFilter ())

2.2.2 partitionBy () and redirect operation (repartitioning operation)

We notice that shuffle () is used in the above example, and shuffle () is a redirect operation. So what is a redirect operation? Redirect defines how our tuple is route to the next processing layer, of course, there may be different parallelism between different layers, the role of shuffle () is to randomly route the next layer of tuple in the thread, while partitionBy () according to our specified field according to the consistent hash algorithm route to the next layer of thread, that is, if we use partitionBy (), the tuple of the same field name will be route to the same thread.

For example, if we change shuffle () in the above code to partitionBy (new Fields ("actor")), guess what happens?

I am partition [2] and I have kept a tweet by: dave

I am partition [2] and I have kept a tweet by: dave

I am partition [2] and I have kept a tweet by: dave

I am partition [2] and I have kept a tweet by: dave

As we described above, the tuple of the same field is route into the same partition.

There are several kinds of redirection operations:

Shuffle: equalize tuple to each partition through a random allocation algorithm

Broadcast: each tuple is broadcast to all partitions, which is very useful in drcp, such as stateQuery on each partition

PartitionBy: partition according to the specified field list, by modulating the number of partitions with the hash value of the specified field list to ensure that the data of the same field list is divided into the same partition

Global: all tuple is sent to a partition that handles the entire Stream

BatchGlobal: all tuple in a Batch are sent to the same partition, and different Batch go to different partition

Partition: partitioning through a custom partition function that implements backtype.storm.grouping.CustomStreamGrouping

2.3 aggregation (Aggregation)

As we mentioned earlier, one of the most important features of Trident is that it handles tuple in the form of batch. The most basic operation we can easily think of for an batch should be aggregation. Trident provides aggregate API to handle batches. Let's take a look at an example:

2.3.1 Aggregator:

Public static class LocationAggregator extends BaseAggregator {@ Override public Map init (Object batchId, TridentCollector collector) {return new HashMap ();} @ Override public void aggregate (Map val, TridentTuple tuple, TridentCollector collector) {String location = tuple.getString (0); val.put (location, MapUtils.getInteger (val, location, 0) + 1) @ Override public void complete (Map val, TridentCollector collector) {collector.emit (new Values (val));}}

Topology:

Topology.newStream ("spout", spout) .cake (new Fields ("location"), new LocationAggregator (), new Fields ("location_counts")) .each (new Fields ("location_counts"), new Utils.PrintFilter ())

The aggregator is simple: calculate the number of location for each batch. Through this example, we can see the Aggregator interface:

Init (): executes when a batch is first received

Aggregate (): executes when each tuple in the batch is received

Complete (): executes at the end of a batch

We mentioned earlier that the aggregate () method is a redirect method because it randomly starts a separate thread to perform the aggregate operation.

Let's take a look at the test results:

[{USA=3, Spain=1, UK=1}]

[{USA=3, Spain=2}]

[{France=1, USA=4}]

[{USA=4, Spain=1}]

[{USA=5}]

We can see the printed result, where the sum of each item is 5, because the number of tuple in each batch of our Spout is set to 5, so the calculation result of each thread will also be 5. In addition, Trident also provides two other Aggregator interfaces: CombinerAggregator and ReducerAggregator. For more information, please see Trident API.

2.3.2 partitionAggregate ():

If we modify the Topology above a little bit, guess what the result will be.

Topology.newStream ("spout", spout) .partitionBy (new Fields ("location")) .partitionAggregate (new Fields ("location"), new LocationAggregator (), new Fields ("location_counts"). ParallelismHint (3) .each (new Fields ("location_counts"), new Utils.PrintFilter ())

Let's analyze it. First, the partitionBy () method redirects the tuples to the next processing logic according to its location field, and the tuple of the same location field must be assigned to the same thread for processing. Second, the partitionAggregate () method, which is different from Aggregate, is not a redirect method, it just performs aggregation operations on each batch on the current partition. Because we have redirected according to location, and there are a total of 4 location in the test data, while there are currently 3 partition, we can guess that in our final test result, one partition will handle the batch of two location. The final test result is as follows:

[{France=10, Spain=5}]

[{USA=63}]

[{UK=22}]

It is important to note that although partitionAggregate is also an aggregation operation, unlike the Aggregate above, it is not a redirect operation.

2.4 groupBy

We can see the test results of the above examples, in fact, what we usually want is the number of location, what should we do with it? Look at the following Topology:

Topology.newStream ("spout", spout) .groupBy (new Fields ("location")) .new Fields ("location"), new Count (), new Fields ("count") .each (new Fields ("location", "count"), new Utils.PrintFilter ())

Let's first take a look at the results of the implementation:

[France, 25]

[UK, 2]

[USA, 25]

[Spain, 44]

[France, 26]

[UK, 3]

The above code calculates the number of each location, even if our Count function does not specify a degree of parallelism. This is what groupBy () does. It creates a GroupedStream based on the specified field, and the tuple of the same field is redirected together to form a group. GroupBy () is followed by aggregate, which, unlike the previous aggregation of the entire batch, where the aggregate aggregates each group separately. We can also think that groupBy divides the Stream into stream group according to the specified fields, and each group is treated like a batch.

Note, however, that groupBy () itself is not a redirect operation, but if it is followed by aggregator, it is not followed by partitionAggregate.

III. Summary

Storm is a real-time stream computing framework. Trident is a higher-level abstraction of storm. The biggest feature of Trident is to deal with stream in the form of batch.

Some of the most basic operation functions are Filter, Function,Filter can filter out tuple,Function, modify tuple content, output 0 or more tuple, and append new fields to tuple.

Aggregation has partitionAggregate and Aggregator interfaces. PartitionAggregate aggregates the tuple in the current partition, which is not a redirect operation. Aggregator has three interfaces: CombinerAggregator and ReducerAggregator,Aggregator, which belong to redirect operations, which redirect stream to a partition for aggregation operations.

The redirect operation will change the data flow, but will not change the data content, and the redirect operation will produce network transmission, which may affect part of the efficiency. On the other hand, Filter, Function and partitionAggregate are local operations and will not generate network transmission.

GroupBy splits the entire stream into grouped stream according to the specified field, and if the aggregation operation is done on the grouped stream, then the aggregation will occur on these grouped stream instead of the entire batch. If groupBy is followed by aggregator, it is an aggregation operation, and if it is followed by partitionAggregate, it is not an aggregation operation.

At this point, I believe that you have a deeper understanding of the "detailed introduction of Storm Trident", you might as well come to the actual operation! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report