Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The realization method of kafka javaAPI Storage Program

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly explains the "kafka javaAPI storage program implementation method", the article explains the content is simple and clear, easy to learn and understand, the following please follow the editor's train of thought slowly in depth, together to study and learn "kafka javaAPI storage program implementation method"!

Explain maven package org.apache.kafka kafka-clients 2.3.0 connection kafkaProperties props = new Properties (); props.put ("acks", "all"); / / ensure that all copies receive the message props.put ("bootstrap.servers", Config.ipList); / / multiple props.put ("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") can be set Props.put ("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put ("retries", "2"); KafkaProducer produce= new KafkaProducer (props); kerberos certification

Kerberos is the security authentication strategy of big data platform, which can be completed in advance when the project is started. Two implementations are introduced here.

Mode one

Specify certification documents

/ / load keberos configuration file System.setProperty ("java.security.krb5.conf", "/ etc/krb5.conf"); / / load kerberos user file System.setProperty ("java.security.auth.login.config", "/ etc/kafka/conf/kafka_jaas.conf")

Sometimes, considering the user switching, different machines, different user information, each has to be set through the configuration file, which is more troublesome, consider using the temporary file function started by java (mainly show off-smile).

/ / load keberos configuration file System.setProperty ("java.security.krb5.conf", "/ etc/krb5.conf"); KafkaUtil.configureJAAS (Config.tabFile, Config.principal) / / user and authentication file / * generate jaas.conf temporary file * @ param keyTab tab authentication file location * @ param principal authentication user * / public static void configureJAAS (String keyTab String principal) {String JAAS_TEMPLATE = "KafkaClient {\ n" + "com.sun.security.auth.module.Krb5LoginModule required\ n" + "useKeyTab=true\ n" + "keyTab=\"% 1$ s\ "\ n" + "principal=\"% 2$ s\ " \ n "+"}; String content = String.format (JAAS_TEMPLATE, keyTab, principal); File jaasConf = null; PrintWriter writer = null; try {jaasConf = File.createTempFile ("jaas", ".conf"); writer = new PrintWriter (jaasConf); writer.println (content);} catch (IOException e) {e.printStackTrace () } finally {if (writer! = null) {writer.close ();} jaasConf.deleteOnExit ();} System.setProperty ("java.security.auth.login.config", jaasConf.getAbsolutePath ());} Application

In actual online use, the following optimizations should be made, taking into account the efficiency and stability of data transmission.

Transport class is thread class, thread pool management, increase transmission efficiency.

Upload data in bulk.

Add Callback processing mechanism to avoid data loss.

The upload thread class is as follows.

