Streaming
Streaming lets a handler yield results one item at a time via IAsyncEnumerable<Result<TItem>>. Use it when the full result set is too large to materialize in memory, or when you want to start sending data to the client before all items are ready.
Define a streaming request
A streaming request implements IStreamRequest<TItem>:
using UnambitiousFx.Synapse.Abstractions;
public record StreamTasksQuery(string? Filter) : IStreamRequest<TaskDto>;
Implement the handler
using System.Runtime.CompilerServices;
using UnambitiousFx.Functional;
using UnambitiousFx.Synapse.Abstractions;
public class StreamTasksQueryHandler : IStreamRequestHandler<StreamTasksQuery, TaskDto>
{
private readonly ITaskRepository _repository;
public StreamTasksQueryHandler(ITaskRepository repository) => _repository = repository;
public async IAsyncEnumerable<Result<TaskDto>> HandleAsync(
StreamTasksQuery query,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var task in _repository.StreamAsync(query.Filter, ct))
{
yield return Result.Success(TaskDto.From(task));
}
}
}
Key points:
- Return type is
IAsyncEnumerable<Result<TItem>>— each item is independently wrapped in aResult. - Decorate
CancellationTokenwith[EnumeratorCancellation]so that cancellation tokens passed toawait foreachare forwarded correctly. - Yield
Result.Success(item)for successful items; yieldResult.Failure<TItem>(...)to signal a per-item error without aborting the entire stream.
Register the handler
services.AddSynapse(cfg =>
{
cfg.RegisterStreamRequestHandler<StreamTasksQueryHandler, StreamTasksQuery, TaskDto>();
});
Invoke the stream
Use IInvoker.InvokeStreamAsync<TItem>. The item type is inferred from IStreamRequest<TItem>:
await foreach (var result in invoker.InvokeStreamAsync(new StreamTasksQuery(filter: null), ct))
{
if (result.TryGet(out var dto, out var error))
Console.WriteLine(dto.Title);
else
Console.WriteLine($"Item error: {error}");
}
Per-item error handling
Because each item is a Result<TItem>, a failure in one item does not cancel the rest of the stream. This is useful when some items may be unavailable or fail to deserialize:
await foreach (var result in invoker.InvokeStreamAsync(new StreamTasksQuery(), ct))
{
result.Match(
success: dto => ProcessItem(dto),
failure: error => LogError(error)); // stream continues after this
}
ASP.NET Core — streaming HTTP responses
When using IHttpInvoker (from UnambitiousFx.Synapse.AspNetCore), call InvokeStreamAsync to get back an IAsyncEnumerable<TItem> with successful items already unwrapped and failures silently skipped:
app.MapGet("/tasks/stream", ([FromServices] IHttpInvoker invoker, CancellationToken ct) =>
invoker.InvokeStreamAsync(new StreamTasksQuery(), ct));
Returning IAsyncEnumerable<T> directly from a Minimal API endpoint causes ASP.NET Core to stream the JSON array to the client rather than buffering the entire response.
See also
- Commands and Queries — for non-streaming requests.
- ASP.NET Core Integration —
IHttpInvoker.InvokeStreamAsyncand HTTP streaming. - Pipeline Behaviors —
IStreamRequestPipelineBehaviorfor stream cross-cutting concerns.