In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "how to use Transactional topology". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
You can create transactional topology by using TransactionalTopologyBuilder. Here is a definition of transactional topology, which counts the number of tuple in the input stream. This code comes from TransactionalGlobalCount in storm-starter.
MemoryTransactionalSpout spout = new MemoryTransactionalSpout (DATA, new Fields ("word"), PARTITION_TAKE_PER_BATCH)
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder ("global-count", "spout", spout, 3)
Builder.setBolt ("partial-count", new BatchCount (), 5)
.shuffleGrouping ("spout")
Builder.setBolt ("sum", new UpdateGlobalCount ())
.globalGrouping ("partial-count")
The following parameters are accepted in the TransactionalTopologyBuilder constructor:
an id of transaction topology
The id of spout in the entire topology.
a transactional spout.
an optional degree of parallelism for this transactional spout.
Topology's id is used to save the current progress state of the topology in zookeeper, so if you restart the topology, it can continue with the previous progress.
There is a unique TransactionalSpout in a transaction topology, and this spout is specified through the constructor of TransactionalTopologyBuilder. In this example, MemoryTransactionalSpout is used to read data from a memory variable (DATA). The second parameter specifies the field of the tuple sent by spout, and the third parameter specifies the maximum number of tuple per batch. We'll talk about how to customize TransactionalSpout later.
Now talk about bolts. This topology calculates the total number of tuple in parallel. The first bolt:BatchBolt randomly distributes the input tuple to each task, and then each task counts the local number. The second bolt:UpdateGlobalCount uses the global grouping to summarize the number of tuple in this batch, and then updates it to the global number in the database.
Here is the definition of BatchCount:
Public static class BatchCount extends BaseBatchBolt {
Object _ id
BatchOutputCollector _ collector
Int _ count = 0
@ Override
Public void prepare (Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_ collector = collector
_ id = id
}
@ Override
Public void execute (Tuple tuple) {
_ count++
}
@ Override
Public void finishBatch () {
_ collector.emit (new Values (_ id, _ count))
}
@ Override
Public void declareOutputFields (OutputFieldsDeclarer declarer) {
Declarer.declare (new Fields ("id", "count"))
}
}
Storm creates a BatchCount object for each batch being processed, and the BatchCount runs inside the BatchBoltExecutor. BatchBoltExecutor is responsible for creating and cleaning up instances of this object.
The prepare method of the BatchCount object receives the following parameters:
Storm config
Topology context
Output collector
is the id (txid) of batch, and in Transactional Topology, this id is a TransactionAttempt object.
This batch bolt abstraction can also be used in DRPC, but the type of txid is different. In fact, BatchBolt can take a parameter of type txid, so if you just want to use this BatchBolt in transactioinal topology, you can inherit the BaseTransactionalBolt class, as defined below:
Public abstract class BaseTransactionalBolt extends BaseBatchBolt {
}
All tuple launched in transaction topology must have TransactionAttempt as the first field, and then storm can determine which tuple belongs to a batch based on this field. So you need to meet this condition when you launch tuple.
TransactionAttempt contains two values: a transaction id and an attempt id. The purpose of transaction id is that what we described above is unique to each batch, and it is the same no matter how many times the batch is replay. Attempt id is the only id for each batch, but for the same batch, the attempt id after replay is different from that before replay. We can understand attempt id as replay-times, and storm uses this id to distinguish different versions of tuple launched by a batch.
Transaction id adds one for each batch, so the transaction id of the first batch is "1", the second batch is "2", and so on.
The tuple,execute method is called every time a batch is received. Every time this method is called, you should keep the state in the batch in a local variable. For this example, it increments the number of tuple in the execute method.
Finally, when the bolt receives all the tuple of a batch, the finishBatch method is called. The BatchCount class in this example will emit its local number into its output stream at this time.
Here is the definition of the UpdateGlobalCount class:
Public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
TransactionAttempt _ attempt
BatchOutputCollector _ collector
Int _ sum = 0
@ Override
Public void prepare (Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
_ collector = collector
_ attempt = attempt
}
@ Override
Public void execute (Tuple tuple) {
_ sum+=tuple.getInteger (1)
}
@ Override
Public void finishBatch () {
Valueval = DATABASE.get (GLOBAL_COUNT_KEY)
Value newval
If (val = = null | |! val.txid.equals (_ attempt.getTransactionId () {
Newval = new Value ()
Newval.txid = _ attempt.getTransactionId ()
If (val==null) {
Newval.count = _ sum
} else {
Newval.count = _ sum + val.count
}
DATABASE.put (GLOBAL_COUNT_KEY, newval)
} else {
Newval = val
}
_ collector.emit (new Values (_ attempt, newval.count))
}
@ Override
Public void declareOutputFields (OutputFieldsDeclarer declarer) {
Declarer.declare (new Fields ("id", "sum"))
}
}
UpdateGlobalCount is a Transactional Topologies-related class, so it inherits from BaseTransactionalBolt. In the execute method, UpdateGlobalCount accumulates the count of this batch, and what is more interesting is the finishBatch method.
First, notice that the bolt implements the ICommitter interface, which tells storm to call the finishBatch method during the commit phase of the transaction, so the call to finishBatch ensures strong ordering (the order is the ascending order of the transaction id). On the other hand, the execute method can be executed in either the processing or commit phase. Another way to identify bolt as commiter is to call TransactionalTopologyBuilder's setCommiterBolt to add Bolt (instead of setBolt).
The logic of the finishBatch method in UpdateGlobalCount is to first get the current value from the database and compare the transaction id in the database with the transaction id of the current batch. If they are the same, then ignore the batch. Otherwise, add the batch result to the total result and update the database.
That's all for the content of "how to use Transactional topology". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.