How To Zip Concurrently Two IAsyncEnumerables?
I have two asynchronous sequences that I want to "zip" in pairs, and for this purpose I used the Zip
operator from the System.Linq.Async package. This operator behaves in an undesirable way though, at least for my case. Instead of enumerating the two sequences concurrently, it enumerates them sequentially, with the result being that the latency is added up. Each of my sequences emits an element every one second on average, and I expected that the combined sequence would also emit zipped pairs every one second, but in reality I get one pair every 2 seconds. Below is a minimal example that demonstrates this behavior:
static async IAsyncEnumerable<int> First()
{
for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}
static async IAsyncEnumerable<int> Second()
{
for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}
var stopwatch = Stopwatch.StartNew();
await foreach (var pair in First().Zip(Second()))
Console.WriteLine(pair);
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");
Output:
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
Duration: 10,155 msec
Is there any way that I can Zip
these two sequences in a way that the program completes in 5 seconds instead of 10? I am interested about a custom operator, or about a combination of operators from the official packages, that has the desirable behavior.
Answer
Something like this appears to work:
public static async IAsyncEnumerable<(TFirst, TSecond)> Zip<TFirst, TSecond>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second)
{
await using var e1 = first.GetAsyncEnumerator();
await using var e2 = second.GetAsyncEnumerator();
while (true)
{
var t1 = e1.MoveNextAsync();
var t2 = e2.MoveNextAsync();
await Task.WhenAll(new[] { t1.AsTask(), t2.AsTask() });
if (!t1.Result || !t2.Result)
yield break;
yield return (e1.Current, e2.Current);
}
}
See it on dotnetfiddle.net.
Of course, this misses things like null checks, so could do with some improvements: that's left as an excercise for the reader.
I'm also not convinced that the Task.WhenAll
is any better than bool r1 = await t1; bool r2 = await t2; if (!r1 || !r2) yield break;
here.
Related Questions
- → How to Fire Resize event after all images resize
- → JavaScript in MVC 5 not being read?
- → URL routing requires /Home/Page?page=1 instead of /Home/Page/1
- → Getting right encoding from HTTPContext
- → How to create a site map using DNN and C#
- → I want integrate shopify into my mvc 4 c# application
- → Bootstrap Nav Collapse via Data Attributes Not Working
- → Shopify api updating variants returned error
- → Get last n quarters in JavaScript
- → ASP.NET C# SEO for each product on detail page on my ECOMMERCE site
- → SEO Meta Tags From Behind Code - C#
- → onchange display GridView record if exist from database using Javascript
- → How to implement search with two terms for a collection?