Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Hadoop IV-data collection flume

2025-02-22 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

Flume is a distributed, reliable, and available service that effectively collects, aggregates, and moves large amounts of data logs from many different sources to a centralized data store. And it is a simple and flexible flow-based data flow architecture. It has the mechanism of robustness and fault tolerance as well as the mechanism of failover and recovery. It uses a simple and extensible data model for the application of analysis. The data transmitted by Flume can be generated by network, media, etc.

Apache Flume is a top-level project of the Apache Software Foundation.

Source-Source, receiver-Sink, Channel-Channel

Flume is a high-performance and high-possibility distributed log collection system of cloudera.

The core of flume is to collect data from the data source and send it to the destination. In order to ensure the success of the transmission, the data will be cached before it is sent to the destination, and the cached data will be deleted after the data actually arrives at the destination.

The basic unit of data transmitted by flume is event, and if it is a text file, it is usually a line of records, which is also the basic unit of transactions.

The core of flume operation is agent. It is a complete data collection tool with three core components, namely source, channel and sink. With these components, event can flow from one place to another.

Apache Flume system requirements:

Java Runtime Environment-Java 1.6 or later (Java 1.7 Recommended)

Memory-Sufficient memory for configurations used by sources, channels or sinks

Disk Space-Sufficient disk space for configurations used by channels or sinks

Directory Permissions-Read/Write permissions for directories used by agent

Data stream

The Flume event is defined as a unit of data traffic with a payload of one byte and an optional string attribute. During the Flume Agent (JVM) process, it hosts components that flow through these events from the external source to the next destination (hop).

Multi-agent mode:

One-to-multiple output model:

Source:

The Client side operates the source of consumption data. Flume supports Avro,log4j,syslog and http post (body is in json format). You can allow applications to deal directly with existing Source, such as AvroSource,SyslogTcpSource. You can also write a Source to access your application by IPC or RPC, both Avro and Thrift (NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface, respectively), where Avro is the default RPC protocol. For specific code-level client data access, please refer to the official manual.

The smallest change to the existing program is to read the log file recorded by the program directly, which can be accessed seamlessly without any changes to the existing program.

For reading the file Source directly, there are two ways:

ExecSource: continuously outputs the latest data, such as the tail-F filename instruction, by running the Linux command, in which case the filename must be specified. ExecSource can realize real-time log collection, but when Flume is not running or instruction execution is wrong, log data can not be collected and the integrity of log data can not be guaranteed.

SpoolSource: monitor the new files in the configured directory and read the data in the files. There are two points to note: files copied to the spool directory can no longer be opened for editing, and the spool directory cannot contain corresponding subdirectories. Although SpoolSource cannot collect data in real time, it can split files in minutes to approach real-time. If the application cannot cut log files in minutes, it can be used in combination with the two collection methods. In the process of actual use, it can be used in combination with log4j. When using log4j, set the file division mechanism of log4j to once every minute, and copy the files to the monitoring directory of spool. Log4j has a plug-in for TimeRolling that can transfer log4j split files to the spool directory. The real-time monitoring is basically realized. After transferring the file, Flume will modify the suffix of the file to .COMPLETED (the suffix can also be flexibly specified in the configuration file).

Channel:

There are many ways to Channel: there is MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel.

MemoryChannel can achieve high-speed throughput, but can not guarantee the integrity of the data.

MemoryRecoverChannel has suggested that it should be replaced with FileChannel on the recommendation of the official documentation.

FileChannel ensures the integrity and consistency of data. When configuring a non-current FileChannel, it is recommended that the directory set by FileChannel and the directory saved by the program log file be set to different disks in order to improve efficiency.

Sink:

When Sink sets up to store data, it can save data to file system, database and hadoop. When there is less log data, it can store data in the file system, and set a certain time interval to save the data. When there is too much log data, the corresponding log data can be stored in Hadoop to facilitate the corresponding data analysis in the future.

The events generated by a web server are consumed by the Flume source. Events sent from external sources are sent to Flume with a recognized format. For example, an Avro Flume source can be used to receive events sent from Avro clients or other flume agents from Avro link. When an Flume source receives an event, it stores it in a live multiple channels, which holds the event until it is disposed of by Flume sink consumption, such as JDBC Channel as an example-it uses a file system to support an embedded database, and sink removes events from channel and puts them in an external repository, such as HDFS, or flows to the next Flume source source Source and sink run events asynchronously in agent.

Complex data flow:

