In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-28 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)06/02 Report--
This article shows you what the role of ConnectionOrderedDispatcher in dubbo is, the content is concise and easy to understand, it can definitely brighten your eyes. I hope you can get something through the detailed introduction of this article.
Dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedDispatcher.java
Public class ConnectionOrderedDispatcher implements Dispatcher {public static final String NAME = "connection"; @ Override public ChannelHandler dispatch (ChannelHandler handler, URL url) {return new ConnectionOrderedChannelHandler (handler, url);}}
ConnectionOrderedDispatcher implements the Dispatcher interface, and its dispatch method returns ConnectionOrderedChannelHandler
ConnectionOrderedChannelHandler
Dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/connection/ConnectionOrderedChannelHandler.java
Public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit; public ConnectionOrderedChannelHandler (ChannelHandler handler, URL url) {super (handler, url); String threadName = url.getParameter (THREAD_NAME_KEY, DEFAULT_THREAD_NAME) ConnectionExecutor = new ThreadPoolExecutor (1,1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (url.getPositiveParameter (CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory (threadName, true), new AbortPolicyWithReport (threadName, url)); / / FIXME There's no place to release connectionExecutor! Queuewarninglimit = url.getParameter (CONNECT_QUEUE_WARNING_SIZE, DEFAULT_CONNECT_QUEUE_WARNING_SIZE);} @ Override public void connected (Channel channel) throws RemotingException {try {checkQueueLength (); connectionExecutor.execute (new ChannelEventRunnable (channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException ("connect event", channel, getClass () + "error when process connected event.", t) } @ Override public void disconnected (Channel channel) throws RemotingException {try {checkQueueLength (); connectionExecutor.execute (new ChannelEventRunnable (channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException ("disconnected event", channel, getClass () + "error when process disconnected event.", t) } @ Override public void received (Channel channel, Object message) throws RemotingException {ExecutorService executor = getExecutorService (); try {executor.execute (new ChannelEventRunnable (channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {/ / fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout. If (message instanceof Request & & t instanceof RejectedExecutionException) {Request request = (Request) message; if (request.isTwoWay) {String msg = "Server side (" + url.getIp () + "," + url.getPort () + ") threadpool is exhausted, detail msg:" + t.getMessage (); Response response = new Response (request.getId (), request.getVersion ()) Response.setStatus (Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage (msg); channel.send (response); return;}} throw new ExecutionException (message, channel, getClass () + "error when process received event.", t) } @ Override public void caught (Channel channel, Throwable exception) throws RemotingException {ExecutorService executor = getExecutorService (); try {executor.execute (new ChannelEventRunnable (channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException ("caught event", channel, getClass () + "error when process caught event.", t) }} private void checkQueueLength () {if (connectionExecutor.getQueue (). Size () > queuewarninglimit) {logger.warn (new IllegalThreadStateException ("connectionordered channel handler `queue size:" + connectionExecutor.getQueue (). Size () + "exceed the warninglimit number:" + queuewarninglimit));}
ConnectionOrderedChannelHandler inherits WrappedChannelHandler, and its constructor creates a connectionExecutor whose corePoolSize and maximumPoolSize are both 1 and LinkedBlockingQueue.
Its connected and disconnected methods both use connectionExecutor to execute the newly created ChannelEventRunnable;. Both methods first execute checkQueueLength to determine whether the queue size is larger than queuewarninglimit, and if so, print the warn log.
Its received and caught obtain the thread pool through the getExecutorService of the parent class, and then execute the created ChannelEventRunnable;received method. When an exception is caught, RejectedExecutionException and message are Request, and request is twoWay, and SERVER_THREADPOOL_EXHAUSTED_ERROR is returned.
ConnectChannelHandlerTest
Dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/handler/ConnectChannelHandlerTest.java
Public class ConnectChannelHandlerTest extends WrappedChannelHandlerTest {@ BeforeEach public void setUp () throws Exception {handler = new ConnectionOrderedChannelHandler (new BizChannelHander (true), url);} @ Test public void test_Connect_Blocked () throws RemotingException {handler = new ConnectionOrderedChannelHandler (new BizChannelHander (false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField (handler, "connectionExecutor", 1); Assertions.assertEquals (1, executor.getMaximumPoolSize ()); int runs = 20 Int taskCount = runs * 2; for (int I = 0; I
< runs; i++) { handler.connected(new MockedChannel()); handler.disconnected(new MockedChannel()); Assertions.assertTrue(executor.getActiveCount() { handler = new ConnectionOrderedChannelHandler(new BizChannelHander(false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField(handler, "connectionExecutor", 1); executor.shutdown(); handler.connected(new MockedChannel()); }); } @Test public void test_Disconnect_Execute_Error() throws RemotingException { Assertions.assertThrows(ExecutionException.class, () ->{handler = new ConnectionOrderedChannelHandler (new BizChannelHander (false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField (handler, "connectionExecutor", 1); executor.shutdown (); handler.disconnected (new MockedChannel ());}) } / / throw ChannelEventRunnable.runtimeExeception (int logger) not in execute exception @ Test// (expected = RemotingException.class) public void test_MessageReceived_Biz_Error () throws RemotingException {handler.received (new MockedChannel (), ");} / / throw ChannelEventRunnable.runtimeExeception (int logger) not in execute exception @ Test public void test_Caught_Biz_Error () throws RemotingException {handler.caught (new MockedChannel (), new BizException ()) @ Test public void test_Received_InvokeInExecuter () throws RemotingException {Assertions.assertThrows (ExecutionException.class, ()-> {handler = new ConnectionOrderedChannelHandler (new BizChannelHander (false), url); ThreadPoolExecutor executor = (ThreadPoolExecutor) getField (handler, "SHARED_EXECUTOR", 1); executor.shutdown (); executor = (ThreadPoolExecutor) getField (handler, "executor", 1); executor.shutdown () Handler.received (new MockedChannel (), ");});} / * Events do not pass through the thread pool and execute directly on the IO * / @ SuppressWarnings (" deprecation ") @ Disabled (" Heartbeat is processed in HeartbeatHandler not WrappedChannelHandler. ") @ Test public void test_Received_Event_invoke_direct () throws RemotingException {handler = new ConnectionOrderedChannelHandler (new BizChannelHander (false), url) ThreadPoolExecutor executor = (ThreadPoolExecutor) getField (handler, "SHARED_EXECUTOR", 1); executor.shutdown (); executor = (ThreadPoolExecutor) getField (handler, "executor", 1); executor.shutdown (); Request req = new Request (); req.setHeartbeat (true); final AtomicInteger count = new AtomicInteger (0) Handler.received (new MockedChannel () {@ Override public void send (Object message) throws RemotingException {Assertions.assertTrue (Response) message). IsHeartbeat (), "response.heartbeat"); count.incrementAndGet ();}}, req); Assertions.assertEquals (1, count.get (), "channel.send must be invoke");}}
ConnectChannelHandlerTest created ConnectionOrderedChannelHandler in setup, and then test_Connect_Blocked, test_Connect_Biz_Error, test_Disconnect_Biz_Error, test_Connect_Execute_Error, test_Disconnect_Execute_Error, test_MessageReceived_Biz_Error, test_Caught_Biz_Error, test_Received_InvokeInExecuter, test_Received_Event_invoke_direct
Summary
ConnectionOrderedDispatcher implements the Dispatcher interface, and its dispatch method returns that ConnectionOrderedChannelHandler;ConnectionOrderedChannelHandler inherits WrappedChannelHandler, and its constructor creates a connectionExecutor whose corePoolSize and maximumPoolSize are both 1 and LinkedBlockingQueue.
Both the connected and disconnected methods of ConnectionOrderedChannelHandler use connectionExecutor to execute the newly created ChannelEventRunnable;. Both methods first execute checkQueueLength to determine whether the queue size is larger than queuewarninglimit, and if so, print the warn log.
The received and caught of ConnectionOrderedChannelHandler obtain the thread pool through the getExecutorService of the parent class, and then execute the created ChannelEventRunnable;received method. When an exception is caught, RejectedExecutionException and message are Request, and request is twoWay, and SERVER_THREADPOOL_EXHAUSTED_ERROR is returned.
The above content is what is the role of ConnectionOrderedDispatcher in dubbo. Have you learned any knowledge or skills? If you want to learn more skills or enrich your knowledge reserve, you are welcome to follow the industry information channel.
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
Adb shell# cd / data# su# touch rt5025.log#dmesg > rt5025.log
© 2024 shulou.com SLNews company. All rights reserved.