Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How flume installs and integrates kafka

2025-03-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article is about how flume installs and integrates kafka. The editor thinks it is very practical, so share it with you as a reference and follow the editor to have a look.

Flume

Communication between Flume agent (reference books)

Flume has built-in special RPC sink-source pairs to handle data transfer between agent. Source is the component responsible for receiving data to Flume Agent. Including Avro Source, Thrift source, HTTP Source, Spooling Directory Source, Syslog Source, Exec Source, JMS Source and so on. Channel is a buffer between source and sink, which is the key to ensure that data is not lost. Sink reads events from Channel, each sink can only read events from one Channel clock, and each sink must be configured with Channel, otherwise it will be removed from agent. Install flume download installation

Cd / data/

Wget http://mirrors.hust.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz

Tar axf apache-flume-1.8.0-bin.tar.gz

Cd apache-flume-1.8.0-bin

Modify environment variabl

Vim / etc/profile

# FLUSMexport FLUME_HOME=/data/apache-flume-1.8.0-binexport PATH=$PATH:$ {FLUME_HOME} / binexport HADOOP_HOME=/data/hadoop

Source / etc/profile

Modify the configuration file

Cd ${FLUME_HOME} / conf/

Cp flume-env.sh.template flume-env.sh

Modify flume-env.sh

Export JAVA_HOME=/usr/local/jdkexport JAVA_OPTS= "- Xms100m-Xmx2000m-Dcom.sun.management.jmxremote" export HADOOP_HOME=/data/hadoop

Verify installation

Flume-ng version

Using flume single-node agent to transmit information

Cd ${FLUME_HOME} / conf/

Add Profil

Vim avro.conf

# Name the components on this agentagent.sources = avroSrcagent.channels = avroChannel#Describe/configure the sourceagent.sources.avroSrc.type = netcatagent.sources.avroSrc.bind = localhostagent.sources.avroSrc.port = 62000#Describe the sinkagent.sinks.avroSink.type = logger#Use a channel that buffers events in memoryagent.channels.avroChannel.type = memoryagent.channels.avroChannel.capacity = 1000agent.channels.avroChannel.transactionCapacity = 100#Bind the source and sink to the channelagent.sinks = avroSinkagent.sources.avroSrc.channels = avroChannelagent.sinks.avroSink.channel = avroChannel

"# Test agent.sources.avroSrc.type with avro and report an error

# org.apache.avro.AvroRuntimeException: Excessively large list # allocation request detected: 1863125517 items! Connection # closed "

Run flume agent

Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/avro.conf-n agent-Dflume.root.logger=INFO,console

Use Telnet connection testing

Telnet localhost 6200

View

Exec monitors local files

Cd ${FLUME_HOME} / conf/

Add Profil

Vim exec.conf

# example.conf: a single-node Flume configuration#Name the components on this agentagentexec.sources = avroexecagentexec.sinks = sinkexecagentexec.channels = channelexec#Describe/configure the sources#Describe/configure the sourceagentexec.sources.avroexec.bind = localhostagentexec.sources.avroexec.port = 630000agentexec.sources.avroexec.type = execagentexec.sources.avroexec.command = tail-F / tmp/testexec.log#Describe the sinkagentexec.sinks.sinkexec.type = logger#Use a channel which buffers events in memoryagentexec.channels.channelexec.type = memoryagentexec.channels.channelexec.capacity = 100000agentexec. Channels.channelexec.transactionCapacity = 10000#Bind the source and sink to the channelagentexec.sources.avroexec.channels = channelexecagentexec.sinks.sinkexec.channel = channelexec

Run flume agent

Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/exec.conf-- name agentexec-Dflume.root.logger=INFO,console

test

Embarrassed, only got part of it (no solution for the time being)

Spooldir integrates kafka monitoring logs

Prerequisite: install kafka cluster

Cd ${FLUME_HOME} / conf/

Add Profil

Vim single_agent.conf

