Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Flume note arrangement

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/03 Report--

[TOC]

About Flume Flume is a distributed, reliable, and highly available mass log collection, aggregation and transmission system. Support customizing various data senders in the log system for data collection; at the same time, Flume provides the ability to simply process the data and write to various data recipients (such as text, HDFS, Hbase, etc.). Noun introduction: Flume OG:Flume original generation, that is, Flume0.9x version Flume NG:Flume next generation, that is, Flume1.x version official website: http://flume.apache.orgFlume architecture

1. Flume has a simple and flexible flow-based data flow structure. 2. Flume has a failover mechanism and a load balancing mechanism. 3. Flume uses a simple and extensible data model (source, channel, sink). There are two ways for flume-ng to deal with data: avro-client, agentavro-client: one-time transfer of data to the specified avro service client agent: a continuous data transmission service Agent main components include: Source, Channel, SinkSource: complete the log data of the mobile phone, divided into transtion and event into the channel. Channel: mainly provides a queue function for simple caching of data provided by source. Sink: take the data from Channel, store it in the file system, database, or submit it to a remote server. The unit in which data is transmitted in the component is Event. The basic component of Flume Source source means source, source. Main function: collect various types of data from the outside world and transfer the data to Channel. For example: monitor a file as soon as you add data, immediately collect new data, monitor a directory once a new file is generated, collect the contents of a new file, monitor a port, and so on. Common types of data collected: Exec Source, Avro Source, NetCat Source, Spooling Directory Source, etc. Detailed view: http://flume.apache.org/FlumeUserGuide.html#flume-sources or self-contained document view. Specific functions of Source: AvroSource: monitor an avro service port to collect Avro data serialized data; Thrift Source: monitor a Thrift service port to collect Thrift data serialized data; Exec Source: command based on Unix collects data on standard output; tail-F and tail-f difference. Based on the problem of whether a file can be read when cutting a file by log4j. JMS Source:Java message service data source, Java message service is a platform-independent API, which supports jms specification data source collection; Spooling Directory Source: through the new files in the folder as the data source collection; Kafka Source: collect data from the kafka service. NetCat Source: bound port (tcp, udp), input each line of text data through the port as Event to HTTP Source: listen to the data generated by HTTP POST and GET to collect ChannelChannel a data storage pool, intermediate channel. The main function is to accept the data sent out by source and transmit it to the destination specified by sink. The data in the Channel will not be deleted until it enters the next channel or the terminal. When a sink write fails, it can be rewritten automatically without data loss, so it is reliable. There are many types of channel, such as in-memory, jdbc data source, file storage, and so on. Common types of data collected: Memory Channel File Channel Spillable Memory Channel and other details: http://flume.apache.org/FlumeUserGuide.html#flume-channelsChannel specific role: Memory Channel: use memory as data storage. Fast File Channel: use files as data storage. Safe and reliable Spillable Memory Channel: use memory and files as data storage, that is, first store in memory, and then flush to files if the data in memory reaches the threshold. JDBC Channel: use jdbc data sources as data storage. Kafka Channel: use kafka services as data storage. SinkSink: the final destination of the data. Main function: accept the data written by channel to be displayed (or stored or displayed) in the specified form. Sink takes many forms, such as printing to the console, hdfs, avro services, and medium files. Common types of data collected: HDFS Sink Hive Sink Logger Sink Avro Sink Thrift Sink File Roll Sink HBaseSink Kafka Sink and other details: http://flume.apache.org/FlumeUserGuide.html#flume-sinksHDFSSink requires configuration files and class libraries for hdfs. Generally, multiple sink are converged into a collection machine to be responsible for pushing to hdfs. The specific role of Sink: Logger Sink: the data is processed as a log (displayed according to the level of the log set in flume). HDFS Sink: transfer the data to the hdfs cluster. Avro Sink: the data is converted to Avro Event and then sent to the specified service port. Thrift Sink: the data is converted to Thrift Event and then sent to the specified service port. File Roll Sink: data is transferred to a local file. Hive Sink: transfers data to a table in hive. IRC Sink: data is sent to the specified IRC service and port. Null Sink: cancels the transmission of data, that is, it is not sent to any destination. HBaseSink: sends the data to the hbase database. MorphlineSolrSink: the data is sent to the Solr search server (cluster). ElasticSearchSink: the data is sent to the ElasticSearch search server (cluster). Kafka Sink: sends data to the kafka service. (note the dependency class library) the Eventevent is the basic unit of data transmitted by Flume NG and the basic unit of transactions. In a text file, usually one line of record is an event. In the network message transmission system, a message is an event. There are header in event and header types in bodyEvent: Map We can customize the key:value of header in source and use header in some channel and sink. Exercise 1: a requirement: how to monitor the increase of data in a file in real time? Print to the console. What if the amount of this file increases a lot? Avro clientavro client: send the source file or folder that the local machine wants to listen for sending to the corresponding hostname: port of the specified receiver. Bin/flume-ng avro-client-- conf conf/-H master-p 41414-F / opt/logs/access.log needs to provide avro-source Note:-- headerFile option: append header information and separate files with spaces. Bin/flume-ng avro-client-- conf conf/-- host slave01-- port 41414-- filename / opt/logs/access.log-- headerFile / opt/logs/kv.log if-- dirname is specified. The files in this folder will be suffixed with fileSuffix after transfer. Exercise 02: what's new in the monitoring file sends data to the source of another machine. What should I do with it? Flume installation system requirements: 1, JRE:JDK1.6+ (recommended to use 1.7) 2, memory: no upper and lower limits, can be configured to meet source, channel and sink 3, disk space: the same 24, directory permissions: general agent operation directory must have read and write access here the Flume version is 1.8.0, is also the latest version Download address: http://archive.apache.org/dist/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz installation step: extract: [uplooking@uplooking01 ~] $tar-zxvf soft/apache-flume-1.8.0-bin.tar.gz-C app/ rename: [uplooking@uplooking01 ~] $mv app/apache-flume-1.8.0-bin/ app/flume add To the environment variable vim ~ / .bash_profile export FLUME_HOME=/home/uplooking/app/flume export PATH=$PATH:$FLUME_HOME/bin modify configuration file conf] # cp flume-env.sh.template flume-env.sh add JAVA_HOME export JAVA_HOME=/opt/jdkFlume Agent case listening network port data

