Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use kafka Technology

2025-03-26 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article mainly explains "how to use kafka technology". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "how to use kafka technology".

Environmental preparation

1) create a java project in eclipse

2) create a lib folder in the root directory of the project

3) unpack the kafka installation package, copy the jar package under the libs directory of the installation package to the project's lib directory, and build path.

4) start the zk and kafka clusters, and open a consumer in the kafka cluster

[root@hadoop102 kafka] $bin/kafka-console-consumer.sh-- zookeeper hadoop102:2181-- topic first

Kafka producer Java API creates producer (outdated API) package com.root.kafka;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class OldProducer {@ SuppressWarnings ("deprecation") public static void main (String [] args) {Properties properties = new Properties (); properties.put ("metadata.broker.list", "hadoop102:9092") Properties.put ("request.required.acks", "1"); properties.put ("serializer.class", "kafka.serializer.StringEncoder"); Producer producer = new Producer (new ProducerConfig (properties)); KeyedMessage message = new KeyedMessage ("first", "hello world"); producer.send (message);} 4.2.2 create producer (new API**) package com.root.kafka Import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class NewProducer {public static void main (String [] args) {Properties props = new Properties (); / / the hostname and port number of the Kafka server props.put ("bootstrap.servers", "port") / wait for props.put ("acks", "all") from all replica nodes; / / maximum number of attempts to send messages props.put ("retries", 0); / / message processing size props.put ("batch.size", 16384); / / request delay props.put ("linger.ms", 1) / / send cache memory size props.put ("buffer.memory", 33554432); / / key serialize props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); / / value serialize props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer (props) For (int I = 0; I

< 50; i++) { producer.send(new ProducerRecord("first", Integer.toString(i), "hello world-" + i)); } producer.close(); }}创建生产者带回调函数(新API)package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CallBackProducer { public static void main(String[] args) {Properties props = new Properties(); // Kafka服务端的主机名和端口号 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1);// 发送缓存区内存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer kafkaProducer = new KafkaProducer(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord("first", "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.err.println(metadata.partition() + "---" + metadata.offset()); } } }); } kafkaProducer.close(); }}4.2.4 自定义分区生产者 0)需求:将所有数据存储到topic的第0号分区上 1)定义一个类实现Partitioner接口,重写里面的方法(过时API) package com.root.kafka;import java.util.Map;import kafka.producer.Partitioner;public class CustomPartitioner implements Partitioner { public CustomPartitioner() { super(); } @Override public int partition(Object key, int numPartitions) { // 控制分区 return 0; }} 2)自定义分区(新API) package com.root.kafka;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;public class CustomPartitioner implements Partitioner { @Override public void configure(Map configs) { } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 控制分区 return 0; } @Override public void close() { }} 3)在代码中调用 package com.root.kafka;import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;public class PartitionerProducer { public static void main(String[] args) { Properties props = new Properties(); // Kafka服务端的主机名和端口号 props.put("bootstrap.servers", "hadoop103:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 自定义分区 props.put("partitioner.class", "com.root.kafka.CustomPartitioner"); Producer producer = new KafkaProducer(props); producer.send(new ProducerRecord("first", "1", "root")); producer.close(); }} 4)测试 (1)在hadoop102上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况 [root@hadoop102 first-0]$ tail -f 00000000000000000000.log[root@hadoop102 first-1]$ tail -f 00000000000000000000.log[root@hadoop102 first-2]$ tail -f 00000000000000000000.log (2)发现数据都存储到指定的分区了。 Kafka消费者Java API 0)在控制台创建发送者 [root@hadoop104 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first >

Hello world

1) create consumers (obsolete API)

Package com.root.kafka.consume;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;public class CustomConsumer {@ SuppressWarnings ("deprecation") public static void main (String [] args) {Properties properties = new Properties () Properties.put ("zookeeper.connect", "hadoop102:2181"); properties.put ("group.id", "G1"); properties.put ("zookeeper.session.timeout.ms", "500,000"); properties.put ("zookeeper.sync.time.ms", "250th"); properties.put ("auto.commit.interval.ms", "1000") / / create a consumer connector ConsumerConnector consumer = Consumer.createJavaConsumerConnector (new ConsumerConfig (properties)); HashMap topicCount = new HashMap (); topicCount.put ("first", 1); Map consumerMap = consumer.createMessageStreams (topicCount); KafkaStream stream = consumerMap.get ("first"). Get (0); ConsumerIterator it = stream.iterator () While (it.hasNext ()) {System.out.println (new String (it.next (). Message ());}

2) official case (automatic maintenance of consumption) (new API)

Package com.root.kafka.consume;import java.util.Arrays;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class CustomNewConsumer {public static void main (String [] args) {Properties props = new Properties () / / define the address of the kakfa service. It is not necessary to specify all broker on props.put ("bootstrap.servers", "hadoop102:9092"); / / define consumer group props.put ("group.id", "test") / / whether to automatically confirm offset props.put ("enable.auto.commit", "true"); / / automatically confirm the interval props.put of offset ("auto.commit.interval.ms", "1000") / / key's serialization class props.put ("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); / / value's serialization class props.put ("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); / / define consumer KafkaConsumer consumer = new KafkaConsumer (props) / / topic subscribed by consumers, which can subscribe to multiple consumer.subscribe (Arrays.asList ("first", "second", "third"); while (true) {/ / read data, with a read timeout of 100ms ConsumerRecords records = consumer.poll For (ConsumerRecord record: records) System.out.printf ("offset =% d, key =% s, value =% s% n", record.offset (), record.key (), record.value ());}} `Kafka producer interceptor (interceptor) interceptor principle

The Producer interceptor (interceptor) was introduced in Kafka version 0.10 and is mainly used to implement customized control logic on the client side.

For producer, interceptor gives users the opportunity to make some customized requirements on messages, such as modifying messages, before sending messages and before producer callback logic. At the same time, producer allows users to specify that multiple interceptor act on the same message sequentially to form an interceptor chain. The implementation interface of Intercetpor is org.apache.kafka.clients.producer.ProducerInterceptor, and its defined methods include:

(1) configure (configs)

Called when getting configuration information and initializing data.

2) onSend (ProducerRecord):

This method is encapsulated in the KafkaProducer.send method, that is, it runs in the user's main thread. Producer ensures that this method is called before the message is serialized and the partition is calculated. Users can do anything with the message in this method, but it is best to make sure that the topic and partition to which the message belongs are not modified, otherwise the calculation of the target partition will be affected.

(3) onAcknowledgement (RecordMetadata, Exception):

This method is called when a message is answered or fails to be sent, usually before the producer callback logic is triggered. OnAcknowledgement runs in the IO thread of producer, so don't put heavy logic in this method, otherwise it will slow down the message sending efficiency of producer.

(4) close:

Turn off interceptor, which is mainly used to perform some resource cleaning work.

As mentioned earlier, interceptor may be run in multiple threads, so users need to ensure thread safety on their own when implementing it. In addition, if more than one interceptor is specified, producer will call them in the specified order and simply catch any exceptions that each interceptor may throw and log them to the error log instead of passing them up. Special attention should be paid to this in the process of use.

Interceptor case

1) requirements:

Implement a simple intercept chain consisting of double interceptor. The first interceptor adds the timestamp information to the front of the message value before the message is sent; the second interceptor updates the number of successful or failed messages after the message is sent.

2) case practice

(1) add timestamp interceptor

Package com.root.kafka.interceptor;import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata Public class TimeInterceptor implements ProducerInterceptor {@ Override public void configure (Map configs) {} @ Override public ProducerRecord onSend (ProducerRecord record) {/ / create a new record Write the timestamp to the front part of the message body return new ProducerRecord (record.topic (), record.partition (), record.timestamp (), record.key (), System.currentTimeMillis () + "," > (2) count the number of successful and failed messages sent, and print these two counters package com.root.kafka.interceptor when producer is closed. Import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor {private int errorCounter = 0; private int successCounter = 0; @ Override public void configure (Map configs) {} @ Override public ProducerRecord onSend (ProducerRecord record) {return record } @ Override public void onAcknowledgement (RecordMetadata metadata, Exception exception) {/ / count the number of successes and failures if (exception = = null) {successCounter++;} else {errorCounter++ } @ Override public void close () {/ / Save the result System.out.println ("Successful sent:" + successCounter); System.out.println ("Failed sent:" + errorCounter);}} (3) producer main program package com.root.kafka.interceptor;import java.util.ArrayList;import java.util.List;import java.util.Properties Import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;public class InterceptorProducer {public static void main (String [] args) throws Exception {/ / 1 setting configuration information Properties props = new Properties (); props.put ("bootstrap.servers", "hadoop102:9092") Props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432) Props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); / / 2 build intercept chain List interceptors = new ArrayList () Interceptors.add ("com.root.kafka.interceptor.TimeInterceptor"); interceptors.add ("com.root.kafka.interceptor.CounterInterceptor"); props.put (ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); String topic = "first"; Producer producer = new KafkaProducer (props) / / 3 send message for (int I = 0; I

< 10; i++) { ProducerRecord record = new ProducerRecord(topic, "message" + i); producer.send(record); } // 4 一定要关闭producer,这样才会调用interceptor的close方法 producer.close(); }} 3)测试(1)在kafka上启动消费者,然后运行客户端java程序。[root@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first1501904047034,message01501904047225,message11501904047230,message21501904047234,message31501904047236,message41501904047240,message51501904047243,message61501904047246,message71501904047249,message81501904047252,message9 (2)观察java平台控制台输出数据如下:Successful sent: 10Failed sent: 0 kafka Streams 概述 Kafka StreamsKafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序。 Kafka Streams特点1)功能强大高扩展性,弹性,容错2)轻量级无需专门的集群一个库,而不是框架3)完全集成100%的Kafka 0.10.0版本兼容易于集成到现有的应用程序4)实时性毫秒级延迟并非微批处理窗口允许乱序数据允许迟到数据 为什么要有Kafka Stream当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark与Apache Storm拥有如此多的优势,那为何还需要Kafka Stream呢?主要有如下原因。第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。 第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低。第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源。第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力。第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度。 Kafka Stream数据清洗案例0)需求:实时处理单词带有">

> "the contents of the prefix. For example, enter "root > ximenqing" and finally process it as "ximenqing" 1) requirements Analysis: 2) case practice (1) create a project and add jar package (2) to create the main class package com.root.kafka.stream;import java.util.Properties;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorSupplier;import org.apache.kafka.streams.processor.TopologyBuilder Public class Application {public static void main (String [] args) {/ / define input topic String from = "first"; / / define output topic String to = "second"; / / set parameter Properties settings = new Properties (); settings.put (StreamsConfig.APPLICATION_ID_CONFIG, "logFilter") Settings.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); StreamsConfig config = new StreamsConfig (settings); / / build topology TopologyBuilder builder = new TopologyBuilder () Builder.addSource ("SOURCE", from) .addProcessor ("PROCESS" New ProcessorSupplier () {@ Override public Processor get () {/ / specific analysis and processing return new LogProcessor () }, "SOURCE") .addSink ("SINK", to, "PROCESS"); / / create kafka stream KafkaStreams streams = new KafkaStreams (builder, config); streams.start ();} (3) specific business processing package com.root.kafka.stream Import org.apache.kafka.streams.processor.Processor;import org.apache.kafka.streams.processor.ProcessorContext;public class LogProcessor implements Processor {private ProcessorContext context; @ Override public void init (ProcessorContext context) {this.context = context;} @ Override public void process (byte [] key, byte [] value) {String input = new String (value) / / if "> >" is included, only the content following the tag if (input.contains ("> >")) {input = input.split ("> >") [1] .trim (); / / output to the next topic context.forward ("logProcessor" .getBytes (), input.getBytes ()) } else {context.forward ("logProcessor" .getBytes (), input.getBytes ()) } @ Override public void punctuate (long timestamp) {} @ Override public void close () {}} (4) run the program (5) launch the producer [root@hadoop104 kafka] $bin/kafka-console-producer.sh on hadoop104-- broker-list hadoop102:9092-- topic first > hello > > world > h > root > (6) launch the consumer [root@hadoop103 kafka] $bin/kafka-console- on hadoop103 Consumer.sh-- zookeeper hadoop102:2181-- from-beginning-- topic secondworldroot Thank you for reading The above is the content of "how to use kafka technology". After the study of this article, I believe you have a deeper understanding of how to use kafka technology, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report