# agent name a1a1.sources = source1a1.channels = channel1a1.sinks = sink1#set source# "the test uses to put the data in the / tmp directory Note that the "a1.sources.source1.type = spooldira1.sources.source1.spoolDir=/tmp/spooldira11.sources.source1.fileHeader = false#set sinka1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.sink1.kafka.bootstrap.servers = master:9092,slave1:9092,slave2:9092a1.sinks.sink1.topic= spooldir#set channel#" test is used to put the data in the / tmp directory Note the setting "a1.channels.channel1.type = filea1.channels.channel1.checkpointDir = / tmp/flume_data/checkpointa1.channels.channel1.dataDirs= / tmp/flume_data/data#binda1.sources.source1.channels = channel1a1.sinks.sink1.channel = channel1"

Create a file storage directory

Mkdir-pv / tmp/spooldirmkdir-pv / tmp/flume_data/checkpointmkdir-pv / tmp/flume_data/data

(all nodes) start the kafka cluster

Kafka-server-start.sh / data/kafka_2.11-1.0.0/config/server.properties

Create a topic for kafka

Kafka-topics.sh-zookeeper master:2181,slave1:2181,slave2:2181-create-topic spooldir-replication-factor 1-partitions 3

View topic

Kafka-topics.sh-list-zookeeper master:2181,slave1:2181,slave2:2181

Create a consumer for kafka

Kafka-console-consumer.sh-zookeeper master:2181,slave1:2181,slave2:2181-topic spooldir-from-beginning

(new window) start agent of flume

Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/single_agent.conf-- name A1-Dflume.root.logger=INFO,console

Write test

[root@master conf] # echo "hello, test flume spooldir source" > > / tmp/spooldir/spool.txt

Flume-ng information

Kafka information

Write log information to hbase

Prerequisite: install hbase cluster

Cd ${FLUME_HOME} / conf/

Mkdir hbase & & cd hbase

To add a configuration file, you need two agent sides

Hbase-back.conf is used to collect local data, and hbase-front.conf is used to write data to hbase

Vim hbase-back.conf

Agent.sources = backsrcagent.channels=memoryChannelagent.sinks = remotesink#Describe the sourcesagent.sources.backsrc.type = execagent.sources.backsrc.command = tail-F / tmp/test/data/data.txtagent.sources.backsrc.checkperiodic = 1000agent.sources.backsrc.channels=memoryChannel#Describe the channelsagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.keep-alive = 30agent.channels.memoryChannel.capacity = 1000agent.channels.memoryChannel.transactionCapacity = 1000#Describe the sinksagent.sinks.remotesink.type = avroagent.sinks.remotesink.hostname = masteragent.sinks.remotesink.port = 9999agent.sinks.remotesink.channel = memoryChannel

Vim hbase-front.conf

Agent.sources = frontsrcagent.channels = memoryChannelagent.sinks = fileSink#Describe the sourcesagent.sources.frontsrc.type = avroagent.sources.frontsrc.bind = masteragent.sources.frontsrc.port = 9999agent.sources.frontsrc.channels = memoryChannel#Describe the channelsagent.channels.memoryChannel.type = memoryagent.channels.memoryChannel.keep-alive = 30agent.channels.memoryChannel.capacity = 1000agent.channels.memoryChannel.transactionCapacity = 1000#Describe the sinksagent.sinks.fileSink.type = hbaseagent.sinks.fileSink.channel=memoryChannelagent.sinks.fileSink.table = access_logagent.sinks.fileSink.columnFamily = tagent.sinks.fileSink.batchSize= 50agent.sinks.fileSink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializeragent.sinks.fileSink.zookeeperQuorum = master:2181 Slave1:2181,slave2:2181agent.sinks.fileSink.znodeParent = / hbaseagent.sinks.fileSink.timeout = 90000

Create local files and directories

Mkdir-pv / tmp/test/data & & touch / tmp/test/data/data.txt

Create a table in hbase

Hbase shell

Create a tabl

Create 'access_log','t'

View

List

Start back agent

Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/hbase/hbase-back.conf-- name agent-Dflume.root.logger=INFO,console

An error will be reported after startup.

