diff --git a/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs b/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs index aec56f140..bf795e7a0 100644 --- a/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs @@ -45,6 +45,53 @@ public void publishes_messages_without_a_synchronization_group_round_robin() Assert.Equal([second], queues[1].Messages); } + [Fact] + public async Task stop_waits_for_all_queues() + { + var queues = CreateQueues(2); + var firstStop = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var secondStop = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + queues[0].StopTask = firstStop.Task; + queues[1].StopTask = secondStop.Task; + var sut = new MultiQueuedHandler(queues.Length, i => queues[i]); + + var stop = sut.Stop(); + Assert.False(stop.IsCompleted); + + firstStop.SetResult(null); + Assert.False(stop.IsCompleted); + + secondStop.SetResult(null); + await stop; + } + + [Fact] + public async Task stop_propagates_queue_stop_failures() + { + var queues = CreateQueues(2); + var failure = new InvalidOperationException("stop failed"); + queues[0].StopTask = Task.FromException(failure); + var sut = new MultiQueuedHandler(queues.Length, i => queues[i]); + + var ex = await Assert.ThrowsAsync(sut.Stop); + + Assert.Same(failure, ex); + } + + [Fact] + public async Task stop_attempts_all_queues_when_one_stop_throws_synchronously() + { + var queues = CreateQueues(2); + var failure = new InvalidOperationException("stop failed"); + queues[0].StopException = failure; + var sut = new MultiQueuedHandler(queues.Length, i => queues[i]); + + var ex = await Assert.ThrowsAsync(sut.Stop); + + Assert.Same(failure, ex); + Assert.Equal(1, queues[1].StopCount); + } + private static RecordingQueuedHandler[] CreateQueues(int count) { var queues = new RecordingQueuedHandler[count]; @@ -67,11 +114,26 @@ private sealed class RecordingQueuedHandler(string name) : IQueuedHandler public List Messages { get; } = []; + public Task StopTask { get; set; } = Task.CompletedTask; + + public Exception StopException { get; set; } + + public int StopCount { get; private set; } + public void Publish(Message message) => Messages.Add(message); public Task Start() => Task.CompletedTask; - public Task Stop() => Task.CompletedTask; + public Task Stop() + { + StopCount++; + if (StopException is not null) + { + throw StopException; + } + + return StopTask; + } public void RequestStop() { diff --git a/src/EventStore.Core/Bus/MultiQueuedHandler.cs b/src/EventStore.Core/Bus/MultiQueuedHandler.cs index 98355dbe0..2343216f6 100644 --- a/src/EventStore.Core/Bus/MultiQueuedHandler.cs +++ b/src/EventStore.Core/Bus/MultiQueuedHandler.cs @@ -48,7 +48,14 @@ public Task Stop() var queues = _queues.Span; for (int i = 0; i < queues.Length; ++i) { - stopTasks[i] = Task.Run(queues[i].Stop); + try + { + stopTasks[i] = queues[i].Stop(); + } + catch (Exception ex) + { + stopTasks[i] = Task.FromException(ex); + } } return Task.WhenAll(stopTasks);