In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-20 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the relevant knowledge of "RocketMq advanced source code learning how to achieve producer sending messages". In the operation of actual cases, many people will encounter such a dilemma. Next, let the editor lead you to learn how to deal with these situations. I hope you can read it carefully and be able to achieve something!
Producers of RocketMq advanced source code learning send messages
On the production side of RocketMq, the most important thing is to send messages. Producers send messages in three modes: synchronous / asynchronous / one-way, and the processing methods of each mode are also different. The most important thing is synchronous / asynchronous processing, and one-way application scenarios are rare (generally suitable for scenarios that do not require high message reliability, such as sending logs). This paper will mainly analyze synchronous / asynchronous.
The old rule, starting with an instance, is the simplest code to send a message.
DefaultMQProducer producer = new DefaultMQProducer ("ProducerGroupName"); producer.start (); Message msg=new Message (); / synchronously send SendResult sendResult = producer.send (msg); / / send the message asynchronously, and the result is returned to the client through callback. Producer.sendAsync (msg, new SendCallback () {@ Override public void onSuccess (final SendResult sendResult) {/ / message was sent successfully. System.out.println ("send message success. Topic=" + sendResult.getTopic () + ", msgId=" + sendResult.getMessageId ());} @ Override public void onException (OnExceptionContext context) {/ / the message failed and needs to be retried. You can resend the message or persist the data for compensation processing. System.out.println ("send message failed. Topic=" + context.getTopic () + ", msgId=" + context.getMessageId ());}})
First look at the synchronization, enter the send method, through N send chain calls, enter the real logic processing method. Looking back at the asynchronous sending, in fact, except for passing in a SendCallback at the beginning and making it clear that CommunicationMode is Async in the first few chained calls, we finally enter the next method, and both Async/Sync/Oneway are handled here.
The treatment here is 1. First check whether producer has started 2. 0. Then check whether the content of message is compliant (for example, whether Topic is empty, whether the Topic character is legal, etc.). I have made a mistake in this character verification. The content of Topic cannot contain spaces. Be sure to check the brokerClusterName of the configuration file. When responding to request messages, you will default to clusterName on the value of Topic) 3. According to the value of Topic to find the routing information of Topic 4. Check the send mode. If it is synchronous, it will be sent only once. Asynchronous will retry if the transmission fails. The maximum number of retries to try to configure is + 1.5. Select one of the multiple MessageQueue in Topic to send a message
Private SendResult sendDefaultImpl (Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback/**sendCallback runs from beginning to end to receive the result of the actual message sent * /, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {/ * * check whether producer has been started * / this.makeSureStateOK () / * * verify the contents of message * / Validators.checkMessage (msg, this.defaultMQProducer); final long invokeID = random.nextLong (); long beginTimestampFirst = System.currentTimeMillis (); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; / * * find topic information * / TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo (msg.getTopic ()) If (topicPublishInfo! = null & & topicPublishInfo.ok ()) {boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; / * * Asynchronous can retry up to the number of retries of 1 + configuration, and synchronization can only try once at most * / int timesTotal = communicationMode = = CommunicationMode.SYNC? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed (): 1 Int times = 0; / * * record which broker*/ String [] brokersSent = new String [timesTotal]; for (; times
< timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); /**所以发送消息的负载均衡实在客户端实现的,默认是从Topic的所有的messageQueue中指定一个 * 这里使用了ThreadLocal,将messageQueue的index存在threadLocal中,这样之后如果重试消息, * 还能拿到之前的index,然后基于之前的选择的messageQueue重新选择,默认是每次index+1*/ MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times >Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.