In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-10 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/03 Report--
1Flume Overview 1.1 definition
Flume is a highly available, highly reliable, distributed mass log collection, aggregation and transmission system provided by Cloudera.
Flume is based on streaming architecture and is flexible and simple.
1.2 Features
Can be integrated with any storage process
The input data rate is greater than the write destination storage rate, and Flume will buffer it to reduce the pressure on HDFS.
Transactions in Flume are based on Channel and use two transaction models (sender+ receiver) to ensure that messages are sent reliably
Flume uses two separate transactions for event delivery from Soucrce to Channel and from Channel to Sink. Once all the data in the transaction is successfully submitted to Channel, then Source considers that the data has been read. Similarly, only the data successfully written out by Sink will be removed from Channel.
1.3 component architecture
1.3.1Agent
Agent is a JVM process that passes data from source to destination in the form of events
Agent is mainly composed of Source, Channel and Sink.
1.3.2Source
Source is the component responsible for receiving data to Agent and can handle various types, including avro, thrift, exec, jms, spooling directory, netcat, sequence generator, syslog, http, legacy
1.3.3Channel
Channel is a buffer between Source and Sink, so Channel allows Source and Sink to operate at different rates. Channel is thread-safe and can handle several Source writes and several Sink reads at the same time.
Flume comes with two kinds of Channel:
Memory Channel: queues in memory are fast and suitable for use in situations where relational data loss is not required
File Channel: all events are written to disk, so data is not lost in the event of program shutdown or machine downtime
1.3.4Sink
Sink constantly polls events in Channel and removes them in batches, and writes them in batches to the storage or indexing system, or is sent to another Flume Agent.
Sink is completely transactional. Before deleting data in batches from Channel, each Sink starts a transaction with Channel. Once the batch event is successfully written to the storage system or the next Flume Agent,Sink, the transaction is committed using Channel. Once the transaction is committed, the Channel deletes the event from its own internal buffer.
Sink component destinations include hdfs, logger, avro, thrift, ipc, file, null, HBase, solr, Custom.
1.3.5Event
Transmission unit, the basic unit of Flume data transmission, sends data from source to destination in the form of events.
Event consists of an optional header and a byte array containing data, and Header is a HashMap that holds a pair of key-value strings.
Usually a piece of data is an Event, divided into an Event for every 2048 bytes.
1.4 Topology
This mode connects multiple Flume sequentially, from the initial Source to the destination storage system of the final Sink transmission. It is not recommended to bridge too many Flume numbers. Excessive Flume quantity will not only affect the transmission rate, but also affect the entire transmission system if a node Flume goes down during the transmission process.
Flum supports the flow of events to one or more destinations. In this mode, the data source is copied to multiple Channel, each Channel has the same data, and the Sink can choose a different destination to send.
Flume supports the use of logically dividing multiple Sink into a Sink group, and Flume sends data to different Sink, mainly to solve the problems of load balancing and failover.
This mode is the most common and very practical for us. Daily web applications are usually distributed on hundreds of servers, or even thousands or tens of thousands of servers. It is also very troublesome to deal with the logs generated. This combination of Flume can solve this problem very well. Each server deploys a Flume to collect logs and send them to a Flume that collects logs. Then upload the Flume to hdfs, hive, hbase, jms and so on for log analysis.
1.5Agent principle
2Flume deployment
1. Decompress apache-flume-1.7.0-bin.tar.gz to / opt/module directory
2. Change the name of apache-flume-1.7.0-bi to flume
3. Modify the flume-env.sh.template file under flume/conf to flume-env.sh, and configure JAVA_HOME in flume-env.sh
3 Enterprise Development case 3.1 Monitoring Port data
Requirements Analysis:
The server listens to the native port 44444
The server uses the netcat tool to send messages to port 44444
Finally, the data is displayed on the console.
Implementation steps:
1. Create the Agent configuration file flume-netcat-logger.conf under the job folder.
[djm@hadoop102 job] $vim flume-netcat-logger.conf
2. Add the following:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = centering Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444th Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10mm Bind the sourcea nd sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = C1
3. Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n A1-f job/flume-netcat-logger.conf-Dflume.root.logger==INFO,console
Parameter description:
-- conf conf/ indicates that the configuration file is stored in the conf/ directory
-- name A1 means to name Agent A1.
-- conf-file job/flume-netcat.conf Flume the configuration file read during this startup is the flume-telnet.conf file under the job folder.
-Dflume.root.logger==INFO,console-D means that the flume.root.logger parameter attribute value is dynamically modified by Flume runtime, and the console log print level is set to INFO level
3.2 read local files to HDFS in real time
Requirements Analysis:
Monitor Hive logs in real time and upload them to HDFS
Implementation steps:
1. Create the Agent configuration file flume-file-hdfs.conf under the job folder.
[djm@hadoop102 job] $vim flume-file-hdfs.conf
2. Add the following:
# Name the components on this agenta2.sources = r2a2.sinks = k2a2.channels = clocked Describe/configure the sourcea2.sources.r2.type = execa2.sources.r2.command = tail-F / opt/module/hive/logs/hive.loga2.sources.r2.shell = / bin/bash-c # Describe the sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = the prefix a2.sinks of the file uploaded by hdfs://hadoop102:9000/flume/%Y%m%d/%H#. K2.hdfs.filePrefix = whether the logs-# scrolls the folder by time a2.sinks.k2.hdfs.round = true# how many time units to create a new folder a2.sinks.k2.hdfs.roundValue = "redefine the time unit a2.sinks.k2.hdfs.roundUnit = hour# whether to use the local timestamp a2.sinks.k2.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a2.sinks.k2.hdfs.batchSize = 100 sets the file type" Can support compression a2.sinks.k2.hdfs.fileType = DataStream# how often to generate a new file a2.sinks.k2.hdfs.rollInterval = 6 sets the scroll size of each file a2.sinks.k2.hdfs.rollSize = 13421770 the scrolling of the file is independent of the number of Event a2.sinks.k2.hdfs.rollCount = "Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 10" Bind the Source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
3. Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a2-f job/flume-file-hdfs.conf
Note:
If you want to read a file on a Linux system, you have to execute the command according to the rules of the Linux command. Because the Hive log is in the Linux system, the type of file to be read is chosen: exec means execute execution. Indicates that the Linux command is executed to read the file.
3.3 read directory files to HDFS in real time
Requirements Analysis:
Use Flume to listen for files in the entire directory
Implementation steps:
1. Create the Agent configuration file flume-dir-hdfs.conf under the job folder.
[djm@hadoop102 job] $vim flume-dir-hdfs.conf
2. Add the following:
A3.sources = r3a3.sinks = k3a3.channels = clocked Describe/configure the sourcea3.sources.r3.type = spooldira3.sources.r3.spoolDir = / opt/module/flume/uploada3.sources.r3.fileSuffix = .COMPLETEDa3.sources .r3.fileHeader = true# ignores all files ending in .tmp Do not upload a3.sources.r3.ignorePattern = ([^] *\ .tmp) # Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H# upload file prefix a3.sinks.k3.hdfs.filePrefix = upload-# whether to create a new file according to the time scroll folder a3.sinks.k3.hdfs.round = true# Folder a3.sinks.k3.hdfs.roundValue = redefine time unit a3.sinks.k3.hdfs.roundUnit = whether hour# uses a local timestamp a3.sinks.k3.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a3.sinks.k3.hdfs.batchSize = 10 sets the file type Support compression a3.sinks.k3.hdfs.fileType = DataStream# how often to generate a new file a3.sinks.k3.hdfs.rollInterval = 6 sets the scrolling size of each file is about 128Ma3.sinks.k3.hdfs.rollSize = 13421770 the scrolling size of the file is independent of the number of Event a3.sinks.k3.hdfs.rollCount = Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 1000a3.channels.c3.transactionCapacity = 100 # Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
3. Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a3-f job/flume-dir-hdfs.conf
Note:
Do not create and continuously modify files in the monitoring directory
3.4 single data source multi-exit case (selector)
Requirements Analysis:
Use Flume-1 to monitor file changes, and Flume-1 passes the changes to Flume-2
Flume-2 is responsible for storing to HDFS
At the same time, Flume-1 passes the changed content to Flume-3,Flume-3 and is responsible for outputting it to Local FileSystem.
1. Create the Agent configuration file flume-file-flume.conf under the group1 folder.
[djm@hadoop102 group1] $vim flume-file-flume.conf
2. Add the following:
# Name the components on this agenta1.sources = r1a1.sinks = K1 k2a1.channels = C1 cstores copies the data stream to all channela1.sources.r1.selector.type = replicating# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail-F / opt/module/hive/logs/hive.loga1.sources.r1.shell = / bin/bash-c # avro on the Describe the sink# sink side is a data sender a1.sinks.k1.type = avroa1.sinks .k1.hostname = hadoop102 a1.sinks.k1.port = 4141a1.sinks.k2.type = avroa1.sinks.k2.hostname = hadoop102a1.sinks.k2.port = 414 "Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memorya1.channels.c2.capacity = 1000a1.channels.c2.transactionCapacity = 10" Bind the source and sink to the channela1.sources.r1.channels = C1 c2a1.sinks.k1.channel = c1a1.sinks.k2.channel = c2
3. Create the Agent configuration file flume-flume-hdfs.conf under the group1 folder
[djm@hadoop102 group1] $vim flume-flume-hdfs.conf
4. Add the following:
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = avro on the Describe/configure the source# source side is a data receiving service a2.sources.r1.type = avroa2.sources.r1.bind = hadoop102a2.sources.r1.port = 414 upload Describe the sinka2.sinks.k1.type = hdfsa2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H# upload file prefix a2.sinks.k1.hdfs. FilePrefix = whether flume2-# scrolls folders by time a2.sinks.k1.hdfs.round = true# how many time units to create a new folder a2.sinks.k1.hdfs.roundValue = "redefine time units a2.sinks.k1.hdfs.roundUnit = whether hour# uses local timestamps a2.sinks.k1.hdfs.useLocalTimeStamp = how many Event is accumulated by true# before flush to HDFS once a2.sinks.k1.hdfs.batchSize = 10 sets the file type Can support compression a2.sinks.k1.hdfs.fileType = DataStream# how often to generate a new file a2.sinks.k1.hdfs.rollInterval = 60 sets the scrolling size of each file is about 128Ma2.sinks.k1.hdfs.rollSize = 13421770 the scrolling size of the file is independent of the number of Event a2.sinks.k1.hdfs.rollCount = "Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 10" Bind the source and Sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = C1
5. Create the Agent configuration file flume-flume-dir.conf under the group1 folder
[djm@hadoop102 group1] $vim flume-flume-dir.conf
6. Add the following:
# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = cased Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop102a3.sources.r1.port = 414 Describe the sinka3.sinks.k1.type = file_rolla3.sinks.k1.sink.directory = / opt/module/data/flume3# Describe the channela3.channels.c2.type = memorya3.channels.c2.capacity = 1000a3.channels.c2.transactionCapacity = 10 Bind the source and sink to the channela3.sources. R1.channels = c2a3.sinks.k1.channel = c2
7. Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a3-f job/group1/flume-flume-dir.conf [djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a 2-f job/group1/flume-flume-hdfs.conf [djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a 1-f job/group1/flume-file-flume.conf
Note:
Avro is a language-independent data serialization and RPC framework
The output local directory must be an existing directory. If the directory does not exist, a new directory will not be created.
You must first start the job where Sink exists
3.5 single data source multi-exit case (Sink group)
Requirements Analysis:
Using Flume-1 to monitor port data, Flume-1 passes changes to Flume-2
Flume-2 is responsible for displaying the data on the console.
At the same time, Flume-1 passes the changes to Flume-3,Flume-3 and is responsible for displaying the data on the console.
Implementation steps:
1. Create the Agent configuration file flume-netcat-flume.conf under the group2 folder.
2. Add the following:
# Name the components on this agenta1.sources = r1a1.channels = c1a1.sinkgroups = g1a1.sinks = K1KN Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444a1.sinkgroups.g1.processor.type = load_balancea1.sinkgroups.g1.processor.backoff = truea1.sinkgroups.g1.processor.selector = round_robina1.sinkgroups.g1.processor.selector.maxTimeOut=10000# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop102a1.sinks .k1.port = 4141a1.sinks.k2.type = avroa1.sinks.k2.hostname = hadoop102a1.sinks.k2.port = 414 percent Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinkgroups.g1.sinks = K1 k2a1.sinks.k1.channel = c1a1.sinks.k2.channel = C1
3. Create the Agent configuration file flume-flume-console1.conf under the group2 folder
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = centering Describe/configure the sourcea2.sources.r1.type = avroa2.sources.r1.bind = hadoop102a2.sources.r1.port = 414 Describe the sinka2.sinks.k1.type = logger# Describe the channela2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela2.sources.r1.channels = c1a2.sinks.k1.channel = C1
5. Create the Agent configuration file flume-flume-console2.conf under the group2 folder
6. Add the following:
# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = centering Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop102a3.sources.r1.port = 414 Describe the sinka3.sinks.k1.type = logger# Describe the channela3.channels.c2.type = memorya3.channels.c2.capacity = 1000a3.channels.c2.transactionCapacity = 10 percent Bind the source and sink to the channela3.sources.r1.channels = c2a3.sinks.k1.channel = c2
7. Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a3-f job/group2/flume-flume-console2.conf-Dflume.root.logger=INFO,console [djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a 2-f job/group2/flume-flume-console1.conf-Dflume.root.logger=INFO,console [djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a 1-f job/group2/flume-netcat-flume.conf3.6 multiple data Source Summary
Requirements Analysis:
Flume-1 Monitoring File / opt/module/group.log on hadoop103
Flume-2 on hadoop102 monitors data flow on a port
Flume-1 and Flume-2 send the data to Flume-3,Flume-3 on hadoop104 and print the final data to the console
Implementation steps:
1. Create the Agent configuration file flume1-logger-flume.conf under the group3 folder.
[djm@hadoop102 group3] $vim flume1-logger-flume.conf
2. Add the following:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = centering Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail-F / opt/module/group.loga1.sources.r1.shell = / bin/bash-c # Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = hadoop104a1.sinks.k1.port = 414 Describe the channela1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = C1
3. Create the Agent configuration file flume2-netcat-flume.conf under the group3 folder
[djm@hadoop102 group3] $vim flume2-netcat-flume.conf
4. Add the following:
# Name the components on this agenta2.sources = r1a2.sinks = k1a2.channels = centering Describe/configure the sourcea2.sources.r1.type = netcata2.sources.r1.bind = hadoop102a2.sources.r1.port = 4444 Describe the sinka2.sinks.k1.type = avroa2.sinks.k1.hostname = hadoop104a2.sinks.k1.port = 414 Use a channel which buffers events in memorya2.channels.c1.type = memorya2.channels.c1.capacity = 1000a2.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela2. Sources.r1.channels = c1a2.sinks.k1.channel = C1
5. Create the Agent configuration file flume3-flume-logger.conf under the group3 folder
[djm@hadoop102 group3] $vim flume3-flume-logger.conf
6. Add the following:
# Name the components on this agenta3.sources = r1a3.sinks = k1a3.channels = centering Describe/configure the sourcea3.sources.r1.type = avroa3.sources.r1.bind = hadoop104a3.sources.r1.port = 414 Describe the sink# Describe the sinka3.sinks.k1.type = logger# Describe the channela3.channels.c1.type = memorya3.channels.c1.capacity = 1000a3.channels.c1.transactionCapacity = 10 percent Bind the source and sink to the channela3.sources.r1.channels = c1a3.sinks.k1.channel = C1
7. Distribute configuration files
[djm@hadoop102 group3] $xsync / opt/module/flume/job
8. Start the task
[djm@hadoop104 flume] $bin/flume-ng agent-c conf/-n a3-f job/group3/flume3-flume-logger.conf-Dflume.root.logger=INFO,console [djm@hadoop102 flume] $bin/flume-ng agent-c conf/-n a 2-f job/group3/flume2-netcat-flume.conf [djm@hadoop103 flume] $bin/flume-ng agent-c conf/-n a 1-f job/group3/flume1-logger-flume.conf4Ganglia deployment
1. Install httpd service and php
Yum-y install httpd php
2. Install other dependencies
Yum-y install rrdtool perl-rrdtool rrdtool-devel
3. Install ganglia
Rpm-Uvh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpmyum-y install ganglia-gmetad ganglia-gmond ganglia-web
4. Modify ganglia configuration file
Vim / etc/httpd/conf.d/ganglia.conf## Ganglia monitoring system php web frontend#Alias / ganglia / usr/share/ganglia # Require local Require all granted # Require ip 10.1.2.3 # Require host example.org
Special note: the following configurations do not work
Order deny,allow Allow from all
5. Modify gmetad configuration file
Vim / etc/ganglia/gmetad.conf data_source "hadoop102" 192.168.1.102
6. Modify gmond configuration file
Vim / etc/ganglia/gmond.conf cluster {# name = "unspecified" name = "hadoop102" owner = "unspecified" latlong = "unspecified" url = "unspecified"} udp_send_channel {# bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address# that resolves to the machine's hostname. Without# this, the metrics may appear to come from any# interface and the DNS names associated with# those IPs will be used to create the RRDs. # mcast_join = 239.2.11.71 host = 192.168.10.102 port = 8649 ttl = 1} / * You can specify as many udp_recv_channels as you like as well. * / udp_recv_channel {# mcast_join = 239.2.11.71 port = 8649 # bind = 239.2.11.71 bind = 192.168.10.102 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really# should bump it up to e.g. 10MB or even higher.# buffer = 10485760}
6. View SELinux status
Sestatus
If it is not disabled, modify the following configuration file:
Vim / etc/selinux/config
Or temporarily shut down SELinux:
Setenforce 0
7. Start ganglia
Systemctl start httpdsystemctl start gmetad systemctl start gmond
8. Open a browser to access
Http://hadoop102/ganglia/
If there is still an insufficient permission error after completing the above operations, you can modify the permission attempt of the / var/lib/ganglia directory
Chmod-R 777 / var/lib/ganglia5 Custom Source
Requirements Analysis:
Coding implementation:
1. Introduce dependency
Org.apache.flume flume-ng-core 1.7.0
2. Code writing
Package com.djm.flume;import org.apache.flume.Context;import org.apache.flume.EventDeliveryException;import org.apache.flume.PollableSource;import org.apache.flume.conf.Configurable;import org.apache.flume.event.SimpleEvent;import org.apache.flume.source.AbstractSource;import java.util.HashMap;public class MySource extends AbstractSource implements Configurable, PollableSource {/ / define the field private Long delay; private String field to be read by the configuration file in the future / * receive the data, package the data into event one by one, and write channel * @ return * @ throws EventDeliveryException * / public Status process () throws EventDeliveryException {HashMap hearderMap = new HashMap (); SimpleEvent event = new SimpleEvent (); try {for (int I = 0; I < 5; iTunes +) {event.setHeaders (hearderMap) Event.setBody ((field + I) .getBytes ()); getChannelProcessor () .processEvent (event); Thread.sleep (delay);}} catch (InterruptedException e) {e.printStackTrace (); return Status.BACKOFF;} return Status.READY } public long getBackOffSleepIncrement () {return 0;} public long getMaxBackOffSleepInterval () {return 0;} / * read configuration file * @ param context * / public void configure (Context context) {delay = context.getLong ("delay"); field = context.getString ("field", "hello");}}
3. Packaging and testing
Use Maven to package and upload to the / opt/module/flume/lib directory
Create the Agent configuration file mysource.conf under the job folder
[djm@hadoop102 job] $vim mysource.conf
Add the following:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = centering Describe/configure the sourcea1.sources.r1.type = com.djm.flume.MySourcea1.sources.r1.delay = 1000a1.sources.r1.field = djm# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10cm Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = C1
Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-f job/mysource.conf-n A1-Dflume.root.logger=INFO,console6 custom Sink
Requirements Analysis:
Coding implementation:
1. Introduce dependency
Org.apache.flume flume-ng-core 1.7.0
2. Code writing
Package com.djm.flume;import org.apache.flume.*;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {private static final Logger LOG = LoggerFactory.getLogger (AbstractSink.class); private String prefix; private String suffix; @ Override public Status process () throws EventDeliveryException {Status status = null; Channel channel = getChannel () Transaction transaction = channel.getTransaction (); try {Event event; transaction.begin (); while ((event = channel.take ()) = = null) {Thread.sleep (200);} LOG.info (prefix + new String (event.getBody ()) + suffix); transaction.commit () Status = Status.READY;} catch (Throwable e) {transaction.rollback (); status = Status.BACKOFF; if (e instanceof Error) throw (Error) e;} finally {transaction.close ();} return status } @ Override public void configure (Context context) {prefix = context.getString ("prefix"); suffix = context.getString ("suffix");}}
3. Packaging and testing
Use Maven to package and upload to the / opt/module/flume/lib directory
Create the Agent configuration file mysource.conf under the job folder
[djm@hadoop102 job] $vim mysink.conf
Add the following:
# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = cased Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = localhosta1.sources.r1.port = 44444th Describe the sinka1.sinks.k1.type = com.djm.flume.MySinka1.sinks.k1.prefix = djm:a1.sinks.k1.suffix =: end# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10mm Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = C1
Start the task
[djm@hadoop102 flume] $bin/flume-ng agent-c conf/-f job/mysink.conf-n A1-Dflume.root.logger=INFO,console7Flume parameter tuning 7.1Source
Increasing the number of Source can increase the ability of Source to read data, for example, when a directory produces too many files, you need to split the file directory into multiple file directories, and configure many Source to ensure that Source has enough ability to obtain the newly generated data.
The batchSize parameter determines the number of Event shipped from Source to Channel in a batch. Properly increasing this parameter can improve the performance of Source when transporting Event to Channel.
7.2Channel
Channel performs best when Type chooses Memory Channel, but data may be lost if the Flume process dies unexpectedly.
When Type chooses File Channel, Channel has better fault tolerance, but its performance is worse than that of Memory Channel. When using File Channel, `dataDirs can improve performance by configuring directories under different disks.
The Capacity parameter determines the maximum number of Event that the Channel can hold, and the TransactionCapacity parameter determines the maximum number of Event entries written by the Source to the Channel and the maximum number of Event entries read from the Channel by the Sink. The TransactionCapacity needs to be greater than the batchSize parameter of Source and Sink.
7.3Sink
Increasing the number of Sink can increase the ability of Sink to consume Event, and the more Sink is not enough, too much Sink will occupy system resources, resulting in unnecessary waste of system resources.
The batchSize parameter determines the number of Event messages that Sink reads from Channel in a batch. Properly increasing this parameter can improve the performance of Sink moving out of Event from Channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.