Before the Flume reaches the final destination, it allows the user to establish multiple skipping activities through multiple agents. It also allows fan-in and fan-out flows for each hop that fails, and content routing and backup routing fail (failover).

Reliability:

The core of Flume is to collect data from the data source and send it to the destination. In order to ensure the success of the transmission, the data will be cached before it is sent to the destination, and the cached data will be deleted after the data actually arrives at the destination.

Flume uses a transactional approach to ensure the reliability of the entire process of transmitting Event. The Sink can remove the Event from the Channel only after the Event has been stored in the Channel, or it has been communicated to the next station agent, or after it has been stored in an external data destination. In this way, the event in the data stream, whether in an agent or between multiple agent, can be guaranteed to be reliable, because the above transactions ensure that the event will be stored successfully. However, many implementations of Channel have different guarantees in terms of recoverability. It also ensures the reliability of event in different degrees. For example, Flume supports saving a file channel locally as a backup, while memory channel stores event in memory queue, which is fast, but cannot be recovered if lost.

Take a look at Transaction in detail. Source and Sink encapsulate the transaction store and fetch interfaces to Event provided by Channel. The following figure shows a transaction process:

one

An implementation of Channel will include an implementation of transaction, and every source and sink dealing with channel must have a transaction object. In the following example, you can see that the status and changes of an Event are completed in a transation. The state of transaction also corresponds to each state in the sequence diagram.

Failure recovery:

The event in the channel of each agent can recover in the event of a failure. Flume supports persistent file channel (local file system must support) memory channel simply stores the event in the memory queue, so that once the event in memory is lost, it cannot be recovered.

Configure an agent (agent)

The Flume agent configuration is stored in the local configuration file. This is a text file format that follows the Java properties file format. In the same profile, you can specify the configuration of one or more agents. The profile includes the nature of each source, receiver, and proxy channel and they are connected together to form a data stream.

Configure a single component

The name, type, and properties of a specific set of types and instances of each component (source, receiver, or channel) in the stream. For example, the Avro source requires a hostname (or IP address) and the port number on which the data is received. A memory channel can have a maximum queue size ("capacity"), HDFS's heatsink needs to know the file system's URI, path creation file, file rotation frequency ("hdfs.rollInterval"), etc., all of these components' properties need to be set in the managed Flume agent's property file.

Composite components (Wiring the pieces together)

The agent needs to know what loads the various components and how they are connected to form the flow. This is done by listing the name of the source, sink and proxy channel, and then specifying the connection channel for each receiver and source. For example, proxying to HDFS flume HDFS cluster1 flows events called avroWeb Avro sources through the JDBC JDBC channel channel. The configuration file will contain the names of these components and the JDBC channel as the avroWeb source and sink in the HDFS cluster1 as the shared channel.

Start the agent (starting an agent)

The agent is started using a shell script called flume-NG that is located in the flume distribution in the bin directory. You need to specify the name of the agent, config directory, configuration file on the command line:

$bin/flume-ng agent-n $agent_name-c conf-f conf/flume-conf.properties.template

Now the agent will start running the source and sink configuration in the given properties file.

A simple example

Here, let's give an example, a configuration file, that describes a single-node Flume deployment. This configuration allows user-generated events and subsequent output to the console.

# example.conf: A single-node Flume configuration

# Name the components on this agent

A1.sources = R1

A1.sinks = K1

A1.channels = C1

# Describe/configure the source

A1.sources.r1.type = netcat

A1.sources.r1.bind = localhost

A1.sources.r1.port = 44444

# Describe the sink

A1.sinks.k1.type = logger

# Use a channel which buffers events in memory

A1.channels.c1.type = memory

A1.channels.c1.capacity = 1000

A1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel

A1.sources.r1.channels = C1

A1.sinks.k1.channel = C1

This configuration defines a single agent called agent1. Agent1 listens on port 44444, and the channel caches event data in memory, which is logged to the console and a data source on a receiver. Configure the various components of the file name, and then introduce their types and configuration parameters. A given configuration file may define multiple named proxies; when a given Flume process starts, it passes a flag telling its named proxy representation.

Combined with this configuration file, we launch Flume with the following parameters:

$bin/flume-ng agent-- conf conf--conf-file example.conf-- name A1-Dflume.root.logger=INFO,console

Note that in a full deployment, we usually include an option:-CONF=. The directory will include a shell script flume-env.sh and a built-in Log4j properties file. In this example, we use a Java option to force flume to log in to the console

