Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

1. The basic principle and use of Flume-- data collector

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

1. Overview 1. What is flume

1) Flume provides a distributed, reliable service that collects, aggregates and moves logs with large amounts of data efficiently. Flume can only run in Linux environment.

2) Flume is based on streaming architecture, with strong fault tolerance, flexibility and simple architecture.

3) Flume and Kafka are used for real-time data collection, Spark and Storm are used for real-time data processing, and impala is used for real-time query.

2. The basic structure of flume

​ figure 1.1 flume architecture

When it comes to the structure of flume, it is enough to take the picture of the official website directly.

First, a flume agent is deployed on each data source, and this agent is used to take the data.

This agent consists of three components: source,channel,sink. In flume, the basic unit of data transmission is event. Let's talk about these concepts.

(1) source

It is used to collect data from the data source and transmit the data in channel. Source supports multiple data source collection methods. For example, the monitoring port collects data, collects data from files, collects from directories, collects data from http services, and so on.

(2) channel

Located between source and sink, it is a temporary storage area of data. In general, the rate of data outflow from source is different from that of sink. So you need a space to temporarily store the data that can't be transferred to sink for processing. So channel is similar to a buffer, a queue.

(3) sink

Get the data from the channel and write the data to the destination source. A variety of destination sources are supported, such as local files, hdfs, kafka, source of the next flume agent, etc.

(4) event

Transmission unit, the basic unit of flume transmission, including headers and body, header can add some header information, body is data.

3. Flume transmission process

Based on the above concept, the process is basically clear. Source monitors the data source. If new data is generated, the data is obtained and encapsulated into an event, then the event is transferred to channel, and then sink pulls the data from channel and writes it to the target source.

Second, the use of flume 1. Flume deployment

The deployment of the flume program itself is very simple

(1) deploy jdk1.8

(2) decompress the flume program to the specified directory, and then add the environment variable.

(3) modify the configuration file

Cd / opt/modules/apache-flume-1.8.0-bin renames the template configuration file copy to the formal configuration file cp conf/flume-env.sh.template conf/flume-env.sh adds the jdk home directory variable vim conf/flume-env.sh plus this export JAVA_HOME=/opt/modules/jdk1.8.0_144

This is the completion of the configuration, basically no difficulty. The use of flume focuses on the writing of the configuration file of agent, which varies according to different business scenarios. To put it simply, it is actually the configuration of the working properties of the three major components of source,channel,sink.

2. Agent definition process

The configuration of agent is actually the configuration of source, channel and sink. There are five main steps. Let's take a look at what the process looks like.

# 1. The name of the defined agent, and the name of the specified source sinks channels used # can have multiple source sinks channels. .sources = .sinks = .operations = # 2, define source working properties. The basic format is agent name .sources.source name. Parameter name = value# the first parameter is type, which specifies the .sources type of source type = xxxx.sources..=xxxx.sources..=xxxx.# 3, and sets the channel working property. The format is similar # the first parameter is type, that is, specify the channel type .channels.. type = xxxxx.channels..=xxxxx.channels..=xxxxx.# 4, set the sink working property # the first parameter is type, specify the sink type = xxxxx.sinks..=xxxxx.sinks..=xxxxx.# 5, set the source and the channel used by the sink Connect the two via channel. Sources.. channels = .sinks.. channel =

This is the complete process defined by agent. Source, channel, and sink each have different types, and the parameters defined by each type are different. Let's take a look at the types commonly used in source, channel and sink (if you want to see all the complete types, check out the official website)