Define the flume agent profile:

# this's flume log purpose is listenning a socket port which product## data of stream## this agent is consists of source which is r1, sinks which is k1 # # channel which is C1 instance # # where A1 is the name of an instance agent of flume # defines the name of the current agent as a1a1.sources = R1 # # defines the name of the current instance The sources component in agent is called r1a1.sinks = K1 # # define the sinks component in the agent as k1a1.channels = censor # and determine the way in which the channels component in the agent is called clisten to the data source Here, the type of listening network port a1.sources.r1.type = netcat # source is network byte flow a1.sources.r1.bind = uplooking01 # source listening network hostnamea1.sources.r1.port = 52019 # source listening network port# collected data sinking (landing) mode through log a1.sinks.k1.type = logger # sink type is logger log mode, log4j levels are INFO, Console, file. # describe the part of channel that uses memory as temporary storage of data a1.channels.c1.type = memory # channel type uses memory for data caching This is the most common type of channela1.channels.c1.capacity = 1000 # defines the capacity of channel pairs a1.channels.c1.transactionCapacity = 100 # defines the maximum transaction capacity of channel # connects source and sink using channel # needs to connect source and sink using channel to form a similar pipeline a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

Start flume agent:

Flume-ng agent-c conf-n a1murf conf/flume-nc.conf-Dflume.root.logger=INFO,console-c conf: use the configuration file-n A1: specify the name of the agent as a1murf: specify the configuration file because the data is landed through the log, so you need to specify the relevant configuration options for the log.

