In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
What this article shares with you is about the principle of RpcClient sending messages and receiving messages synchronously. The editor thinks it is very practical, so I share it with you to learn. I hope you can get something after reading this article.
The code that uses RpcClient to send messages and receive messages synchronously is simple, as follows:
RpcClient client = new RpcClient (channel, exchange, routingKey)
String msg = "hello world!"
Byte [] result = client.primitiveCall (msg.getBytes ())
After the primitiveCall is called here, the current thread waits synchronously, waiting for a reply message from the message receiver.
A complete legend of sending and receiving reply messages:
The whole process is explained in detail:
L create RpcClient
RpcClient client = new RpcClient (channel, exchange, routingKey)
When you create a RpcClient, you do two things:
A: create a reply queue, and the recipient who receives the message sent by the current RpcClient will send the reply message to this replyQueue for the current RpcClient to receive the reply message.
_ replyQueue = setupReplyQueue ()
Protected String setupReplyQueue () throws IOException {
Return _ channel.queueDeclare ("", false, false, true, true, null) .getQueue ()
/ / here, it is up to rabbitmq server to define a unique queue (because queueName is empty, so it is up to server to generate queueName), and finally returns that the queueName,queueName is generated by server, using the following method:
Queue.DeclareOk queueDeclare (String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete
Map arguments)
}
B: create a consumer that receives reply messages
_ consumer = setupConsumer ()
Protected DefaultConsumer setupConsumer () throws IOException {
/ / create a DefaultConsumer instance that receives messages
DefaultConsumer consumer = new DefaultConsumer (_ channel) {
@ Override / / callback when shutdown occurs
Public void handleShutdownSignal (String consumerTag
ShutdownSignalException signal) {
Synchronized (_ continuationMap) {
For (Entry entry: _ continuationMap.entrySet ()) {
Entry.getValue () set (signal)
}
_ consumer = null
}
}
@ Override / / process message delivery
Public void handleDelivery (String consumerTag
Envelope envelope
AMQP.BasicProperties properties
Byte [] body)
Throws IOException {
/ / this part works with the following code to force asynchronous reception into synchronous reception.
Synchronized (_ continuationMap) {
String replyId = properties.getCorrelationId ()
BlockingCell blocker = _ continuationMap.get (replyId)
_ continuationMap.remove (replyId)
Blocker.set (body)
}
}
}
/ / send the consumer that receives the message to replyQueue to receive the message. This process is asynchronous for the main thread. As long as there is a message on the replyQueue, consumer will go to replyQueue to receive the message and call back its handleDelivery method.
_ channel.basicConsume (_ replyQueue, true, consumer)
Return consumer
}
L send a message
Byte [] result = rpcClient.primitiveCall (msg.getBytes ())
Send a message using rpcClient's primitiveCall to see how it is done
Public byte [] primitiveCall (byte [] message) throws IOException, ShutdownSignalException {
Return primitiveCall (null, message)
}
Keep tracking. The core method is this.
Public byte [] primitiveCall (AMQP.BasicProperties props, byte [] message) throws IOException, ShutdownSignalException {
/ / check whether consumer is empty. If so, throw an exception.
CheckConsumer ()
BlockingCell k = new BlockingCell ()
Synchronized (_ continuationMap) {
_ correlationId++
String replyId = "" >
/ / if props is not empty, set the replyQueue created in the previous step to props, and replyId
If (props! = null) {
Props.setCorrelationId (replyId)
Props.setReplyTo (_ replyQueue)
}
Else {
/ / if props is empty, create one and set both replyId and replyQueue to props
Props = new AMQP.BasicProperties (null, null
Null, replyId
_ replyQueue, null, null, null
Null, null)
}
_ continuationMap.put (replyId, k)
}
/ / use the above props to send the message, so that replyQueue and replyId are passed along to the party that receives the message. The client that receives the message goes to props to get the replyQueue, and it knows the reply queue of the message it receives, and then it sends the reply message to replyQueue. In the previous step, we have specified a consumer to go to replyQueue to get the message, so all the client of sending and receiving the message is carried out in an orderly manner.
Publish (props, message); / / after this line of code is executed, only the message is sent, the reply message is received asynchronously, and the consumer in the previous step receives the reply message
/ / here is to wait for the reply message to be received synchronously, and the core of turning asynchronous reception into synchronous reply reception is here.
Object reply = k.uninterruptibleGet ()
If (reply instanceof ShutdownSignalException) {
ShutdownSignalException sig = (ShutdownSignalException) reply
ShutdownSignalException wrapper =
New ShutdownSignalException (sig.isHardError ()
Sig.isInitiatedByApplication ()
Sig.getReason ()
Sig.getReference ()
Wrapper.initCause (sig)
Throw wrapper
} else {
Return (byte []) reply
}
}
Complete description
Create a RpcClient instance:
1. Define a Map to store information about each message:
Private final Map _ continuationMap = new HashMap ()
Key is a correlationId, which is equivalent to a counter for messages sent by the current rpcClient instance. It is initialized with 0, and each time a message is sent, add 1
Value is a com.rabbitmq.utility.BlockingCell object that is created before the message is sent, associated with the current correlationId, and put in
_ continuationMap.put (correlationId, blockingCell)
2Jol corrupted Id initialized to 0
3, create a reply queue,replyQueue=channel.queueDeclare ("", false, false, true, true, null) .getQueue ()
4, create a consumer that receives reply messages
5. Specify consumer to receive messages on replyQueue, channel.basicConsume (replyQueue, true, consumer)
RpcClient sends a message:
1, create a BlockingCell object blockingCell
1minute corrupted Idcast +
2. Create a BasicProperties object and set correlationId,replyQueue to it. When you send a message, it will be passed to the receiver.
3. Put blockingCell into _ continuationMap with correlationId as Key
4. Send a message: channel.basicPublish (exchange, routingKey, the BasicProperties object obtained in the above step, message)
5. Get the reply message, Object reply = blockingCell.uninterruptibleGet (). Here, wait for the reply message synchronously
RpcServer receives messages:
1, receive messages
2, get the BasicProperties object requestProperties,requestProperties=request.getProperties () from request
3, get correlationId,replyQueue from requestProperties
4, create a BasicProperties object replyProperties for reply messages, and set correlationId to it
4. Send a reply message: channel.basicPublish ("", replyQueue, replyProperties, replyMessage)
RpcClient receives a reply:
1 consumer will receive and call back the handleDelivery method of consumer as soon as there is a message from consumer
2. Get the passed BasicProperties and get the correlationId
3. Fetch the BlockingCell object from continuationMap according to correlationId, BlockingCell blocker = continuationMap.get (correlationId)
4, delete from continuationMap, continuationMap.remove (correlationId)
5. Set the reply message to the blocker object, blocker.set (replyMessage)
Synchronize waiting for reply message:
1, [RpcClient send message] step 4 main thread. After sending the message, step 5 goes to get the reply message.
2, [RpcClient send message] step 5 main thread, blockingCell.uninterruptibleGet (). If the blockingCell has not been passed by set (value), then leave the current main thread waiting for wait (), waiting.
3, [RpcClient receives reply] step 5 blocker.set (replyMessage); the blocker here is actually the blockingCell created by the main thread above, because it is fetched from the continuationMap according to correlationId. Set (replyMessage), blocker will save the replyMessage with an attribute for get to return, and then call notify () Wake up the main thread that is waiting (the main thread in the current step and the main thread in the previous step are in two threads, so the waiting of the main thread can be awakened by this thread). After the main thread is awakened, get () will get the replyMessage, and finally the whole step implements the mandatory conversion of asynchronous reception to synchronous waiting for reception.
BlockingCell class
Public class BlockingCell {
Private boolean _ filled = false
Private T _ value
Private static final long NANOS_IN_MILLI = 1000 * 1000
Private static final long INFINITY =-1
Public BlockingCell () {
}
Public synchronized T get () throws InterruptedException {
While (! _ filled) {/ / if value has not been set
Wait (); / / keep the current thread waiting until another thread calls the current object's notify () or notifyAll ()
}
Return _ value
}
/ / get with timeout
Public synchronized T get (long timeout) throws InterruptedException, TimeoutException {
If (timeout < 0 & & timeout! = INFINITY)
Throw new AssertionError ("Timeout cannot be less than zero")
If (! _ filled & & timeout! = 0) {
Wait (timeout = = INFINITY? 0: timeout)
}
If (! _ filled)
Throw new TimeoutException ()
Return _ value
}
/ / wait indefinitely until the value is reached
Public synchronized T uninterruptibleGet () {
While (true) {
Try {
Return get ()
} catch (InterruptedException ex) {
}
}
}
Public synchronized T uninterruptibleGet (int timeout) throws TimeoutException {
Long now = System.nanoTime () / NANOS_IN_MILLI
Long runTime = now + timeout
Do {
Try {
Return get (runTime-now)
} catch (InterruptedException e) {
}
} while ((timeout = = INFINITY) | ((now = System.nanoTime () / NANOS_IN_MILLI) < runTime))
Throw new TimeoutException ()
}
Public synchronized void set (T newValue) {
If (_ filled) {
Throw new AssertionError ("BlockingCell can only be set once")
}
_ value = newValue
_ filled = true
Notify (); / / Wake up the current thread (waiting)
}
/ / guarantee can only be made by set (value) once
Public synchronized boolean setIfUnset (T newValue) {
If (_ filled) {
Return false
}
Set (newValue)
_ filled = true
Return true
}
}
These are the principles of RpcClient sending messages and receiving messages synchronously. The editor believes that there are some knowledge points that we may see or use in our daily work. I hope you can learn more from this article. For more details, please follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.