Public class Performance extends Thread {private final static Logger log = LoggerFactory.getLogger (Performance.class); private List recordList; public Performance (List recordList) {this.recordList=recordList;} / * Test method * / public static void test () {log.info ("Kafka Tool Test") Try {/ * parse args * / String topicName = "test40"; / * Total number of packets * / long numRecords = 10000000000L; / * packet size * / int recordSize = 1500 / * maximum number of packets sent at a time * / int throughput = 1000000000; Properties props = new Properties (); props.put ("acks", "1"); props.put ("bootstrap.servers", "ip:6667,ip:6667") Props.put ("sasl.kerberos.service.name", "kafka"); props.put ("security.protocol", "SASL_PLAINTEXT"); props.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") Props.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); KafkaProducer producer = new KafkaProducer (props); / * create test data * / byte [] payload = new byte [recordSize] Random random = new Random (0); for (int I = 0; I

< payload.length; ++i) payload[i] = (byte) (random.nextInt(26) + 65); /*创建测试数据发送对象*/ ProducerRecord record = new ProducerRecord(topicName, payload); /*测试数据模型 包总数*/ Stats stats = new Stats(numRecords, 5000); /*启动时间*/ long startMs = System.currentTimeMillis(); /*帮助生成者发送流量类 每次最多发送包数 时间*/ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); for (int i = 0; i < numRecords; i++) { long sendStartMs = System.currentTimeMillis(); Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats,record.topic(),record.value()); producer.send(record, cb); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } } /* 结束任务 */ producer.close(); stats.printTotal(); } catch (Exception e) { log.info("Test Error:"+e); } } /** * 实际入库方法 */ @Override public void run() {// log.info("Start To Send:"); super.run(); KafkaUtil kafkaUtil=new KafkaUtil(); KafkaProducer produce=kafkaUtil.create(); //总包数 long size=recordList.size();// size=10000000000L; /*每次最多发送包数*/ int throughput = 900000;// throughput = 10000000; /*测试数据模型 包总数*/ Stats stats = new Stats(size, 5000); /*启动时间*/ long startMs = System.currentTimeMillis(); /*帮助生成者发送流量类 每次最多发送包数 时间*/ ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); int i=0; for (ProducerRecord record:recordList) { long sendStartMs = System.currentTimeMillis(); //参数说明:发送数据时间 数据长度 数据模型类 Callback cb = stats.nextCompletion(sendStartMs, record.value().length, stats,record.topic(),record.value()); produce.send(record,cb); if (throttler.shouldThrottle(i, sendStartMs)) { throttler.throttle(); } i++; } produce.close();// stats.printTotal();// log.info("End to Send"); log.info("Finish Data To Send"); LogModel.sendNum++; } private static class Stats { private long start; private long windowStart; private int[] latencies; private int sampling; private int iteration; private int index; private long count; private long bytes; private int maxLatency; private long totalLatency; private long windowCount; private int windowMaxLatency; private long windowTotalLatency; private long windowBytes; private long reportingInterval; public Stats(long numRecords, int reportingInterval) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.index = 0; this.iteration = 0; this.sampling = (int) (numRecords / Math.min(numRecords, 500000)); this.latencies = new int[(int) (numRecords / this.sampling) + 1]; this.index = 0; this.maxLatency = 0; this.totalLatency = 0; this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; this.reportingInterval = reportingInterval; } public void record(int iter, int latency, int bytes, long time) { this.count++; this.bytes += bytes; this.totalLatency += latency; this.maxLatency = Math.max(this.maxLatency, latency); this.windowCount++; this.windowBytes += bytes; this.windowTotalLatency += latency; this.windowMaxLatency = Math.max(windowMaxLatency, latency); if (iter % this.sampling == 0) { this.latencies[index] = latency; this.index++; } /* maybe report the recent perf */ if (time - windowStart >

= reportingInterval) {printWindow (); newWindow ();}} public Callback nextCompletion (long start, int bytes, Stats stats,String topic,byte [] data) {Callback cb = new PerfCallback (this.iteration, start, bytes, stats,topic,data); this.iteration++; return cb } / * Transmission efficiency feedback * / public void printWindow () {long ellapsed = System.currentTimeMillis ()-windowStart; double recsPerSec = 1000.0 * windowCount / (double) ellapsed; double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0) System.out.printf ("% d spend time,%d records sent,% .1f records/sec (% .2f MB/sec),% .1f ms avg latency,% .1f max latency.\ n", ellapsed, windowCount, recsPerSec, mbPerSec WindowTotalLatency / (double) windowCount, (double) windowMaxLatency) } public void newWindow () {this.windowStart = System.currentTimeMillis (); this.windowCount = 0; this.windowMaxLatency = 0; this.windowTotalLatency = 0; this.windowBytes = 0 } / * Transmission efficiency * / public void printTotal () {long elapsed = System.currentTimeMillis ()-start; double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0) Int [] percs = percentiles (this.latencies, index, 0.5,0.95,0.99,0.999) System.out.printf ("% d spend time,%d records sent,% f records/sec (% .2f MB/sec),% .2f ms avg latency,% .2f ms max latency,% d ms 50th,% d ms 95th,% d ms 99th,% d ms 99.9th.\ n", elapsed, count RecsPerSec, mbPerSec, totalLatency / (double) count, (double) maxLatency, percs [0], percs [1] Percs [2], percs [3]) } private static int [] percentiles (int [] latencies, int count, double... Percentiles) {int size = Math.min (count, latencies.length); Arrays.sort (latencies, 0, size); int [] values = new int [percentiles.length]; for (int I = 0; I < percentiles.length; iTunes +) {int index = (int) (percentiles [I] * size); values [I] = latencies [index] } return values;}} private static final class PerfCallback implements Callback {private final long start; private final int iteration; private final int bytes; private final Stats stats; private final String topic; private final byte [] data; public PerfCallback (int iter, long start, int bytes, Stats stats,String topic,byte [] data) {this.start = start This.stats = stats; this.iteration = iter; this.bytes = bytes; this.topic=topic; this.data=data;} public void onCompletion (RecordMetadata metadata, Exception exception) {long now = System.currentTimeMillis (); int latency = (int) (now-start); this.stats.record (iteration, latency, bytes, now) If (exception! = null) {ProducerRecord record=new ProducerRecord (topic,data); / / add data to the data queue and upload ControlTask.recordList.add (record); log.error ("Send Error And Second To Send", exception);}

KafkaUtil.java

Public class KafkaUtil {/ / private final static Logger log = LoggerFactory.getLogger (KafkaUtil.class); private KafkaProducer produce; / * create connection * @ return * / public KafkaProducer create () {Properties props = new Properties (); props.put ("acks", "all") Props.put ("bootstrap.servers", Config.ipList); props.put ("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); / / props.put (ProducerConfig.MAX_BLOCK_MS_CONFIG, 120000) / / increase waiting time props.put ("retries", "2"); / / kerbores security certification if (Config.kerberos==0) {props.put ("security.protocol", "SASL_PLAINTEXT"); props.put ("sasl.mechanism", "GSSAPI") Props.put ("sasl.kerberos.service.name", "kafka");} produce = new KafkaProducer (props); return produce } / * send data * @ param record * @ param cb * / public void send (ProducerRecord record,Callback cb) {produce.send (record,cb) } / * close connection * @ param produce * / public void close () {produce.flush (); produce.close () } / * generate jaas.conf temporary file * @ param keyTab tab authentication file location * @ param principal authenticated user * / public static void configureJAAS (String keyTab String principal) {String JAAS_TEMPLATE = "KafkaClient {\ n" + "com.sun.security.auth.module.Krb5LoginModule required\ n" + "useKeyTab=true\ n" + "keyTab=\"% 1$ s\ "\ n" + "principal=\"% 2$ s\ " \ n "+"}; String content = String.format (JAAS_TEMPLATE, keyTab, principal); File jaasConf = null; PrintWriter writer = null; try {jaasConf = File.createTempFile ("jaas", ".conf"); writer = new PrintWriter (jaasConf); writer.println (content) } catch (IOException e) {e.printStackTrace ();} finally {if (writer! = null) {writer.close ();} jaasConf.deleteOnExit ();} System.setProperty ("java.security.auth.login.config", jaasConf.getAbsolutePath ()) }} Thank you for your reading, the above is the content of "the implementation method of kafka javaAPI storage program". After the study of this article, I believe you have a deeper understanding of the implementation method of kafka javaAPI storage program, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report