Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to configure the Transactional Topology of Storm

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)05/31 Report--

This article mainly explains "how to configure Transactional Topology in Storm". The content in the article is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to configure Transactional Topology in Storm".

1. What is Transactional Topology?

○ is a framework in which each tuple is processed only once.

○ was introduced by Storm0.7, abandoned in Storm0.9 and replaced by triden

The underlying ○ relies on a feature of spout\ bolt\ topology\ stream abstraction

2. Transactional Topology Design ideas ○ only deals with tuple once at a time

Based on the fact that Storm will replay when it fails to process tuple, how to ensure that the records of replay are not repeated, in other words, how to ensure that tuple is processed only once, which depends on an idea called strong sequentiality.

Strong orderability: each tuple is associated with a transaction id. Transaction id is actually a number, and each tuple has a sequential transaction id (for example, tuple1 has a transaction id of 1 and transaction id of 2. And so on), only after the current tuple is processed and stored, the next tuple (in waiting state) can be stored. When the tuple is stored, it is stored together with the transaction id. Consider two cases:

When tuple processing fails: resend a transaction id exactly the same as the original

When the tuple processing is successful: the sent transaction id is compared with the stored transaction id. If there is no transaction id, it means the first record is stored directly; if it is found to exist, the tuple is ignored.

This idea was put forward by Kafka developers.

○ processes one batch of tuple at a time

Based on one of the above optimizations, a batch of tuple is packaged directly into a batch, and then a transaction id is assigned to ensure strong sequence between batch and batch, and the tuples within the batch can be parallel.

How is ○ Storm adopted?

Two steps:

1. Number of tuple in parallel computing batch

2. Batch strongly sequential storage

While storing batch strongly sequentially, let other batch waiting for storage perform parallel operations without waiting for the next batch storage to perform internal operations.

The two steps above Storm are represented as the processing phase and the commit phase.

3. Some design details

When using Transactional Topology, storm provides the following:

○ Management status

Put the status information that needs to be processed, such as transaction id, batch meta, etc., in zookeeper.

○ coordinate transaction

Specify a certain period of time to perform processing and commit operations

○ error detection

Storm uses the acking framework to automatically detect the success or failure of batch processing, and then resend it accordingly (replay)

○ built-in batch API

By packaging ordinary bolt, it provides a set of API and coordination work for batch processing (that is, dealing with a certain processing or commit at a certain time), and storm automatically clears the intermediate results.

Transactional Topology is a message queuing system that can completely resend a specific batch, which is exactly what is required in Kakfa, so Storm implements a transactional spout for Kafka in the Storm-Kafka in storm-contrib.

4. Examples from Storm-Starter.jar

Count the number of tuple from the input stream

