In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article mainly explains "RabbitMQ uses multi-routing, multi-queue to break flow control", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Let's let the editor take you to learn "RabbitMQ uses multi-routing, multi-queue to break traffic control".
Flow control mechanism is the biggest headache when we use RabbitMQ. Once concurrency proliferates, consumers consume queue messages as slowly as dripping water.
Now after we place the order, we need to send a message to the notification center to notify the service provider to receive the order and confirm the provision of service.
Let's first add a method to send messages to the Order interface.
Public interface Order {public void makeOrder (Order order); public OrderSuccessResult getResult (Order order); public void postOrder (Order order);}
The implementation class implements this method
Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion (value = 1) @ RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id; @ NonNull private String code; @ NonNull private Store store; @ NonNull private ProviderService service; @ NonNull private Date serviceDate; @ NonNull private String contact; @ NonNull private String contactTel; private AppUser user; @ NonNull private String content; private int status; private Date createDate Override public void makeOrder (Order order) {ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean (ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean (IdService.class); ((ServiceOrder) order) .setId (idService.genId ()); ((ServiceOrder) order) .setCode (getCodeInfo (idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser (); AppUser user = new AppUser (); user.setId (loginAppUser.getId ()) User.setUsername (loginAppUser.getUsername ()); (ServiceOrder) order) .setUser (user); ((ServiceOrder) order) .setStatus (1); ((ServiceOrder) order) .setCreateDate (new Date ()); serviceOrderDao.save ((ServiceOrder) order);} @ Override public OrderSuccessResult getResult (Order order) {ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean (ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult () Return this.orderSuccessResult.getResult (order);} @ Override public void postOrder (Order order) {MessageSender sender = SpringBootUtil.getBean (MessageSender.class); CompletableFuture.runAsync (()-> sender.send (OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order)) } private String getCodeInfo (IdService idService) {String flow = String.valueOf (idService.genId ()); flow = flow.substring (14 flow. Length ()); String pre = DateUtils.format (new Date (), DateUtils.pattern9); return pre + flow;}
Where we define such a set of queue names, switches, and routes.
Public interface OwnerCarCenterMq {/ * queue name * / String ORDER_QUEUE = "order"; / * Service system exchange name * / String MQ_EXCHANGE_ORDER = "order.topic.exchange"; / * * Service add routing key * / String ROUTING_KEY_ORDER = "post.order";}
In order to avoid flow control, we define 10 queues and bind them all to one switch.
@ Configurationpublic class RabbitmqConfig {@ Bean public List orderQueues () {List queues = new ArrayList (); for (int I = 1polii < 11polii +) {Queue queue = new Queue (OwnerCarCenterMq.ORDER_QUEUE + "_" + I); queues.add (queue);} return queues;} @ Bean public TopicExchange orderExchange () {return new TopicExchange (OwnerCarCenterMq.MQ_EXCHANGE_ORDER) } @ Bean public List bindingOrders () {List bindings = new ArrayList (); for (int I = 1 get (I-1)) {Binding binding = BindingBuilder.bind (orderQueues (). Get (I-1)) .to (orderExchange ()) .with (OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + I); bindings.add (binding);} return bindings;}}
The message provider is re-encapsulated and a random route is selected for each transmission.
@ Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@ Autowired private RabbitTemplate rabbitTemplate; public void send (String exchange,String routingKey,Object content) {log.info ("send content=" + content); this.rabbitTemplate.setMandatory (true); this.rabbitTemplate.setConfirmCallback (this); this.rabbitTemplate.setReturnCallback (this); ThreadLocalRandom random = ThreadLocalRandom.current () This.rabbitTemplate.convertAndSend (exchange,routingKey + "_" + random.nextInt (1jue 11), serialize (content)); * @ param correlationData * @ param ack * @ param cause * / @ Override public void confirm (CorrelationData correlationData, boolean ack, String cause) {if (! ack) {log.info ("send ack fail, cause =" + cause) } else {log.info ("send ack success") Return callback after failure: * * @ param message * @ param replyCode * @ param exchange * @ param routingKey * / @ Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info ("send fail return-message =" + new String (message.getBody ()) + ", replyCode:" + replyCode + " ReplyText: "+ replyText +", exchange: "+ exchange +", routingKey: "+ routingKey) Binary serialization of message objects * @ param o * @ return * / private byte [] serialize (Object o) {Kryo kryo = new Kryo (); ByteArrayOutputStream stream = new ByteArrayOutputStream (); Output output = new Output (stream); kryo.writeObject (output, o); output.close (); return stream.toByteArray ();}}
We can see that in ServiceOrder, we send to it asynchronously.
Controller is as follows
@ Slf4j@RestControllerpublic class OrderController {private ThreadLocal orderFactory = new ThreadLocal (); private ThreadLocal orderService = new ThreadLocal (); @ Autowired private OrderBean orderBean; @ Transactional @ SuppressWarnings ("unchecked") @ PostMapping ("/ makeeorder") public Result makeOrder (@ RequestBody String orderStr, @ RequestParam ("type") String type) {log.info (orderStr); Order order = setOrderFactory (orderStr,type); orderService.get (). MakeOrder (order); orderService.get (). PostOrder (order) Return Result.success (orderService.get (). GetResult (order));} / * determine which type of order is to obtain which type of specific order factory * @ param orderStr * @ return * / private Order setOrderFactory (String orderStr,String type) {Class classType = orderBean.getOrderMap () .get (type); Object order = JSONObject.parseObject (orderStr, classType) / / if (orderStr.contains ("service")) {/ / order = JSON.parseObject (orderStr, ServiceOrder.class); / /} else if (orderStr.contains ("product")) {/ / order = JSON.parseObject (orderStr, ProductOrder.class); / /} Class classFactoryType = orderBean.getOrderFactoryMap (). Get (type + "Factory"); this.orderFactory.set (OrderFactory) SpringBootUtil.getBean (classFactoryType)) / if (order instanceof ServiceOrder) {/ / this.orderFactory.set (SpringBootUtil.getBean (ServiceOrderFactory.class)); / /} else if (order instanceof ProductOrder) {/ / this.orderFactory.set (SpringBootUtil.getBean (ProductOrderFactory.class)); / /} orderService.set (orderFactory.get (). GetOrder ()); return (Order) order;}}
Finally, the message is received in our notification center module and the 10 queues are monitored at the same time.
@ Slf4j@Component@RabbitListener (queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7 OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10}) public class ServiceOrderConsummer {@ Getter private Queue serviceOrders = new ConcurrentLinkedDeque () @ RabbitHandler public void receiceOrder (byte [] data, Channel channel, Message message) throws IOException {try {/ / tell the server that this message has been consumed by me and can be deleted in the queue Otherwise, the message server thinks that the message will be sent channel.basicAck (message.getMessageProperties (). GetDeliveryTag (), false); ServiceOrder order = unSerialize (data); this.serviceOrders.add (order); log.info (String.valueOf (order));} catch (IOException e) {e.printStackTrace () / / discard this message channel.basicNack (message.getMessageProperties (). GetDeliveryTag (), false,false); log.info ("receiver fail");}} / * deserialization * @ param data * @ return * / private ServiceOrder unSerialize (byte [] data) {Input input = null Try {Kryo kryo = new Kryo (); input = new Input (new ByteArrayInputStream (data)); return kryo.readObject (input,ServiceOrder.class);} finally {input.close ();}
After the project starts, we can see that the situation of rabbitmq is as follows
Now let's do the stress test, start the Jmeter, and we use 1000 threads to do the stress test. Each configuration is as follows
Save the file upload server, because I am the server of Huawei Cloud, so I do the pressure test on the server, not the remote pressure test.
Type in the bin directory of the server's jmeter
. / jmeter-n-t model/rabbit.jmx-l log.jtl
Here-n does not start the graphical interface,-t uses the configuration file we uploaded, and-l logs.
The pressure test results are as follows
Let's take a look at the UI interface of rabbitmq during the pressure test.
Consumption is basically real-time, there is no backlog of flow control.
At this point, I believe you have a deeper understanding of "RabbitMQ uses multi-routing, multi-queue to break flow control". You might as well do it in practice. Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.