3 、 Common source type (1) netcat-- gets data from tcp port Common attribute: type: to be specified as netcatbind: listening hostname or ipport: listening Port example: listening at 0.0.0.0tcp Port 6666 a1.sources.r1.type = netcata1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 6666 (2) exec-- execution command output as a data source commonly used attribute: type : command to be specified as execcommand: run shell: run the shell named required For example, / bin/bash-c example: new content of monitoring file a1.sources.r1.type = execa1.sources.r1.command = tail-F / var/log/securea1.sourcesr.r1.shell = / bin/bash-c (3) commonly used attributes of spooldir-- monitoring directory content: type: set to spooldirspoolDir: directory path of monitoring fileSuffix: uploaded file with specified suffix Default is .COMPLETEDfileHeader: whether to add a key to the header of event to indicate the absolute path of the file. The default is falseignorePattern: regular match. There are many other parameters for ignored files. For example, a3.sources.r3.type = spooldira3.sources.r3.spoolDir = / opt/module/flume1.8.0/uploada3.sources.r3.fileSuffix = .COMPLETEDa3.sources.r3.fileHeader = true# ignores all files ending in .tmp. Do not upload a3.sources.r3.ignorePattern = ([^] *\ .tmp) (4) intermediate format of concatenation between avro--flume

This source is special and is usually used in the format of the sink output of the previous flume and then as the input to the next flume.

Commonly used attributes: type: need to be specified as avrobind: listening hostname or ip. It can only be the ip or hostnameport of the host where the agent resides. Example: a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 4141 (5) TAILDIR-- monitors file or directory content changes (1.7 and later)

​ spoolDir has a bug, that is, the uploaded file. No more content can be appended, otherwise an error will be reported and the new file content cannot be read. So spooldir can only be used to monitor the changes of new files in the directory, not the contents of existing files. In the past, this situation can only use exec sources, and then use tail-f xxxlog to listen for changes in the contents of the file, but this approach is flawed, that is, it is easy to lose data. After flume1.7, there is a new source, called TAILDIR, which can listen for changes in files directly. Take a look at the usage:

Common attributes: type:TAILDIR, remember, all uppercase filegroups: the name of the filegroup to be monitored, there can be multiple filegroups filegroups.: to specify which files are included in the filegroup, and extended regular expressions can be used. Here are some tips / path/.* so that you can listen for changes in the contents of all files in the directory positionFile: this file json format records the inode of each file in the directory. And pos offset fileHeader: whether to add too many header attributes Can be seen as the official website: http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source example: a1.sources = r1a1.channels = c1a1.sources.r1.type = TAILDIRa1.sources.r1.channels = c1a1.sources.r1.positionFile = / var/log/flume/taildir_position.jsona1.sources.r1.filegroups = F1 f2 there are two filegroups # Filegroup 1 content a1.sources.r1.filegroups.f1 = / var/ Log/test1/example.loga1.sources.r1.headers.f1.headerKey1 = value1# uses a regular expression to specify the filegroup a1.sources.r1.filegroups.f2 = / var/log/test2/.*log.*a1.sources.r1.headers.f2.headerKey1 = value2a1.sources.r1.headers.f2.headerKey2 = value2-2a1.sources.r1.fileHeader = truea1.sources.ri.maxBatchCount = 1000

Let's talk about the positionFile mentioned above and take a look at its format:

[{"inode": 408241856, "pos": 27550, "file": "/ opt/modules/apache-flume-1.8.0-bin/logs/flume.log.COMPLETED"}, {"inode": 406278032, "pos": 0, "file": "/ opt/modules/apache-flume-1.8.0-bin/logs/words.txt.COMPLETED"}, {"inode": 406278035, "pos": 0 "file": "/ opt/modules/apache-flume-1.8.0-bin/logs/words.txt"}, {"inode": 406278036, "pos": 34, "file": "/ opt/modules/apache-flume-1.8.0-bin/logs/test.txt"}] Analysis: 1. Each file is a json string. Something similar to an array made up of multiple json strings. 2, each json contains: inode: what do you mean by looking at the basics of the file system pos: start listening on the file contents of the initial offset file: file absolute path name 3, tips: (1) if listening to the directory, some files already exist, then flume default is from the last file as the starting point for listening. When the contents of the file are updated, flume gets it, and then sink. The posvalue is then updated. So because of this feature, even if the flume agent suddenly collapses, the next time it starts, it automatically starts listening from the last crashed pos, rather than from the end of the latest file. In this way, the data will not be lost, and the old data will not be read repeatedly. (2) as can be seen from (1), pos is a real-time updated file content listening point. If we want to monitor files from scratch, sometimes we need to transfer all the files under the listening directory to one side. At this point, it's easy to change the pos in the json file to 0. 4. If no positionFile path is specified Default is / USER_HOME/.flume/taildir_position.json4, Commonly used channel type (1) memory-- uses memory as a staging space commonly used properties: type: to be specified as memorycapacity: maximum number of event stored in channel transactionCapacity: maximum number of event transferred at a time example: a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100 (2) file-- uses disk files as a common attribute for temporary storage space: type: need to be specified as FilecheckpointDir: directory for storing checkpoint files dataDirs: example of a directory for storing data: a1.channels.c1.type = filea1.channels.c1.checkpointDir = / mnt/flume/checkpointa1.channels.c1.dataDirs = / mnt/flume/data (3) SPILLABLEMEMORY-- files + memory as scratch space this type is memory + files as channel Write to the attributes commonly used in the file when the capacity space exceeds memory: type: specified as SPILLABLEMEMORYmemoryCapacity: maximum number of event stored in memory overflowCapacity: maximum number of event stored in the file byteCapacity: maximum capacity of event stored in memory Unit is bytescheckpointDir: directory to store checkpoint files dataDirs: directory to store data example: a1.channels.c1.type = SPILLABLEMEMORYa1.channels.c1.memoryCapacity = 10000a1.channels.c1.overflowCapacity = 1000000a1.channels.c1.byteCapacity = 800000a1.channels.c1.checkpointDir = / mnt/flume/checkpointa1.channels.c1.dataDirs = / mnt/flume/data (4) kafka-- as channel

Flume+kafka is also a common technology stack in production environment, but kafka is generally regarded as the target of sink.

Common attribute: type: server set to org.apache.flume.channel.kafka.KafkaChannelbootstrap.servers:kafka cluster Topicconsumer.group.id in ip:port,ip2:port,....topic:kafka: consumer's groupid example: a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092 Kafka-3:9092a1.channels.channel1.kafka.topic = channel1a1.channels.channel1.kafka.consumer.group.id = flume-consumer5, commonly used sink types (1) logger-- is output directly as log information Common attributes: type:logger example: a1.sinks.k1.type = logger

This type is relatively simple and is generally used when debugging

(2) Intermediate format of avro-- tandem flume

This type is mainly used to give the next flume as an input format, a byte stream, and a serialized sequence.

Common attribute: type:avrohostname: the hostname or ip of the output target, which can be any host Not limited to native ip: Port output to example: a1.sinks.k1.type = avroa1.sinks.k1.hostname = 10.10.10.10a1.sinks.k1.port = 4545 (3) hdfs-- is written directly to hdfs common attribute: type:hdfshdfs.path: storage path Hdfs://namenode:port/PATHhdfs.filePrefix: prefix of uploaded file (additional) hdfs.round: whether to scroll folders by time hdfs.roundValue: scrolling time value hdfs.roundUnit: scrolling time unit hdfs.userLocalTimeStamp: whether to use local timestamp True or falsehdfs.batchSize: how many event are accumulated before flush to hdfs once hdfs.fileType: file type, DataStream (normal file), SequenceFile (binary format, default), CompressedStream (compressed format) hdfs.rollInterval: how often a new file is generated, in seconds hdfs.rollSize: file scroll size, unit is byteshdfs.rollCount: whether file scrolling is related to the number of event True or falsehdfs.minBlockReplicas: minimum number of copies example: # specify the type of sink stored in hdfs a2.sinks.k2.type = hdfs# path named hourly a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H# upload prefix a2.sinks.k2.hdfs.filePrefix = king-# whether to scroll the folder a2.sinks.k2.hdfs.round = true# in time units of time Create a new folder a2.sinks.k2.hdfs.roundValue = redefine time units a2.sinks.k2.hdfs.roundUnit = whether hour# uses a local timestamp a2.sinks.k2.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a2.sinks.k2.hdfs.batchSize = 100 sets the file type Support compression a2.sinks.k2.hdfs.fileType = DataStream# how often to generate a new file, in seconds a2.sinks.k2.hdfs.rollInterval = 60 seconds set the scroll size of each file Unit: bytesa2.sinks.k2.hdfs.rollSize = 13421770 scrolling of files has nothing to do with the number of Event a2.sinks.k2.hdfs.rollCount = minimum number of copies a2.sinks.k2.hdfs.minBlockReplicas = 1 (4) file_roll-- is stored to the local file system commonly used attributes: type:file_rollsink.directory: storage path example: a1.sinks.k1.type = file_rolla1.sinks.k1.sink.directory = / var/log/flum (5) Kafka-- is stored in kafka cluster commonly used attributes: tpye:org.apache.flume.sink.kafka.KafkaSinkkafka.topic:kafka topic name kafka.bootstrap.servers: cluster server list Kafka.flumeBatchSize separated by commas: number of event written to kafka kafka.producer.acks: when ack information is returned when received Minimum number of copies written kafka.producer.compression.type: compression type example: a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.kafka.topic = mytopica1.sinks.k1.kafka.bootstrap.servers = localhost:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sinks.k1.kafka.producer.compression.type = snappy6, interceptor interceptors common type