MemoryTransactionalSpout spout = new MemoryTransactionalSpout (DATA, new Fields ("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder ("global-count", "spout", spout, 3); builder.setBolt ("partial-count", new BatchCount (), 5) .shuffleGrouping ("spout"); builder.setBolt ("sum", new UpdateGlobalCount ()) .globalGrouping ("partial-count")

○ builds Transactional through the TransactionalTopologyBuilder class

Parameters:

The ID of Transaction ID:transactional topology, which is used to save the progress state in zookeeper. When you restart topology, you can start execution directly from the progress of execution without having to repeat it from beginning to end.

Spout ID: the ID of the Spout located throughout the Topology

Spout object in Spout Object:Transactional

Number of parallelism of Spout in Spout:Trasactional

○ MemoryTransactionalSpout is used to read data from a memory variable

DATA: data

Tuple fields: field

TupleNum: the maximum number of tuple in batch

○ Bolts

The first Bolt is randomly distributed to each task in a random grouping.

Public static class BatchCount extends BaseBatchBolt {Object _ id; BatchOutputCollector _ collector; int _ count = 0; @ Override public void prepare (Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {_ collector = collector; _ id = id;} @ Override public void execute (Tuple tuple) {_ count++;} @ Override public void finishBatch () {_ collector.emit (new Values (_ id, _ count)) @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("id", "count"));}}

BatchBolt objects run in BatchBoltExecutor, and BatchBoltExecutor is responsible for the creation and cleaning of BatchBolt objects.

The ID of BatchBolt is in the context object, which ID is a TransactionAttempt object.

BatchBolt can also be used in DRPC, but the type of txid is different. If you use BatchBolt in Transactional Topology, you can inherit BaseTransactionalBolt.

In Tranasctional Topology, all Tuple must take TransactionAttempt as the first field, and then storm can judge the BatchBolt to which the Tuple belongs according to the field, so this condition must be satisfied when transmitting Tuple.

There are two properties in the TransactionAttempt object:

Transaction id: strong sequence, the same number no matter how many times it is retransmitted

Attempt id: for the ID identified by each Batch, the value of each retransmission is different. The ID can be used to distinguish different versions of the Tuple for each retransmission.

The second Bolt uses GlobalGrouping to summarize the number of tuple in the batch

Public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {TransactionAttempt _ attempt; BatchOutputCollector _ collector; int _ sum = 0; @ Override public void prepare (Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {_ collector = collector; _ attempt = attempt;} @ Override public void execute (Tuple tuple) {_ sum+=tuple.getInteger (1) } @ Override public void finishBatch () {Valueval = DATABASE.get (GLOBAL_COUNT_KEY); Value newval; if (val==null | |! val.txid.equals (_ attempt.getTransactionId () {newval = new Value (); newval.txid = _ attempt.getTransactionId (); if (val==null) {newval.count = _ sum } else {newval.count = _ sum + val.count;} DATABASE.put (GLOBAL_COUNT_KEY, newval);} else {newval = val;} _ collector.emit (new Values (_ attempt, newval.count)) @ Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("id", "sum"));}}

ICommitter interface: the Bolt that implements this interface will call the finishBatch method during the commit phase, and the method will be called in a strong sequence. In addition, you can also use TransactionalTopologyBuilder's setCommiterBolt to add Bolt to implement the same functions as this interface.

Executor method: can be executed in both the processing phase and the commit phase.

For more transactional topology examples, take a look at the TransactionalWords class in storm-starter, which updates multiple databases in a single transaction

5. Transaction Topology APIBolt class

BaiscBolt: this Bolt does not interact with the tuples in batch, but only generates a new tuple based on a single incoming tuple

BatchBolt: this Bolt handles the tuples in batch. For each tuple, the executor method is called, and the finishBatch method is called when the entire batch is complete.

Bolt marked by Committer: the finishBatch method is called only in the commit phase. Commit has a strong sequence. There are two ways to mark Bolt as the commit phase to execute finishBatch: 1. Implement the ICommiter interface. 2. SetCommiterBolt of TransactionalTopologyBuilder to add Bolt.

Processing phase and Commit phase

The Bolt with a red outline has been marked as commit

Spout sends the entire Batch to Bolt A.

After Bolt A processes the entire Batch, call the finishBatch method to send Batch to Bolt B and Bolt C, respectively.

Bolt B will not call the finishBatch method when it receives the tuple passed by Bolt A for processing (which has not been processed yet).

Although the tuple passed by Bolt C interface Bolt A has finished processing the tuple passed by Bolt A, because Bolt B has not yet commit, Bolt C is waiting for Bolt B commit and will not call the finishBatch method.

Bolt D receives all tuple sent from Bolt C when it calls the executor method

At this point, once Bolt B does commit for finishBatch operation, Bolt C will confirm that all tuple,Bolt C of Bolt B also calls the finishBatch method, and eventually Bolt D also receives all batch from Bolt C.

Although Bolt D is a committer here, it does not need to wait for a second commit signal after receiving the tuple of the entire batch. Because it is the entire batch received during the commit phase, it calls finishBatch to complete the entire transaction.

Acking

Note that when using transactional topology, you don't have to explicitly do any acking or anchoring,storm is done behind the back. (storm highly optimizes the acking mechanism in transactional topolgies)

Failing a transaction

When using a normal bolt, you can fail the tuple tree where the tuple is located by calling the fail method of OutputCollector. Transactional Topology hides the acking framework from users, which provides a different mechanism to fail a batch (thus making the batch replay): just throw a FailedException. Unlike a normal exception, this exception only causes the current batch to be replay, not the entire process to crash.

Transactional spout

The TransactionalSpout interface is completely different from the normal Spout interface. An implementation of TransactionalSpout sends a batch of (batch) tuple, and must ensure that the transaction id of the same batch of tuples is always the same.

When transactional topology is running, transactional spout looks like this structure:

Coordinator is a normal storm spout-- that always emits tuple for transactional batch.

Emitter is like an ordinary storm bolt, which is responsible for all grouping subscriptions to coordinator's "batch emit" stream for each batch actually emitting tuple,emitter.

For details on how to implement a TransactionalSpout, see Javadoc

Partitioned Transactional Spout

A common TransactionalSpout is the kind of tuple that reads data from multiple queue broker and then transmits it. For example, this is how TransactionalKafkaSpout works. IPartitionedTransactionalSpout automates the work of managing the state of each partition to ensure the idempotency of replay. For more information, please see Javadoc.

Configuration

Transactional Topologies has two important configurations:

Zookeeper: by default, transactional topology stores the state information in a zookeeper (the one that coordinates the cluster). You can specify other zookeeper through these two configurations: "transactional.zookeeper.servers" and "transactional.zookeeper.port".

Number of batch active at the same time: you must set the number of batch to be processed at the same time. You can specify it through "topology.max.spout.pending". If you do not specify it, the default is 1.

6. Realization

The implementation of Transactional Topologies is very elegant. Managing the commit protocol, detecting failures, and serial commits seem complex, but abstracting using storm primitives is very simple.

1. Transactional spout is a child topology, which consists of a coordinator spout and an emitter bolt.

2. Coordinator is an ordinary spout, and the degree of parallelism is 1. Bolt is a bolt, and the degree of parallelism is P, which is connected to the "batch" stream of coordinator by all grouping.

3. Coordinator uses an acking framework to determine when a batch is successfully executed (process), and then to determine when a batch is successfully submitted (commit).

Thank you for your reading, the above is the content of "how to configure the Transactional Topology of Storm". After the study of this article, I believe you have a deeper understanding of how to configure the Transactional Topology of Storm, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report