We can start from a separate terminal, then telnet port 44444 and send flume events:

$telnet localhost 44444Trying 127.0.0.1...Connected to localhost.localdomain (127.0.0.1). Escape character is'^]'. Hello world! OK

His original flume terminal outputs log events.

12-06-19 15:32:19 INFO source.NetcatSource: Source starting

12-06-19 15:32:19 INFO source.NetcatSource: Created serverSocket: sun.nio.ch.ServerSocketChannelImpl[ / 127.0.0.1:44444]

12-06-19 15:32:34 INFO sink.LoggerSink: Event: {headers: {} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }

At this point, you have successfully configured and deployed a flume agent! Subsequent chapters cover more detailed agent configuration.

Data acquisition

Flume supports mechanisms for fetching data from external data sources.

RPC

In flume, the Avro client can send a given file Avro source using the AVRO RPC mechanism:

$bin/flume-ng avro-client-H localhost-p 41414-F / usr/logs/log.10

The above command sends the contents of the / usr/logs/log.10 to the flume source listener

Executing commands

There is also an exec that executes a given command to get the source of the output. A single output, "line". Enter ('\ R') or newline character ('\ N'), or text of both.

Note: Flume does not support tail as a source, but it can be done through exec tail

Network streams

Flume supports the following mechanisms to read data from popular log flow types

Avro

Syslog

Netcat

Define flow

In a single proxy-defined stream, you need to link through a channel of source and receiver. You need to list the source, receiver and channel, for a given proxy, and then point to the source and receiver and channel. An instance of a source can specify multiple channels, but only one sink instance channel can be specified. The format is as follows:

# list the sources, sinks and channels for the agent

.sources =

.sinks =

.resume =

# set channel for source

Sources. Sources =.

# set channel for sink

.sinks.. channel =

For example, an agent named weblog-agent, externally through the avro client, and sends data to hdfs through the memory channel. The weblog.config in the configuration file might look like this:

# list the sources, sinks and channels for the agent

Agent_foo.sources = avro-appserver-src-1

Agent_foo.sinks = hdfs-sink-1

Agent_foo.channels = mem-channel-1

# set channel for source

Agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink

Agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

This causes the flow of events from avro-AppSrv-source to hdfs-Cluster1-sink through the memory channel mem-channel-1. When the agent starts weblog.config as its configuration file, it instantiates the flow.

Configure a single component

After defining the stream, you need to set the properties of each source, receiver, and channel. You can set the property values of the component separately.

# properties for sources

.sources.. =

# properties for channels

.channel.. =

# properties for sinks

.sources.. =

The "type" property must be set for each component to know what kind of objects it needs. Each source, receiver and channel type has its own set of performance it requires to achieve the desired functionality. All of these must be set up as needed. In the previous example, we got the avro-AppSrv-source source that flows from hdfs-Cluster1-sink to HDFS and through the memory channel mem-channel-1. Here is an example that shows the configuration of these components.

Agent_foo.sources = avro-AppSrv-source

Agent_foo.sinks = hdfs-Cluster1-sink

Agent_foo.channels = mem-channel-1

# set channel for sources, sinks

# properties of avro-AppSrv-source

Agent_foo.sources.avro-AppSrv-source.type = avro

Agent_foo.sources.avro-AppSrv-source.bind = localhost

Agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1

Agent_foo.channels.mem-channel-1.type = memory

Agent_foo.channels.mem-channel-1.capacity = 1000

Agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink

Agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs

Agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

#...

Add multiple streams to a single agent

A single Flume agent can contain several separate streams. You can list multiple sources, receivers, and channels in a configuration file. These components can be connected to form multiple streams.

# list the sources, sinks and channels for the agent

.sources =

.sinks =

.resume =

Then you can connect the source and receiver to their corresponding channels and set up two different streams. For example, if you need to set up a weblog proxy with two streams, one from the external Avro client to HDFS, and the other output from tail to the Avro receiver, then do a configuration here:

# list the sources, sinks and channels in the agent

Agent_foo.sources = avro-AppSrv-source1 exec-tail-source2

Agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2

Agent_foo.channels = mem-channel-1 file-channel-2

# flow # 1 configuration

Agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1

Agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow # 2 configuration

Agent_foo.sources.exec-tail-source2.channels = file-channel-2

Agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

Configure the multi-agent process

