Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Sample Analysis of Kafka Java client Code

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

This article will explain in detail the sample analysis of Kafka Java client code. The content of the article is of high quality, so the editor shares it for you as a reference. I hope you will have some understanding of the relevant knowledge after reading this article.

Kafka is a high-throughput distributed publish and subscribe messaging system.

Kafka is a distributed message queue used by linkedin for log processing. Linkedin has a large log data capacity, but does not require high reliability. Its log data mainly includes user behavior (login, browse, click, share, like) and system operation log (CPU, memory, disk, network, system and process status).

At present, many message queuing services provide reliable delivery guarantee, and the default is instant consumption (not suitable for offline).

High reliable delivery of linkedin logs is not necessary, so performance can be improved by reducing reliability, and by building distributed clusters, messages are allowed to accumulate in the system, making kafka support both offline and online log processing.

Test environment

Kafka_2.10-0.8.1.1 3-node cluster

Zookeeper-3.4.5 an instance node

Code example

Sample message producer code

Import java.util.Collections; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig / * for more information, please see https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example * @ author Fung * * / public class ProducerDemo {public static void main (String [] args) {Random rnd = new Random (); int events=100; / / set configuration property Properties props = new Properties () Props.put ("metadata.broker.list", "172.168.63.221 props.put (" serializer.class "," kafka.serializer.StringEncoder "); / / key.serializer.class defaults to serializer.class props.put (" key.serializer.class "," kafka.serializer.StringEncoder ") / / optional configuration. If not, the default partitioner props.put ("partitioner.class", "com.catt.kafka.demo.PartitionerDemo") is used. / / trigger the acknowledgement mechanism, otherwise fire and forget may cause data loss / / if the value is 0, ProducerConfig config 1 may cause data loss, please refer to / / http://kafka.apache.org/08/configuration.html props.put ("request.required.acks", "1"); ProducerConfig config = new ProducerConfig (props) / / create producer Producer producer = new Producer (config); / / generate and send messages long start=System.currentTimeMillis (); for (long I = 0; I)

< events; i++) { long runtime = new Date().getTime(); String ip = "192.168.2." + i;//rnd.nextInt(255); String msg = runtime + ",www.example.com," + ip; //如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0 KeyedMessage data = new KeyedMessage( "page_visits", ip, msg); producer.send(data); } System.out.println("耗时:" + (System.currentTimeMillis() - start)); // 关闭producer producer.close(); } } 消息消费者代码示例 import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; /** * 详细可以参考:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example * * @author Fung * */ public class ConsumerDemo { private final ConsumerConnector consumer; private final String topic; private ExecutorService executor; public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) { consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId)); this.topic = a_topic; } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); } public void run(int numThreads) { Map topicCountMap = new HashMap(); topicCountMap.put(topic, new Integer(numThreads)); Map consumerMap = consumer .createMessageStreams(topicCountMap); List streams = consumerMap.get(topic); // now launch all the threads executor = Executors.newFixedThreadPool(numThreads); // now create an object to consume the messages // int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerMsgTask(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { Properties props = new Properties(); props.put("zookeeper.connect", a_zookeeper); props.put("group.id", a_groupId); props.put("zookeeper.session.timeout.ms", "400"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); return new ConsumerConfig(props); } public static void main(String[] arg) { String[] args = { "172.168.63.221:2188", "group-1", "page_visits", "12" }; String zooKeeper = args[0]; String groupId = args[1]; String topic = args[2]; int threads = Integer.parseInt(args[3]); ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic); demo.run(threads); try { Thread.sleep(10000); } catch (InterruptedException ie) { } demo.shutdown(); } } 消息处理类 import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerMsgTask implements Runnable { private KafkaStream m_stream; private int m_threadNumber; public ConsumerMsgTask(KafkaStream stream, int threadNumber) { m_threadNumber = threadNumber; m_stream = stream; } public void run() { ConsumerIterator it = m_stream.iterator(); while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message())); System.out.println("Shutting down Thread: " + m_threadNumber); } } Partitioner类示例 import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class PartitionerDemo implements Partitioner { public PartitionerDemo(VerifiableProperties props) { } @Override public int partition(Object obj, int numPartitions) { int partition = 0; if (obj instanceof String) { String key=(String)obj; int offset = key.lastIndexOf('.'); if (offset >

0) {partition = Integer.parseInt (key.substring (offset + 1))% numPartitions;}} else {partition = obj.toString () .length ()% numPartitions;} return partition }} this is the end of the sample analysis of the Kafka Java client code. I hope the above content can be helpful to you and learn more. If you think the article is good, you can share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report