In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)06/01 Report--
This article is to share with you about how to implement MemoryChannel transactions in Flume architecture. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
Flume provides reliable log collection function, and its high reliability is realized by transaction mechanism. For Channel transactions, we will introduce the implementation of MemoryChannel and FileChannel.
First, let's take a look at the BasicChannelSemantics implementation:
Public abstract class BasicChannelSemantics extends AbstractChannel {/ / 1, transactions use ThreadLocal storage to ensure transaction thread safety private ThreadLocal currentTransaction = new ThreadLocal (); private boolean initialized = false; / / 2, do some initialization work protected void initialize () {} / / 3, provide callback protected abstract BasicTransactionSemantics createTransaction () for creating transactions of the implementation class / / 4. Put Event into Channel, and its put method implementation @ Override public void put (Event event) throws ChannelException {BasicTransactionSemantics transaction = currentTransaction.get (); Preconditions.checkState (transaction! = null, "No transaction exists for this thread"); transaction.put (event);} / / 5. Obtain Event from Channel, which is also the take method implementation @ Override public Event take () throws ChannelException {BasicTransactionSemantics transaction = currentTransaction.get () directly delegated to the transaction. Preconditions.checkState (transaction! = null, "No transaction exists for this thread"); return transaction.take ();} / / 6. Get the transaction. Initialize it first if the instance is not initialized. Otherwise, get the transaction from ThreadLocal first, and if it is not available or closed, create one and bind to ThreadLocal. @ Override public Transaction getTransaction () {if (! initialized) {synchronized (this) {if (! initialized) {initialize (); initialized = true;}} BasicTransactionSemantics transaction = currentTransaction.get (); if (transaction = = null | | transaction.getState (). Equals (BasicTransactionSemantics.State.CLOSED)) {transaction = createTransaction (); currentTransaction.set (transaction) } return transaction;}} MemoryChannel transaction implementation
First of all, let's take a look at the implementation of MemoryChannel, which is a pure memory Channel implementation, and the entire transaction operation is done in memory. First, take a look at its memory structure:
1. First, a Channel Queue is used to store the Event data of the entire Channel.
2. Each transaction has a Take Queue and a Put Queue for storing transaction-related fetch data and playback data, respectively. When the transaction commits, it is fully synchronized to Channel Queue, or failed to roll back the fetch data to Channel Queue.
MemoryChannel is designed with two capacities in mind: Channel Queue capacity and transaction capacity, which involve quantity capacity and byte capacity.
In addition, because multiple transactions need to operate Channel Queue and consider the dynamic expansion of Channel Queue, MemoryChannel uses locks to implement, while the capacity problem uses semaphores to achieve.
Some parameters are initialized in the configure method, such as capacity, Channel Queue and so on. First, take a look at how the capacity of Channel Queue is calculated:
Try {capacity = context.getInteger ("capacity", defaultCapacity);} catch (NumberFormatException e) {capacity = defaultCapacity;} if (capacity 0) {queueRemaining.release (remainingChange);} / / 6, ChannelCounter some data counts if (puts > 0) {channelCounter.addToEventPutSuccessCount (puts);} if (takes > 0) {channelCounter.addToEventTakeSuccessCount (takes);} channelCounter.setChannelSize (queue.size ());}
Two semaphores are involved here:
QueueStored indicates that Channel Queue has stored event capacity (number of events stored),-1 when the queue fetches events, + N when events are successfully put in, and-N when fetching fails, that is, how many events are stored in Channel Queue. The queueStored semaphore defaults to 0. Reduce one queueStored semaphore when doTake fetches Event, increase queueStored semaphore of putList queue size when doCommit commits transaction, and reduce queueStored semaphore of takeList queue size when doRollback rollback transaction.
QueueRemaining means that Channel Queue can store event capacity (the number of events that can be stored), + N when the event is successfully fetched, and-N when the event is successful. The queueRemaining signal quantity defaults to Channel Queue capacity. When submitting a transaction, it first calculates the number of change events that need to be added by remainingChange = takeList.size ()-putList.size (). If less than 0 means that more events are put in than taken out, the-queueRemaining semaphore should be reduced; if it is greater than 0, the-queueRemaining semaphore should be reduced; if it is greater than 0, it means that more events are taken out than put in, indicating that there are queueRemaining events taken out, and queueRemaining semaphores should be increased at this time. That is, the semaphore is reduced during the consumption event and increased during the production event.
BytesRemaining is a byte capacity semaphore, and if the capacity is exceeded, the transaction will be rolled back.
Finally, take a look at the rollback transaction:
Protected void doRollback () {int takes = takeList.size (); synchronized (queueLock) {/ / must lock queueLock / / 1 when operating Channel Queue, precondition judgment, check whether there is enough capacity to roll back transaction Preconditions.checkState (queue.remainingCapacity () > = takeList.size (), "Not enough space in memory channel" + "queue to rollback takes. This should never happen, please report "); / / 2. Roll back the takeList queue of the transaction to Channel Queue while (! takeList.isEmpty ()) {queue.addFirst (takeList.removeLast ());} putList.clear ();} / / 3, release putByteCounter bytesRemaining semaphores bytesRemaining.release (putByteCounter); / / 4, counter reset putByteCounter = 0; takeByteCounter = 0 / / 5. Release takeList queue size queueStored.release (takes); channelCounter.setChannelSize (queue.size ());}}
In other words, when rolling back, you need to roll back the events temporarily stored in takeList to Channel Queue, and roll back the queueStored semaphore.
The above is how to implement MemoryChannel transactions in Flume architecture. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.