How to batch an IAsyncEnumerable, enforcing a maximum interval policy between consecutive batches?

Total
1
Shares

I have an asynchronous sequence (stream) of messages that are arriving sometimes numerously and sometimes sporadically, and I would like to process them in batches of 10 messages per batch. I also want to enforce an upper limit to the latency between receiving a message and processing it, so a batch with fewer than 10 messages should also be processed, if 5 seconds have passed after receiving the first message of the batch. I found that I can solve the first part of the problem by using the Buffer operator from the System.Interactive.Async package:

IAsyncEnumerable<Message> source = GetStreamOfMessages();
IAsyncEnumerable<IList<Message>> batches = source.Buffer(10);
await foreach (IList<Message> batch in batches)
{
    // Process batch
}

The signature of the Buffer operator:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

Unfortunately the Buffer operator has no overload with a TimeSpan parameter, so I can’t solve the second part of the problem so easily. I’ll have to implement somehow a batching operator with a timer myself. My question is: how can I implement a variant of the Buffer operator that has the signature below?

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count);

Regarding the behavior of the timer, it’s not needed to be as precise as I described earlier. I am OK if the resulting sequence just emits a buffer every 5 seconds max, even if less that 5 seconds have passed after receiving the first message of the batch. Also emitting empty buffers is OK. I would prefer a simple implementation with a non-perfect behavior, over a perfect one that is overly complicated.

I am also OK with adding external dependencies to my project if needed, like the System.Interactive.Async or the System.Linq.Async packages.

P.S. this question was inspired by a recent question related to channels and memory leaks.


Clarification: regarding my preference for simplicity over accuracy, I mean that it’s OK if a buffer is emitted sooner than what is required by the maximum latency policy. It’s not OK though if this policy is violated. Under no circumstances the processing of a message should be postponed for more than 5 seconds. Also emitting buffers more frequently than every 5 seconds is not allowed, unless the buffers are full.


Solution

Here are two approaches to this problem. The first one is flawed, but I am posting it anyway due to its extreme simplicity. A Buffer operator with a TimeSpan parameter already exists in the System.Reactive package, and converters between asynchronous and observable sequences exist in the System.Linq.Async package. So it’s just a matter of chaining together three already available operators:

public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
    return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}

Unfortunately this neat approach is flawed, because of the side-effects of shifting from the pull to the push and back to the pull model. What happens is that the intermediate observable sequence, when subscribed, starts pulling aggressively the source IAsyncEnumerable, regardless of how the resulting IAsyncEnumerable is pulled. So instead of the consumer of the resulting sequence being the driver of the enumeration, the enumeration happens silently in the background in the maximum speed allowed by the source sequence, and the produced messages are buffered in an internal queue. So not only it’s possible for hidden latency to be imposed to the processing of the messages, but also it’s possible for the memory consumption to skyrocket out of control.

The second is a hands-on approach, that uses the Task.Delay method as a timer, and the Task.WhenAny method for coordinating the timer and enumeration tasks. The behavior of this approach is similar to the Rx-based approach, except that the enumeration of the source sequence is driven by the consumer of the resulting sequence, as one would expect.

/// <summary>
/// Projects each element of an async-enumerable sequence into a buffer that's
/// sent out when either it's full or a given amount of time has elapsed.
/// </summary>
public static async IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    await using var enumerator = source.GetAsyncEnumerator(cancellationToken);
    var moveTask = enumerator.MoveNextAsync().AsTask();
    var timerCts = new CancellationTokenSource();
    var delayTask = Task.Delay(timeSpan, timerCts.Token);
    var buffer = new List<TSource>(count);
    while (true)
    {
        var completedTask = await Task.WhenAny(moveTask, delayTask);
        if (completedTask == moveTask)
        {
            if (!await moveTask) break;
            buffer.Add(enumerator.Current);
            if (buffer.Count == count)
            {
                timerCts.Cancel(); timerCts.Dispose();
                yield return buffer.ToArray();
                buffer.Clear();
                timerCts = new CancellationTokenSource();
                delayTask = Task.Delay(timeSpan, timerCts.Token);
            }
            moveTask = enumerator.MoveNextAsync().AsTask();
        }
        else // completedTask == delayTask
        {
            yield return buffer.ToArray();
            buffer.Clear();
            delayTask = Task.Delay(timeSpan, timerCts.Token);
        }
    }
    timerCts.Cancel(); timerCts.Dispose();
    if (buffer.Count > 0)
    {
        yield return buffer.ToArray();
    }
}
Leave a Reply

Your email address will not be published. Required fields are marked *