Interceptor interceptors is not required, it is a component that works between source and channel to filter data from source and output it to channel.

Use format:

First specify the name of the interceptor, and then configure the working properties for each interceptor. Sources.. interceptors = .sources.. interceptors.. = xxxx (1) timestamp timestamp interceptor

Add a field to the header of event to indicate the timestamp such as: headers: {timestamp:111111}.

Common attribute: type:timestampheaderName: key name in header. Default is timestamp example: a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp (2) host hostname interceptor

Add a field to the header of event to indicate the host stamp, such as: headers: {host:bigdata121}.

Common attribute: type:hosthostHeader: key name in header. Default is hostuseIP: ip or hostname example: a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = host (3) UUID interceptor

Add a field to the header of event to indicate the uuid such as: headers: {id:111111}.

Common attribute: type:org.apache.flume.sink.solr.morphline.UUIDInterceptor$BuilderheadName: key name in header, default is idprefix: add prefix to each UUID (4) search_replace query replacement

Use regular matching, and then replace the specified character

Common attributes: type:search_replacesearchPattern: matching regular replaceString: replaced string charset: character set, default UTF-8 example: delete the string beginning with a specific character a1.sources.avroSrc.interceptors = search-replacea1.sources.avroSrc.interceptors.search-replace.type = search_replacea1.sources.avroSrc.interceptors.search-replace.searchPattern = ^ [A-Za-z0-9 _] + a1.sources.avroSrc.interceptors.search-replace.replaceString = (5) regex_filter regular filtering

Regular matching, discarding or leaving the matches

Common attributes: type:regex_filterregex: regular excludeEvents:true to filter out matches, false to leave examples of matches: a1.sources.r1.interceptors.i1.type = regex_filtera1.sources.r1.interceptors.i1.regex = ^ A. matches # if excludeEvents is set to false, it means to filter out events that does not start with A. If excludeEvents is set to true, events starting with An is filtered out. A1.sources.r1.interceptors.i1.excludeEvents = true (6) regex_extractor regular extraction

Here, we use regular group matching to get multiple matching groups, and then store the matching values of each group in header, which can be customized by key.

A1.sources.r1.type = execa1.sources.r1.channels = c1a1.sources.r1.command = tail-F / opt/Andya1.sources.r1.interceptors = ibinary specifies regular a1.sources.r1.interceptors.i1.regex = hostname is (. *?) whose type is regex_extractora1.sources.r1.interceptors.i1.type = regex_extractor# packet matching. Ip is (. *) # key aliases of the two groups a1.sources.r1.interceptors.i1.serializers = S1 slots set the name of key a1.sources.r1.interceptors.i1.serializers.s1.name = cookieida1.sources.r1.interceptors.i1.serializers.s2.name = ip (7) Custom interceptor

Inherit the interface org.apache.flume.interceptor.Interceptor to implement specific methods, such as:

