Skip to main content

Streaming

Vali-Mediator supports streaming responses via IAsyncEnumerable<T>, enabling real-time data delivery without loading all results into memory.

Defining a Stream Request

public record GetProductsStreamRequest(string Category) : IStreamRequest<ProductDto>;

Implementing the Handler

public class GetProductsStreamHandler
: IStreamRequestHandler<GetProductsStreamRequest, ProductDto>
{
private readonly IProductRepository _repository;

public GetProductsStreamHandler(IProductRepository repository)
{
_repository = repository;
}

public async IAsyncEnumerable<ProductDto> Handle(
GetProductsStreamRequest request,
[EnumeratorCancellation] CancellationToken ct)
{
await foreach (var product in _repository.StreamByCategory(request.Category, ct))
{
yield return new ProductDto(product.Id, product.Name, product.Price);
}
}
}

Consuming the Stream

Use CreateStream on the mediator:

IAsyncEnumerable<ProductDto> stream = _mediator.CreateStream(
new GetProductsStreamRequest("Electronics"));

await foreach (var product in stream.WithCancellation(cancellationToken))
{
Console.WriteLine($"{product.Name}: {product.Price:C}");
}

Minimal API Example

app.MapGet("/products/stream", async (
string category,
IValiMediator mediator,
CancellationToken ct) =>
{
async IAsyncEnumerable<ProductDto> GetProducts()
{
await foreach (var p in mediator.CreateStream(
new GetProductsStreamRequest(category)).WithCancellation(ct))
{
yield return p;
}
}

return Results.Ok(GetProducts());
});
warning

Streaming bypasses pipeline behaviors entirely. Pre/post processors and behaviors are not executed for stream requests. If you need cross-cutting concerns, implement them directly in the handler.

Use Cases

  • Large datasets — stream thousands of records without memory pressure
  • Real-time data — live feed of sensor readings, events, or messages
  • Server-Sent Events — pair with ASP.NET Core SSE endpoints
  • Progress reporting — yield progress updates as a long task completes