18-01-22 22:29:28 WARN sink.AbstractRpcSink: Unable to create Rpc client using hostname: 192.168.3.58, port: 9999

Org.apache.flume.FlumeException: NettyAvroRpcClient {host: master, port: 9999}: RPC connection error

This is because the avro connection is not completed. Now only the sink side is started, not the source side. After starting the front, the connection will be displayed.

Start front agent

Flume-ng agent-f / data/apache-flume-1.8.0-bin/conf/hbase/hbase-front.conf-- name agent-Dflume.root.logger=INFO,console

Append content to the local file, and then view it in hbase

Echo "hello, test flush to hbase" > > / tmp/test/data/data.txt

The log will not be printed by two agent during write

View data in hbase

Hbase shellscan "access_log"

There will be a certain delay for flume to write logs to hbase.

Write log to hadoop

The principle is the same as writing hbase. If you understand the hbase writing process, you can easily understand writing to other services. Please refer to the official documentation for detailed configuration.

Prerequisite: install hadoop cluster

Cd ${FLUME_HOME} / conf/

Mkdir hdfs & & cd hdfs

To add a configuration file, you need two agent sides

Hadoop-back.conf is used to collect local data, and hadoop-front.conf is used to write data to hadoop

Vim hadoop-back.conf

# Namethe componentshadoop.sources= backsrchadoop.sinks= fileSinkhadoop.channels= memoryChannel#Sourcehadoop.sources.backsrc.type= spooldirhadoop.sources.backsrc.spoolDir= / tmp/data/hadoophadoop.sources.backsrc.channels= memoryChannelhadoop.sources.backsrc.fileHeader = true#Channelhadoop.channels.memoryChannel.type= memoryhadoop.channels.memoryChannel.keep-alive = 30hadoop.channels.memoryChannel.capacity = 1000hadoop.channels.memoryChannel.transactionCapacity = 1000 Sinkhadoop.sinks.fileSink.type = avrohadoop.sinks.fileSink.hostname= masterhadoop.sinks.fileSink.port= 10000hadoop.sinks.fileSink.channel = memoryChannel

Vim hadoop-front.conf

# Namethe componentshadoop.sources= frontsrchadoop.channels= memoryChannelhadoop.sinks= remotesink#Sourcehadoop.sources.frontsrc.type= avrohadoop.sources.frontsrc.bind= masterhadoop.sources.frontsrc.port= 10000hadoop.sources.frontsrc.channels = memoryChannel#Channelhadoop.channels.memoryChannel.type= memoryhadoop.channels.memoryChannel.keep-alive = 30hadoop.channels.memoryChannel.capacity = 1000hadoop.channels.memoryChannel.transactionCapacity = 1000cubic Sinkhadoop.sinks.remotesink.type = hdfshadoop.sinks.remotesink.hdfs.path=hdfs://master/flumehadoop.sinks.remotesink.hdfs.rollInterval = 0hadoop.sinks.remotesink.hdfs.idleTimeout = 10000hadoop. Sinks.remotesink.hdfs.fileType= DataStreamhadoop.sinks.remotesink.hdfs.writeFormat= Texthadoop.sinks.remotesink.hdfs.threadsPoolSize = 20hadoop.sinks.remotesink.channel = memoryChannel

Create a local directory and modify permissions

Mkdir-pv / tmp/data/hadoop & & chmod-R 777 / tmp/data/

Create directories in hdfs and modify permissions

Hadoop fs-mkdir / flumehadoop fs-chmod 777 / flumehadoop fs-ls /

Write a file to the local directory

Echo "hello, test hadoop" > > / tmp/data/hadoop/hadoop.logecho "hello, test flume" > > / tmp/data/hadoop/flume.logecho "hello, test helloworld" > / tmp/data/hadoop/helloworld.log

View files and file information in hdfs

Hadoop fs-ls / flumehadoop fs-cat / flume/FlumeData.1516634328510.tmp

Thank you for reading! This is the end of the article on "how flume installs and integrates kafka". I hope the above content can be of some help to you, so that you can learn more knowledge. if you think the article is good, you can share it for more people to see!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report