In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >
Share
Shulou(Shulou.com)06/01 Report--
If you haven't seen the startup process of Flume-ng source code parsing, you can click on the startup process of Flume-ng source code parsing.
1 interface introduction
The analysis order of the components is analyzed according to the startup order in the previous article, first Channel, then Sink, and finally Source. Before we start to look at the component source code, let's take a look at two important interfaces, one is LifecycleAware, and the other is NamedComponent.
1.1 LifecycleAware@InterfaceAudience.Public@InterfaceStability.Stablepublic interface LifecycleAware {public void start (); public void stop (); public LifecycleState getLifecycleState ();}
There are three very simple methods, start (), stop () and getLifecycleState. This interface is implemented by many classes of flume, including the startup process of Flume-ng source code parsing.
Mentioned in the PollingPropertiesFileConfigurationProvider (), as long as the life cycle is involved in the implementation of this interface, of course, the components also want to implement!
1.2 NamedComponent@InterfaceAudience.Public@InterfaceStability.Stablepublic interface NamedComponent {public void setName (String name); public String getName ();}
There is nothing to say about this, it is used to set the name.
2 Channel
As one of the three core components of Flume, it is necessary to take a look at its composition:
InterfaceAudience.Public@InterfaceStability.Stablepublic interface Channel extends LifecycleAware, NamedComponent {public void put (Event event) throws ChannelException; public Event take () throws ChannelException; public Transaction getTransaction ();
So from the above interface, we can see that the main functions of Channel are put () and take (), so let's take a look at its implementation. Here we choose MemoryChannel as an example, but the MemoryChannel is too long, so let's take a short look at it.
Public class MemoryChannel extends BasicChannelSemantics {private static Logger LOGGER = LoggerFactory.getLogger (MemoryChannel.class); private static final Integer defaultCapacity = Integer.valueOf; private static final Integer defaultTransCapacity = Integer.valueOf; public MemoryChannel () {}.}
We see that it inherits BasicChannelSemantics. From the name, we can see that it is a basic Channel. Let's continue to look at its implementation.
@ InterfaceAudience.Public@InterfaceStability.Stablepublic abstract class BasicChannelSemantics extends AbstractChannel {private ThreadLocal currentTransaction = new ThreadLocal (); private boolean initialized = false; protected void initialize () {} protected abstract BasicTransactionSemantics createTransaction (); @ Override public void put (Event event) throws ChannelException {BasicTransactionSemantics transaction = currentTransaction.get (); Preconditions.checkState (transaction! = null, "No transaction exists for this thread"); transaction.put (event);} @ Override public Event take () throws ChannelException {BasicTransactionSemantics transaction = currentTransaction.get () Preconditions.checkState (transaction! = null, "No transaction exists for this thread"); return transaction.take ();} @ Override public Transaction getTransaction () {if (! initialized) {synchronized (this) {if (! initialized) {initialize (); initialized = true;}} BasicTransactionSemantics transaction = currentTransaction.get () If (transaction = = null | | transaction.getState () .equals (BasicTransactionSemantics.State.CLOSED)) {transaction = createTransaction (); currentTransaction.set (transaction);} return transaction;}}
After searching for a long time, I finally found put () and take (), but if I take a closer look, they call BasicTransactionSemantics's put () and take () internally. I'm a little disappointed. Let's move on to BasicTransactionSemantics.
Public abstract class BasicTransactionSemantics implements Transaction {private State state; private long initialThreadId; protected void doBegin () throws InterruptedException {} protected abstract void doPut (Event event) throws InterruptedException; protected abstract Event doTake () throws InterruptedException; protected abstract void doCommit () throws InterruptedException; protected abstract void doRollback () throws InterruptedException; protected void doClose () {} protected BasicTransactionSemantics () {state = State.NEW; initialThreadId = Thread.currentThread () .getId () } protected void put (Event event) {Preconditions.checkState (Thread.currentThread (). GetId () = = initialThreadId, "put () called from different thread than getTransaction ()!"); Preconditions.checkState (state.equals (State.OPEN), "put () called when transaction is% s!", state); Preconditions.checkArgument (event! = null, "put () called with null event!"); try {doPut (event) } catch (InterruptedException e) {Thread.currentThread () .interrupt (); throw new ChannelException (e.toString (), e);}} protected Event take () {Preconditions.checkState (Thread.currentThread (). GetId () = = initialThreadId, "take () called from different thread than getTransaction ()!"); Preconditions.checkState (state.equals (State.OPEN), "take () called when transaction is% s!", state) Try {return doTake ();} catch (InterruptedException e) {Thread.currentThread (). Interrupt (); return null;}} protected State getState () {return state;}. / / We are only talking about put and take here, so I kill some methods that are not involved for the time being. Interested friends can read protected static enum State {NEW, OPEN, COMPLETED, CLOSED}}.
It is another abstract class. Put () and take () call the abstract methods doPut () and doTake () internally. Seeing here, I believe the impatient students have collapsed, but the last step is missing. Since it is an abstract class, in the end Channel must be using an implementation class. At this time, we can go back to the MemoryChannel we used at the beginning and look inside to see if there are any clues. As soon as we see, there is an inner class hidden in the MemoryChannel.
Private class MemoryTransaction extends BasicTransactionSemantics {private LinkedBlockingDeque takeList; private LinkedBlockingDeque putList; private final ChannelCounter channelCounter; private int putByteCounter = 0; private int takeByteCounter = 0; public MemoryTransaction (int transCapacity, ChannelCounter counter) {putList = new LinkedBlockingDeque (transCapacity); takeList = new LinkedBlockingDeque (transCapacity); channelCounter = counter;} @ Override protected void doPut (Event event) throws InterruptedException {channelCounter.incrementEventPutAttemptCount (); int eventByteSize = (int) Math.ceil (estimateEventSize (event) / byteCapacitySlotSize) If (! putList.offer (event)) {throw new ChannelException ("Put queue for MemoryTransaction of capacity" + putList.size () + "full, consider committing more frequently," + "increasing capacity or increasing thread count");} putByteCounter + = eventByteSize;} @ Override protected Event doTake () throws InterruptedException {channelCounter.incrementEventTakeAttemptCount () If (takeList.remainingCapacity () = = 0) {throw new ChannelException ("Take list for MemoryTransaction, capacity" + takeList.size () + "full, consider committing more frequently," + "increasing capacity, or increasing thread count");} if (! queueStored.tryAcquire (keepAlive, TimeUnit.SECONDS)) {return null;} Event event; synchronized (queueLock) {event = queue.poll () } Preconditions.checkNotNull (event, "Queue.poll returned NULL despite semaphore" + "signalling existence of entry"); takeList.put (event); int eventByteSize = (int) Math.ceil (estimateEventSize (event) / byteCapacitySlotSize); takeByteCounter + = eventByteSize; return event;} / /. Still delete methods that are not needed temporarily}
In this class, we can see the implementation methods of doPut () and doTake (), and understand that put () and take () of MemoryChannel end up calling MemoryTransaction's doPut () and doTake ().
Some friends think that this parsing is coming to an end. In fact, there are two important classes, ChannelProcessor and ChannelSelector, in Channel. Listen to me patiently and slowly.
3 ChannelProcessor
The purpose of ChannelProcessor is to perform put operations and put data into the channel. Each ChannelProcessor instance is equipped with a ChannelSelector to determine which channl the event should put into.
Public class ChannelProcessor implements Configurable {private static final Logger LOG = LoggerFactory.getLogger (ChannelProcessor.class); private final ChannelSelector selector; private final InterceptorChain interceptorChain; public ChannelProcessor (ChannelSelector selector) {this.selector = selector; this.interceptorChain = new InterceptorChain ();} public void initialize () {this.interceptorChain.initialize ();} public void close () {this.interceptorChain.close () } public void configure (Context context) {this.configureInterceptors (context);} private void configureInterceptors (Context context) {/ / configure interceptor} public ChannelSelector getSelector () {return this.selector;} public void processEventBatch (List events) {... While (i$.hasNext ()) {Event optChannel = (Event) i$.next (); List tx = this.selector.getRequiredChannels (optChannel);... / put event into Required queue T1 = this.selector.getOptionalChannels (optChannel); Object eventQueue . / / put event into Optional queue}. / / allocation operation of event} public void processEvent (Event event) {event = this.interceptorChain.intercept (event); if (event! = null) {List requiredChannels = this.selector.getRequiredChannels (event); Iterator optionalChannels = requiredChannels.iterator () The allocation operation of / / event is List optionalChannels1 = this.selector.getOptionalChannels (event); the allocation operation of Iterator iTunes 1 = optionalChannels1.iterator (); / / the allocation operation of event}
In order to simplify the code, I made some deletions, leaving only the parts that need to be explained. I made it clear that the two writing methods in Channel need to get the corresponding channel from the selector passed as a parameter to perform the put operation of event. Next let's take a look at ChannelSelector.
4 ChannelSelector
ChannelSelector is an interface, and we can create its subclasses through ChannelSelectorFactory. Flume provides two implementation classes, MultiplexingChannelSelector and ReplicatingChannelSelector.
Public interface ChannelSelector extends NamedComponent, Configurable {void setChannels (List var1); List getRequiredChannels (Event var1); List getOptionalChannels (Event var1); List getAllChannels ();}
Create it through the create of ChannelSelectorFactory, call getSelectorForType in create to get a selector, and create the corresponding subclass through the type in the configuration file.
Public class ChannelSelectorFactory {private static final Logger LOGGER = LoggerFactory.getLogger (ChannelSelectorFactory.class); public static ChannelSelector create (List channels, Map config) {...} public static ChannelSelector create (List channels, ChannelSelectorConfiguration conf) {String type = ChannelSelectorType.REPLICATING.toString (); if (conf! = null) {type = conf.getType ();} ChannelSelector selector = getSelectorForType (type); selector.setChannels (channels) Configurables.configure (selector, conf); return selector;} private static ChannelSelector getSelectorForType (String type) {if (type = = null | | type.trim (). Length () = 0) {return new ReplicatingChannelSelector ();} String selectorClassName = type; ChannelSelectorType selectorType = ChannelSelectorType.OTHER; try {selectorType = ChannelSelectorType.valueOf (type.toUpperCase (Locale.ENGLISH)) } catch (IllegalArgumentException ex) {LOGGER.debug ("Selector type {} is a custom type", type);} if (! selectorType.equals (ChannelSelectorType.OTHER)) {selectorClassName = selectorType.getChannelSelectorClassName ();} ChannelSelector selector = null; try {@ SuppressWarnings ("unchecked") Class
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.