Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

Example Analysis of Spring Cloud Bus message bus

2025-03-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

This article mainly introduces "Spring Cloud Bus message bus example analysis". In daily operation, I believe many people have doubts about Spring Cloud Bus message bus example analysis. The editor consulted all kinds of data and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts of "Spring Cloud Bus message bus example analysis". Next, please follow the editor to study!

Bus example demonstration

Before we examine the implementation of Bus, let's look at two simple examples of using Spring Cloud Bus.

1. New configuration for all nodes

The example of Bus is relatively simple, because the AutoConfiguration layer of Bus has a default configuration, and you only need to introduce Spring Cloud Stream and Spring Cloud Bus dependencies corresponding to message middleware, and then all launched applications will use the same Topic to receive and send messages.

The Demo corresponding to Bus has been placed on github. The Demo will simulate the startup of 5 nodes. You only need to add a new configuration item to any one of the instances, and all nodes will add this configuration item.

Access the address provided by Controller provided by any node to get the configuration (key is hangzhou):

Curl-X GET 'http://localhost:10001/bus/env?key=hangzhou'

The result returned by all nodes is unknown because the key of hangzhou is not in the configuration of all nodes.

EnvironmentBusEndpoint is provided internally in Bus, which is used by message broker to add / update configurations.

Visit the url: / actuator/bus-env?name=hangzhou&value=alibaba corresponding to the Endpoint on any node to add configuration items (such as accessing the url of node1):

Curl-X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba'-H' content-type: application/json'

Then visit all nodes / bus/env again to get the configuration:

$curl-X GET 'http://localhost:10001/bus/env?key=hangzhou'unknown%~ ⌚ $curl-X GET' http://localhost:10002/bus/env?key=hangzhou'unknown%~ ⌚ $curl-X GET 'http://localhost:10003/bus/env?key=hangzhou'unknown%~ ⌚ $curl-X GET' http://localhost:10004/bus/env?key=hangzhou'unknown%~ ⌚ $curl-X GET 'http://localhost:10005/bus/env?key=hangzhou'unknown%~ ⌚ $curl-X POST 'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba'-H' content-type: application/json'~ ⌚ $curl-X GET 'http://localhost:10005/bus/env?key=hangzhou'alibaba%~ ⌚ $curl-X GET' http://localhost:10004/bus/env?key=hangzhou'alibaba%~ ⌚ $curl-X GET 'http://localhost:10003/bus/env?key=hangzhou'alibaba%~ ⌚ $curl-X GET' http:// Localhost:10002/bus/env?key=hangzhou'alibaba%~ ⌚ $curl-X GET 'http://localhost:10001/bus/env?key=hangzhou'alibaba%

As you can see, all nodes have a new configuration with key as hangzhou, and the corresponding value is alibaba. This configuration item is done through EnvironmentBusEndpoint provided by Bus.

Here I quote a picture drawn by programmer DD. Spring Cloud Config works with Bus to refresh the configuration of all nodes to describe the previous instance (the example in this article is not refreshed, but the configuration is added, but the process is the same):

two。 Configuration modification of some nodes

For example, specify destination as rocketmq-bus-node2 on node1 (node2 configures spring.cloud.bus.id as rocketmq-bus-node2:10002, which can be matched above) to modify the configuration:

Curl-X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu'-H' content-type: application/json'

Access / bus/env to get the configuration (Bus will also modify the sender's node node1 due to sending messages on the node1):

~ ⌚ $curl-X POST 'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hangzhou&value=xihu'-H' content-type: application/json'~ ⌚ $curl-X GET 'http://localhost:10005/bus/env?key=hangzhou'alibaba%~ ⌚ $curl-X GET' http://localhost:10004/bus/env?key=hangzhou'alibaba%~ ⌚ $curl-X GET 'http://localhost:10003/bus/env?key=hangzhou'alibaba%~ ⌚ $curl -X GET 'http://localhost:10002/bus/env?key=hangzhou'xihu%~ ⌚ $curl-X GET' http://localhost:10001/bus/env?key=hangzhou'xihu%

As you can see, only node1 and node2 have modified the configuration, and the configuration of the remaining three nodes has not changed.

Implementation of Bus 1. Introduction to Bus concept 1) event

