In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-25 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >
Share
Shulou(Shulou.com)06/03 Report--
This article introduces the relevant knowledge of "how to understand Dotnet Core multipath asynchronous termination". In the operation process of actual cases, many people will encounter such difficulties. Next, let Xiaobian lead you to learn how to deal with these situations! I hope you can read carefully and learn something!
I. Start
Suppose we have an API like this:
Client establishes connection
There is a SendAsync message sent from the client to the server
There is a TryReceiveAsync message trying to wait for a message from the server (the server has a message sent that is True and returns False)
The server controls the termination of the data flow, and if the server sends the last message, the client does not send any more messages.
The interface code can be written as follows:
interface ITransport : IAsyncDisposable { ValueTask SendAsync(TRequest request, CancellationToken cancellationToken); ValueTask TryReceiveAsync(CancellationToken cancellationToken); }
Ignoring the connections, the code doesn't look complicated.
Below, we create two loops and expose the data through enumerators:
ITransport transport; public async IAsyncEnumerable ReceiveAsync([EnumeratorCancellation] CancellationToken cancellationToken) { while (true) { var (success, message) = await transport.TryReceiveAsync(cancellationToken); if (! success) break; yield return message; } } public async ValueTask SendAsync(IAsyncEnumerable data, CancellationToken cancellationToken) { await foreach (var message in data.WithCancellation(cancellationToken)) { await transport.SendAsync(message, cancellationToken); } }
This is where the concept of asynchronous iterators is used. If you don't understand, check out my other article devoted to asynchronous iterators, Portal.
II. Settlement of termination marks
As if done, we loop receive and send, and pass the outer termination flag to both methods.
Is it really done?
Not yet. The problem is with the stop sign. We didn't take into account that the two streams are interdependent, and in particular, we didn't want the producer (code using SendAsync) to still run in any connection failure scenario.
In fact, there are more termination paths than we think:
We may have provided an external termination token for both methods, and this token may have been triggered
ReceiveAsync consumers may have provided GetAsyncEnumerator with a termination token via WithCancellation, and this token may have been triggered
Our send/receive codes may be wrong.
ReceiveAsync's consumer is in the middle of data retrieval and wants to terminate the retrieval-a simple reason is that there is an error in processing the received data
Producers in SendAsync may have errors
These are just a few possible examples, but there could be many more.
Essentially, these all represent connection termination, so we need to include all of these scenarios in a way that allows the problem to be communicated between the send and receive paths. In other words, we need our own CancellationTokenSource.
Obviously, this kind of requirement, with library to solve is more perfect. We can put these complex pieces of content into a single API that consumers can access:
public IAsyncEnumerable Duplex(IAsyncEnumerable request, CancellationToken cancellationToken = default);
This method:
Allow it to pass into a producer
Call it incoming with an external termination token
There is an asynchronous response returned
When used, we can do this:
await foreach (MyResponse item in client.Duplex(ProducerAsync())) { // ... todo } async IAsyncEnumerable ProducerAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { for (int i = 0; i
< 100; i++) { yield return new MyRequest(i); await Task.Delay(100, cancellationToken); } } 上面这段代码中,我们ProducerAsync还没有实现太多内容,目前只是传递了一个占位符。稍后我们可以枚举它,而枚举行为实际上调用了代码。 回到Duplex。这个方法,至少需要考虑两种不同的终止方式: 通过cancellationToken传入的外部令牌 使用过程中可能传递给GetAsyncEnumerator()的潜在的令牌 这儿,为什么不是之前列出的更多种终止方式呢?这儿要考虑到编译器的组合方式。我们需要的不是一个CancellationToken,而是一个CancellationTokenSource。 public IAsyncEnumerable Duplex(IAsyncEnumerable request, CancellationToken cancellationToken = default) =>DuplexImpl(transport, request, cancellationToken); private async static IAsyncEnumerable DuplexImpl(ITransport transport, IAsyncEnumerable request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default) { using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken); // ... todo }
这里,DuplexImpl方法允许枚举终止,但又与外部终止标记保持分离。这样,在编译器层面不会被合并。在里面,CreateLinkedTokenSource反倒像编译器的处理。
现在,我们有一个CancellationTokenSource,需要时,我们可能通过它来终止循环的运行。
using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken); try { // ... todo } finally { allDone.Cancel(); }
通过这种方式,我们可以处理这样的场景:消费者没有获取所有数据,而我们想要触发allDone,但是我们退出了DuplexImpl。这时候,迭代器的作用就很大了,它让程序变得更简单,因为用了using,最终里面的任何内容都会定位到Dispose/DisposeAsync。
下一个是生产者,也就是SendAsync。它也是双工的,对传入的消息没有影响,所以可以用Task.Run作为一个独立的代码路径开始运行,而如果生产者出现错误,则终止发送。上边的todo部分,可以加入:
var send = Task.Run(async () => { try { await foreach (var message in request.WithCancellation(allDone.Token)) { await transport.SendAsync(message, allDone.Token); } } catch { allDone.Cancel(); throw; } }, allDone.Token); // ... todo: receive await send;
这里启动了一个生产者的并行操作SendAsync。注意,这里我们用标记allDone.Token把组合的终止标记传递给生产者。延迟await是为了允许ProducerAsync方法里可以使用终止令牌,以满足复合双工操作的生命周期要求。
这样,接收代码就变成了:
while (true) { var (success, message) = await transport.TryReceiveAsync(allDone.Token); if (!success) break; yield return message; } allDone.Cancel();
最后,把这部分代码合在一起看看:
private async static IAsyncEnumerable DuplexImpl(ITransport transport, IAsyncEnumerable request, CancellationToken externalToken, [EnumeratorCancellation] CancellationToken enumeratorToken = default) { using var allDone = CancellationTokenSource.CreateLinkedTokenSource(externalToken, enumeratorToken); try { var send = Task.Run(async () => { try { await foreach (var message in request.WithCancellation(allDone.Token)) { await transport.SendAsync(message, allDone.Token); } } catch { allDone.Cancel(); throw; } }, allDone.Token); while (true) { var (success, message) = await transport.TryReceiveAsync(allDone.Token); if (!success) break; yield return message; } allDone.Cancel(); await send; } finally { allDone.Cancel(); } }"如何理解Dotnet Core多路径异步终止"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.