Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the function of ExecutionDispatcher in dubbo

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

This article shows you what the role of ExecutionDispatcher in dubbo is, the content is concise and easy to understand, it can definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.

ExecutionDispatcher

Dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java

Public class ExecutionDispatcher implements Dispatcher {public static final String NAME = "execution"; @ Override public ChannelHandler dispatch (ChannelHandler handler, URL url) {return new ExecutionChannelHandler (handler, url);}}

ExecutionDispatcher implements the Dispatcher interface, and its dispatch method returns ExecutionChannelHandler

ExecutionChannelHandler

Dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java

Public class ExecutionChannelHandler extends WrappedChannelHandler {public ExecutionChannelHandler (ChannelHandler handler, URL url) {super (handler, url);} @ Override public void received (Channel channel, Object message) throws RemotingException {ExecutorService executor = getExecutorService (); if (message instanceof Request) {try {executor.execute (new ChannelEventRunnable (channel, handler, ChannelState.RECEIVED, message)) } catch (Throwable t) {/ / FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly, / / therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent / / this scenario from happening, but a better solution should be considered later. If (t instanceof RejectedExecutionException) {Request request = (Request) message; if (request.isTwoWay ()) {String msg = "Server side (" + url.getIp () + "," + url.getPort () + ") thread pool is exhausted, detail msg:" + t.getMessage () Response response = new Response (request.getId (), request.getVersion ()); response.setStatus (Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage (msg); channel.send (response); return } throw new ExecutionException (message, channel, getClass () + "error when process received event.", t);} else {handler.received (channel, message);}

ExecutionChannelHandler inherits WrappedChannelHandler, and its received method determines whether message is of type Request. If so, create ChannelEventRunnable and put it into thread pool for execution. If not, execute handler.received directly.

PerformanceServerTest

Dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java

Public class PerformanceServerTest {private static final Logger logger = LoggerFactory.getLogger (PerformanceServerTest.class); private static ExchangeServer server = null; private static void restartServer (int times, int alive, int sleep) throws Exception {if (server! = null & &! server.isClosed ()) {server.close (); Thread.sleep (100);} for (int I = 0; I

< times; i++) { logger.info("restart times:" + i); server = statServer(); if (alive >

0) Thread.sleep (alive); server.close (); if (sleep > 0) Thread.sleep (sleep);} server = statServer ();} private static ExchangeServer statServer () throws Exception {final int port = PerformanceUtils.getIntProperty ("port", 9911); final String transporter = PerformanceUtils.getProperty (Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER) Final String serialization = PerformanceUtils.getProperty (Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); final String threadpool = PerformanceUtils.getProperty (THREADPOOL_KEY, DEFAULT_THREADPOOL); final int threads = PerformanceUtils.getIntProperty (THREADS_KEY, DEFAULT_THREADS); final int iothreads = PerformanceUtils.getIntProperty (IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS); final int buffer = PerformanceUtils.getIntProperty (BUFFER_KEY, DEFAULT_BUFFER_SIZE) Final String channelHandler = PerformanceUtils.getProperty (Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME) / Start server ExchangeServer server = Exchangers.bind ("exchange://0.0.0.0:" + port + "? transporter=" + transporter + "& serialization=" + serialization + "& threadpool=" + threadpool + "& threads=" + threads + "& iothreads=" + iothreads + "& buffer=" + buffer + "& channel.handler=" + channelHandler, new ExchangeHandlerAdapter () {public String telnet (Channel channel) String message) throws RemotingException {return "echo:" + message + "\ r\ ntelnet >" } public CompletableFuture reply (ExchangeChannel channel, Object request) throws RemotingException {if ("environment" .equals (request)) {return CompletableFuture.completedFuture (PerformanceUtils.getEnvironment ());} if ("scene" .equals (request)) {List scene = new ArrayList () Scene.add ("Transporter:" + transporter); scene.add ("Service Threads:" + threads); return CompletableFuture.completedFuture (scene);} return CompletableFuture.completedFuture (request);}}); return server } private static ExchangeServer statTelnetServer (int port) throws Exception {/ / Start server ExchangeServer telnetserver = Exchangers.bind ("exchange://0.0.0.0:" + port, new ExchangeHandlerAdapter () {public String telnet (Channel channel) String message) throws RemotingException {if (message.equals ("help")) {return "support cmd:\ r\ n\ tstart\ r\ n\ tstop\ r\ n\ tshutdown\ n\ trestart times [alive] [sleep]\ r\ ntelnet >" } else if (message.equals ("stop")) {logger.info ("server closed:" + server); server.close (); return "stop server\ r\ ntelnet >" } else if (message.startsWith ("start")) {try {restartServer (0,0,0);} catch (Exception e) {e.printStackTrace ();} return "start server\ r\ ntelnet >" } else if (message.startsWith ("shutdown")) {System.exit (0); return "start server\ r\ ntelnet >";} else if (message.startsWith ("channels")) {return "server.getExchangeChannels ():" + server.getExchangeChannels (). Size () + "\ r\ ntelnet >" } else if (message.startsWith ("restart")) {/ / r times [sleep] r 10 or r 10 100 String [] args = message.split (""); int times = Integer.parseInt (args [1]); int alive = args.length > 2? Integer.parseInt (args [2]): 0; int sleep = args.length > 3? Integer.parseInt (args [3]): 100; try {restartServer (times, alive, sleep);} catch (Exception e) {e.printStackTrace () } return "restart server,times:" + times + "stop alive time:" + alive + ", sleep time:" + sleep + "usage:r times [alive] [sleep]\ r\ ntelnet >";} else {return "echo:" + message + "\ r\ ntelnet >";}) Return telnetserver;} @ Test public void testServer () throws Exception {/ / Read port from property if (PerformanceUtils.getProperty ("port", null) = = null) {logger.warn ("Please set-Dport=9911"); return;} final int port= PerformanceUtils.getIntProperty ("port", 9911); final boolean telnet = PerformanceUtils.getBooleanProperty ("telnet", true) If (telnet) statTelnetServer (port + 1); server = statServer (); synchronized (PerformanceServerTest.class) {while (true) {try {PerformanceServerTest.class.wait ();} catch (InterruptedException e) {}

The statServer method of PerformanceServerTest uses PerformanceUtils.getProperty (Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME) to get channelHandler, and if it can't find it, it uses ExecutionDispatcher.NAME.

The above content is what is the role of ExecutionDispatcher in dubbo. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report