Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the function of ForkingClusterInvoker in dubbo

2025-01-19 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >

Share

Shulou(Shulou.com)06/02 Report--

In this issue, the editor will bring you about the role of ForkingClusterInvoker in dubbo. The article is rich in content and analyzes and narrates it from a professional point of view. I hope you can get something after reading this article.

ForkingClusterInvoker

Dubbo-2.7.3/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvoker.java

Public class ForkingClusterInvoker extends AbstractClusterInvoker {/ * * Use {@ link NamedInternalThreadFactory} to produce {@ link org.apache.dubbo.common.threadlocal.InternalThread} * which with the use of {@ link org.apache.dubbo.common.threadlocal.InternalThreadLocal} in {@ link RpcContext}. * / private final ExecutorService executor = Executors.newCachedThreadPool (new NamedInternalThreadFactory ("forking-cluster-timer", true)); public ForkingClusterInvoker (Directory directory) {super (directory);} @ Override @ SuppressWarnings ({"unchecked", "rawtypes"}) public Result doInvoke (final Invocation invocation, List invokers, LoadBalance loadbalance) throws RpcException {try {checkInvokers (invokers, invocation); final List selected Final int forks = getUrl (). GetParameter (FORKS_KEY, DEFAULT_FORKS); final int timeout = getUrl (). GetParameter (TIMEOUT_KEY, DEFAULT_TIMEOUT); if (forks = invokers.size ()) {selected = invokers;} else {selected = new ArrayList (); for (int I = 0; I)

< forks; i++) { Invoker invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) { //Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue ref = new LinkedBlockingQueue(); for (final Invoker invoker : selected) { executor.execute(new Runnable() { @Override public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >

= selected.size () {ref.offer (e);}});} try {Object ret = ref.poll (timeout, TimeUnit.MILLISECONDS) If (ret instanceof Throwable) {Throwable e = (Throwable) ret; throw new RpcException (e instanceof RpcException? (RpcException) e) .getCode (): 0, "Failed to forking invoke provider" + selected + ", but no luck to perform the invocation. Last error is:" + e.getMessage (), e.getCause ()! = null? E.getCause (): e);} return (Result) ret;} catch (InterruptedException e) {throw new RpcException ("Failed to forking invoke provider" + selected + ", but no luck to perform the invocation. Last error is:" + e.getMessage (), e);} finally {/ / clear attachments which is binding to current thread. RpcContext.getContext () .clearAttachments ();

ForkingClusterInvoker uses Executors.newCachedThreadPool to create an executor; whose doInvoke gets forks and timeout parameters from url, then selects the invoker of the number of forks from invokers, then puts it to executor request to execute invoker.invoke (invocation), puts Result to LinkedBlockingQueue, and finally uses the specified timeout to poll the first returned result. If there is an exception, RpcException is thrown.

ForkingClusterInvokerTest

Dubbo-2.7.3/dubbo-cluster/src/test/java/org/apache/dubbo/rpc/cluster/support/ForkingClusterInvokerTest.java

Public class ForkingClusterInvokerTest {private List invokers = new ArrayList (); private URL url = URL.valueOf ("test://test:11/test?forks=2"); private Invoker invoker1 = mock (Invoker.class); private Invoker invoker2 = mock (Invoker.class); private Invoker invoker3 = mock (Invoker.class); private RpcInvocation invocation = new RpcInvocation (); private Directory dic; private Result result = new AppResponse () BeforeEach public void setUp () throws Exception {dic = mock (Directory.class); given (dic.getUrl ()) .willReturn (url); given (dic.list (invocation)) .willReturn (invokers); given (dic.getInterface ()) .willReturn (ForkingClusterInvokerTest.class); invocation.setMethodName ("method1"); invokers.add (invoker1); invokers.add (invoker2); invokers.add (invoker3) } private void resetInvokerToException () {given (invoker1.invoke (invocation)) .willThrow (new RuntimeException ()); given (invoker1.getUrl ()) .willReturn (url); given (invoker1.isAvailable ()) .willReturn (true); given (invoker1.getInterface ()) .willReturn (ForkingClusterInvokerTest.class); given (invoker2.invoke (invocation)) .willThrow (new RuntimeException ()); given (invoker2.getUrl ()) .willReturn (url) Given (invoker2.isAvailable ()) .willReturn (true); given (invoker2.getInterface ()) .willReturn (ForkingClusterInvokerTest.class); given (invoker3.invoke (invocation)) .willThrow (new RuntimeException ()); given (invoker3.getUrl ()) .willReturn (url); given (invoker3.isAvailable ()) .willReturn (true); given (invoker3.getInterface ()) .willReturn (ForkingClusterInvokerTest.class) } private void resetInvokerToNoException () {given (invoker1.invoke (invocation)) .willReturn (result); given (invoker1.getUrl ()) .willReturn (url); given (invoker1.isAvailable ()) .willReturn (true); given (invoker1.getInterface ()) .willReturn (ForkingClusterInvokerTest.class); given (invoker2.invoke (invocation) .willReturn (result); given (invoker2.getUrl ()) .willReturn (url)) Given (invoker2.isAvailable ()) .willReturn (true); given (invoker2.getInterface ()) .willReturn (ForkingClusterInvokerTest.class); given (invoker3.invoke (invocation)) .willReturn (result); given (invoker3.getUrl ()) .willReturn (url); given (invoker3.isAvailable ()) .willReturn (true); given (invoker3.getInterface ()) .willReturn (ForkingClusterInvokerTest.class) } @ Test public void testInvokeException () {resetInvokerToException (); ForkingClusterInvoker invoker = new ForkingClusterInvoker (dic); try {invoker.invoke (invocation); Assertions.fail ();} catch (RpcException expected) {Assertions.assertTrue (expected.getMessage (). Contains ("Failed to forking invoke provider")); assertFalse (expected.getCause () instanceof RpcException) } @ Test public void testClearRpcContext () {resetInvokerToException (); ForkingClusterInvoker invoker = new ForkingClusterInvoker (dic); String attachKey = "attach"; String attachValue = "value"; RpcContext.getContext () .setAttachment (attachKey, attachValue); Map attachments = RpcContext.getContext () .getAttachments () Assertions.assertTrue (attachments! = null & & attachments.size () = = 1, "set attachment failed!"); try {invoker.invoke (invocation); Assertions.fail ();} catch (RpcException expected) {Assertions.assertTrue (expected.getMessage (). Contains ("Failed to forking invoke provider"), "Succeeded to forking invoke provider!"); assertFalse (expected.getCause () instanceof RpcException) } Map afterInvoke = RpcContext.getContext (). GetAttachments (); Assertions.assertTrue (afterInvoke! = null & & afterInvoke.size () = = 0, "clear attachment failed!");} @ Test () public void testInvokeNoException () {resetInvokerToNoException (); ForkingClusterInvoker invoker = new ForkingClusterInvoker (dic); Result ret = invoker.invoke (invocation); Assertions.assertSame (result, ret);}}

ForkingClusterInvokerTest validates two scenarios of testInvokeException and testClearRpcContext.

Summary

ForkingClusterInvoker uses Executors.newCachedThreadPool to create an executor; whose doInvoke gets forks and timeout parameters from url, then selects the invoker of the number of forks from invokers, then puts it to executor request to execute invoker.invoke (invocation), puts Result to LinkedBlockingQueue, and finally uses the specified timeout to poll the first returned result. If there is an exception, RpcException is thrown.

The above is what the role of ForkingClusterInvoker in dubbo is shared by the editor. If you happen to have similar doubts, you might as well refer to the above analysis to understand. If you want to know more about it, you are welcome to follow the industry information channel.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Internet Technology

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report