The remote event RemoteApplicationEvent is defined in Bus, which inherits Spring's event ApplicationEvent, and it currently has four specific implementations:

EnvironmentChangeRemoteApplicationEvent: remote environment change event. It is mainly used to receive data of type Map and update to events in Environment in the context of Spring. The example in this article is done using this event in conjunction with EnvironmentBusEndpoint and EnvironmentChangeListener.

AckRemoteApplicationEvent: remote confirmation event. When a remote event is successfully received within the Bus, it will be sent back to the AckRemoteApplicationEvent confirmation event for confirmation.

RefreshRemoteApplicationEvent: remote configuration refresh event. Dynamic refresh of configuration classes decorated with @ RefreshScope and all @ ConfigurationProperties annotations.

UnknownRemoteApplicationEvent: remote unknown event. When the Bus internal message body transforms the remote event, if an exception occurs, it will be uniformly packaged as the event.

There is also a non-RemoteApplicationEvent event within Bus-the SentApplicationEvent message sending event, which is recorded with Trace for remote message delivery.

These events will work with ApplicationListener, such as adding / modifying EnvironmentChangeRemoteApplicationEvent with EnvironmentChangeListener:

Public class EnvironmentChangeListener implements ApplicationListener {private static Log log = LogFactory.getLog (EnvironmentChangeListener.class); @ Autowired private EnvironmentManager env; @ Override public void onApplicationEvent (EnvironmentChangeRemoteApplicationEvent event) {Map values = event.getValues (); log.info ("Received remote environment change request. Keys/values to update "+ values); for (Map.Entry entry: values.entrySet ()) {env.setProperty (entry.getKey (), entry.getValue ());}

After receiving the EnvironmentChangeRemoteApplicationEven event sent by other nodes, call EnvironmentManager#setProperty to configure the settings. This method will send an EnvironmentChangeEvent event for each configuration item, and then be listened to by ConfigurationPropertiesRebinder for rebind operation to add / update the configuration.

2) Actuator Endpoint

Two Endpoint are exposed inside Bus, namely EnvironmentBusEndpoint and RefreshBusEndpoint, for configuration addition / modification and global configuration refresh. Their corresponding Endpoint id, url, are bus-env and bus-refresh.

3) configuration

Bus must involve information such as Topic and Group when sending messages, which are encapsulated in BusProperties. The default configuration prefix is spring.cloud.bus, such as:

Spring.cloud.bus.refresh.enabled toggles global refresh Listener on / off.

Spring.cloud.bus.env.enabled is used to turn on / off new / modified Endpoint for configuration.

Spring.cloud.bus.ack.enabled is used to turn on / off the sending of AckRemoteApplicationEvent events.

Spring.cloud.bus.trace.enabled is used to turn on / off the Listener of the information recording Trace.

The Topic involved in sending messages uses springCloudBus by default, which can be configured and modified. Group can be set to broadcast mode or use UUID with offset to lastest mode.

Each Bus application has a corresponding Bus id, and the official value method is complicated:

