In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-04-11 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
Editor to share with you how to use rabbitmq in .net, I believe most people do not know much about it, so share this article for your reference, I hope you can learn a lot after reading this article, let's learn about it!
What is rabbitMQ?
RabbitMQ is an open source implementation of AMQP (Advanced Message Queue Advanced message queuing Protocol) developed by erlang.
Ability to implement asynchronous message processing
RabbitMQ is a message broker: it accepts and forwards messages.
You can think of it as a post office: when you put the mail you want to post in your mailbox, you can be sure that the postman will eventually send it to your recipient. In this analogy, RabbitMQ is a post office box, post office and postman.
The main difference between RabbitMQ and the post office is that it does not process paper, but accepts, stores and forwards binary data blocks.
Advantages: asynchronous message processing
Business decoupling (placing orders: deducting inventory, generating orders, sending red packets, sending text messages)
The main process of order operation will be: deduct inventory, generate order
Then complete the notification, send red packets and send text messages through the MQ message queue
Off-peak flow control (MQ message queuing mechanism is implemented in the case of a large number of notifications, messages, orders, and less traffic in the off-season)
Flexible routing (Flexible Routing)
The message is routed through the Exchange before it enters the queue. For typical routing functions, RabbitMQ already provides some built-in Exchange to implement. For more complex routing functions, you can bind multiple Exchange together and implement your own Exchange through a plug-in mechanism.
RabbitMQ website port number: 15672
The port implemented in the program is: 5672
Key terms of Rabbitmq
1. Binding: bind Queue and Exchange according to routing rules.
2. Routing key (Routing Key): Exchange delivers messages based on keywords.
3. Switch (Exchange): specified messages enter the specified queue according to routing rules
4. Message queue (Queue): the storage carrier of messages.
5. Producer: the publisher of the message.
6. Consumer: the recipient of the message.
The operation of Rabbitmq
As can be seen from the following figure, the Publisher first sends the message to the switch (Exchange), and then sends the message from the switch to the specified queue (Queue). The switch has previously declared the binding relationship between the switch and the queue, and finally the consumer (Customer) consumes the specified queue message by subscribing or actively fetching the specified queue message.
Then the subscription and active fetch just mentioned can be understood as push (passive) and pull (active).
Push, as long as a message is added to the queue, idle consumers will be notified to spend. (I'm not looking for you, just wait for you to find me, observer mode)
Pull, will not notify the consumer, but by the consumer to take the initiative to take turns or regularly to get the queue message. (I came to you only when I needed it)
Let me give you an example in the use scenario. If there are two systems, the order system and the shipping system, the delivery message instruction is initiated from the order system. In order to deliver the goods in time, the shipping system needs to subscribe to the queue and process as long as there are any instructions.
However, the program occasionally gives an exception, such as a network or DB timeout, and throws the message into the failure queue, which requires a retransmission mechanism. But I don't want to while (IsPostSuccess = = True), because as long as there is an exception, there will be an exception within a certain period of time, so there is no point in retrying.
At this time, there is no need to process the message in time. There is a JOB regularly or every few minutes (number of failures * minutes) to pick up the failed queue message and resend it.
Encapsulation of Publish (release)
Step: initialize the link-> declare the switch-> declare the queue-> change the machine to bind to the queue-> publish the message. Note that I saved Model in ConcurrentDictionary because declarations and bindings are very time-consuming, and secondly, sending messages to duplicate queues does not require reinitialization.
/ switch declaration / switch / / switch type: / 1, Direct Exchange-handles routing keys. A queue needs to be bound to the switch, requiring the message to match exactly / with a specific routing key. This is a complete match. If a queue is bound to the switch and requires the routing key "dog", only the / messages marked "dog" will be forwarded, dog.puppy will not be forwarded, dog.guard will not be forwarded, only dog / 2 will be forwarded, Fanout Exchange-routing keys will not be processed. You just need to simply bind the queue to the switch. A message sent to the switch is forwarded to all queues bound to the switch. Much like subnet broadcasts, hosts in each subnet get a copy of the message. The Fanout / / switch is the fastest to forward messages. / 3, Topic Exchange-matches the routing key to a pattern. At this point the queue needs to be bound to a mode. The symbol "#" matches one or more / / words, and the symbol "*" matches no more, no less word. So "audit.#" matches "audit.irs.corporate", but "audit.*" / / only matches "audit.irs". / / persistence / / automatically delete / Parameter private static void ExchangeDeclare (IModel iModel, string exchange, string type = ExchangeType.Direct, bool durable = true, bool autoDelete = false, IDictionary arguments = null) {exchange = exchange.IsNullOrWhiteSpace ()? ": exchange.Trim () IModel.ExchangeDeclare (exchange, type, durable, autoDelete, arguments) } / / queue declaration / / queue / / persistence / exclusive queue, if a queue is declared as an exclusive queue, it is only visible to the connection that first declared it, and is automatically deleted when the connection is disconnected. There are three points to note here: first, the exclusive queue is based on the visible connection, and different channels of the same connection can / to access the exclusive queue created by the same connection at the same time. Second, "for the first time", if a connection has declared an exclusive queue, other connections / connections are not allowed to establish an exclusive queue with the same name, which is different from the normal queue. Third, even if the queue is persistent, the exclusive queue will be deleted automatically once the connection is closed or / / the client exits. This kind of queue is suitable for application scenarios where only one client sends read messages. / / automatically delete / Parameter private static void QueueDeclare (IModel channel, string queue, bool durable = true, bool exclusive = false, bool autoDelete = false, IDictionary arguments = null) {queue = queue.IsNullOrWhiteSpace ()? "UndefinedQueueName": queue.Trim (); channel.QueueDeclare (queue, durable, exclusive, autoDelete, arguments) } / get Model / switch name / / queue name / whether to persist / private static IModel GetModel (string exchange, string queue, string routingKey, bool isProperties = false) {return ModelDic.GetOrAdd (queue) Key = > {var model = _ conn.CreateModel () ExchangeDeclare (model, exchange, ExchangeType.Fanout, isProperties); QueueDeclare (model, queue, isProperties); model.QueueBind (queue, exchange, routingKey); ModelDic [queue] = model; return model;}) } / publish messages / routing keys / queue information / switch name / / queue name / whether to persist / public void Publish (string exchange, string queue, string routingKey, string body) Bool isProperties = false) {var channel = GetModel (exchange, queue, routingKey, isProperties) Try {channel.BasicPublish (exchange, routingKey, null, body.SerializeUtf8 ());} catch (Exception ex) {throw ex.GetInnestException ();}
Next time is a screenshot of the release speed of the local test:
4.2W/S is stable, and ToJson is slightly faster.
Encapsulation of Subscribe (subscription)
The switch and queue are declared and bound when publishing, but only the queue is declared when subscribing. As you can see from the following code, when an exception is caught, the message will be sent to a custom "dead letter queue", and another JOB will resend it regularly, so the finally is successful.
/ get Model / queue name / private static IModel GetModel (string queue, bool isProperties = false) {return ModelDic.GetOrAdd (queue, value = > {var model = _ conn.CreateModel (); QueueDeclare (model, queue, isProperties) / / number of messages consumed each time model.BasicQos (0,1, false); ModelDic [queue] = model; return model;}) } / receive messages / queue name / consumption processing / public void Subscribe (string queue, bool isProperties, Action handler Bool isDeadLetter) where T: class {/ / queue statement var channel = GetModel (queue, isProperties) Var consumer = new EventingBasicConsumer (channel); consumer.Received + = (model, ea) = > {var body = ea.Body; var msgStr = body.DeserializeUtf8 (); var msg = msgStr.FromJson (); try {handler (msg) } catch (Exception ex) {ex.GetInnestException () .WriteToFile ("queue receives messages", "RabbitMq"); if (! isDeadLetter) PublishToDead (queue, msgStr, ex) } finally {channel.BasicAck (ea.DeliveryTag, false);}}; channel.BasicConsume (queue, false, consumer);}
Next time is a screenshot of the release speed of the local test:
There is 1.9K/S in fast times and 1.7K/S in slow times.
Packaging of Pull (pull)
Go directly to the code:
/ / get message / consumption processing private void Poll (string exchange, string queue, string routingKey, Action handler) where T: class {var channel = GetModel (exchange, queue, routingKey) Var result = channel.BasicGet (queue, false); if (result.IsNull ()) return; var msg = result.Body.DeserializeUtf8 (). FromJson (); try {handler (msg) } catch (Exception ex) {ex.GetInnestException () .WriteToFile ("queue receives messages", "RabbitMq");} finally {channel.BasicAck (result.DeliveryTag, false);}}
When it is fast, there is 1.8K/s, but the stability is 1.5K/S.
Encapsulation of Rpc (remote call)
First of all, RabbitMq only provides the functionality of this RPC, but it is not the real RPC. Why do you say so:
1. Traditional Rpc hides the call details, passing parameters and throwing exceptions just like calling local methods.
2. The Rpc of RabbitMq is message-based. After consumption, the consumer returns the response result through the new queue.
/ RPC client / public string RpcClient (string exchange, string queue, string routingKey, string body, bool isProperties = false) {var channel = GetModel (exchange, queue, routingKey, isProperties) Var consumer = new QueueingBasicConsumer (channel); channel.BasicConsume (queue, true, consumer); try {var correlationId = Guid.NewGuid (). ToString (); var basicProperties = channel.CreateBasicProperties (); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId Channel.BasicPublish (exchange, routingKey, basicProperties, body.SerializeUtf8 ()); var sw = Stopwatch.StartNew (); while (true) {var ea = consumer.Queue.Dequeue () If (ea.BasicProperties.CorrelationId = = correlationId) {return ea.Body.DeserializeUtf8 ();} if (sw.ElapsedMilliseconds > 30000) throw new Exception ("timeout waiting for response") }} catch (Exception ex) {throw ex.GetInnestException () }} / RPC server / public void RpcService (string exchange, string queue, bool isProperties, Func handler Bool isDeadLetter) {/ / queue declaration var channel = GetModel (queue, isProperties) Var consumer = new EventingBasicConsumer (channel); consumer.Received + = (model, ea) = > {var body = ea.Body; var msgStr = body.DeserializeUtf8 (); var msg = msgStr.FromJson (); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties () ReplyProps.CorrelationId = props.CorrelationId; try {msg = handler (msg);} catch (Exception ex) {ex.GetInnestException () .WriteToFile ("queue receives messages", "RabbitMq") } finally {channel.BasicPublish (exchange, props.ReplyTo, replyProps, msg.ToJson (). SerializeUtf8 ()); channel.BasicAck (ea.DeliveryTag, false);}}; channel.BasicConsume (queue, false, consumer);}
It can be used, but it is not recommended. Consider other RPC frameworks. Grpc, thrift, etc.
The above is all the content of the article "how to use rabbitmq in. Net". Thank you for reading! I believe we all have a certain understanding, hope to share the content to help you, if you want to learn more knowledge, welcome to follow the industry information channel!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.