Send data to the port via telnet or nc

Install telnet or nc:

Yum isntall-y telentyum install-y nc

Send data to the port:

# use telnet [telephone oking @ uplooking01 ~] $telnet uplooking01 52019Trying 192.168.43.101...Connected to uplooking01.Escape character is'^] '.wo ai niOKsai bei de xueOK# use NC [telephone oking @ uplooking01 ~] $nc uplooking01 52019heiheiOKxpleafOK

At this point, you can view the output of the flume agent startup terminal:

2018-03-24 20 org.apache.flume.source.NetcatSource.start 09NetcatSource.java:166 34390 (lifecycleSupervisor-1-0) [INFO-org.apache.flume.source.NetcatSource.start (NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl [/ 192.168.43.101 org.apache.flume.source.NetcatSource.start 52019] 2018-03-24 20 20 Created serverSocket:sun.nio.ch.ServerSocketChannelImpl 13022 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 77 6F 20 61 69 20 6E 69 0D wo ai ni. } 2018-03-24 20 org.apache.flume.sink.LoggerSink.process 10 SinkRunner-PollingRunner-DefaultSinkProcessor (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 73 61 69 20 62 65 69 20 64 65 20 78 75 65 0D sai bei de xue. } 2018-03-24 20 org.apache.flume.sink.LoggerSink.process 13 LoggerSink.java:95 26190 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 68 65 69 68 65 69 heihei} 2018-03-24 20 20 purse 1314 663 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 78 70 6C 65 61 66 xpleaf} 2018-03-24 20 SinkRunner-PollingRunner-DefaultSinkProcessor 17V 01694 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 68 65 6C 6F hello} listen for new files in the directory

The configuration file is as follows:

# listen for new files in the directory # # this agent is consists of source which is R1, sinks which is K1 # # channel which is C1 listen # # where A1 is the name of an instance agent of flume # a1.sources = r1a1.sinks = k1a1.channels = listening to the data source Here, we use the new file a1.sources.r1.type = spooldira1.sources.r1.spoolDir = / home/uplooking/data/flumea1.sources.r1.fileSuffix = .ok # a1.sources.r1.deletePolicy = immediatea1.sources.r1.deletePolicy = nevera1.sources.r1.fileHeader = true# in the listening directory to describe the part of channel through the log a1.sinks.k1.type = logger# Temporary storage of data using memory a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 10 connecting source and sink using channel a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

Start flume agent:

Flume-ng agent-c conf-n A1-f conf/flume-dir.conf-Dflume.root.logger=INFO,console

Add a file to the listening directory as follows:

Hello youhello hehello me

You can see the flume agent terminal output:

2018-03-24 21 org.apache.flume.sink.LoggerSink.process 23 org.apache.flume.sink.LoggerSink.process 59182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 79 6F 75 hello you} 2018-03-24 21 21 org.apache.flume.sink.LoggerSink.process 23 LoggerSink.java:95 59182 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink. Java:95)] Event: {headers: {file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 68 65 hello he} 2018-03-24 21 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {file=/home/uplooking/data/flume/hello.txt} body: 68 65 6C 6C 6F 20 6D 65 hello me} 2018-03-24 21 INFO-org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents (ReliableSpoolingFileEventReader.java:324)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.2018-03-24 21 21 pool-3-thread-1) [INFO-org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile (ReliableSpoolingFileEventReader.java:433)] Preparing to move file / home/uplooking/data/flume/hello.txt to / home/uplooking/data/flume/hello.txt.ok

You can see the prompt that the original text file has been renamed .ok to view the files in the data directory:

[uplooking@uplooking01 flume] $lshello.txt.ok listens for new data in the file

Description of tail-f and tail-F:

In a production environment, to prevent the log file from being too large, a new log file is usually generated every day, which is achieved by renaming the original log file and then touch an original log file. Http-xxx.log http-xxx.log.2017-03-15 http-xxx.log.2017-03-16-f does not monitor split files, while-F continues to monitor.

