Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is trident in storm

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly introduces what trident is in storm. It is very detailed and has certain reference value. Friends who are interested must finish reading it.

Brief introduction

Storm is a real-time stream computing framework. Trident is a higher-level abstraction of storm. The biggest feature of Trident is to deal with stream in the form of batch.

Some of the most basic operation functions are Filter, Function,Filter can filter out tuple,Function, modify tuple content, output 0 or more tuple, and append new fields to tuple.

Aggregation has partitionAggregate and Aggregator interfaces. PartitionAggregate aggregates the tuple in the current partition, which is not a redirect operation. Aggregator has three interfaces: CombinerAggregator and ReducerAggregator,Aggregator, which belong to redirect operations, which redirect stream to a partition for aggregation operations.

The redirect operation will change the data flow, but will not change the data content, and the redirect operation will produce network transmission, which may affect part of the efficiency. On the other hand, Filter, Function and partitionAggregate are local operations and will not generate network transmission.

GroupBy splits the entire stream into grouped stream according to the specified field, and if the aggregation operation is done on the grouped stream, then the aggregation will occur on these grouped stream instead of the entire batch. If groupBy is followed by aggregator, it is an aggregation operation, and if it is followed by partitionAggregate, it is not an aggregation operation.

There are five main types of operations in Trident:

1. The local operation does not produce network transmission.

2. The redistribution of the data stream does not change the content of the stream, but produces network transmission.

3. Aggregation operation, it is possible to generate network transmission.

4. The operation on the packet flow (grouped streams).

5. Merge and join

Partition

Concept

Partition means partition in Chinese, and some people understand partition as the task in Storm, that is, the basic execution unit of concurrency. I understand that it should be like the partition in the database, which divides the data of a batch into multiple partition, or can be understood as multiple sub-batch, and then multiple partition can be processed concurrently. The key difference here is that partition is data, not code that executes. After you partition the data (tuple), if you do not have multiple task (concurrency) to process the partitioned data, then partitioning is also useless. So the relationship here goes like this: first there is batch, because the internal implementation of Trident is based on batch; then there are partition; partitions and then the degree of concurrency is allocated before concurrent processing can be carried out. The allocation of concurrency is realized by using parallelismHint.

Operation

Now that there is the concept of partition, then there is also the operation of partition. The partition operation provided by Trident is similar to the grouping described in Storm. Partition operations are as follows:

The repartition operation changes the distribution of tuples between tasks by running a function, and can also adjust the number of partitions (such as increasing parallelism after repartitioning). Repartitioning requires the participation of network transmission. The repartitioning function consists of the following:

Shuffle: use a random polling algorithm to evenly distribute tuples across all target partitions

Broadcast: each tuple is copied to all target partitions. This is very useful in DRPC, for example, you need to do a stateQuery operation on the data of each partition

PartitionBy: receives some input fields and makes semantic partitions based on these input fields. Select the target partition by taking a hash value or a module on the field. PartitionBy guarantees that the same fields must be assigned to the same target partition

Global: all tuples are assigned to the same partition, which is determined by all batch of the stream

BatchGlobal: tuples in the same batch are assigned to the same target partition, while tuples in different batch may be assigned to different target partitions

Partition: receives a custom partition function, which needs to implement the backtype.storm.grouping.CustomStreamGrouping interface.

Note that in addition to the partition operation explicitly mentioned here, there is also a partition operation implied in the aggregate () function in Trident, which uses the global () operation, which will be described later when you receive the aggregation operation.

API

Each () method

Function: manipulate every tuple content in batch, generally used with Filter or Function functions.

Let's introduce the each () method through an example. Suppose we have a FakeTweetsBatchSpout that simulates a Stream and randomly generates messages. We can change the size of the Spout's batch Size by setting the construction parameters of the Spout class.

1.Filter class: filtering tuple

A Filter that filters messages through the actor field:

Public static class PerActorTweetsFilter extends BaseFilter {String actor; public PerActorTweetsFilter (String actor) {this.actor = actor;} @ Override public boolean isKeep (TridentTuple tuple) {return tuple.getString (0) .equals (actor);}}

Topology:

Topology.newStream ("spout", spout) .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave") .each (new Fields ("actor", "text"), new Utils.PrintFilter ())

As you can see from the example above, the each () method has some construction parameters

The first construction parameter: as a Field Selector, a tuple may have many fields. By setting Field, we can hide other fields and only receive the specified fields (other fields are actually there).

The second is a Filter: used to filter out messages other than actor's name "dave".

2.Function class: processing tuple content

