In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "how to achieve the kafka client that publishes records to the kafka cluster". In the daily operation, I believe that many people have doubts about how to achieve the kafka client that publishes records to the kafka cluster. The editor consulted all kinds of materials and sorted out simple and easy-to-use methods of operation. I hope it will be helpful to answer the doubts of "how to achieve the kafka client that publishes records to the kafka cluster". Next, please follow the editor to study!
Producers are thread-safe, and it is usually faster for multiple threads to share the same producer instance than multiple producer instances.
Here is a simple example of using producer to send string data, including key and value.
Properties props = new Properties (); props.put ("bootstrap.servers", "localhost:9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") Props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer (props); for (int I = 0; I
< 100; i++) producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); 一个producer由几部分组成:1、一个buff poll,保存尚未发送的数据;2、一个后台运行的I/O线程,负责执行数据发送。producer使用完毕后,务必执行close操作,否则将会造成资源的泄漏。 send()方法是异步的。当调用它时,它将记录添加到缓冲区中,并立即返回。这使得producer能够批量的执行数据的生产。 acks有3个可能的值,0:客户端不必等待任何的server响应;1:leader of partition将会在把数据写入自己的log之后,响应客户端,而不必等待其他的follower完成同步的操作;all:leader和follower全部完成log写入操作。服务器才会响应客户端。相比之下,all最慢但是可靠性更好。 如果请求失败,生产者可以自动重试,但是我们已经设置retries = 0,那么重试将不会发生。如果我们开启了重试,可能会出现重复记录的问题。 producer保持每个partition的未发送数据的缓冲区。这些缓冲的大小由batch.size配置指定。如果增大这个配置,可以一次性执行更大的批量操作,但需要更多的内存(因为我们通常会有一个缓冲区为每个partition)。 默认情况下,缓冲区可以立即发送,即使在缓冲区中有额外的未使用的空间。但是如果你想减少请求的数量,可以设置linger.ms >0 . Producer waits for a period of time (in milliseconds) before sending, in order to get a larger batch operation. For example, in the code snippet above, if you set linger.ms = 1, 100 records may be sent in batches. However, if no more data reaches the buffer within 1 millisecond, then the 1 millisecond wait only increases the delay without any positive effect. It is important to note that if a large amount of data reaches the buffer in a short period of time, a batch operation will occur even if linger.ms = 0.
Buffer.memory controls the total amount of buffer memory provided to producer, and if the write rate of the buffer is longer than the output rate, the buffer will be exhausted. When the buffer is exhausted, additional send calls will be blocked. After blocking for a period of time (max.block.ms), a TimeoutException will be thrown.
Key.serializer and value.serializer are responsible for converting key and value in record into byte arrays, respectively, and kafka provides a simple set of serialized class.
At this point, the study on "how to implement the kafka client for publishing records to the kafka cluster" is over. I hope to be able to solve everyone's doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
Http://linux5588.blog.51cto.com/65280/1293677
© 2024 shulou.com SLNews company. All rights reserved.