Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Typical application scenarios of flume

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1.flume configuration files for different Source and Sink (1) Source---spool

  snooping is a directory, which cannot have subdirectories, and monitors the files in this directory. When the collection is complete, the files in this directory will be suffixed (.COMPLETED)

Configuration file:

# Name the components on this agent# A1 here refers to the name of agent, which can be customized But note: the names of agent under the same node cannot be the same. # defines sources, sinks, Channels alias a1.sources = r1a1.sinks = k1a1.channels = source specifies the type of source and related parameters a1.sources.r1.type = spooldira1.sources.r1.spoolDir = / home/hadoop/flumedata# setting channela1.channels.c1.type = memory# setting sinka1.sinks.k1.type = logger#Bind the source and sink to the channel# setting sources channel a1.sources.r1.channels = centering setting sink channel a1.sinks.k1.channel = C1 (2) Source---netcat

  A NetCat Source is used to listen on a specified port and convert each line of received data into an event.

Data source: netcat (monitoring tcp protocol)

Channel: memory

Data destination: console

Configuration file

# specify agent a1.sources = r1a1.channels = c1a1.sinks = kslave specify sourcesa1.sources.r1.channels = source specify the type of source a1.sources.r1.type = netcat# specify the host to be monitored a1.sources.r1.bind = 192.168.191.13 specify the port to be monitored a1.sources.r1.port = 321 specify channela1.channels.c1.type = memory#sinks write data loggera1.sinks.k1.channel=c1a1.sinks.k1.type=logger (3) Source---avro

  listens on the AVRO port to accept event flows from external AVRO clients. Multi-stage flow, fan out flow, fan inflow and other effects can be realized by using Avro Source. You can also accept log information sent through the Avro client provided by flume.

Data source: avro

Channel: memory

Data destination: console

Configuration file

# specify proxy a1.sources = r1a1.channels = c1a1.sinks = knight specify sourcesa1.sources.r1 channels. = a1.sources.r1.type specify the type of source a1.sources.r1.type = avro# specify the hostname to be monitored a1.sources.r1.bind = hadoop03# specify the port to be monitored a1.sources.r1.port = 321 specify channela1.channels.c1.type = memory# specify sinka1.sinks.k1.channel = c1a1.sinks.k1.type = logger (4) collect log files to hdfs

Source = exec (a Linux command: tail-f)

Channel====memory

Sink====hdfs

Note: if the cluster is a highly available cluster, you need to put core-site.xml and hdfs-site.xml into the conf of flume.

Configuration file:

A1.sources = r1a1.channels = c1a1.sinks = knight specify sourcesa1.sources.r1.channels = source specified type a1.sources.r1.type = exec# specify exec commanda1.sources.r1.command = tail-F / home/hadoop/flumedata/zy.log# specify channela1.channels.c1.type = memory# specify sink write hdfsa1.sinks.k1.channel = c1a1.sinks.k1.type = hdfs# specify the path of the file generated on hdfs year-month-day Hourly minutes a1.sinks.k1.hdfs.path = / flume/%y-%m-%d/%H_%M# start scrolling a1.sinks.k1.hdfs.round = true# set scrolling time (set directory scrolling) a1.sinks.k1.hdfs.roundValue = 2 minutes time unit a1.sinks.k1.hdfs.roundUnit = hour# set file scrolling # current file scrolling interval (in seconds) A1. Sinks.k1.hdfs.rollInterval = 1 scroll sets the size of the file scroll (how big the file is Scroll once) a1.sinks.k1.hdfs.rollSize = 102 sets the number of scrolls in the file (how many scrolls per scroll) a1.sinks.k1.hdfs.rollCount = 1 "specifies the source of time (true indicates the use of local time) a1.sinks.k1.hdfs.useLocalTimeStamp = true# sets the file type stored on the hdfs, (DataStream Text) a1.sinks.k1.hdfs.fileType = DataStream# plus file prefix a1.sinks.k1.hdfs.filePrefix = zzy# plus file suffix a1.sinks.k1.hdfs.fileSuffix = .log2.flume typical usage scenarios (1) multi-agent stream

  is transferred from the flume agent of the first machine to the flume agent of the second machine.

Example:

Planning:

Hadoop02:tail-avro.properties

    uses exec "tail-F / home/hadoop/testlog/welog.log" to obtain collected data

    uses avro sink data as the next agent

Hadoop03:avro-hdfs.properties

    uses avro to receive and collect data

    uses hdfs sink data to the destination

Configuration file

# tail-avro.propertiesa1.sources = R1 a1.sinks = k1a1.channels = c1#Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail- F / home/hadoop/testlog/date.log a1.sources.r1.channels = c1#Describe the sinka1.sinks.k1.type = avro a1.sinks.k1.channel = C1 a1.sinks.k1.hostname = hadoop02 a1.sinks.k1.port = 4141 a1.sinks.k1.batch-size = 2#Use a channel which buffers Events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1#avro-hdfs.propertiesa1.sources = r1a1.sinks = k1a1.channels = c1#Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.channels = c1a1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 4141#Describe k1a1.sinks. K1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://myha01/testlog/flume-event/%y-%m-%d/%H-%Ma1.sinks.k1.hdfs.filePrefix = date_a1.sinks.k1.hdfs.maxOpenFiles = 5000a1.sinks.k1.hdfs.batchSize = 100a1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1.hdfs.writeFormat = Texta1.sinks.k1.hdfs.rollSize = 102400a1.sinks.k1.hdfs.rollCount = 1000000a1.sinks.k1.hdfs.rollInterval = 60a1. Sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minutea1.sinks.k1.hdfs.useLocalTimeStamp = true#Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = C1 (2) Multiplex acquisition

  has multiple channel and multiple sink in an agent, and then multiple sink are output to different files or file systems.

