Ad

How To Zip Concurrently Two IAsyncEnumerables?

I have two asynchronous sequences that I want to "zip" in pairs, and for this purpose I used the Zip operator from the System.Linq.Async package. This operator behaves in an undesirable way though, at least for my case. Instead of enumerating the two sequences concurrently, it enumerates them sequentially, with the result being that the latency is added up. Each of my sequences emits an element every one second on average, and I expected that the combined sequence would also emit zipped pairs every one second, but in reality I get one pair every 2 seconds. Below is a minimal example that demonstrates this behavior:

static async IAsyncEnumerable<int> First()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

static async IAsyncEnumerable<int> Second()
{
    for (int i = 1; i <= 5; i++) { await Task.Delay(1000); yield return i; }
}

var stopwatch = Stopwatch.StartNew();
await foreach (var pair in First().Zip(Second()))
    Console.WriteLine(pair);
Console.WriteLine($"Duration: {stopwatch.ElapsedMilliseconds:#,0} msec");

Output:

(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
Duration: 10,155 msec

Try it on Fiddle.

Is there any way that I can Zip these two sequences in a way that the program completes in 5 seconds instead of 10? I am interested about a custom operator, or about a combination of operators from the official packages, that has the desirable behavior.

Ad

Answer

Something like this appears to work:

public static async IAsyncEnumerable<(TFirst, TSecond)> Zip<TFirst, TSecond>(IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second)
{
    await using var e1 = first.GetAsyncEnumerator();    
    await using var e2 = second.GetAsyncEnumerator();
    
    while (true)
    {
        var t1 = e1.MoveNextAsync();
        var t2 = e2.MoveNextAsync();
        await Task.WhenAll(new[] { t1.AsTask(), t2.AsTask() });
        
        if (!t1.Result || !t2.Result)
            yield break;
        
        yield return (e1.Current, e2.Current);
    }
}

See it on dotnetfiddle.net.

Of course, this misses things like null checks, so could do with some improvements: that's left as an excercise for the reader.

I'm also not convinced that the Task.WhenAll is any better than bool r1 = await t1; bool r2 = await t2; if (!r1 || !r2) yield break; here.

Ad
source: stackoverflow.com
Ad