In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article is about how flume installs and integrates kafka. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.
Flume
Communication between Flume agent (reference books)
Flume has built-in special RPC sink-source pairs to handle data transfer between agent. Source is the component responsible for receiving data to Flume Agent. Including Avro Source, Thrift source, HTTP Source, Spooling Directory Source, Syslog Source, Exec Source, JMS Source and so on. Channel is a buffer between source and sink, which is the key to ensure that data is not lost. Sink reads events from Channel, each sink can only read events from one Channel clock, and each sink must be configured with Channel, otherwise it will be removed from agent. Install flume download installation
Cd / data/
Wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
Tar axf apache-flume-1.8.0-bin.tar.gz
Cd apache-flume-1.8.0-bin
Modify environment variabl
Vim / etc/profile
# FLUSMexport FLUME_HOME=/data/apache-flume-1.8.0-binexport PATH=$PATH:$ {FLUME_HOME} / binexport HADOOP_HOME=/data/hadoop
Source / etc/profile
Modify the configuration file
Cd ${FLUME_HOME} / conf/
Cp flume-env.sh.template flume-env.sh
Modify flume-env.sh
Export JAVA_HOME=/usr/local/jdkexport JAVA_OPTS= "- Xms100m-Xmx2000m-Dcom.sun.management.jmxremote" export HADOOP_HOME=/data/hadoop
Verify installation
Flume-ng version
Using flume single-node agent to transmit information
Cd ${FLUME_HOME} / conf/
Add Profil
Vim avro.conf
# Name the components on this agentagent.sources = avroSrcagent.channels = avroChannel#Describe/configure the sourceagent.sources.avroSrc.type = netcatagent.sources.avroSrc.bind = localhostagent.sources.avroSrc.port = 62000#Describe the sinkagent.sinks.avroSink.type = logger#Use a channel that buffers events in memoryagent.channels.avroChannel.type = memoryagent.channels.avroChannel.capacity = 1000agent.channels.avroChannel.transactionCapacity = 100#Bind the source and sink to the channelagent.sinks = avroSinkagent.sources.avroSrc.channels = avroChannelagent.sinks.avroSink.channel = avroChannel
"# Test agent.sources.avroSrc.type with avro and report an error
# org.apache.avro.AvroRuntimeException: Excessively large list # allocation request detected: 1863125517 items! Connection # closed "
Run flume agent
Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/avro.conf-n agent-Dflume.root.logger=INFO,console
Use Telnet connection testing
Telnet localhost 6200
View
Exec monitors local files
Cd ${FLUME_HOME} / conf/
Add Profil
Vim exec.conf
# example.conf: a single-node Flume configuration#Name the components on this agentagentexec.sources = avroexecagentexec.sinks = sinkexecagentexec.channels = channelexec#Describe/configure the sources#Describe/configure the sourceagentexec.sources.avroexec.bind = localhostagentexec.sources.avroexec.port = 630000agentexec.sources.avroexec.type = execagentexec.sources.avroexec.command = tail-F / tmp/testexec.log#Describe the sinkagentexec.sinks.sinkexec.type = logger#Use a channel which buffers events in memoryagentexec.channels.channelexec.type = memoryagentexec.channels.channelexec.capacity = 100000agentexec. Channels.channelexec.transactionCapacity = 10000#Bind the source and sink to the channelagentexec.sources.avroexec.channels = channelexecagentexec.sinks.sinkexec.channel = channelexec
Run flume agent
Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/exec.conf-- name agentexec-Dflume.root.logger=INFO,console
test
Embarrassed, only got part of it (no solution for the time being)
Spooldir integrates kafka monitoring logs
Prerequisite: install kafka cluster
Cd ${FLUME_HOME} / conf/
Add Profil
Vim single_agent.conf
# agent name a1a1.sources = source1a1.channels = channel1a1.sinks = sink1#set source# "the test uses to put the data in the / tmp directory Note that the "a1.sources.source1.type = spooldira1.sources.source1.spoolDir=/tmp/spooldira11.sources.source1.fileHeader = false#set sinka1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092a1.sinks.sink1.topic= spooldir#set channel#" test is used to put the data in the / tmp directory Note the setting "a1.channels.channel1.type = filea1.channels.channel1.checkpointDir = / tmp/flume_data/checkpointa1.channels.channel1.dataDirs= / tmp/flume_data/data#binda1.sources.source1.channels = channel1a1.sinks.sink1.channel = channel1"
Create a file storage directory
Mkdir-pv / tmp/spooldirmkdir-pv / tmp/flume_data/checkpointmkdir-pv / tmp/flume_data/data
(all nodes) start the kafka cluster
Kafka-server-start.sh / data/kafka_2.11-1.0.0/config/server.properties
Create a topic for kafka
Kafka-topics.sh-zookeeper master:2181,slave1:2181,slave2:2181-create-topic spooldir-replication-factor 1-partitions 3
View topic
Kafka-topics.sh-list-zookeeper master:2181,slave1:2181,slave2:2181
Create a consumer for kafka
Kafka-console-consumer.sh-zookeeper master:2181,slave1:2181,slave2:2181-topic spooldir-from-beginning
(new window) start agent of flume
Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/single_agent.conf-- name A1-Dflume.root.logger=INFO,console
Write test
[root@master conf] # echo "hello, test flume spooldir source" > > / tmp/spooldir/spool.txt
Flume-ng information
Kafka information
Write log information to hbase
Prerequisite: install hbase cluster
Cd ${FLUME_HOME} / conf/
Mkdir hbase & & cd hbase
To add a configuration file, you need two agent sides
Hbase-back.conf is used to collect local data, and hbase-front.conf is used to write data to hbase
Vim hbase-back.conf
Agent.sources = backsrcagent.channels=memoryChannelagent.sinks = remotesink#Describe the sourcesagent.sources.backsrc.type = execagent.sources.backsrc.command = tail-F / tmp/test/data/data.txtagent.sources.backsrc.checkperiodic = 1000agent.sources.backsrc.channels=memoryChannel#Describe the channelsagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.keep-alive = 30agent.channels.memoryChannel.capacity = 1000agent.channels.memoryChannel.transactionCapacity = 1000#Describe the sinksagent.sinks.remotesink.type = avroagent.sinks.remotesink.hostname = masteragent.sinks.remotesink.port = 9999agent.sinks.remotesink.channel = memoryChannel
Vim hbase-front.conf
Agent.sources = frontsrcagent.channels = memoryChannelagent.sinks = fileSink#Describe the sourcesagent.sources.frontsrc.type = avroagent.sources.frontsrc.bind = masteragent.sources.frontsrc.port = 9999agent.sources.frontsrc.channels = memoryChannel#Describe the channelsagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.keep-alive = 30agent.channels.memoryChannel.capacity = 1000agent.channels.memoryChannel.transactionCapacity = 1000#Describe the sinksagent.sinks.fileSink.type = hbaseagent.sinks.fileSink.channel=memoryChannelagent.sinks.fileSink.table = access_logagent.sinks.fileSink.columnFamily = tagent.sinks.fileSink.batchSize= 50agent.sinks.fileSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializeragent.sinks.fileSink.zookeeperQuorum = master:2181 Slave1:2181,slave2:2181agent.sinks.fileSink.znodeParent = / hbaseagent.sinks.fileSink.timeout = 90000
Create local files and directories
Mkdir-pv / tmp/test/data & & touch / tmp/test/data/data.txt
Create a table in hbase
Hbase shell
Create a tabl
Create 'access_log','t'
View
List
Start back agent
Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/hbase/hbase-back.conf-- name agent-Dflume.root.logger=INFO,console
An error will be reported after startup.
18-01-22 22:29:28 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: 192.168.3.58, port: 9999
Org.apache.flume.FlumeException: NettyAvroRpcClient {host: master, port: 9999}: RPC connection error
This is because the avro connection is not completed. Now only the sink side is started, not the source side. After starting the front, the connection will be displayed.
Start front agent
Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/hbase/hbase-front.conf-- name agent-Dflume.root.logger=INFO,console
Append content to the local file, and then view it in hbase
Echo "hello, test flush to hbase" > > / tmp/test/data/data.txt
The log will not be printed by two agent during write
View data in hbase
Hbase shellscan "access_log"
There will be a certain delay for flume to write logs to hbase.
Write log to hadoop
The principle is the same as writing hbase. If you understand the hbase writing process, you can easily understand writing to other services. Please refer to the official documentation for detailed configuration.
Prerequisite: install hadoop cluster
Cd ${FLUME_HOME} / conf/
Mkdir hdfs & & cd hdfs
To add a configuration file, you need two agent sides
Hadoop-back.conf is used to collect local data, and hadoop-front.conf is used to write data to hadoop
Vim hadoop-back.conf
# Namethe componentshadoop.sources= backsrchadoop.sinks= fileSinkhadoop.channels= memoryChannel#Sourcehadoop.sources.backsrc.type= spooldirhadoop.sources.backsrc.spoolDir= / tmp/data/hadoophadoop.sources.backsrc.channels= memoryChannelhadoop.sources.backsrc.fileHeader = true#Channelhadoop.channels.memoryChannel.type= memoryhadoop.channels.memoryChannel.keep-alive = 30hadoop.channels.memoryChannel.capacity = 1000hadoop.channels.memoryChannel.transactionCapacity = 1000 Sinkhadoop.sinks.fileSink.type = avrohadoop.sinks.fileSink.hostname= masterhadoop.sinks.fileSink.port= 10000hadoop.sinks.fileSink.channel = memoryChannel
Vim hadoop-front.conf
# Namethe componentshadoop.sources= frontsrchadoop.channels= memoryChannelhadoop.sinks= remotesink#Sourcehadoop.sources.frontsrc.type= avrohadoop.sources.frontsrc.bind= masterhadoop.sources.frontsrc.port= 10000hadoop.sources.frontsrc.channels = memoryChannel#Channelhadoop.channels.memoryChannel.type= memoryhadoop.channels.memoryChannel.keep-alive = 30hadoop.channels.memoryChannel.capacity = 1000hadoop.channels.memoryChannel.transactionCapacity = 1000cubic Sinkhadoop.sinks.remotesink.type = hdfshadoop.sinks.remotesink.hdfs.path=hdfs://master/flumehadoop.sinks.remotesink.hdfs.rollInterval = 0hadoop.sinks.remotesink.hdfs.idleTimeout = 10000hadoop. Sinks.remotesink.hdfs.fileType= DataStreamhadoop.sinks.remotesink.hdfs.writeFormat= Texthadoop.sinks.remotesink.hdfs.threadsPoolSize = 20hadoop.sinks.remotesink.channel = memoryChannel
Create a local directory and modify permissions
Mkdir-pv / tmp/data/hadoop & & chmod-R 777 / tmp/data/
Create directories in hdfs and modify permissions
Hadoop fs-mkdir / flumehadoop fs-chmod 777 / flumehadoop fs-ls /
Write a file to the local directory
Echo "hello, test hadoop" > > / tmp/data/hadoop/hadoop.logecho "hello, test flume" > > / tmp/data/hadoop/flume.logecho "hello, test helloworld" > / tmp/data/hadoop/helloworld.log
View files and file information in hdfs
Hadoop fs-ls / flumehadoop fs-cat / flume/FlumeData.1516634328510.tmp
Thank you for reading! This is the end of the article on "how flume installs and integrates kafka". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.