In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/01 Report--
This article introduces you how to analyze the integration of Flume and Kafka, the content is very detailed, interested friends can refer to, hope to be helpful to you.
Integration of Flume and Kafka
I. concept
1. The distributed log collection system developed by Flume:Cloudera is a distributed, reliable and available service for collecting, summarizing and moving large amounts of log data efficiently. It has a simple and flexible architecture based on streaming data streams. It has reliable reliability mechanism and many failure transfer and recovery mechanisms, and has strong fault tolerance and fault tolerance. It uses a simple extensible data model that allows applications to be analyzed online. Flume is divided into OG and NG versions, of which the last release of Flume OG is 0.94.0, followed by the NG version.
2. Kafka: run as a cluster on one or more servers that can span multiple data centers. In Kafka, the communication between the client and the server is accomplished through a simple, high-performance, language-agnostic TCP protocol. The protocol is version-controlled and maintains backward compatibility with older versions. Kafka provides a Java client, but the client can use multiple languages.
3. Kafka is usually used for two types of applications, as follows:
A. build a real-time streaming data pipeline that can reliably obtain data between systems or applications
B. build a real-time streaming application for transforming or responding to data streams
C, Kafka each record consists of a key, a value and a timestamp.
II. Background of production and narration
Based on the needs of constant log data collection and data transmission in big data domain, we try to complete the integration of flume+kafka fan-in and fan-out functions, including functional tests such as replication stream and multiplex stream. In the follow-up, according to the actual needs, we will improve the integration of kafka and spark streaming.
Note: this document is limited to functional testing, performance optimization, please increase according to the actual situation.
III. Deployment and installation
1. Test environment description:
Operating system: CentOS 7
Flume version: flume-ng-1.6.0-cdh6.7.0
Kafka version: kafka_2.11-0.10.0.1
JDK version: JDK1.8.0
Scala version: 2.11.8
2. Test steps:
2.1By flume deployment
2.1.1. Download the installation media and decompress:
Here) collapse or open
Here) collapse or open
Here) collapse or open
Cd / app/apache-flume-1.6.0-cdh6.7.0-bin
Vi netcatOrKafka-memory-logger.conf
Netcatagent.sources = netcat_sources
Netcatagent.channels = C1 c2
Netcatagent.sinks = logger_sinks kafka_sinks
Netcatagent.sources.netcat_sources.type = netcat
Netcatagent.sources.netcat_sources.bind = 0.0.0.0
Netcatagent.sources.netcat_sources.port = 44444
Netcatagent.channels.c1.type = memory
Netcatagent.channels.c1.capacity = 1000
Netcatagent.channels.c1.transactionCapacity = 100
Netcatagent.channels.c2.type = memory
Netcatagent.channels.c2.capacity = 1000
Netcatagent.channels.c2.transactionCapacity = 100
Netcatagent.sinks.logger_sinks.type = logger
Netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink
Netcatagent.sinks.kafka_sinks.topic = test
Netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132purl 9082192.168.137.133purl 9082192.168.137.134purl 9082
Netcatagent.sinks.kafka_sinks.requiredAcks = 0
# # netcatagent.sinks.kafka_sinks.batchSize = 20
Netcatagent.sinks.kafka_sinks.producer.type=sync
Netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8
Netcatagent.sinks.kafka_sinks.partition.key=0
Netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder
Netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition
Netcatagent.sinks.kafka_sinks.max.message.size=1000000
Netcatagent.sources.netcat_sources.selector.type = replicating
Netcatagent.sources.netcat_sources.channels = C1 c2
Netcatagent.sinks.logger_sinks.channel = C1
Netcatagent.sinks.kafka_sinks.channel = c2
2.4.2. Start each test command:
A. Start the agent of flume (at 192.168.137.130):
Flume-ng agent-- name netcatagent\
-- conf $FLUME_HOME/conf\
-- conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf\
-Dflume.root.logger=INFO,console
B, launch kafka consumers (on 192.168.137.132):
Kafka-console-consumer.sh\
-- zookeeper 192.168.137.132:2181192.168.137.133:2181192.168.137.134:2181/kafka\
-from-beginning-topic test
C, test delivery (at 192.168.137.130 and 192.168.137.132)
Telnet sends the result
Kafka consumption result
The final logger receives the result
At this point, flume+kafka Fan out-copy stream test (fan in source: netcat; output: Logger of kafka+Flume) test and verification are completed.
2.5. Flume+kafka Fan out-Multiplex stream test (fan in source: netcat; output: Logger of kafka+Flume)
Not available for the time being, follow-up supplement
Problems in the process of deployment, installation and verification
1. When doing the flume+kafka fan-in test (the fan-in source is: the netcat+kafka; output is output as Logger of Flume), the kafka data has not been received.
The main reason is that when configuring kafka, it is written in the configuration file (server.properties):
Zookeeper.connect=192.168.137.132:2181192.168.137.133:2181192.168.137.134:2181
However, when creating a topics, you use:
Kafka-topics.sh-- create\
-- zookeeper 192.168.137.132:2181192.168.137.133:2181192.168.137.134:2181/kafka\
-replication-factor 3-- partitions 3-- topic test
/ kakfa is not added to the zookeeper configuration file in the kafka configuration file, but / kafka is added when the topics is created
Final use:
Kafka-console-producer.sh\
-- broker-list 192.168.137.132 Virgo 9092192.168.137.133 VOL9092192.168.137.134Vera 9092\
-- topic test
The command checks that there is no topics information to find this problem
Solution: synchronize the two pieces of information
2. When doing the flume+kafka fan-in test (the fan-in source is: netcat+kafka; output is Logger output of Flume), the agent Times error of starting flume.
2018-03-31 10 org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run 4315 (conf-file-poller-0) [ERROR-org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run (PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.
Org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource
At org.apache.flume.source.DefaultSourceFactory.getClass (DefaultSourceFactory.java:69)
At org.apache.flume.source.DefaultSourceFactory.create (DefaultSourceFactory.java:42)
At org.apache.flume.node.AbstractConfigurationProvider.loadSources (AbstractConfigurationProvider.java:322)
At org.apache.flume.node.AbstractConfigurationProvider.getConfiguration (AbstractConfigurationProvider.java:97)
At org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run (PollingPropertiesFileConfigurationProvider.java:140)
At java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
At java.util.concurrent.FutureTask.runAndReset (FutureTask.java:308)
At java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301 (ScheduledThreadPoolExecutor.java:180)
At java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:294)
At java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
At java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
At java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource
At java.net.URLClassLoader.findClass (URLClassLoader.java:381)
At java.lang.ClassLoader.loadClass (ClassLoader.java:424)
At sun.misc.Launcher$AppClassLoader.loadClass (Launcher.java:335)
At java.lang.ClassLoader.loadClass (ClassLoader.java:357)
At java.lang.Class.forName0 (Native Method)
At java.lang.Class.forName (Class.java:264)
At org.apache.flume.source.DefaultSourceFactory.getClass (DefaultSourceFactory.java:67)
... 11 more
Solution: there is a problem with the official website. Org.apache.flume.source.kafka,KafkaSource should not include a comma. It can be changed to: org.apache.flume.source.kafka.KafkaSource. Detailed official website
On how to analyze the integration of Flume and Kafka to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.