Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the flexible application of WebFlux fixed-point push and full push websocket

2025-04-05 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article shows you WebFlux fixed-point push and full push flexible websocket application, the content is concise and easy to understand, absolutely can make your eyes bright, through the detailed introduction of this article, I hope you can get something.

Preface

WebFlux itself provides support for WebSocket protocol. Processing WebSocket requests requires the corresponding handler implementation WebSocketHandler interface. Each WebSocket has an associated WebSocketSession, which contains the handshake information HandshakeInfo and other related information when establishing the request. You can receive data from the client through the receive () method of session and send data to the client through the send () method of session.

Example

Here is a simple example of WebSocketHandler:

@ Componentpublic class EchoHandler implements WebSocketHandler {public Mono handle (WebSocketSession session) {return session.send (session.receive () .map (msg-> session.textMessage ("ECHO->" + msg.getPayloadAsText ();}}

Once you have handler, you also need to let WebFlux know which requests need to be handed over to the handler for processing, so create the corresponding HandlerMapping.

When processing HTTP requests, we often use the simplest handler definition in WebFlux, that is, by annotating @ RequestMapping, we define a method as a handler that handles a specific path request. However, this annotation is used to handle HTTP requests. For WebSocket requests, the process of protocol upgrade is required after receiving the request, and then the execution of handler, so we cannot define the request mapping directly through this annotation, but we can use SimpleUrlHandlerMapping to add the mapping.

@ Configurationpublic class WebSocketConfiguration {@ Bean public HandlerMapping webSocketMapping (EchoHandler echoHandler) {final Map map = new HashMap (1); map.put ("/ echo", echoHandler); final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping (); mapping.setOrder (Ordered.HIGHEST_PRECEDENCE); mapping.setUrlMap (map); return mapping;} @ Bean public WebSocketHandlerAdapter handlerAdapter () {return new WebSocketHandlerAdapter ();}}

This allows WebSocket requests destined for / echo to be handed over to EchoHandler for processing.

We also create a corresponding WebSocketHandlerAdapter for the handler of type WebSocket so that DispatcherHandler can call our WebSocketHandler.

After completing these three steps, when a WebSocket request arrives at WebFlux, it is first processed by DispatcherHandler. It will find the corresponding handler of the WebSocket request according to the existing HandlerMapping, and then find that the handler implements the WebSocketHandler interface, so it will complete the call to the handler through WebSocketHandlerAdapter.

Doubt

From the above example, it is not difficult to see that after not receiving a request, you have to return a message in it, and then you can no longer send a message to him. Secondly, every time I add or delete a message processing class Handler, I have to modify the contents of the UrlMap of SimpleUrlHandlerMapping in the configuration file, which doesn't feel very friendly. Therefore, the modifications and adjustments for these two points are as follows:

1. Register Handler with custom annotations

Can we register Handler through RequestMapping-like annotations like the Handler that registers the HTTP request?

Although there is no official implementation, we can implement a similar annotation ourselves, which can be called WebSocketMapping:

@ Retention (RetentionPolicy.RUNTIME) @ Target (ElementType.TYPE) public @ interface WebSocketMapping {String value () default "";}

@ Retention (RetentionPolicy.RUNTIME) indicates that the annotation works at runtime, and @ Target (ElementType.TYPE) indicates that the annotation acts on the class.

Let's first take a look at how the annotation is ultimately used. The following is an example of TimeHandler, which is sent to the client once a second. We have completed the registration of TimeHandler by annotating @ WebSocketMapping ("/ time"), telling WebFlux that when a WebSocket request is sent to the / echo path, it should be handed over to EchoHandler:

@ Component@WebSocketMapping ("/ echo") public class EchoHandler implements WebSocketHandler {@ Override public Mono handle (final WebSocketSession session) {return session.send (session.receive () .map (msg-> session.textMessage ("server returns: Xiaoming,->" + msg.getPayloadAsText ();}}

Is it as convenient as RequestMapping?

So far, this annotation has no actual functionality and does not automatically register handler. Reviewing the way we registered the route above, we created a SimpleUrlHandlerMapping, manually added the mapping rule for EchoHandler, and returned it as a Bean for HandlerMapping.

Now we will create a special HandlerMapping class to handle WebSocketMapping annotations and automatically complete the registration of handler:

Public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping {private Map handlerMap = new LinkedHashMap (); / * Register WebSocket handlers annotated by @ WebSocketMapping * @ throws BeansException * / @ Override public void initApplicationContext () throws BeansException {Map beanMap = obtainApplicationContext () .getBeansWithAnnotation (WebSocketMapping.class) BeanMap.values () .forEach (bean-> {if (! (bean instanceof WebSocketHandler)) {throw new RuntimeException ("Controller [% s] doesn't implement WebSocketHandler interface.", bean.getClass () .getName () } WebSocketMapping annotation = AnnotationUtils.getAnnotation (bean.getClass (), WebSocketMapping.class); / / webSocketMapping is mapped to handlerMap.put in management (Objects.requireNonNull (annotation). Value (), (WebSocketHandler) bean);}); super.setOrder (Ordered.HIGHEST_PRECEDENCE); super.setUrlMap (handlerMap); super.initApplicationContext ();}}

Our WebSocketMappingHandlerMapping class, which is actually SimpleUrlHandlerMapping, just adds some initialization operations.

The initApplicationContext () method is a method of the ApplicationObjectSupport class in Spring, which is used to customize the initialization behavior of the class. In our WebSocketMappingHandlerMapping, the initialization work is mainly to collect the Component that uses the @ WebSocketMapping annotation and implement the WebSocketHandler interface, and then register them with the internal SimpleUrlHandlerMapping. The subsequent routing work is done by the functions that have been implemented by the parent class SimpleUrlHandlerMapping.

Now we just need to return the Bean of WebSocketMappingHandlerMapping to automatically process the @ WebSocketMapping annotation:

@ Configurationpublic class WebSocketConfiguration {@ Bean public HandlerMapping webSocketMapping () {return new WebSocketMappingHandlerMapping ();} @ Bean public WebSocketHandlerAdapter handlerAdapter () {return new WebSocketHandlerAdapter ();}} 2. Analysis of WebSocket request processing

Let's take a look at exactly how Reactor Netty-based WebFlux handles WebSocket requests.

As mentioned earlier, after the WebSocket request enters the WebFlux, the corresponding WebSocketHandler is first found in the HandlerMapping, and then the actual call is made by the WebSocketHandlerAdapter. This is no longer to elaborate, interested friends can go to see WebSocketHandler,WebSocketHandlerAdapter.

3. Separate the operation of receiving and sending data

We know that HTTP protocol is half-duplex communication, although both the client and the server can send data to each other, but at the same time, only one side will send the data to the other party, and in order, the client sends the request first, and then the server returns the response data. So the server's logic for dealing with HTTP is simple: every time a client request is received, a response is returned.

While WebSocket is full-duplex communication, the client and server can send data to the other party at any time, so it is no longer a "send request, return response" way of communication. Our EchoHandler example above still uses this approach, that is, we receive the data and then return a piece of data specifically. Let's take a look at how to take full advantage of the two-way communication of WebSocket.

The processing of WebSocket is mainly to complete the operation of two data streams through session, one is the data flow sent by the client to the server, and the other is the data flow sent by the server to the client:

The WebSocketSession method describes that Flux receive () receives a data stream from the client, which ends when the connection is closed. Mono send (Publisher) sends a data stream to the client, and when the data flow ends, the write operation to the client ends, and the returned Mono sends a completion signal.

In WebSocketHandler, you should finally consolidate the processing results of the two data streams into one signal flow and return a Mono to indicate whether the processing is over.

We define the logic of processing for the two flows:

For output streams: the server sends a number to the client every second

For the input stream: print to standard output whenever a client message is received

Mono input = session.receive () .map (WebSocketMessage::getPayloadAsText) .map (msg-> id + ":" + msg) .doOnNext (System.out::println) .then (); Mono output = session.send (sink-> senderMap.put (id, new WebSocketSender (session, sink)

The two processing logics are independent of each other, and there is no sequential relationship between them. After the operation is executed, a Mono is returned, but how to integrate the results of these two operations into a signal flow and return them to WebFlux? We can use the Mono.zip () method in WebFlux:

@ Component@WebSocketMapping ("/ echo") public class EchoHandler implements WebSocketHandler {@ Autowired private ConcurrentHashMap senderMap; @ Override public Mono handle (WebSocketSession session) {Mono input = session.receive () .map (WebSocketMessage::getPayloadAsText) .map (msg-> id + ":" + msg) .doOnNext (System.out::println) .then () Mono output = session.send (Flux.create (sink-> senderMap.put (id, new WebSocketSender (session, sink) / * Mono.zip () merges multiple Mono into a new Mono. * any Mono that produces error or complete will cause the merged Mono * to generate error or complete, and other Mono will be canceled. * / return Mono.zip (input, output). Then ();} 4. Send data from outside Handler

Sending data from the outside here refers to the need to send data to the WebSocket connection through code calls elsewhere outside the code scope of the WebSocketHandler.

Idea: when defining the send () operation of session, create the Flux programmatically, even with the Flux.create () method, expose and save the FluxSink that publishes the Flux data, and then, where you need to send the data, call the next (T data) method of FluxSink to publish the data to Flux subscribers.

The create method is an advanced form of programmatically creating Flux that allows multiple data to be generated at a time and can be generated by multiple threads.

The create method exposes the internal FluxSink, and FluxSink provides next, error, and complete methods. Through the create method, you can connect the API in the responsive stack to other API.

Consider a scenario in which client B is allowed to send data to client A through HTTP after the server establishes an WebSocket connection with client A.

Regardless of security, robustness and other issues, we give a simple example.

The first is the implementation of WebSocketHandler. When the client sends a WebSocket establishment request, it needs to specify an id for the current connection in the query parameter. The server will use the id as the key and the corresponding WebSocketSender as the value to store in the senderMap:

@ Component@WebSocketMapping ("/ echo") public class EchoHandler implements WebSocketHandler {@ Autowired private ConcurrentHashMap senderMap; @ Override public Mono handle (WebSocketSession session) {/ / TODO Auto-generated method stub HandshakeInfo handshakeInfo = session.getHandshakeInfo (); Map queryMap = getQueryMap (handshakeInfo.getUri (). GetQuery ()); String id = queryMap.getOrDefault ("id", "defaultId") Mono input = session.receive () .map (WebSocketMessage::getPayloadAsText) .map (msg-> id + ":" + msg) .doOnNext (System.out::println) .then (); Mono output = session.send (Flux.create (sink-> senderMap.put (id, new WebSocketSender (session, sink) / * Mono.zip () merges multiple Mono into a new Mono, and any Mono that produces error or complete will cause the merged Mono * to generate error or complete, and other Mono will be canceled. * / return Mono.zip (input, output). Then ();} / / to get the url parameter private Map getQueryMap (String queryStr) {Map queryMap = new HashMap (); if (! StringUtils.isEmpty (queryStr)) {String [] queryParam = queryStr.split ("&") Arrays.stream (queryParam) .forEach (s-> {String [] kv = s.split ("=", 2); String value = kv.length = = 2? Kv [1]: "; queryMap.put (kv [0], value);});} return queryMap;}}

Where senderMap is the Bean defined by ourselves, which is defined in the configuration file:

@ Configurationpublic class WebSocketConfiguration {@ Bean public HandlerMapping webSocketMapping () {return new WebSocketMappingHandlerMapping ();} @ Bean public ConcurrentHashMap senderMap () {return new ConcurrentHashMap ();} @ Bean public WebSocketHandlerAdapter handlerAdapter () {return new WebSocketHandlerAdapter ();}}

WebSocketSender is a class we created to save the session of the WebSocket connection and the corresponding FluxSink to send data outside the scope of the WebSocketHandler code:

Public class WebSocketSender {private WebSocketSession session; private FluxSink sink; public WebSocketSender (WebSocketSession session, FluxSink sink) {this.session = session; this.sink = sink;} public void sendData (String data) {sink.next (session.textMessage (data));}}

Next, let's implement HTTP Controller. When initiating a HTTP request, the user specifies the WebSocket connection id to communicate and the data to be sent through the query parameter, then takes the corresponding WebSocketSender from the senderMap and calls its send () method to send data to the client:

@ RestController@RequestMapping ("/ msg") public class MsgController {@ Autowired private ConcurrentHashMap senderMap; @ RequestMapping ("/ send") public String sendMessage (@ RequestParam String id, @ RequestParam String data) {WebSocketSender sender = senderMap.get (id); if (sender! = null) {sender.sendData (data) Return String.format ("Message'% s' sent to connection:% s.", data, id);} else {return String.format ("Connection of id'% s' doesn't exist", id);} 5. test

Instead of writing the page, I will directly test it with https://www.websocket.org/echo.html. The results are as follows:

In this way, even if the fixed-point push is completed, all push and some push will no longer be written, just take it out of the ConcurrentHashMap and send it.

What is the above content of WebFlux fixed-point push and full push flexible websocket application? have you learned the knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report