${vcap.application.name:$ {spring.application.name:application}}: ${vcap.application.instance_index:$ {spring.application.index:$ {local.server.port:$ {server.port:0}: ${vcap.application.instance_id:$ {random.value}}

It is recommended that you configure Bus id manually because the destination in Bus remote events matches according to Bus id:

Spring.cloud.bus.id=$ {spring.application.name}-${server.port} 2. Bus underlying analysis

The underlying analysis of Bus involves nothing more than these aspects:

How the message is sent

How messages are received

How does destination match

How to trigger the next action when a remote event is received

The BusAutoConfiguration automation configuration class is decorated with @ EnableBinding (SpringCloudBusClient.class).

The use of @ EnableBinding has been explained in the article "introduction to SpringCloud Stream Architecture and principles", and its value is SpringCloudBusClient.class, and the DirectChannel of input and output will be created in SpringCloudBusClient based on agents:

Public interface SpringCloudBusClient {String INPUT = "springCloudBusInput"; String OUTPUT = "springCloudBusOutput"; @ Output (SpringCloudBusClient.OUTPUT) MessageChannel springCloudBusOutput (); @ Input (SpringCloudBusClient.INPUT) SubscribableChannel springCloudBusInput ();}

The properties of the two Binding, springCloudBusInput and springCloudBusOutput, can be modified through the configuration file (such as modifying topic):

Spring.cloud.stream.bindings: springCloudBusInput: destination: my-bus-topic springCloudBusOutput: destination: my-bus-topic

Reception and transmission of messages:

/ / BusAutoConfiguration@EventListener (classes = RemoteApplicationEvent.class) / / 1public void acceptLocal (RemoteApplicationEvent event) {if (this.serviceMatcher.isFromSelf (event) & &! (event instanceof AckRemoteApplicationEvent)) {/ / 2 this.cloudBusOutboundChannel.send (MessageBuilder.withPayload (event) .build ()) / / 3}} @ StreamListener (SpringCloudBusClient.INPUT) / / 4public void acceptRemote (RemoteApplicationEvent event) {if (event instanceof AckRemoteApplicationEvent) {if (this.bus.getTrace (). IsEnabled () &! this.serviceMatcher.isFromSelf (event) & & this.applicationEventPublisher! = null) {/ / 5 this.applicationEventPublisher.publishEvent (event) } / / If it's an ACK we are finished processing at this point return;} if (this.serviceMatcher.isForSelf (event) & & this.applicationEventPublisher! = null) {/ / 6 if (! this.serviceMatcher.isFromSelf (event)) {/ / 7 this.applicationEventPublisher.publishEvent (event) } if (this.bus.getAck (). IsEnabled ()) {/ / 8 AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent (this, this.serviceMatcher.getServiceId (), this.bus.getAck () .getDestinationService (), event.getDestinationService (), event.getId (), event.getClass ()) This.cloudBusOutboundChannel. Send (MessageBuilder.withPayload (ack). Build ()); this.applicationEventPublisher.publishEvent (ack) }} if (this.bus.getTrace (). IsEnabled () & & this.applicationEventPublisher! = null) {/ / 9 / / We are set to register sent events so publish it for local consumption, / / irrespective of the origin this.applicationEventPublisher.publishEvent (new SentApplicationEvent (this, event.getOriginService (), event.getDestinationService (), event.getId (), event.getClass ());}}

Use the listening mechanism of Spring events to listen for all local RemoteApplicationEvent remote events (for example, bus-env sends EnvironmentChangeRemoteApplicationEvent events locally, and bus-refresh sends RefreshRemoteApplicationEvent events locally, all of which are listened for here).

Judge that the event received locally is not an AckRemoteApplicationEvent remote acknowledgement event (otherwise it will be an endless loop, always receiving messages and sending messages.) And the event is sent by the application itself (the event sender is the application itself), if all meet the execution step 3.

Construct the Message and use the remote event as the payload, and then use the Binding name constructed by Spring Cloud Stream to send the message to the broker as the MessageChannel of the springCloudBusOutput.

4.@StreamListener annotations consume the MessageChannel constructed by Spring Cloud Stream whose Binding name is springCloudBusInput, and the messages received are remote messages.

If the remote event is an AckRemoteApplicationEvent remote acknowledgement event and the application turns on the message tracking trace switch, and the remote event is not sent by the application itself (the event sender is not the application itself, indicating that the event is sent by another application), then sending the AckRemoteApplicationEvent remote acknowledgement event locally means that the application acknowledges that it has received a remote event sent by other applications, and the process ends.

If the remote event is sent to the application itself by another application (the recipient of the event is the application itself), proceed to steps 7 and 8, otherwise perform step 9.

If the remote event is not sent by the application itself (the event sender is not the application itself), the event is sent locally. The application itself has been processed locally by the corresponding message receiver at the beginning and does not need to be sent again.

If the switch for AckRemoteApplicationEvent remote acknowledgement event is turned on, the AckRemoteApplicationEvent event is constructed and sent remotely and locally (locally sent because the local AckRemoteApplicationEvent event is not sent in step 5, that is, its own application confirms its own application; remote transmission is to tell other applications that its own application has received a message).

If the switch for message logging Trace is turned on, the SentApplicationEvent event is constructed and sent locally.

After bus-env is triggered, the EnvironmentChangeListener of all nodes listens for configuration changes, and the console prints the following information:

O.s.c.b.event.EnvironmentChangeListener: Received remote environment change request. Keys/values to update {hangzhou=alibaba}

If you listen to the remote acknowledgement event AckRemoteApplicationEvent locally, you will receive information from all nodes. For example, the AckRemoteApplicationEvent events monitored by the console of the node5 node are as follows:

ServiceId [rocketmq-bus-node5:10005] listeners on {"type": "AckRemoteApplicationEvent", "timestamp": 1554124670484, "originService": "rocketmq-bus-node5:10005", "destinationService": "* *", "id": "375f0426-c24e-4904-bce1-5e09371fc9bc", "ackId": "750d033f-356a-4aad-8cf0-3481ace8698c", "ackDestinationService": "* *", "event": "org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type": "AckRemoteApplicationEvent" "timestamp": 1554124670184, "originService": "rocketmq-bus-node1:10001", "destinationService": "*", "id": "91f06cf1-4bd9-4dd8-9526-9299a35bb7cc", "ackId": "750d033f-356a-4aad-8cf0-3481ace8698c", "ackDestinationService": "* *", "event": "org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type": "AckRemoteApplicationEvent", "timestamp": 1554124670402, "originService": "rocketmq-bus-node2:10002" "destinationService": "* *", "id": "7df3963c-7c3e-4549-9a22-a23fa90a6b85", "ackId": "750d033f-356a-4aad-8cf0-3481ace8698c", "ackDestinationService": "* *", "event": "org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type": "AckRemoteApplicationEvent", "timestamp": 1554124670406, "originService": "rocketmq-bus-node3:10003", "destinationService": "* *" "id": "728b45ee-5e26-46c2-af1a-e8d1571e5d3a", "ackId": "750d033f-356a-4aad-8cf0-3481ace8698c", "ackDestinationService": "*", "event": "org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"} ServiceId [rocketmq-bus-node5:10005] listeners on {"type": "AckRemoteApplicationEvent", "timestamp": 1554124670427, "originService": "rocketmq-bus-node4:10004", "destinationService": "*", "id": "1812fd6d-6f98-4e5b-a38a-4b11aee08aeb" "ackId": "750d033f-356a-4aad-8cf0-3481ace8698c", "ackDestinationService": "* *", "event": "org.springframework.cloud.bus.event.EnvironmentChangeRemoteApplicationEvent"}

So going back to the four questions mentioned at the beginning of this chapter, let's answer each of them:

How messages are sent: events are sent to springCloudBustopic through Spring Cloud Stream in the BusAutoConfiguration#acceptLocal method.

How messages are received: receive messages from springCloudBustopic through Spring Cloud Stream in the BusAutoConfiguration#acceptRemote method.

How the destination matches: match the destination in the receiving remote event method in the BusAutoConfiguration#acceptRemote method.

How to trigger the next action after receiving the remote event: the Bus receives the local RemoteApplicationEvent event through the Spring event mechanism and then does the next action (for example, EnvironmentChangeListener receives the EnvironmentChangeRemoteApplicationEvent event and RefreshListener receives the RefreshRemoteApplicationEvent event).

At this point, the study on "Spring Cloud Bus message bus example analysis" is over. I hope to be able to solve your doubts. The collocation of theory and practice can better help you learn, go and try it! If you want to continue to learn more related knowledge, please continue to follow the website, the editor will continue to work hard to bring you more practical articles!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report