Configuration file:

# New data in listening file # this agent is consists of source which is R1, sinks which is K1 # # channel which is C1 listen # # where A1 is the name of an instance agent of flume # a1.sources = r1a1.sinks = k1a1.channels = listening to the data source Here, the new data in the listening file a1.sources.r1.type = execa1.sources.r1.command = tail-F / the sinking (landing) mode of the data collected by home/uplooking/data/flume/http-flume.log# describes the part of channel by log a1.sinks.k1.type = logger# Use memory as temporary storage for data a1.channels.c1.type = memorya1.channels.c1.capacity = 10000000a1.channels.c1.transactionCapacity = 1000000 connect source and sink using channel a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

Start flume agent:

Flume-ng agent-c conf-n A1-f conf/flume-data.conf-Dflume.root.logger=INFO,console

Add data to the listening file:

Cat hello.txt.ok > http-flume.log

View the output of the flume agent terminal:

2018-03-25 01 Event 28LoggerSink.java:95 39359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 68 65 6C 6C 6F 20 79 6F 75 hello you} 2018-03-25 01 Vera 2840 465 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} Body: 68 65 6C 6C 6F 20 68 65 hello he} 2018-03-25 01 SinkRunner-PollingRunner-DefaultSinkProcessor 28 INFO-org.apache.flume.sink.LoggerSink.process (LoggerSink.java:95)] Event: {headers: {} body: 68 65 6C 6C 6F 20 6D 65 hello me} memory overflow caused by excessive data and its solution

Use the jps-v command to view the amount of memory allocated when flume is started:

20837 Application-Xmx20m-Dflume.root.logger=INFO,console-Djava.library.path=:/home/uplooking/app/hadoop/lib/native:/home/uplooking/app/hadoop/lib/native

You can see that the maximum memory allocated is 20m, because we are using to save the data in channel to memory, so once the listening text data is too large, it will cause memory overflow. First, use the following script to generate a larger text data:

For i in `seq 1 10000000`do echo "${I} I like bigdata, I would like to do something with bigdata." > / home/uplooking/data/mr/bigData.logdone

Then type the data into the listening log:

Cat bigData.log >.. / flume/http-flume.log

You can see the exception in the flume agent terminal:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange (Arrays.java:3664) at java.lang.String. (String.java:207) at java.lang.StringBuilder.toString (StringBuilder.java:407) at sun.net.www.protocol.jar.Handler.parseContextSpec (Handler.java:207) at sun.net.www.protocol.jar.Handler.parseURL (Handler .java: 153) at java.net.URL. (URL.java:622) at java.net.URL. (URL.java:490) Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "SinkRunner-PollingRunner-DefaultSinkProcessor"

Solution:

By adjusting the section describing channel in # and using memory as the temporary storage of data a1.channels.c1.type = memorya1.channels.c1.capacity = 10000000a1.channels.c1.transactionCapacity = 1000000 execute the new records in the case monitoring log file, operate the exception java.lang.OutOfMemoryError: GC overhead limit exceeded Abbreviated as OOM/OOME two solutions: the first scheme: to increase the memory storage capacity of the flume program, the default value is-Xmx20m (maximum heap memory size),->-Xmx 2000m-Xms10m (initial heap memory size) flume-ng agent-Xms1000m-Xmx1000m-c conf-n A1-f conf/flume-data.conf-Dflume.root.logger=INFO,console the second scheme: when the first can not be handled For example, if there is not enough memory available on the machine, use other channel solutions such as disk files, such as jdbc

If the text data is not very large, it can also be solved with the first scheme, but once the text data is too large, the first scheme needs to allocate a lot of memory space, so the second scheme is demonstrated below.

The configuration file is as follows:

# listen for new data in the file # # use the file as channel## this agent is consists of source which is R1, sinks which is K1 # # channel which is C1 listen # # where A1 is the name of an instance agent of flume # a1.sources = r1a1.sinks = k1a1.channels = listening to the data source Here, the new data in the listening file a1.sources.r1.type = execa1.sources.r1.command = tail-F / the sinking (landing) mode of the data collected by home/uplooking/data/flume/http-flume.log# describes the part of channel by log a1.sinks.k1.type = logger# Temporary storage of data using memory a1.channels.c1.type = filea1.channels.c1.checkpointDir = / home/uplooking/data/flume/checkpointa1.channels.c1.transactionCapacity = 1000000a1.channels.c1.dataDirs = / home/uplooking/data/flume/data# uses channel to connect source and sink a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

Note that you need to create the following two directories:

/ home/uplooking/data/flume/checkpoint # store checkpoint data / home/uplooking/data/flume/data # store channel data

In this way, if you type the data into the listening file, you will see constantly brushing the data in the terminal.

Hdfs directory of flume data sinking

You can finally save the data in channel to hdfs, and the configuration file is as follows:

# listen for new data in the file # # use the file as channel## this agent is consists of source which is R1, sinks which is K1 # # channel which is C1 listen # # where A1 is the name of an instance agent of flume # a1.sources = r1a1.sinks = k1a1.channels = listening to the data source Here, the sinking (landing) of the data collected by listening network port a1.sources.r1.type = netcata1.sources.r1.bind = uplooking01a1.sources.r1.port = 520 is stored to a certain path of hdfs a1.sinks.k1.type = hdfsa1.sinks.k1.hdfs.path = hdfs://ns1/input/flume/%Y/%m/%d# file generated prefix a1.sinks.k1.hdfs.filePrefix = http# file generated suffix For example, http.1521927418991.loga1.sinks.k1.hdfs.fileSuffix = .log # file prefix a1.sinks.k1.hdfs.inUsePrefix = xttzm.# file suffix, such as xttzm.http.1521927418992.log.zdhma1.sinks.k1.hdfs.inUseSuffix = .zdhma1.sinks.k1.hdfs.rollInterval = 0a1.sinks.k1.hdfs.rollSize = 0a1.sinks.k1.hdfs.rollCount = 5a1.sinks.k1.hdfs.useLocalTimeStamp = true# defaults to SequenceFile When viewing files on hdfs, you need to configure the serialized a1.sinks.k1.hdfs.fileType = DataStream# above, and this also needs to be configured. The format of the written data is text content a1.sinks.k1.hdfs.writeFormat = Text#. If the configuration option below is not added, then rollInterval rollSize rollCount is the part describing the channel that will not take effect. Use files as temporary storage for data a1.channels.c1.type = filea1.channels.c1.checkpointDir = / home/uplooking/data/flume/checkpointa1.channels.c1.transactionCapacity = 1000000a1.channels.c1.dataDirs = / home/uplooking/data/flume/data# use channel to connect source and sink a1.sources.r1.channels = c1a1.sinks.k1.channel = C1

Start flume agent:

Flume-ng agent-c conf-n A1-f conf/flume-hdfs-sink.conf-Dflume.root.logger=INFO,console

Send data over nc:

$nc uplooking01 520191OK2OK3OK.12OK13OK14OK15OK16OK

In this way, three official documents will be generated in the hdfs directory, as well as a temporary file:

$hdfs dfs-ls / input/flume/2018/03/25/ Found 4 items-rw-r--r-- 3 uplooking supergroup 10 2018-03-25 06:00 / input/flume/2018/03/25/http.1521928799720.log-rw-r--r-- 3 uplooking supergroup 11 2018-03-25 06:00 / input/flume/2018/03/25/http.1521928799721.log-rw-r--r-- 3 uplooking supergroup 15 2018-03-25 06:00 / input/flume/2018/03/25/http.1521928799722.log-rw-r--r-- 3 uplooking supergroup 3 2018-03-25 06:00 / input/flume/2018/03/25/xttzm.http.1521928799723.log.zdhm

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 209

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report