Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to analyze the Integration of Flume and Kafka

2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article introduces you how to analyze the integration of Flume and Kafka, the content is very detailed, interested friends can refer to, hope to be helpful to you.

Integration of Flume and Kafka

I. concept

1. The distributed log collection system developed by Flume:Cloudera is a distributed, reliable and available service for collecting, summarizing and moving large amounts of log data efficiently. It has a simple and flexible architecture based on streaming data streams. It has reliable reliability mechanism and many failure transfer and recovery mechanisms, and has strong fault tolerance and fault tolerance. It uses a simple extensible data model that allows applications to be analyzed online. Flume is divided into OG and NG versions, of which the last release of Flume OG is 0.94.0, followed by the NG version.

2. Kafka: run as a cluster on one or more servers that can span multiple data centers. In Kafka, the communication between the client and the server is accomplished through a simple, high-performance, language-agnostic TCP protocol. The protocol is version-controlled and maintains backward compatibility with older versions. Kafka provides a Java client, but the client can use multiple languages.

3. Kafka is usually used for two types of applications, as follows:

A. build a real-time streaming data pipeline that can reliably obtain data between systems or applications

B. build a real-time streaming application for transforming or responding to data streams

C, Kafka each record consists of a key, a value and a timestamp.

II. Background of production and narration

Based on the needs of constant log data collection and data transmission in big data domain, we try to complete the integration of flume+kafka fan-in and fan-out functions, including functional tests such as replication stream and multiplex stream. In the follow-up, according to the actual needs, we will improve the integration of kafka and spark streaming.

Note: this document is limited to functional testing, performance optimization, please increase according to the actual situation.

III. Deployment and installation

1. Test environment description:

Operating system: CentOS 7

Flume version: flume-ng-1.6.0-cdh6.7.0

Kafka version: kafka_2.11-0.10.0.1

JDK version: JDK1.8.0

Scala version: 2.11.8

2. Test steps:

2.1By flume deployment

2.1.1. Download the installation media and decompress:

Here) collapse or open

Here) collapse or open

Here) collapse or open

Cd / app/apache-flume-1.6.0-cdh6.7.0-bin

Vi netcatOrKafka-memory-logger.conf

Netcatagent.sources = netcat_sources

Netcatagent.channels = C1 c2

Netcatagent.sinks = logger_sinks kafka_sinks

Netcatagent.sources.netcat_sources.type = netcat

Netcatagent.sources.netcat_sources.bind = 0.0.0.0

Netcatagent.sources.netcat_sources.port = 44444

Netcatagent.channels.c1.type = memory

Netcatagent.channels.c1.capacity = 1000

Netcatagent.channels.c1.transactionCapacity = 100

Netcatagent.channels.c2.type = memory

Netcatagent.channels.c2.capacity = 1000

Netcatagent.channels.c2.transactionCapacity = 100

Netcatagent.sinks.logger_sinks.type = logger

Netcatagent.sinks.kafka_sinks.type = org.apache.flume.sink.kafka.KafkaSink

Netcatagent.sinks.kafka_sinks.topic = test

Netcatagent.sinks.kafka_sinks.brokerList = 192.168.137.132purl 9082192.168.137.133purl 9082192.168.137.134purl 9082

Netcatagent.sinks.kafka_sinks.requiredAcks = 0

# # netcatagent.sinks.kafka_sinks.batchSize = 20

Netcatagent.sinks.kafka_sinks.producer.type=sync

Netcatagent.sinks.kafka_sinks.custom.encoding=UTF-8

Netcatagent.sinks.kafka_sinks.partition.key=0

Netcatagent.sinks.kafka_sinks.serializer.class=kafka.serializer.StringEncoder

Netcatagent.sinks.kafka_sinks.partitioner.class=org.apache.flume.plugins.SinglePartition

Netcatagent.sinks.kafka_sinks.max.message.size=1000000

Netcatagent.sources.netcat_sources.selector.type = replicating

Netcatagent.sources.netcat_sources.channels = C1 c2

Netcatagent.sinks.logger_sinks.channel = C1

Netcatagent.sinks.kafka_sinks.channel = c2

