Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion src/EventStore.Core.XUnit.Tests/Bus/MultiQueuedHandlerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,53 @@ public void publishes_messages_without_a_synchronization_group_round_robin()
Assert.Equal([second], queues[1].Messages);
}

[Fact]
public async Task stop_waits_for_all_queues()
{
var queues = CreateQueues(2);
var firstStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
var secondStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
queues[0].StopTask = firstStop.Task;
queues[1].StopTask = secondStop.Task;
var sut = new MultiQueuedHandler(queues.Length, i => queues[i]);

var stop = sut.Stop();
Assert.False(stop.IsCompleted);

firstStop.SetResult(null);
Assert.False(stop.IsCompleted);

secondStop.SetResult(null);
await stop;
}

[Fact]
public async Task stop_propagates_queue_stop_failures()
{
var queues = CreateQueues(2);
var failure = new InvalidOperationException("stop failed");
queues[0].StopTask = Task.FromException(failure);
var sut = new MultiQueuedHandler(queues.Length, i => queues[i]);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(sut.Stop);

Assert.Same(failure, ex);
}

[Fact]
public async Task stop_attempts_all_queues_when_one_stop_throws_synchronously()
{
var queues = CreateQueues(2);
var failure = new InvalidOperationException("stop failed");
queues[0].StopException = failure;
var sut = new MultiQueuedHandler(queues.Length, i => queues[i]);

var ex = await Assert.ThrowsAsync<InvalidOperationException>(sut.Stop);

Assert.Same(failure, ex);
Assert.Equal(1, queues[1].StopCount);
}

private static RecordingQueuedHandler[] CreateQueues(int count)
{
var queues = new RecordingQueuedHandler[count];
Expand All @@ -67,11 +114,26 @@ private sealed class RecordingQueuedHandler(string name) : IQueuedHandler

public List<Message> Messages { get; } = [];

public Task StopTask { get; set; } = Task.CompletedTask;

public Exception StopException { get; set; }

public int StopCount { get; private set; }

public void Publish(Message message) => Messages.Add(message);

public Task Start() => Task.CompletedTask;

public Task Stop() => Task.CompletedTask;
public Task Stop()
{
StopCount++;
if (StopException is not null)
{
throw StopException;
}

return StopTask;
}

public void RequestStop()
{
Expand Down
9 changes: 8 additions & 1 deletion src/EventStore.Core/Bus/MultiQueuedHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ public Task Stop()
var queues = _queues.Span;
for (int i = 0; i < queues.Length; ++i)
{
stopTasks[i] = Task.Run(queues[i].Stop);
try
{
stopTasks[i] = queues[i].Stop();
}
catch (Exception ex)
{
stopTasks[i] = Task.FromException(ex);
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return Task.WhenAll(stopTasks);
Expand Down
Loading