In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-09 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly introduces "the use of the kafka client". In the daily operation, I believe that many people have doubts about the use of the kafka client. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about the use of the kafka client. Next, please follow the editor to study!
The kafka client issues record (messages) to the kafka cluster.
New producers are thread-safe, sharing a single producer instance between threads, and a single instance is usually faster than multiple instances.
A simple example is to use producer to send an ordered key/value (key-value pair), which can be run directly in the main method of java
Properties props = new Properties (); props.put ("bootstrap.servers", "localhost:9092"); props.put ("acks", "all"); props.put ("retries", 0); props.put ("batch.size", 16384); props.put ("linger.ms", 1); props.put ("buffer.memory", 33554432); props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") Props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer (props); for (int I = 0; I < 100; iTunes +) producer.send (new ProducerRecord ("my-topic", Integer.toString (I), Integer.toString (I)); producer.close ()
The producer's buffer pool retains messages that have not been sent to the server, and the background I / O thread is responsible for converting these messages into requests to be sent to the cluster. If the producer is not turned off after use, these resources will be disclosed.
The send () method is asynchronous, adds a message to the buffer waiting to be sent, and returns immediately. Producers send individual messages together in batches to improve efficiency.
Ack is the condition for determining whether the request is complete (that is, to determine whether it was sent successfully). We specified that "all" will block messages, which has the lowest performance, but is the most reliable.
Retries, if the request fails, the producer will automatically retry. We specify 0 times. If retry is enabled, there is the possibility of repeating the message.
Producer (producer) caches messages that are not sent by each partition. The size of the cache is specified by batch.size configuration. A higher value will result in a larger batch. And more memory is required (because each "active" partition has a buffer).
The default buffer can be sent immediately, even if the buffer space is not full, but if you want to reduce the number of requests, you can set linger.ms greater than 0. This instructs the producer to wait for a period of time before sending the request, hoping for more messages to fill the unfilled batch. This is similar to TCP's algorithm, such as the code snippet above, where 100 messages may be sent in a single request, because we set the linger time to 1 millisecond, and then, if we do not fill the buffer, this setting will increase the delay request by 1 millisecond to wait for more messages. It is important to note that under high loads, similar times generally form batches, even if it is linger.ms=0. Without a high load, if the setting is greater than 0, you will get fewer, more efficient requests at a small delay cost.
Buffer.memory controls the total amount of cache available to the producer, and this cache space will be exhausted if the message is sent faster than it is transmitted to the server. When the cache runs out of space, other send calls will be blocked, the threshold of blocking time will be set by max.block.ms, and then it will throw a TimeoutException.
The key.serializer and value.serializer examples convert the user-supplied key and value object ProducerRecord into bytes, and you can use the accompanying ByteArraySerializaer or StringSerializer to handle simple string or byte types.
Send () public Future send (ProducerRecord record,Callback callback)
Asynchronously sends a message to topic and calls callback (when the delivery has been confirmed).
The send is asynchronous and this method returns as soon as the message is saved in the message cache waiting to be sent. In this way, multiple messages are sent in parallel without blocking to wait for the response of each message.
The result of sending is a RecordMetadata that specifies the partition in which the message is sent, the assigned offset, and the timestamp of the message. If topic uses CreateTime, the timestamp provided by the user or the time sent is used (if the user does not specify the timestamp of the message) if topic uses LogAppendTime, the timestamp is the local time of the broker when the message is appended.
Because the send call is asynchronous, it returns a Future for the RecordMetadata that allocates the message. If future calls get (), it blocks until the relevant request completes and returns the metadata of the message, or throws a send exception.
To simulate a simple blocking call, you can call the get () method.
Byte [] key = "key" .getBytes (); byte [] value = "value" .getBytes (); ProducerRecord record = new ProducerRecord ("my-topic", key, value) producer.send (record). Get ()
If there is no blocking at all, you can use the callback provided by the callback parameter to notify the callback when the request is completed.
ProducerRecord record = new ProducerRecord ("the-topic", key, value); producer.send (myRecord, new Callback () {public void onCompletion (RecordMetadata metadata, Exception e) {if (e! = null) e.printStackTrace (); System.out.println ("The offset of the record we just sent is:" + metadata.offset ()) })
Message callbacks sent to the same partition are guaranteed to be executed in a certain order, that is, in the following example, callback1 guarantees that callback2 is executed before:
Producer.send (new ProducerRecord (topic, partition, key1, value1), callback1); producer.send (topic, partition, key2, value2), callback2)
Note: callback is usually executed in the producer's IMaple O thread, so it is quite fast, otherwise messages from other threads will be delayed. If you need to perform blocking or calculate expensive (expensive) callbacks, it is recommended to use your own Executor in the callback body for parallel processing.
Pecified by:
Send in interface Producer
Parameters:
Record-record sent (message)
Callback-the callback provided by the user, which is called by the server to reply to the result (null indicates no callback).
Throws:
InterruptException-if the thread is blocking the interrupt.
SerializationException-if key or value is not a serializers for a given valid configuration.
TimeoutException-if it takes longer to get metadata or message allocation memory than max.block.ms.
KafkaException-Kafka related errors (exceptions that are not part of the public API).
At this point, the study on "how to use the kafka client" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.