To set up a multi-layer stream, you need to have an avro receiver that points to the first hop of the next-hop avro source. This will cause the first Flume agent to forward the event to the next Flume agent. For example, if you send files periodically, each event (file 1) AVRO client uses a local Flume agent, then this local agent can be forwarded to another stored agent.

Weblog agent config:

List sources, sinks and channels in the agent

Agent_foo.sources = avro-AppSrv-source

Agent_foo.sinks = avro-forward-sink

Agent_foo.channels = file-channel

# define the flow

Agent_foo.sources.avro-AppSrv-source.channels = file-channel

Agent_foo.sinks.avro-forward-sink.channel = file-channel

# avro sink properties

Agent_foo.sources.avro-forward-sink.type = avro

Agent_foo.sources.avro-forward-sink.hostname = 10.1.1.100

Agent_foo.sources.avro-forward-sink.port = 10000

# configure other pieces

#...

HDFS agent config:

# list sources, sinks and channels in the agent

Agent_foo.sources = avro-collection-source

Agent_foo.sinks = hdfs-sink

Agent_foo.channels = mem-channel

# define the flow

Agent_foo.sources.avro-collection-source.channels = mem-channel

Agent_foo.sinks.hdfs-sink.channel = mem-channel

# avro sink properties

Agent_foo.sources.avro-collection-source.type = avro

Agent_foo.sources.avro-collection-source.bind = 10.1.1.100

Agent_foo.sources.avro-collection-source.port = 10000

# configure other pieces

#...

Here we connect the avro-forward-sink from weblog-agent to the avro-collection-source collection source from hdfs-agent. The end result from the external source of the appserver is finally stored in the HDFS event.

Fan out flow

Flume supports Fan out streams from one source to multiple channels. There are two modes of Fan out, replication and reuse. In the case of replication, the events of the stream are sent to all configured channels. In the case of reuse, events are sent to a subset of the available channels. The Fan out flow needs to specify the rules for the source and Fan out channel. This is done by adding a channel "select" that can be copied or repeated. Further specify the selection rule if it is a multiplex. If you do not specify a choice, it replicates by default

# List the sources, sinks and channels for the agent

.sources =

.sinks =

.resume =

# set list of channels for source (separated by space)

Sources. Sources =

# set channel for sinks

.sinks.. channel =

.sinks.. channel =

.sources.. selector.type = replicating

The attributes of the reused selection set further fork. This requires specifying an event attribute to map to a set of channels. Select each event header check in the configuration property. If the specified value matches, the event is sent to all channels mapped to that value. If there is no match, the event is sent to the channel that is set to the default configuration.

# Mapping for multiplexing selector

.sources.. selector.type = multiplexing

.sources.. selector.header =

.sources..selector.mapping. =

.sources..selector.mapping. =

.sources..selector.mapping. =

#...

.sources.. selector.default =

Mapping allows each value channel to overlap. The default value can contain any number of channels. In the following example, a single stream multiplexes two paths. The agent has a single avro source and two channels that connect to two receivers.

# list the sources, sinks and channels in the agent

Agent_foo.sources = avro-AppSrv-source1

Agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2

Agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source

Agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks

Agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

Agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration

Agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing

Agent_foo.sources.avro-AppSrv-source1.selector.header = State

Agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1

Agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2

Agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2

Agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

"State" is used as a selection check for Header. If the value is "CA", then send it to mem-channel-1, if it's "AZ", then jdbc-channel-2, if it's "NY" then send it to these two. If the "State" header is not set or does not match any three, then go to the default mem-channel-1 channel.

Flume Sources

Avro Source

The Avro port listens for and receives events from external Avro customer flows. When the built-in AvroSink has another (forward-hopped) Flume agent, it can create a hierarchical set pairing topology.

Example for agent named agent_foo:

A1.sources = r1a1.channels = c1a1.sources.r1.type = org.apache.flume.source.avroLegacy.AvroLegacySourcea1.sources.r1.host = 0.0.0.0a1.sources.r1.bind = 6666a1.sources.r1.channels = C1

Exec Source

This source starts running a given Unix command, which is expected to continue to produce standard output (stderr is simply discarded unless the data on the logStdErr= TRUE). If the source exits when the process exits for any reason, no further data is generated.

Note: there is no guarantee in ExecSource, if there is a failed event to put into the channel, the customer also knows. In this case, the data will be lost.

A1.sources = R1

A1.channels = C1

A1.sources.r1.type = exec

