In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-29 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly shows you "Flume how to integrate kafka", the content is easy to understand, clear, hope to help you solve your doubts, the following let the editor lead you to study and learn "Flume how to integrate kafka" this article.
Using Kafka with Flume
In CDH 5.2.0 and later, Flume includes a Kafka source and sink. Use them to flow data from Kafka to Hadoop or from any Flume source to Kafka.
Important: you cannot configure a Kafka source to send data to a Kafka sink. If you do, the Kafka source sets the topic in the event header, overriding the sink configuration and creating an infinite loop, sending messages back and forth between the source and sink. If you need to use both a source and a sink, use an interceptor to modify the event header and set a different topic.
Kafka Source
Use Kafka source to flow data from Kafka topics to Hadoop. The Kafka source can be merged with any Flume sink, making it easy to write data from Kafka to HDFS, HBase, and Solr.
The following example of a Flume configuration uses Kafka source to send data to HDFS sink:
Tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource tier1.sources.source1.zookeeperConnect = zk01.example.com:2181 tier1.sources.source1.topic = weblogs tier1.sources.source1.groupId = flume tier1.sources.source1.channels = channel1 tier1.sources.source1.interceptors = i1 tier1.sources.source1.interceptors.i1.type = timestamp tier1.sources.source1.kafka.consumer.timeout.ms = 100tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path = / tmp/kafka/% {topic} /% y-%m-%d tier1.sinks.sink1.hdfs.rollInterval = 5 tier1.sinks.sink1.hdfs.rollSize = 0 tier1.sinks.sink1.hdfs.rollCount = 0 tier1.sinks.sink1.hdfs.fileType = DataStream tier1.sinks.sink1.channel = channel1
For higher throughput, you can configure multiple Kafka sources to read a topic. If all sources are configured with the same groupID, and topic has multiple partitions, setting each source to read data from different partitions can improve efficiency.
The following list describes the parameters supported by Kafka source; required parameters are listed in bold.
Table 1. Kafka Source Properties
Property NameDefault ValueDescriptiontype must be set to org.apache.flume.source.kafka.KafkaSource.zookeeperConnectThe URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181) .topicsource reads the Kafka topic of the message. Flume supports only one topic per source. GroupIDflumeThe unique identifier of the Kafka consumer group. Maximum number of messages written by Set the same groupID in all sources to indicate that they belong to the same consumer group.batchSize1000 to channel the maximum time (in milliseconds) that batchDurationMillis1000 wrote to channel. Other properties supported by Kafka consumer configure Kafka consumer through Kafka source. You can use any property supported by consumer. Prepend the consumer property name with the prefix kafka. (for example, kafka.fetch.min.bytes) See the Kafka documentation for the full list of Kafka consumer properties.
Tuning
Kafka source overrides two properties of Kafka consumer:
Auto.commit.enable is set to false by the source, and every batch is committed. To improve performance, set to true and use kafka.auto.commit.enable instead. This may lose data if the source goes down before committing.
Consumer.timeout.ms is set to 10, so when Flume polls Kafka for new data, it waits no more than 10 ms for the data to be available. Setting this to a higher value can reduce CPU utilization due to less frequent polling, but introduces latency in writing batches to the channel.
Kafka Sink
Use Kafka sink to send data from a Flume source to Kafka. You can use the Kafka sink in addition to Flume sinks such as HBase or HDFS.
The following Flume configuration example uses a Kafka sink with an exec source:
Tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.sources.source1.type = exec tier1.sources.source1.command = / usr/bin/vmstat 1 tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 10000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink tier1.sinks.sink1.topic = sink1 tier1.sinks.sink1.brokerList = kafka01.example.com:9092 Kafka02.example.com:9092 tier1.sinks.sink1.channel = channel1 tier1.sinks.sink1.batchSize = 20
The following table describes parameters the Kafka sink supports; required properties are listed in bold.
Table 2. Kafka Sink Properties
Property NameDefault ValueDescriptiontype must be set to: org.apache.flume.sink.kafka.KafkaSink.brokerListThe brokers the Kafka sink uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.topicdefault-flume-topicThe Kafka topic to which messages are published by default. If the event header contains a topic field, the event is published to the designated topic, overriding the configured topic.batchSize100The number of messages to process in a single batch. Specifying a larger batchSize can improve throughput and increase latency.requiredAcks1The number of replicas that must acknowledge a message before it is written successfully. Possible values are 0 (do not wait for an acknowledgement), 1 (wait for the leader to acknowledge only), and-1 (wait for all replicas to acknowledge). To avoid potential loss of data in case of a leader failure, set this to-1. Other properties supported by Kafka producer Used to configure the Kafka producer used by the Kafka sink. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec) See the Kafka documentation for the full list of Kafka producer properties.
Kafka sink uses topic and key properties from the FlumeEvent headers to determine where to send events in Kafka. If the header contains the topic property, that event is sent to the designated topic, overriding the configured topic. If the header contains the key property, that key is used to partition events within the topic. Events with the same key are sent to the same partition. If the key parameter is not specified, events are distributed randomly to partitions. Use these properties to control the topics and partitions to which events are sent through the Flume source or interceptor.
Kafka Channel
CDH 5. 3 and later include a Kafka channel to Flume in addition to the existing memory and file channels. You can use Kafka channel:
To write to Hadoop directly from Kafka without using a source. Write data directly from Kafka to hadoop without using source.
To write to Kafka directly from Flume sources without additional buffering. Writes data directly from Flume source to Kafka without using additional buffers.
As a reliable and highly available channel for any source/sink combination. Can be combined with any source/sink.
The following Flume configuration uses a Kafka channel and an exec source and hdfs sink:
Tier1.sources = source1tier1.channels = channel1tier1.sinks = sink1tier1.sources.source1.type = exectier1.sources.source1.command = / usr/bin/vmstat 1tier1.sources.source1.channels = channel1tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChanneltier1.channels.channel1.capacity = 10000tier1.channels.channel1.transactionCapacity = 1000tier1.channels.channel1.brokerList = kafka02.example.com:9092 Kafka03.example.com:9092tier1.channels.channel1.topic = channel2tier1.channels.channel1.zookeeperConnect = zk01.example.com:2181tier1.channels.channel1.parseAsFlumeEvent = truetier1.sinks.sink1.type = hdfstier1.sinks.sink1.hdfs.path = / tmp/kafka/channeltier1.sinks.sink1.hdfs.rollInterval = 5tier1.sinks.sink1.hdfs.rollSize = 0tier1.sinks.sink1.hdfs.rollCount = 0tier1.sinks.sink1.hdfs.fileType = DataStreamtier1.sinks.sink1.channel = channel1
The following list describes the parameters supported by Kafka channel; bold is required.
Table 3. Kafka Channel Properties
Property NameDefault ValueDescriptiontype must be set to: org.apache.flume.channel.kafka.KafkaChannel.brokerListThe brokers the Kafka channel uses to discover topic partitions, formatted as a comma-separated list of hostname:port entries. You do not need to specify the entire list of brokers, but Cloudera recommends that you specify at least two for high availability.zookeeperConnectThe URI of the ZooKeeper server or quorum used by Kafka. This can be a single node (for example, zk01.example.com:2181) or a comma-separated list of nodes in a ZooKeeper quorum (for example, zk01.example.com:2181,zk02.example.com:2181, zk03.example.com:2181). Topicflume-channelThe Kafka topic the channel will use.groupIDflumeThe unique identifier of the Kafka consumer group the channel uses to register with Kafka.parseAsFlumeEventtrueSet to true if a Flume source is writing to the channel and expects AvroDataums with the FlumeEvent schema (org.apache.flume.source.avro.AvroFlumeEvent) in the channel. Set to false if other producers are writing to the topic that the channel is using.readSmallestOffsetfalseIf true, reads all data in the topic. If false, reads only data written after the channel has started. The interval between Only used when parseAsFlumeEvent is false.kafka.consumer.timeout.ms100 polls when writing data to sink. Other properties supported by Kafka producer Used to configure the Kafka producer. You can use any producer properties supported by Kafka. Prepend the producer property name with the prefix kafka. (for example, kafka.compression.codec) See the Kafka documentation for the full list of Kafka producer properties.
>
Terms and Conditions Privacy Policy
The above is all the content of the article "how Flume integrates kafka". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.