2.4.2. Start each test command:

A. Start the agent of flume (at 192.168.137.130):

Flume-ng agent-- name netcatagent\

-- conf $FLUME_HOME/conf\

-- conf-file $FLUME_HOME/conf/netcat-memory-loggerToKafka.conf\

-Dflume.root.logger=INFO,console

B, launch kafka consumers (on 192.168.137.132):

Kafka-console-consumer.sh\

-- zookeeper 192.168.137.132:2181192.168.137.133:2181192.168.137.134:2181/kafka\

-from-beginning-topic test

C, test delivery (at 192.168.137.130 and 192.168.137.132)

Telnet sends the result

Kafka consumption result

The final logger receives the result

At this point, flume+kafka Fan out-copy stream test (fan in source: netcat; output: Logger of kafka+Flume) test and verification are completed.

2.5. Flume+kafka Fan out-Multiplex stream test (fan in source: netcat; output: Logger of kafka+Flume)

Not available for the time being, follow-up supplement

Problems in the process of deployment, installation and verification

1. When doing the flume+kafka fan-in test (the fan-in source is: the netcat+kafka; output is output as Logger of Flume), the kafka data has not been received.

The main reason is that when configuring kafka, it is written in the configuration file (server.properties):

Zookeeper.connect=192.168.137.132:2181192.168.137.133:2181192.168.137.134:2181

However, when creating a topics, you use:

Kafka-topics.sh-- create\

-- zookeeper 192.168.137.132:2181192.168.137.133:2181192.168.137.134:2181/kafka\

-replication-factor 3-- partitions 3-- topic test

/ kakfa is not added to the zookeeper configuration file in the kafka configuration file, but / kafka is added when the topics is created

Final use:

Kafka-console-producer.sh\

-- broker-list 192.168.137.132 Virgo 9092192.168.137.133 VOL9092192.168.137.134Vera 9092\

-- topic test

The command checks that there is no topics information to find this problem

Solution: synchronize the two pieces of information

2. When doing the flume+kafka fan-in test (the fan-in source is: netcat+kafka; output is Logger output of Flume), the agent Times error of starting flume.

2018-03-31 10 org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run 4315 (conf-file-poller-0) [ERROR-org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run (PollingPropertiesFileConfigurationProvider.java:142)] Failed to load configuration data. Exception follows.

Org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka,KafkaSource, class: org.apache.flume.source.kafka,KafkaSource

At org.apache.flume.source.DefaultSourceFactory.getClass (DefaultSourceFactory.java:69)

At org.apache.flume.source.DefaultSourceFactory.create (DefaultSourceFactory.java:42)

At org.apache.flume.node.AbstractConfigurationProvider.loadSources (AbstractConfigurationProvider.java:322)

At org.apache.flume.node.AbstractConfigurationProvider.getConfiguration (AbstractConfigurationProvider.java:97)

At org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run (PollingPropertiesFileConfigurationProvider.java:140)

At java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)

At java.util.concurrent.FutureTask.runAndReset (FutureTask.java:308)

At java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301 (ScheduledThreadPoolExecutor.java:180)

At java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:294)

At java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)

At java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)

At java.lang.Thread.run (Thread.java:748)

Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka,KafkaSource

At java.net.URLClassLoader.findClass (URLClassLoader.java:381)

At java.lang.ClassLoader.loadClass (ClassLoader.java:424)

At sun.misc.Launcher$AppClassLoader.loadClass (Launcher.java:335)

At java.lang.ClassLoader.loadClass (ClassLoader.java:357)

At java.lang.Class.forName0 (Native Method)

At java.lang.Class.forName (Class.java:264)

At org.apache.flume.source.DefaultSourceFactory.getClass (DefaultSourceFactory.java:67)

... 11 more

Solution: there is a problem with the official website. Org.apache.flume.source.kafka,KafkaSource should not include a comma. It can be changed to: org.apache.flume.source.kafka.KafkaSource. Detailed official website

On how to analyze the integration of Flume and Kafka to share here, I hope that the above content can be of some help to you, can learn more knowledge. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report