In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article introduces the relevant knowledge of "what is the overall process of Flume". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Overall process
Both Source and Sink depend on Channel, so you should start Channel first and then Source or Sink at startup.
There are two ways to start Flume: using EmbeddedAgent embedded in the Java application or using Application to start a separate process, which we have analyzed by Application.
First, enter the main method of org.apache.flume.node.Application to start:
/ 1. Set default startup parameters and whether the parameters are required Options options = new Options (); Option option = new Option ("n", "name", true, "the name of this agent"); option.setRequired (true); options.addOption (option); option = new Option ("f", "conf-file", true, "specify a config file (required if-z missing)"); option.setRequired (false); options.addOption (option) / / 2. Then parse the command line parameters CommandLineParser parser = new GnuParser (); CommandLine commandLine = parser.parse (options, args); String agentName = commandLine.getOptionValue ('n'); boolean reload =! commandLine.hasOption ("no-reload-conf"); if (commandLine.hasOption ('z') | | commandLine.hasOption ("zkConnString")) {isZkConfigured = true } if (isZkConfigured) {/ / 3. If it is configured through ZooKeeper, it starts with the ZooKeeper parameter. Ignored here, we explain} else {/ / 4 in the configuration file, open the configuration file, if it does not exist, File configurationFile = new File (commandLine.getOptionValue ('f')); if (! configurationFile.exists ()) {throw new ParseException ("The specified configuration file does not exist:" + path) } List components = Lists.newArrayList (); if (reload) {/ / 5. If periodic reload configuration files are required, use the event bus EventBus eventBus = new EventBus (agentName + "- event-bus") provided by Guava here. / / 5.2. read the configuration file and use the regular rotation training pull policy. By default, pull PollingPropertiesFileConfigurationProvider configurationProvider = new PollingPropertiesFileConfigurationProvider (agentName, configurationFile, eventBus, 30) once in 30s; components.add (configurationProvider); application = new Application (components) Register the application with Application, EventBus will automatically register the method eventBus.register (application) declared with @ Subscribe in Application;} else {/ / 5, the configuration file does not support periodic reload PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider (agentName, configurationFile); application = new Application () / / 6.2. initialize the Flume component application.handleConfigurationEvent (configurationProvider. GetConfiguration ()) directly with the configuration file;}} / / 7, start the Flume application application.start (); / / 8, register the virtual authority to close the hook, and call the stop method of Application to terminate final Application appReference = application when the virtual machine is shut down Runtime.getRuntime () .addShutdownHook (new Thread ("agent-shutdown-hook") {@ Override public void run () {appReference.stop ();}})
The above process only extracts a part of the core code. For example, the implementation of ZK is directly ignored, while the Flume startup process is as follows:
1. Read command line parameters
2. Read the configuration file
3. Initialize Flume; with different policies according to whether reload is required. If you need reload, use the event bus implementation of Guava. HandleConfigurationEvent of Application is the event subscriber and PollingPropertiesFileConfigurationProvider is the event publisher. It will periodically train in rotation to check whether the file has changed. If so, re-read the configuration file and publish the configuration file event change, and handleConfigurationEvent will receive the configuration change and reinitialize it.
4. Start Application and register the virtual authority closed hook.
The handleConfigurationEvent method is relatively simple, first calling stopAllComponents to stop all components, and then calling startAllComponents to initialize all components with the configuration file:
@ Subscribepublic synchronized void handleConfigurationEvent (MaterializedConfiguration conf) {stopAllComponents (); startAllComponents (conf);}
MaterializedConfiguration stores the components needed by Flume runtime: Source, Channel, Sink, SourceRunner, SinkRunner, etc., which are initialized through ConfigurationProvider. For example, PollingPropertiesFileConfigurationProvider reads the configuration file and initializes the component.
For the startAllComponents implementation, it is roughly as follows:
/ / 1. Start Channelsupervisor.supervise (Channels, new SupervisorPolicy.AlwaysRestartPolicy (), LifecycleState.START) first; / / 2. Make sure that all Channel have started for (Channel ch: materializedConfiguration.getChannels (). Values ()) {while (ch.getLifecycleState ()! = LifecycleState.START & &! supervisor.isComponentInErrorState (ch)) {try {Thread.sleep (500);} catch (InterruptedException e) {Throwables.propagate (e) }} / / 3. Start SinkRunnersupervisor.supervise (SinkRunners, new SupervisorPolicy.AlwaysRestartPolicy (), LifecycleState.START); / / 4, start SourceRunnersupervisor.supervise (SourceRunner,new SupervisorPolicy.AlwaysRestartPolicy (), LifecycleState.START); / / 5, initialize the monitoring service this.loadMonitoring ()
As you can see from the following code, first prepare Channel, because Source and Sink will operate it, and the whole process will fail if initialization fails for Channel; then start SinkRunner to prepare consumers first; then start SourceRunner to start log collection. Here we find that there are two separate components, LifecycleSupervisor and MonitorService, one is the component guard sentinel, and the other is the monitoring service. The Guardian Sentinel guards these components, assuming that if something goes wrong, the default policy is to restart them automatically.
For the stopAllComponents implementation, it is roughly as follows:
/ / 1, first stop SourceRunnersupervisor.unsupervise (SourceRunners); / / 2, then stop SinkRunnersupervisor.unsupervise (SinkRunners); / / 3, then stop Channelsupervisor.unsupervise (Channels); / / 4, finally stop MonitorServicemonitorServer.stop ()
It can be seen here that the order of stopping is Source, Sink, Channel, that is, stop production first, then stop consumption, and finally stop the pipeline.
The code implementation of the start method in Application is as follows:
Public synchronized void start () {for (LifecycleAware component: components) {supervisor.supervise (component, new SupervisorPolicy.AlwaysRestartPolicy (), LifecycleState.START);}}
It circulates the Application registered component, and then Guardian Sentinel guards it. The default policy is to restart the component automatically if there is a problem. If we support reload configuration file, we registered the PollingPropertiesFileConfigurationProvider component when we started Application, that is, the component will be guarded by Guardian Sentinel. If there is a problem, the default policy automatically restarts.
Application closes and performs the following actions:
Public synchronized void stop () {supervisor.stop (); if (monitorServer! = null) {monitorServer.stop ();}}
That is, turn off the guard sentry and surveillance services.
At this point, the basic Application analysis is over, and we still have a lot of questions about how the Guardian Sentinel achieved it.
The overall process can be summarized as follows:
1. Initialize the command line configuration first
2. Then read the configuration file
3. Initialize the components in the configuration file according to whether you need reload; if you need reload, you will use Guava event bus to make publish and subscribe changes.
4. Then create Application, create Guardian Sentinel, stop all components first, and then start all components; start order: Channel, SinkRunner, SourceRunner, and register these components with Guardian Sentinel, initialize monitoring services; stop order: SourceRunner, SinkRunner, Channel
5. If the configuration file requires regular reload, you need to register Polling***ConfigurationProvider to Guardian Sentinel
6. Finally, register the virtual authority to close the hook and stop guarding the sentinel and monitoring services.
SourceRunner and SinkRunner implemented in rotation create a thread to work, which has been described previously. Next let's take a look at the realization of the Guardian Sentinel.
First create a LifecycleSupervisor:
/ 1. Used to store guarded components supervisedProcesses = new HashMap (); / / 2. Used to store monitored components monitorFutures = new HashMap future = monitorService.scheduleWithFixedDelay (monitorRunnable, 0,3, TimeUnit.SECONDS); monitorFutures.put (lifecycleAware, future);}
If you no longer need a daemon, you need to call unsupervise:
Public synchronized void unsupervise (LifecycleAware lifecycleAware) {synchronized (lifecycleAware) {Supervisoree supervisoree = supervisedProcesses.get (lifecycleAware); / / 1.1. set the status of the daemon component to be discarded supervisoree.status.discard = true; / / 1.2, set the latest lifecycle state expected by the component to STOP this.setDesiredState (lifecycleAware, LifecycleState.STOP); / / 1.3, stop the component lifecycleAware.stop () } / / 2. Remove supervisedProcesses.remove (lifecycleAware) from the daemon component; / / 3. Cancel the scheduled monitoring component service monitorFutures.get (lifecycleAware) .monitor (false); / / 3.1.notify Purger that it needs to be cleaned, and Purger will periodically remove the cancel component needToPurge = true; monitorFutures.remove (lifecycleAware);}
Next, let's take a look at the implementation of MonitorRunnable, which is responsible for component state migration or component failure recovery:
Public void run () {long now = System.currentTimeMillis (); try {if (supervisoree.status.firstSeen = = null) {supervisoree.status.firstSeen = now; / / 1, record first status check time} supervisoree.status.lastSeen = now / / 2. Record the last status check time synchronized (lifecycleAware) {/ / 3. If the daemon component is discarded or an error occurs, directly return if (supervisoree.status.discard | | supervisoree.status.error) {return;} / / 4. Update the status last seen supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState () / / 5. If the state of the component is inconsistent with the state seen by the daemon component, then initialize if (! lifecycleAware.getLifecycleState () .equals (supervisoree.status.desiredState)) {switch (supervisoree.status.desiredState) {case START: / / 6, if it is startup state Then start the component try {lifecycleAware.start () } catch (Throwable e) {if (e instanceof Error) {supervisoree.status.desiredState = LifecycleState.STOP; try {lifecycleAware.stop ();} catch (Throwable E1) {supervisoree.status.error = true If (E1 instanceof Error) {throw (Error) E1;} supervisoree.status.failures++;} break Case STOP: / / 7. If it is stopped, stop the component try {lifecycleAware.stop ();} catch (Throwable e) {if (e instanceof Error) {throw (Error) e;} supervisoree.status.failures++ } break; default:}} catch (Throwable t) {}}
As the above code has been simplified, the overall logic is to collect the state of the component regularly, and if it is found that the state of the daemon component and the component are inconsistent, it may need to be started or stopped. That is, the daemon monitor can be used to ensure that the component starts automatically if it fails. The default policy is to always restart after failure, and another strategy is to start only once.
This is the end of the content of "what is the overall process of Flume". Thank you for your reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.