Import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList;import java.util.List Public class MyInterceptor implements Interceptor {@ Override public void initialize () {} @ Override public void close () {} / * intercept messages sent by source to channel channel * handle a single event * @ param event receive filtered event * @ return event according to event * / @ Override public Event intercept (Event event) { / / get the byte data in the event object byte [] arr = event.getBody () / / convert the acquired data to uppercase event.setBody (new String (arr). ToUpperCase (). GetBytes ()); / return to the message return event;} / / process the event collection @ Override public List intercept (List events) {List list = new ArrayList (); for (Event event: events) {list.add (intercept (event)) } return list;} / / is used to return the interceptor object public static class Builder implements Interceptor.Builder {/ / get the properties of the configuration file @ Override public Interceptor build () {return new MyInterceptor ();} @ Override public void configure (Context context) {}}

Pom.xml dependence

Org.apache.flume flume-ng-core 1.8.0

Specify the interceptor in the configuration file of agent

A1.sources.r1.interceptors = full class name $Buildera1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

Run the command:

Bin/flume-ng agent-c conf/-n A1-f jar/ToUpCase.conf-C jar/Flume_Andy-1.0-SNAPSHOT.jar-Dflume.root.logger=DEBUG,console-C specifies the path of the additional jar package, which is the interceptor jar package that we wrote ourselves.

You can also put the jar package in the lib directory of the flume program directory

Third, flume case 1, read the file to hdfs# 1. Define the name of agent a2. And define the name of the source,sink,channel in this agent a2.sources = r2a2.sinks = k2a2.channels = c2q2. Define the Source, define the data source # define the source type as exec, execute the command a2.sources.r2.type = exec# command a2.sources.r2.command = tail-F / tmp/access.log# shella2.sources.r2.shell = / bin/bash-clocked 3. Define sink# specifies the type of sink stored in hdfs a2.sinks.k2.type = hdfs# path named hourly a2.sinks.k2.hdfs.path = hdfs://bigdata121:9000/flume/%H# upload file prefix a2.sinks.k2.hdfs.filePrefix = king-# whether to create a new folder a2.sinks by time scrolling folder a2.sinks.k2.hdfs.round = true#. K2.hdfs.roundValue = redefine time unit a2.sinks.k2.hdfs.roundUnit = whether hour# uses a local timestamp a2.sinks.k2.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a2.sinks.k2.hdfs.batchSize = 100 sets the file type Can support compression a2.sinks.k2.hdfs.fileType = DataStream# how often to generate a new file, unit is a2.sinks.k2.hdfs.rollInterval = 60 seconds to set the scroll size of each file, unit is bytesa2.sinks.k2.hdfs.rollSize = 13421770 file scrolling has nothing to do with the number of Event a2.sinks.k2.hdfs.rollCount = "minimum number of copies a2.sinks.k2.hdfs.minBlockReplicas =" 4. Define Channel, type, capacity limit, transmission capacity limit a2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 10. 5. Link to connect source and sink via channel a2.sources.r2.channels = c2a2.sinks.k2.channel = c2

Start flume-agent:

/ opt/module/flume1.8.0/bin/flume-ng agent\-- conf/ opt/module/flume1.8.0/conf/\ flume configuration directory-- name a2\ agent name-- conf-file / opt/module/flume1.8.0/jobconf/flume-hdfs.conf agent configuration-Dflume.root.logger=INFO,console prints logs to terminal 2, multi-flume federation, one-to-many

Flume1: exporting to flume2 and flume3

Flume2: exporting to local fil

Flume3: exporting to hdfs

Flume1.conf

# Name the components on this agenta1.sources = r1a1.sinks = K1 k2a1.channels = C1 cstores copies the data stream to multiple channel. Start replication mode a1.sources.r1.selector.type = replicating# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail-F / opt/testa1.sources.r1.shell = / bin/bash-c # this is K1 sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = bigdata111a1.sinks.k1.port = 414 cycles and this is K2 sinka1.sinks.k2.type = avroa1.sinks.k2.hostname = bigdata111a1.sinks.k2.port = 414 cycles Describe The channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 10 give source access to connect two channel. Each channel corresponds to a sinka1.sources.r1.channels = C1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2

Flume2.conf

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = clocked Describe/configure the sourcea2.sources.r1.type = avroa2.sources.r1.bind = bigdata111a2.sources.r1.port = 414 upload Describe the sinka2.sinks.k1.type = hdfsa2.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume2/%H# upload prefix a2.sinks.k1.hdfs.filePrefix = whether the folder a2.sinks.k1 is scrolled by time. Hdfs.round = true# how many units of time to create a new folder a2.sinks.k1.hdfs.roundValue = "redefine time units a2.sinks.k1.hdfs.roundUnit = whether hour# uses local timestamps a2.sinks.k1.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a2.sinks.k1.hdfs.batchSize = 10 sets the file type Can support compression a2.sinks.k1.hdfs.fileType = DataStream# how long to generate a new file a2.sinks.k1.hdfs.rollInterval = 60 sets the scrolling size of each file is about 128Ma2.sinks.k1.hdfs.rollSize = 13421770 the scrolling of the file is independent of the number of Event a2.sinks.k1.hdfs.rollCount = "minimum number of copies a2.sinks.k1.hdfs.minBlockReplicas =" Describe the channela2.channels.c1.type = memorya2.channels.c1 .capacity = 1000a2.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = C1

Flume3.conf

# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = cased Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = bigdata111a3.sources.r1.port = 414 "Describe the sinka3.sinks.k1.type = file_roll# Note: the folder here needs to be created first a3.sinks.k1.sink.directory = / opt/flume3# Describe the channela3.channels.c1.type = memorya3.channels.c1.capacity = 1000a3.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = C1

When starting, start flume2 and flume3 first, and finally start flume1. The startup command is not repeated.

3. Multi-flume association, many-to-one

There are many scenarios in which logs generated by multiple server need to be monitored and stored together.

Flume1 (listening file) and flume2 (listening port) collect data respectively, and then sink to flume3,flume3 to summarize and write to hdfs.

Flume1.conf

# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = cantilever Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail-F / opt/Andya1.sources.r1.shell = / bin/bash-c # Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = bigdata111a1.sinks.k1.port = 414 Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10 Bind the source And sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = C1

Flume2.conf

# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = centering Describe/configure the sourcea2.sources.r1.type = netcata2.sources.r1.bind = bigdata111a2.sources.r1.port = 4444 Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = bigdata111a2.sinks.k1.port = 414 Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela2. Sources.r1.channels = c1a2.sinks.k1.channel = C1

Flume3.conf

# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = clocked Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = bigdata111a3.sources.r1.port = 414 upload Describe the sinka3.sinks.k1.type = hdfsa3.sinks.k1.hdfs.path = hdfs://bigdata111:9000/flume3/%H# upload prefix a3.sinks.k1.hdfs.filePrefix = whether the folder a3.sinks.k1 is scrolled by time. Hdfs.round = true# how many units of time to create a new folder a3.sinks.k1.hdfs.roundValue = "redefine time units a3.sinks.k1.hdfs.roundUnit = whether hour# uses local timestamps a3.sinks.k1.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a3.sinks.k1.hdfs.batchSize = 10 sets the file type Can support compression a3.sinks.k1.hdfs.fileType = DataStream# how long to generate a new file a3.sinks.k1.hdfs.rollInterval = 60 setting the scrolling size of each file is about 128Ma3.sinks.k1.hdfs.rollSize = 13421770 the scrolling of the file is independent of the number of Event a3.sinks.k1.hdfs.rollCount = "minimum redundancy a3.sinks.k1.hdfs.minBlockReplicas =" Describe the channela3.channels.c1.type = memorya3.channels.c1 .capacity = 1000a3.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = C1

Start flume3 first, and then start flume1 and flume2

$bin/flume-ng agent-- conf conf/-- name a3-- conf-file jobconf/flume3.conf$ bin/flume-ng agent-- conf conf/-- name a2-- conf-file jobconf/flume2.conf$ bin/flume-ng agent-- conf conf/-- name A1-- conf-file jobconf/flume1.conf

The test can send data through telnet bigdata111 44444 port

You can append data to the / opt/Andy file

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report