Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Introduction and simple operation of Flume

2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

What is Flume?

Flume is a highly available, highly reliable, distributed massive log collection, aggregation and transmission system provided by Cloudera. Flume supports customizing all kinds of data sender in the log system to collect data; at the same time, Flume provides the ability to simply process the data and write to various data receivers (customizable).

The function of Flume supports customizing all kinds of data senders in the log system, which are used to collect data, provide simple processing of data, and write to the capabilities of various data receivers (customizable). The composition of Flume: core component source is responsible for data generation or collection channel is a short-term storage container, responsible for data storage persistence sink is responsible for data forwarding Flume workflow schematic data flow model

Multi-Agent model

Merge model

Hybrid model

Installation of Flume

Download the installation package and unzip it

Wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gztar-zxvf apache-flume-1.8.0-bin.tar.gz

Configure environment variables

Vim ~ / .bashrcexport FLUME_HOME=/usr/local/src/apache-flume-1.8.0-binexport PATH=$PATH:$FLUME_HOME/binsource ~ / .bashrcFlume simple operation netcat mode

Go to the conf directory and write the netcat.conf file as follows: agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = netcatagent.sources.netcatSource.bind = localhostagent.sources.netcatSource.port = 11111agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10

Start an instance

(py27) [root@master conf] # pwd/usr/local/src/apache-flume-1.8.0-bin/conf (py27) [root@master conf] # flume-ng agent-conf conf--conf-file. / netcat.conf-name agent-Dflume.root.logger=INFO,console

Started successfully

