In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the relevant knowledge of "what are the methods of Kafka producers and reliability assurance ACK". In the operation of actual cases, many people will encounter such a dilemma, so let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Producer message sending process
The overall process of sending messages, the production side is mainly coordinated by two threads. They are the mainthread and the sender thread (sending thread).
In the Kafka (version 2.6.0) source code, you can see.
Source address: kafka\ clients\ src\ main\ java\ org.apache.kafka.clients.producer.KafkaProducer.java Test entry: KafkaProducerTest.testInvalidGenerationIdAndMemberIdCombinedInSendOffsets ()
When the KafkaProducer is created, a Sender object is created at 430, and an IO thread is started.
This.errors = this.metrics.sensor ("errors"); this.sender = newSender (logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + "|" + clientId;this.ioThread = new KafkaThread (ioThreadName, this.sender, true); this.ioThread.start (); interceptor
The role of interceptor is to customize messages, similar to: spring Interceptor, plug-ins for MyBatis, listeners for Quartz.
@ Overridepublic Future send (ProducerRecord record, Callback callback) {/ / intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors.onSend (record); return doSend (interceptedRecord, callback);}
The customizer can be developed by implementing the org.apache.kafka.clients.producer.ProducerInterceptor interface.
Simple customization example:
Public class CustomInterceptor implements ProducerInterceptor {/ / triggers @ Override public ProducerRecord onSend (ProducerRecord record) {System.out.println ("trigger when sending message"); return record;} / / triggers @ Override public void onAcknowledgement (RecordMetadata metadata, Exception exception) {System.out.println ("message is received by the server") when receiving ACK from the server. } @ Override public void close () {System.out.println ("producer shutdown");} / / trigger @ Override public void configure (Map configs) {System.out.println ("configure...") when configured with key-value pairs;}} / / add List interceptors = new ArrayList () to the producer; interceptors.add ("com.freecloud.plug.kafka.interceptor.CustomInterceptor") Props.put (ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); serialize byte [] serializedKey;try {serializedKey = keySerializer.serialize (record.topic (), record.headers (), record.key ()) } catch (ClassCastException cce) {throw new SerializationException ("Can't convert key of class" + record.key (). GetClass (). GetName () + "to class" + producerConfig.getClass (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG). GetName () + "specified in key.serializer", cce);} byte [] serializedValue;try {serializedValue = valueSerializer.serialize (record.topic (), record.headers (), record.value ()) } catch (ClassCastException cce) {throw new SerializationException ("Can't convert value of class" + record.value (). GetClass (). GetName () + "to class" + producerConfig.getClass (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG). GetName () + "specified in value.serializer", cce);}
The corresponding serialization tools are made for different data types in kafka. If you need to customize the implementation of org.apache.kafka.common.serialization.Serializer interface.
Router (divider) int partition = partition (record, serializedKey, serializedValue, cluster); message accumulator RecordAccumulator.RecordAppendResult result = accumulator.append (tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs)
/ / RecordAccumulator is essentially a ConcurrentMap:
Private final ConcurrentMap batches
One partition and one Batch. When the batch is full, the Sender thread is awakened to send a message.
If (result.batchIsFull | | result.newBatchCreated) {log.trace ("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic (), partition); this.sender.wakeup ();} data reliability guarantee ACK
How does the producer send a message to the server to ensure that the server receives the message? If there is a problem with the network during the sending process, or if there is a problem with the kafka server when receiving it, the message fails and the producer does not know.
So the kafka server needs to use a way to respond to the client, only after the server confirms, the producer sends a message, otherwise resend the data.
Then when will it be considered a successful reception? Because the message is stored in a different broker, it responds to the producer after it is written to disk.
Server response policy
In a distributed scenario, it is not enough for only one broker to be written successfully, and if there are multiple copies, the follower has to be written successfully.
There are several strategies for the server to send ACK to the producer.
As long as the leader is successfully received, the copy will be inconsistent with the leader, and there may be a risk of data loss if there is a problem with the leader. The client has the shortest wait time.
More than half of the follower nodes are required to complete synchronization. In this way, the client waits a little longer than above, but it ensures that there are no problems in most scenarios.
All follwer are required to be synchronized, and the client has the longest wait time, but the impact if the node dies is relatively minimal, because the data of all nodes is complete.
Kafka's ACK response mechanism uses the above three ways. It can be configured by configuring the acks parameter.
ISR (in-sync replica set)
The third way above is to ensure the success of all follower synchronization data?
Suppose that leader receives data and all follower start to synchronize data, but there is a problem with one follower and cannot synchronize data from leader. According to this rule, leader has to wait all the time and cannot return ack, thus becoming a black sheep.
So what if we solve this problem? Next, let's change the rules so that not all follower have the right to make leader wait, but only when those working follower synchronize data.
Maintain copies that are normally synchronized with leader and put them in a dynamic set, which is called in-sync replica set (ISR). As long as the follower in the ISR has synchronized the data, you can send the ACK to the client.
For follower with frequent problems, you can set replica.lag.time.max.ms=30 (default is 30 seconds). If the configuration time is exceeded, it will be removed from the isr.
Parameters indicate that acks = 0Producer does not wait for broker's ack,brokder to return as soon as it is received that it has not been written to disk, and data may be lost when brokder fails; acks = 1Producer waits for brokder's ack,partition 's leader to successfully set down and returns ack; if leader fails before follower synchronization succeeds, data will be lost; acks =-1producer waits for brokder's ack,partition 's leader and follower to be all successful before returning ack
The performance of the above three mechanisms decreases successively (producer throughput decreases), while data robustness increases sequentially. In actual development, different strategies can be chosen according to different scenarios.
This is the end of the introduction of "what are the methods of Kafka producers and reliability assurance ACK". Thank you for reading. If you want to know more about the industry, you can follow the website, the editor will output more high-quality practical articles for you!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.