A1.sources.r1.command = tail-F / var/log/secure

A1.sources.r1.channels = C1

Shell:

Agent_foo.sources.tailsource-1.type = exec

Agent_foo.sources.tailsource-1.shell = / bin/bash-c

Agent_foo.sources.tailsource-1.command = for i in / path/*.txt; do cat $I; done

JMS source:

Required properties are in bold.

Converter:

Example for agent named a1:

A1.sources = r1a1.channels = c1a1.sources.r1.type = jmsa1.sources.r1.channels = c1a1.sources.r1.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactorya1.sources.r1.connectionFactory = GenericConnectionFactorya1.sources.r1.providerURL = tcp://mqserver:61616a1.sources.r1.destinationName = BUSINESS_DATAa1.sources.r1.destinationType = QUEUE

Spooling Directory Source

Example for an agent named agent-1:

Agent-1.channels = ch-1

Agent-1.sources = src-1

Agent-1.sources.src-1.type = spooldir

Agent-1.sources.src-1.channels = ch-1

Agent-1.sources.src-1.spoolDir = / var/log/apache/flumeSpool

Agent-1.sources.src-1.fileHeader = true

Write agent configuration

The core of using flume is how to configure agent files. The configuration of agent is a plain text file, which stores configuration information in the form of key-value pairs, and multiple agent information can be set. The content of configuration includes source, channel, sink and so on. Components source, channel, and sink all have names, types, and many personalized property configurations.

The configuration file should be written like this

# list the sources, sinks and channels for the agent

.sources =

.sinks =

.resume =

# set channel for source

Sources. Sources =.

# set channel for sink

.sinks.. channel =

# properties for sources

.sources.. =

# properties for channels

.channel.. =

# properties for sinks

.sources.. =

# the following is an example

# the following agent1 is the agent name, corresponding to source, the name is src1,;, there is a sink, the name is sink1;, there is a channel, and the name is ch2.

Gent1.sources = src1

Agent1.sinks = sink1

Agent1.channels = ch3

# configure the directory source to monitor the changes of the directory (must exist), and require that the file name must be unique, otherwise flume reports an error

Agent1.sources.src1.type = spooldir

Agent1.sources.src1.channels = ch3

Agent1.sources.src1.spoolDir = / root/hmbbs

Agent1.sources.src1.fileHeader = false

Agent1.sources.src1.interceptors = i1

Agent1.sources.src1.interceptors.i1.type = timestamp

# configure memory channel

Agent1.channels.ch2.type = memory

Agent1.channels.ch2.capacity = 1000

Agent1.channels.ch2.transactionCapacity = 1000

Agent1.channels.ch2.byteCapacityBufferPercentage = 20

Agent1.channels.ch2.byteCapacity = 800000

# configuration file channel

Agent1.channels.ch3.type = file

Agent1.channels.ch3.checkpointDir = / root/flumechannel/checkpoint

Agent1.channels.ch3.dataDirs = / root/flumechannel/data

# configure hdfs sink

Agent1.sinks.sink1.type = hdfs

Agent1.sinks.sink1.channel = ch3

Agent1.sinks.sink1.hdfs.path = hdfs://hadoop0:9000/flume/%Y-%m-%d/

Agent1.sinks.sink1.hdfs.rollInterval=1

Agent1.sinks.sink1.hdfs.fileType = DataStream

Agent1.sinks.sink1.hdfs.writeFormat = Text

# configure hbase sink

# configure hbase sink2

Agent1.sinks.sink2.type = hbase

Agent1.sinks.sink2.channel = channel1

Agent1.sinks.sink2.table = hmbbs

Agent1.sinks.sink2.columnFamily = cf

Agent1.sinks.sink2.serializer = flume.HmbbsHbaseEventSerializer

Agent1.sinks.sink2.serializer.suffix = timestamp

Agent1.sinks.sink2.serializer = org.apache.flume.sink.hbase.SimpleHbaseEventSerializer

5. The script to start the agent is flume-ng agent, and you need to specify agent name, configuration directory, and configuration file

-n specify agent name

-c specify the profile directory

-f specify profile

-Dflume.root.logger=DEBUG,console

So the complete startup command should be written like this.

Bin/flume-ng agent-n agent1-c conf-f conf/example-Dflume.root.logger=DEBUG,console

After the startup is successful, you can put the file into the directory / root/hmbbs, and flume will sense the new file and upload it to the / flume directory of hdfs.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report