A Function that can capitalize text content in tuple:

Public static class UppercaseFunction extends BaseFunction {@ Override public void execute (TridentTuple tuple, TridentCollector collector) {collector.emit (new Values (tuple.getString (0). ToUpperCase ());}}

Topology:

Topology.newStream ("spout", spout) .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave")) .each (new Fields ("text", "actor"), new UppercaseFunction (), new Fields ("uppercased_text") .each (new Fields ("actor", "text", "uppercased_text"), new Utils.PrintFilter ())

First, the input to the UppercaseFunction function is Fields ("text", "actor"), which changes the contents of the "text" field to uppercase.

Second, it has one more output field than Filter, and the effect is that after each tuple is processed by this Function function, the output field is appended to tuple. In this case, there is an extra "uppercased_text" in the tuple content after Function, and this field comes last.

3. Field Selector and project

We need to note that the first Field field of each each () method above simply hides the contents of the unspecified fields. In fact, the hidden fields are still in the tuple. If we want to get rid of them completely, we need to use the project () method.

Projection operation is used to retain only the data of the fields specified by Stream. For example, a Stream contains the following fields: ["a", "b", "c", "d"], and run the following code:

Mystream.project (new Fields ("b", "d"))

The output stream contains only the ["b", "d"] fields.

Introduction of aggregation

First, there are two types of aggregation operations: partitionAggregate (), and aggregate ().

1.partitionAggregate

The operation of partitionAggregate () is on partition, and when the tuple of a batch is divided into multiple partition, each partition runs the aggregation operation specified in the partitionAggregate separately. Partition aggregation runs a function on each partition of a batch of tuple. Unlike the function, the output tuple aggregated by the partition overwrites the input tuple. Take a look at the following example:

Mystream.partitionAggregate (new Fields ("b"), new Sum (), new Fields ("sum"))

Suppose you have an input stream that contains two fields, aformab, and the partition of the tuple is as follows:

Partition 0: ["a", 1] ["b", 2] Partition 1: ["a", 3] ["c", 8] Partition 2: ["e", 1] ["d", 9] ["d", 10]

Running the above line of code will output the following tuples, which contain only one sum field:

Partition 0: [3] Partition 1: [11] Partition 2: [20]

2.aggregate

Aggregate () implies a global partition operation, that is, it does a global aggregation operation. It is aimed at the aggregate calculation of the entire batch.

Both of these aggregation operations can be passed into different aggregator to implement specific aggregation tasks. There are three kinds of aggregator interfaces in Trident, which are ReducerAggregator,CombinerAggregator,Aggregator.

The following is the definition of the CombinerAggregator interface:

Public interface CombinerAggregator extends Serializable {T init (TridentTuple tuple); T combine (T val1, T val2); T zero ();}

CombinerAggregator returns a tuple with only one field. CombinerAggregator runs the init function on each input tuple and then aggregates the resulting values through the combine function until there is only one tuple left. If there are no tuples in the partition, CombinerAggregator returns the tuples defined in the zero function. For example, here is the implementation of the Count aggregator:

Public class Count implements CombinerAggregator {public Long init (TridentTuple tuple) {return 1L;} public Long combine (Long val1, Long val2) {return val1 + val2;} public Long zero () {return 0L;}}

The definition of ReducerAggregator interface is as follows:

Public interface ReducerAggregator extends Serializable {T init (); T reduce (T curr, TridentTuple tuple);}

ReducerAggregator gets an initial value through the init function, and then calls the reduce method to calculate the value for each input tuple, producing a tuple as output. For example, the ReducerAggregator implementation of Count is as follows:

Public class Count implements ReducerAggregator {public Long init () {return 0L;} public Long reduce (Long curr, TridentTuple tuple) {return curr + 1;}}

The most commonly used aggregator interface is Aggregator, which is defined as follows:

Public interface Aggregator extends Operation {T init (Object batchId, TridentCollector collector); void aggregate (T state, TridentTuple tuple, TridentCollector collector); void complete (T state, TridentCollector collector);}

Aggregator can emit any number of tuples of any field. And the tuple can be launched at any time during execution, and its execution process is as follows:

The init method is called before processing batch, and the return value of the init function is an object representing the aggregate state, which is passed to the aggregate and complete functions.

Each tuple in the batch partition calls the aggregate method, which updates the aggregation state and emits the tuple

The complete function is called when all tuples in the batch partition are processed by the aggregate function.

The following is a Count aggregator implemented using the Aggregator interface:

Public class CountAgg extends BaseAggregator {static class CountState {long count = 0;} public CountState init (Object batchId, TridentCollector collector) {return new CountState ();} public void aggregate (CountState state, TridentTuple tuple, TridentCollector collector) {state.count+=1;} public void complete (CountState state, TridentCollector collector) {collector.emit (new Values (state.count));}}

Sometimes, when we need to notify the execution of many aggregators, we can use the following chained call to execute:

Mystream.chainedAgg () .partitionAggregate (new Count (), new Fields ("count")) .partitionAggregate (new Fields ("b"), new Sum (), new Fields ("sum")) .chainEnd ()

The above code will execute the Count and Sum aggregators on each partition, and the output will be a tuple containing the count and sum fields.

The most important difference is CombinerAggregator, which first does partial aggregate on partition, and then partitions these parts of the results into a total partition through global, and summarizes the results on this total partition.

GroupBy () grouping operation

First of all, it contains two operations, one is a partition operation, and the other is a grouping operation.

If it is followed by partitionAggregate (), there is only grouping: grouping on each partition, grouping, and aggregation on each grouping.

If it is followed by aggregate (), it is grouped on each partition according to the partitionBy partition, and after grouping, aggregation is carried out on each group.

Introduction to the concurrency of parallelismHint

It sets the concurrency of all its previous operations until it encounters a repartition operation.

Topology.newStream ("spout", spout) .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave"). ParallelismHint (5) .each (new Fields ("actor", "text"), new Utils.PrintFilter ())

It means: the spout,each before parallelismHit is the concurrency of five identical operations. Yes, there are a total of five spout transmitting data at the same time. In fact, the each operation behind parallelismHint is also five concurrency. The partition operation serves as the demarcation point for Bolt partition.

What if I want to set up Spout separately? To add a ParallelismHint after Spout and before Bolt, and add a partition operation:

Topology.newStream ("spout", spout) .parallelismHint (2) .shuffle () .each (new Fields ("actor", "text"), new PerActorTweetsFilter ("dave"). ParallelismHint (5) .each (new Fields ("actor", "text"), new Utils.PrintFilter ())

Many people just set the concurrency of Spout without invoking partitioning operations, which is not effective because Trident does not automatically do partitioning operations. As I introduced earlier, partition first, and then set the degree of concurrency. If Spout does not set the degree of concurrency, only set shuffle. The default is 1 degree of concurrency, so that setting 5 degrees of concurrency later will not affect Spout, because the effect of concurrency degree on shuffle partition operation will stop.

Examples

GroupBy+aggregate+parallelismHint

Package com.demo;import java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Values;import storm.trident.operation.BaseAggregator;import storm.trident.operation.TridentCollector;import storm.trident.operation.TridentOperationContext;import storm.trident.tuple.TridentTuple;public class MyAgg extends BaseAggregator {/ * / private static final long serialVersionUID = 1L / * * which partition does it belong to * / private int partitionId; / * number of partitions * / private int numPartitions; private String batchId; @ SuppressWarnings ("rawtypes") @ Override public void prepare (Map conf, TridentOperationContext context) {partitionId = context.getPartitionIndex () NumPartitions = context.numPartitions ();} public void aggregate (Map val, TridentTuple tuple, TridentCollector collector) {String word = tuple.getString (0); Integer value = val.get (word); if (value = = null) {value = 0 } value++; / / Save data to a map object val.put (word, value); System.err.println ("I am partition [" + partitionId + "] and I have kept a tweet by:" + numPartitions + "" + word + "+ batchId) } public void complete (Map val, TridentCollector collector) {collector.emit (new Values (val));} public Map init (Object arg0, TridentCollector arg1) {this.batchId = arg0.toString (); return new HashMap () } FixedBatchSpout spout = new FixedBatchSpout (new Fields ("sentence"), 2, new Values ("a"), new Values ("d"), new Values ("e"), new Values ("f"); spout.setCycle (false); TridentTopology tridentTopology = new TridentTopology () TridentTopology .newStream ("spout", spout) .shuffle () .groupBy (new Fields ("sentence")) .news (new Fields ("sentence"), new MyAgg () New Fields ("Map")) .parallelismHint (2) I am partition [0] and I have kept a tweet by: 2 a 1 and I have kept a tweet by 0I am partition [0] and I have kept a tweet by: 2 a 1 am partition 0I Hint [0] and I have kept a tweet by: 2 a 2 a 2purl 0I am partition [1] and I have kept a tweet by: 2 D 2:0I am partition [0] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

GroupBy+partitionAggregate+parallelismHint

FixedBatchSpout spout = new FixedBatchSpout (new Fields ("sentence"), 2, new Values ("a"), new Values ("d"), new Values ("e"), new Values ("f"); spout.setCycle (false); TridentTopology tridentTopology = new TridentTopology () TridentTopology .newStream ("spout", spout) .shuffle () .groupBy (new Fields ("sentence")) .partitionaggregate (new Fields ("sentence"), new MyAgg () New Fields ("Map")) .toStream () .parallelismHint (2) I am partition [0] and I have kept a tweet by: 2 a 1 am partition 0I am partition [1] and I have kept a tweet by: 2 a 1 and I have kept a tweet by 0I Hint [0] and I have kept a tweet By: 2 a 2:0I am partition [1] and I have kept a tweet by: 2 d 2:0I am partition [0] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

Since shuffle has evenly allocated tuple to five partition, aggregating with groupBy+partitionAggregate has no effect on partitionBy partitions, so aggregating directly on five partitions results in one tuple for each partition.

The use of groupBy+aggregate, although it is also shuffle, but because of the role of partitiononBy partition, the same value of tuple are assigned to the same partition, the result is that each partition to do aggregation according to different values.

Aggregate+parallelismHint (no groupBy)

FixedBatchSpout spout = new FixedBatchSpout (new Fields ("sentence"), 2, new Values ("a"), new Values ("d"), new Values ("e"), new Values ("f"); spout.setCycle (false); TridentTopology tridentTopology = new TridentTopology () TridentTopology .newstream ("spout", spout) .shuffle () .shuffle (new Fields ("sentence"), new MyAgg () New Fields ("Map")) .parallelismHint (2) I am partition [1] and I have kept a tweet by: 2 a 1 am partition 0I am partition [1] and I have kept a tweet by: 2 a 1 am partition 0I Hint [0] and I have kept a tweet by: 2 a 2 a 2purl 0I am partition [0] and I have kept a tweet by: 2 D 2:0I am partition [1] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

PartitionAggregate+parallelismHint (no groupBy operation)

FixedBatchSpout spout = new FixedBatchSpout (new Fields ("sentence"), 2, new Values ("a"), new Values ("d"), new Values ("e"), new Values ("f"); spout.setCycle (false); TridentTopology tridentTopology = new TridentTopology () TridentTopology .newStream ("spout", spout) .shuffle () .partitionaggregate (new Fields ("sentence"), new MyAgg () New Fields ("Map") .toStream () .parallelismHint (2) I am partition [1] and I have kept a tweet by: 2 a 1 am partition 0I am partition [0] and I have kept a tweet by: 2 a 1purl 0I am partition [1] and I have kept a tweet by : 2 a 2:0I am partition [0] and I have kept a tweet by: 2 d 2:0I am partition [0] and I have kept a tweet by: 2 e 3:0I am partition [1] and I have kept a tweet by: 2 f 3:0

We can find that partitionAggregate plus groupBy, or without groupBy, has the same result: groupBy has no effect on partitionAggregate. But for aggregate, adding groupBy is not global aggregation, but grouping aggregation; without groupBy, global aggregation is done.

If spout sets parallelism but does not add shuffle, it will not work, and the partition defaults to 1; if parallelism is not set and shuffle is not added, the partition defaults to 1.

Merge and Joins

The last part of api is how to bring streams together. The easiest way is to converge these streams into one stream. We can do this:

Topology.merge (stream1, stream2, stream3)

Trident specifies that the fields in the newly merged stream are fields in stream1.

Another way to merge streams is join. A standard join is like a sql, there must be standard input, so the join is only for qualified Stream. Join is used in every small Batch from Spout.

In the following example, the stream1 flow contains three fields key,val1,val2, and the stream2 flow contains two fields:

Topology.join (stream1, new Fields ("key"), stream2, new Fields ("x"), new Fields ("key", "a", "b", "c"))

The key field of the stream1 stream operates with the x-field group join of the stream2 stream. In addition, Trident requires that the output fields of all new streams be renamed because the input stream may contain the same field name. The tuple emitted by the connection stream will contain:

A list of connection fields. In the above example, the field key corresponds to the x of the key,stream2 of stream1

A list of all unconnected fields from all streams, sorted in the order passed to the connection method. In the above example, fields an and b correspond to stream1's val1 and val2,c to stream2's val1.

When the join is a stream from a different Spout, these Spout need to be synchronized when transmitting data, and the tuple contained in a Batch will come from each Spout.

The above is all the content of this article "what is trident in storm?" Thank you for reading! Hope to share the content to help you, more related knowledge, welcome to follow the industry information channel!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report