In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-27 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
What is Flume?
Flume is a highly available, highly reliable, distributed massive log collection, aggregation and transmission system provided by Cloudera. Flume supports customizing all kinds of data sender in the log system to collect data; at the same time, Flume provides the ability to simply process the data and write to various data receivers (customizable).
The function of Flume supports customizing all kinds of data senders in the log system, which are used to collect data, provide simple processing of data, and write to the capabilities of various data receivers (customizable). The composition of Flume: core component source is responsible for data generation or collection channel is a short-term storage container, responsible for data storage persistence sink is responsible for data forwarding Flume workflow schematic data flow model
Multi-Agent model
Merge model
Hybrid model
Installation of Flume
Download the installation package and unzip it
Wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gztar-zxvf apache-flume-1.8.0-bin.tar.gz
Configure environment variables
Vim ~ / .bashrcexport FLUME_HOME=/usr/local/src/apache-flume-1.8.0-binexport PATH=$PATH:$FLUME_HOME/binsource ~ / .bashrcFlume simple operation netcat mode
Go to the conf directory and write the netcat.conf file as follows: agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = netcatagent.sources.netcatSource.bind = localhostagent.sources.netcatSource.port = 11111agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10
Start an instance
(py27) [root@master conf] # pwd/usr/local/src/apache-flume-1.8.0-bin/conf (py27) [root@master conf] # flume-ng agent-conf conf--conf-file. / netcat.conf-name agent-Dflume.root.logger=INFO,console
Started successfully
18-10-24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting18/10/24 11:26:35 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:./flume_netcat.conf18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Processing:loggerSink18/10/24 11:26:35 INFO conf.FlumeConfiguration: Added sinks: loggerSink Agent: agent18/10/24 11:26:35 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for Agents: [agent] 18-10-24 11:26:35 INFO node.AbstractConfigurationProvider: Creating channels18/10/24 11:26:35 INFO channel.DefaultChannelFactory: Creating instance of channel memoryChannel type memory18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Created channel memoryChannel18/10/24 11:26:35 INFO source.DefaultSourceFactory: Creating instance of source netcatSource Type netcat18/10/24 11:26:35 INFO sink.DefaultSinkFactory: Creating instance of sink: loggerSink, type: logger18/10/24 11:26:35 INFO node.AbstractConfigurationProvider: Channel memoryChannel connected to [netcatSource, loggerSink] 18-10-24 11:26:35 INFO node.Application: Starting new configuration: {sourceRunners: {netcatSource=EventDrivenSourceRunner: {source:org.apache.flume.source.NetcatSource {name:netcatSource State:IDLE}}} sinkRunners: {loggerSink=SinkRunner: {policy:org.apache.flume.sink.DefaultSinkProcessor@262b92ac counterGroup: {name:null counters: {}}} channels: {memoryChannel=org.apache.flume.channel.MemoryChannel {name: memoryChannel}}} 18-10-24 11:26:35 INFO node.Application: Starting Channel memoryChannel18/10/24 11:26:35 INFO node.Application: Waiting for channel: memoryChannel to start. Sleeping for 500 ms18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: memoryChannel: Successfully registered new MBean.18/10/24 11:26:36 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL Name: memoryChannel started18/10/24 11:26:36 INFO node.Application: Starting Sink loggerSink18/10/24 11:26:36 INFO node.Application: Starting Source netcatSource18/10/24 11:26:36 INFO source.NetcatSource: Source starting18/10/24 11:26:36 INFO source.NetcatSource: Created serverSocket: sun.nio.ch.ServerSocketChannelImpl[ / 172.16.155.120:11111]
Then open a new terminal and send data.
(py27) [root@master apache-flume-1.8.0-bin] # telnet localhost 11111Trying 127.0.0.1...Connected to localhost.Escape character is'^] '.1OK
View received data
18-10-24 11:30:15 INFO sink.LoggerSink: Event: {headers: {} body: 31 0D 1.}
Note: if you do not have the telnet tool, please install: yum install telnet first
Exec mode
Write the configuration file exec.confagent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail-f / home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10
Start the instance
(py27) [root@master conf] # flume-ng agent-conf conf--conf-file. / flume_exec.conf-name agent-Dflume.root.logger=INFO,console
After successful startup, create the exec.log file in the configuration file
(py27) [root@master test_data] # lsexec.log (py27) [root@master test_data] # pwd/home/master/FlumeTest/test_data (py27) [root@master test_data] #
Then simulate the generation of logs through the echo command
(py27) [root@master test_data] # echo 'Hello Worldwide exec.log' > exec.log
View received logs
18-10-25 09:19:52 INFO sink.LoggerSink: Event: {headers: {} body: 48 65 6C 6C 6F 20 57 6F 72 6C 64 21 21 21 Hello World!!!}
How to save the log to HDFS
Modify the configuration file
Agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = exec agent.sources.netcatSource.command = tail-f / home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = hdfs agent.sinks.loggerSink.hdfs.path = / flume/%y-%m-%d/%H%M/agent.sinks.loggerSink.hdfs.filePrefix = exec_hdfs_agent.sinks.loggerSink.hdfs.round = trueagent.sinks.loggerSink.hdfs Value = 1agent.sinks.loggerSink.hdfs.roundUnit = minuteagent.sinks.loggerSink.hdfs.rollInterval = 3agent.sinks.loggerSink.hdfs.rollSize = 20agent.sinks.loggerSink.hdfs.rollCount = 5agent.sinks.loggerSink.hdfs.useLocalTimeStamp = trueagent.sinks.loggerSink.hdfs.fileType = DataStreamagent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 100agent.channels.memoryChannel.transactionCapacity = 10
Then start the instance
(py27) [root@master conf] # flume-ng agent-conf conf--conf-file. / flume_exec_hdfs.conf-name agent-Dflume.root.logger=INFO,console
Then you can see that it writes the log in the exec.log file to HDFS.
09:54:26 on 18-10-25 INFO hdfs.HDFSDataStream: Serializer = TEXT UseRawLocalFileSystem = false18/10/25 09:54:26 INFO hdfs.BucketWriter: Creating / flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Closing / flume/18-10-25/0954//exec_hdfs_.1540475666623.tmp18/10/25 09:54:32 INFO hdfs.BucketWriter: Renaming / flume/18-10-25/0954/exec_hdfs_.1540475666623.tmp to / flume/18-10-25/0954/exec _ hdfs_.154047566662318/10/25 09:54:32 INFO hdfs.HDFSEventSink: Writer callback called.
When we go to HDFS, we can see the contents of log.
(py27) [root@master sbin] # hadoop fs-ls / flume/18-10-25/0954Found 1 items-rw-r--r-- 3 root supergroup 15 2018-10-25 09:54 / flume/18-10-25/0954/exec_hdfs_.1540475666623 (py27) [root@master sbin] # hadoop fs-text / flume/18-10-25/0954/exec_hdfs_.1540475666623Hello Worldworkers!
Then we write the written log again, and then check
/ / write a new log (py27) [root@master test_data] # echo 'test001' > > exec.log (py27) [root@master test_data] # echo' test002' > > exec.log// enter the HDFS directory to view (py27) [root@master sbin] # hadoop fs-ls / flume/18-10-25Found 2 itemsdrwxr-xr-x-root supergroup 0 2018-10-25 09:54 / flume/18-10-25 / 0954drwxr-xr-x-root supergroup 0 2018-10-25 09:56 / flume/18-10-25swap 0956 (py27) [root@master sbin] # hadoop fs-ls / flume/18-10-25/0956Found 1 items-rw-r--r-- 3 root supergroup 16 2018-10-25 09:56 / flume/18-10-25/0956/exec_hdfs_.1540475766338 (py27) [root@master sbin] # hadoop fs-text / flume/ 18-10-25/0956/exec_hdfs_.1540475766338test001test002 failover Instanc
First, you need three machines, master, slave1, and slave2, and then configure the instance and start it. The agent instance on master sends logs, and slave1 and slave2 receive logs.
Master configuration agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSink1 loggerSink2agent.sinkgroups = groupagent.sources.netcatSource.type = execagent.sources.netcatSource.command = tail-f / home/master/FlumeTest/test_data/exec.logagent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink1.type = avroagent.sinks.loggerSink1.hostname = slave1agent.sinks.loggerSink1.port = 52020agent.sinks.loggerSink1.channel = memoryChannelagent.sinks.loggerSink2.type = avroagent.sinks.loggerSink2.hostname = slave2agent.sinks.loggerSink2.port = 52020agent.sinks. LoggerSink2.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000agent.sinkgroups.group.sinks = loggerSink1 loggerSink2agent.sinkgroups.group.processor.type = failoveragent.sinkgroups.group.processor.loggerSink1 = 10agent.sinkgroups.group.processor.loggerSink2 = 1agent.sinkgroups.group.processor.maxpenalty = 10000
Slave1 configuration
Agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave1agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000
Slave2 configuration
Agent.sources = netcatSourceagent.channels = memoryChannelagent.sinks = loggerSinkagent.sources.netcatSource.type = avroagent.sources.netcatSource.bind = slave2agent.sources.netcatSource.port = 52020agent.sources.netcatSource.channels = memoryChannelagent.sinks.loggerSink.type = loggeragent.sinks.loggerSink.channel = memoryChannelagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.capacity = 10000agent.channels.memoryChannel.transactionCapacity = 1000
Start the agent of master, slave1, and slave2, respectively, then write a log on mater, and then observe who received
/ / master (py27) [root@master test_data] # echo 'hello' > > exec.log / / slave118/10/25 10:53:53 INFO sink.LoggerSink: Event: {headers: {} body: 68 65 6C 6C 6F hello} / / slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3 / 172.16.155.120VR 39726 = > / 172.16.155.122VR 52020] CONNECTED: / 172.16.155.120mer 39726
It was found that slave1 received the data, and then we turned off slave1's agent and sent the log again.
/ / master (py27) [root@master test_data] # echo '11111' > > exec.log / / slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3 / 172.16.155.120 INFO sink.LoggerSink 39726 = > / 172.16.155.122 Event 52020] CONNECTED: / 172.16.155.120 Event: 31 31 31 11111}
Then start the agent of slave1 again
/ / master (py27) [root@master test_data] # echo '2222' > > exec.log / / slave118/10/25 10:58:21 INFO sink.LoggerSink: Event: {headers: {} body: 32 32 32 22222} / / slave218/10/25 10:43:00 INFO ipc.NettyServer: [id: 0x8da012e3 / 172.16.155.120 INFO sink.LoggerSink 39726 = > / 172.16.155.122 Event 52020] CONNECTED: / 172.16.155.120 Event: 31 31 31 11111}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.