Planning:

Hadoop02: (tail-hdfsandlogger.properties)

    uses exec "tail-F / home/hadoop/testlog/datalog.log" to obtain collected data

    uses sink1 to store data in hdfs

    uses sink2 to store all data on the console

Configuration file

# configuration file for # tail-hdfsandlogger.properties#2 channel and 2 sink # Name the components on this agenta1.sources = s1a1.sinks = K1 k2a1.channels = C1 c2#Describe/configure tail- F source1a1.sources.s1.type = execa1.sources.s1.command = tail- F / home/hadoop/logs/catalina.out# specifies the rule for source to fan out to multiple channnel a1.sources.s1.selector.type = replicatinga1.sources.s1.channels = C1 c2#Use a channel which buffers events in memory # specify channel c1a1.channels.c1.type = memory# specify channel c2a1.channels.c2.type = memory#Describe the sink# specify settings a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path=hdfs://myha01/flume_log/%y-%m-%d/%H-%Ma1.sinks.k1.hdfs.filePrefix = eventsa1.sinks.k1.hdfs.maxOpenFiles = 5000a1.sinks.k1.hdfs.batchSize = 100a1.sinks.k1.hdfs.fileType = DataStreama1.sinks.k1 .hdfs.writeFormat = Texta1.sinks.k1.hdfs.rollSize = 102400a1.sinks.k1.hdfs.rollCount = 1000000a1.sinks.k1.hdfs.rollInterval = 60a1.sinks.k1.hdfs.round = truea1.sinks.k1.hdfs.roundValue = 10a1.sinks.k1.hdfs.roundUnit = minutea1.sinks.k1.hdfs.useLocalTimeStamp = truea1.sinks.k1.channel = casks specify a1.sinks.k2.type = loggera1.sinks.k2.channel = c2 (3) highly available deployment collection for K2

  first collects data from three web servers, and then gives it to collect. The collect here is highly available. First, collect01 is the master, and all the collected data is sent to him. Collect02 simply does not accept data because of hot standby status. When collect01 goes down, collect02 takes over, then accepts the data, and finally sends the data to hdfs or kafka.

Deployment of agent and collecotr

  Agent1 and Agent2 data flow into Collector1 and Collector2 respectively. Flume NG provides a Failover mechanism that can be switched and restored automatically. Then Collector1 and Collector2 output the data to hdfs.

Schematic diagram

Configuration file:

# ha_agent.properties#agent name: agent1agent1.channels = c1agent1.sources = r1agent1.sinks = K1 k2#set gruopagent1.sinkgroups = g1#set channelagent1.channels.c1.type = memoryagent1.channels.c1.capacity = 1000agent1.channels.c1.transactionCapacity = 100agent1.sources.r1.channels = c1agent1.sources.r1.type = execagent1.sources.r1.command = tail-F / home/hadoop/testlog/testha.logagent1.sources.r1.interceptors = i1 i2agent1.sources.r1.interceptors.i1.type = staticagent1.sources.r1. Interceptors.i1.key = Typeagent1.sources.r1.interceptors.i1.value = LOGINagent1.sources.r1.interceptors.i2.type = timestamp#set sink1agent1.sinks.k1.channel = c1agent1.sinks.k1.type = avroagent1.sinks.k1.hostname = hadoop02agent1.sinks.k1.port = 52020#set sink2agent1.sinks.k2.channel = c1agent1.sinks.k2.type = avroagent1.sinks.k2.hostname = hadoop03agent1.sinks.k2.port = 52020#set sink groupagent1.sinkgroups.g1.sinks = K1 k2#set failoveragent1.sinkgroups. G1.processor.type = failoveragent1.sinkgroups.g1.processor.priority.k1 = 10agent1.sinkgroups.g1.processor.priority.k2 = 1agent1.sinkgroups.g1.processor.maxpenalty = 10000#ha_collector.properties#set agent namea1.sources = r1a1.channels = c1a1.sinks = k1#set channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100#other node Nna to nnsa1.sources.r1.type = avro## why the current host Change it to what hostname a1.sources.r1.bind = hadoop03a1.sources.r1.port = 52020a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = Collector## why the current host Change it to what hostname a1.sources.r1.interceptors.i1.value = hadoop03a1.sources.r1.channels = c1#set sink to hdfsa1.sinks.k1.type=hdfsa1.sinks.k1.hdfs.path= hdfs://myha01/flume_ha/loghdfsa1.sinks.k1.hdfs.fileType=DataStreama1.sinks.k1.hdfs.writeFormat=TEXTa1.sinks.k1.hdfs.rollInterval=10a1.sinks.k1.channel=c1a1.sinks.k1.hdfs.filePrefix=%Y-%m-%d

Finally start:

# start the collector role on hadoop02 and hadoop03 first: bin/flume-ng agent-c conf-f agentconf/ha_collector.properties-n A1-Dflume.root.logger=INFO,console#, and then start the agent role on hadoop01,hadoop02: bin/flume-ng agent-c conf-f agentconf/ha_agent.properties-n agent1-Dflume.root.logger=INFO,console

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report