18-10-24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for Agents: [agent] 18-10-24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource Type netcat18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink] 18-10-24 11:26:35 INFO node.Application: Starting new configuration: {sourceRunners: {netcatSource=EventDrivenSourceRunner: {source:org.apache.flume.source.NetcatSource {name:netcatSource State:IDLE}}} sinkRunners: {loggerSink=SinkRunner: {policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup: {name:null counters: {}}} channels: {memoryChannel=org.apache.flume.channel.MemoryChannel {name: memoryChannel}}} 18-10-24 11:26:35 INFO node.Application: Starting Channel memoryChannel18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL Name: memoryChannel started18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource18/10/24 11:26:36 INFO source.NetcatSource: Source starting18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket: sun.nio.ch.ServerSocketChannelImpl[ / 172.16.155.120:11111]

Then open a new terminal and send data.

(py27) [root@master apache-flume-1.8.0-bin] # telnet localhost 11111Trying 127.0.0.1...Connected to localhost.Escape character is'^] '.1OK

View received data

18-10-24 11:30:15 INFO sink.LoggerSink: Event: {headers: {} body: 31 0D 1.}

Note: if you do not have the telnet tool, please install: yum install telnet first

Exec mode

Write the configuration file exec.confagent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail-f / home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10

Start the instance

(py27) [root@master conf] # flume-ng agent-conf conf--conf-file. / flume_exec.conf-name agent-Dflume.root.logger=INFO,console

After successful startup, create the exec.log file in the configuration file

(py27) [root@master test_data] # lsexec.log (py27) [root@master test_data] # pwd/home/master/FlumeTest/test_data (py27) [root@master test_data] #

Then simulate the generation of logs through the echo command

(py27) [root@master test_data] # echo 'Hello Worldwide exec.log' > exec.log

View received logs

18-10-25 09:19:52 INFO sink.LoggerSink: Event: {headers: {} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21 Hello World!!!}

How to save the log to HDFS

Modify the configuration file

Agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail-f / home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = hdfs agent.sinks.loggerSink.hdfs.path = / flume/%y-%m-%d/%H%M/agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_agent.sinks.loggerSink.hdfs.round = trueagent.sinks.loggerSink.hdfs Value = 1agent.sinks.loggerSink.hdfs.roundUnit = minuteagent.sinks.loggerSink.hdfs.rollInterval = 3agent.sinks.loggerSink.hdfs.rollSize = 20agent.sinks.loggerSink.hdfs.rollCount = 5agent.sinks.loggerSink.hdfs.useLocalTimeStamp = trueagent.sinks.loggerSink.hdfs.fileType = DataStreamagent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10

Then start the instance

(py27) [root@master conf] # flume-ng agent-conf conf--conf-file. / flume_exec_hdfs.conf-name agent-Dflume.root.logger=INFO,console

Then you can see that it writes the log in the exec.log file to HDFS.

09:54:26 on 18-10-25 INFO hdfs.HDFSDataStream: Serializer = TEXT UseRawLocalFileSystem = false18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating / flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing / flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming / flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to / flume/18-10-25/0954/exec _ hdfs_.154047566662318/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.

When we go to HDFS, we can see the contents of log.

(py27) [root@master sbin] # hadoop fs-ls / flume/18-10-25/0954Found 1 items-rw-r--r-- 3 root supergroup 15 2018-10-25 09:54 / flume/18-10-25/0954/exec_hdfs_.1540475666623 (py27) [root@master sbin] # hadoop fs-text / flume/18-10-25/0954/exec_hdfs_.1540475666623Hello Worldworkers!

Then we write the written log again, and then check

/ / write a new log (py27) [root@master test_data] # echo 'test001' > > exec.log (py27) [root@master test_data] # echo' test002' > > exec.log// enter the HDFS directory to view (py27) [root@master sbin] # hadoop fs-ls / flume/18-10-25Found 2 itemsdrwxr-xr-x-root supergroup 0 2018-10-25 09:54 / flume/18-10-25 / 0954drwxr-xr-x-root supergroup 0 2018-10-25 09:56 / flume/18-10-25swap 0956 (py27) [root@master sbin] # hadoop fs-ls / flume/18-10-25/0956Found 1 items-rw-r--r-- 3 root supergroup 16 2018-10-25 09:56 / flume/18-10-25/0956/exec_hdfs_.1540475766338 (py27) [root@master sbin] # hadoop fs-text / flume/ 18-10-25/0956/exec_hdfs_.1540475766338test001test002 failover Instanc

First, you need three machines, master, slave1, and slave2, and then configure the instance and start it. The agent instance on master sends logs, and slave1 and slave2 receive logs.

Master configuration agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSink1 loggerSink2agent.sinkgroups = groupagent.sources.netcatSource.type = execagent.sources.netcatSource.command = tail-f / home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink1.type = avroagent.sinks.loggerSink1.hostname = slave1agent.sinks.loggerSink1.port = 52020agent.sinks.loggerSink1.channel = memoryChannelagent.sinks.loggerSink2.type = avroagent.sinks.loggerSink2.hostname = slave2agent.sinks.loggerSink2.port = 52020agent.sinks. LoggerSink2.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000agent.sinkgroups.group.sinks = loggerSink1 loggerSink2agent.sinkgroups.group.processor.type = failoveragent.sinkgroups.group.processor.loggerSink1 = 10agent.sinkgroups.group.processor.loggerSink2 = 1agent.sinkgroups.group.processor.maxpenalty = 10000

Slave1 configuration

Agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave1agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000

Slave2 configuration

Agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave2agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000

Start the agent of master, slave1, and slave2, respectively, then write a log on mater, and then observe who received

/ / master (py27) [root@master test_data] # echo 'hello' > > exec.log / / slave118/10/25 10:53:53 INFO sink.LoggerSink: Event: {headers: {} body: 68 65 6C 6C 6F hello} / / slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3 / 172.16.155.120VR 39726 = > / 172.16.155.122VR 52020] CONNECTED: / 172.16.155.120mer 39726

It was found that slave1 received the data, and then we turned off slave1's agent and sent the log again.

/ / master (py27) [root@master test_data] # echo '11111' > > exec.log / / slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3 / 172.16.155.120 INFO sink.LoggerSink 39726 = > / 172.16.155.122 Event 52020] CONNECTED: / 172.16.155.120 Event: 31 31 31 11111}

Then start the agent of slave1 again

/ / master (py27) [root@master test_data] # echo '2222' > > exec.log / / slave118/10/25 10:58:21 INFO sink.LoggerSink: Event: {headers: {} body: 32 32 32 22222} / / slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3 / 172.16.155.120 INFO sink.LoggerSink 39726 = > / 172.16.155.122 Event 52020] CONNECTED: / 172.16.155.120 Event: 31 31 31 11111}

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report