From 2f57f055a8f56365ff34acb38637d8e6950dbd8e Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 14 May 2026 11:48:09 -0400 Subject: [PATCH 1/2] chore(bus): avoid stop thread hop Signed-off-by: Yordis Prieto --- .../Bus/MultiQueuedHandlerTests.cs | 37 ++++++++++++++++++- src/EventStore.Core/Bus/MultiQueuedHandler.cs | 2 +- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs b/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs index aec56f140..81d96af95 100644 --- a/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs @@ -45,6 +45,39 @@ public void publishes_messages_without_a_synchronization_group_round_robin() Assert.Equal([second], queues[1].Messages); } + [Fact] + public async Task stop_waits_for_all_queues() + { + var queues = CreateQueues(2); + var firstStop = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var secondStop = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + queues[0].StopTask = firstStop.Task; + queues[1].StopTask = secondStop.Task; + var sut = new MultiQueuedHandler(queues.Length, i => queues[i]); + + var stop = sut.Stop(); + Assert.False(stop.IsCompleted); + + firstStop.SetResult(null); + Assert.False(stop.IsCompleted); + + secondStop.SetResult(null); + await stop; + } + + [Fact] + public async Task stop_propagates_queue_stop_failures() + { + var queues = CreateQueues(2); + var failure = new InvalidOperationException("stop failed"); + queues[0].StopTask = Task.FromException(failure); + var sut = new MultiQueuedHandler(queues.Length, i => queues[i]); + + var ex = await Assert.ThrowsAsync(sut.Stop); + + Assert.Same(failure, ex); + } + private static RecordingQueuedHandler[] CreateQueues(int count) { var queues = new RecordingQueuedHandler[count]; @@ -67,11 +100,13 @@ private sealed class RecordingQueuedHandler(string name) : IQueuedHandler public List Messages { get; } = []; + public Task StopTask { get; set; } = Task.CompletedTask; + public void Publish(Message message) => Messages.Add(message); public Task Start() => Task.CompletedTask; - public Task Stop() => Task.CompletedTask; + public Task Stop() => StopTask; public void RequestStop() { diff --git a/src/EventStore.Core/Bus/MultiQueuedHandler.cs b/src/EventStore.Core/Bus/MultiQueuedHandler.cs index 98355dbe0..b7edd1055 100644 --- a/src/EventStore.Core/Bus/MultiQueuedHandler.cs +++ b/src/EventStore.Core/Bus/MultiQueuedHandler.cs @@ -48,7 +48,7 @@ public Task Stop() var queues = _queues.Span; for (int i = 0; i < queues.Length; ++i) { - stopTasks[i] = Task.Run(queues[i].Stop); + stopTasks[i] = queues[i].Stop(); } return Task.WhenAll(stopTasks); From 28f7589597d34118a77f40383587df12b60491fd Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Thu, 14 May 2026 11:53:00 -0400 Subject: [PATCH 2/2] fix(bus): keep stopping queues after synchronous failure Signed-off-by: Yordis Prieto --- .../Bus/MultiQueuedHandlerTests.cs | 29 ++++++++++++++++++- src/EventStore.Core/Bus/MultiQueuedHandler.cs | 9 +++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs b/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs index 81d96af95..bf795e7a0 100644 --- a/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs +++ b/src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs @@ -78,6 +78,20 @@ public async Task stop_propagates_queue_stop_failures() Assert.Same(failure, ex); } + [Fact] + public async Task stop_attempts_all_queues_when_one_stop_throws_synchronously() + { + var queues = CreateQueues(2); + var failure = new InvalidOperationException("stop failed"); + queues[0].StopException = failure; + var sut = new MultiQueuedHandler(queues.Length, i => queues[i]); + + var ex = await Assert.ThrowsAsync(sut.Stop); + + Assert.Same(failure, ex); + Assert.Equal(1, queues[1].StopCount); + } + private static RecordingQueuedHandler[] CreateQueues(int count) { var queues = new RecordingQueuedHandler[count]; @@ -102,11 +116,24 @@ private sealed class RecordingQueuedHandler(string name) : IQueuedHandler public Task StopTask { get; set; } = Task.CompletedTask; + public Exception StopException { get; set; } + + public int StopCount { get; private set; } + public void Publish(Message message) => Messages.Add(message); public Task Start() => Task.CompletedTask; - public Task Stop() => StopTask; + public Task Stop() + { + StopCount++; + if (StopException is not null) + { + throw StopException; + } + + return StopTask; + } public void RequestStop() { diff --git a/src/EventStore.Core/Bus/MultiQueuedHandler.cs b/src/EventStore.Core/Bus/MultiQueuedHandler.cs index b7edd1055..2343216f6 100644 --- a/src/EventStore.Core/Bus/MultiQueuedHandler.cs +++ b/src/EventStore.Core/Bus/MultiQueuedHandler.cs @@ -48,7 +48,14 @@ public Task Stop() var queues = _queues.Span; for (int i = 0; i < queues.Length; ++i) { - stopTasks[i] = queues[i].Stop(); + try + { + stopTasks[i] = queues[i].Stop(); + } + catch (Exception ex) + { + stopTasks[i] = Task.FromException(ex); + } } return Task.WhenAll(stopTasks);