diff --git a/src/EventStore.ClusterNode/metricsconfig.json b/src/EventStore.ClusterNode/metricsconfig.json index b8389696e..b1bc02497 100644 --- a/src/EventStore.ClusterNode/metricsconfig.json +++ b/src/EventStore.ClusterNode/metricsconfig.json @@ -167,6 +167,10 @@ "Regex": "Timer", "Label": "Timer" }, + { + "Regex": "ThreadPoolBacklog", + "Label": "ThreadPoolBacklog" + }, { "Regex": "StorageReaderQueue #.*", "Label": "Readers" diff --git a/src/EventStore.Core.XUnit.Tests/Metrics/QueueStatsCollectorTests.cs b/src/EventStore.Core.XUnit.Tests/Metrics/QueueStatsCollectorTests.cs new file mode 100644 index 000000000..41c356802 --- /dev/null +++ b/src/EventStore.Core.XUnit.Tests/Metrics/QueueStatsCollectorTests.cs @@ -0,0 +1,57 @@ +using EventStore.Core.Bus; +using EventStore.Core.Metrics; +using Xunit; + +namespace EventStore.Core.XUnit.Tests.Metrics; + +public class QueueStatsCollectorTests +{ + [Fact] + public void statistics_include_the_current_queue_length_in_the_peak() + { + var collector = new QueueStatsCollector("queue"); + collector.Start(); + try + { + var stats = collector.GetStatistics(currentQueueLength: 5); + + Assert.Equal(5, stats.Length); + Assert.Equal(5, stats.LengthCurrentTryPeak); + Assert.Equal(5, stats.LengthLifetimePeak); + } + finally + { + collector.Stop(); + } + } + + [Fact] + public void reported_queue_length_updates_the_peak() + { + var collector = new QueueStatsCollector("queue"); + collector.Start(); + try + { + collector.ReportQueueLength(7); + var stats = collector.GetStatistics(currentQueueLength: 3); + + Assert.Equal(3, stats.Length); + Assert.Equal(7, stats.LengthCurrentTryPeak); + Assert.Equal(7, stats.LengthLifetimePeak); + } + finally + { + collector.Stop(); + } + } + + [Fact] + public void thread_pool_backlog_monitor_dispose_is_idempotent() + { + var monitor = new ThreadPoolBacklogMonitor(new QueueStatsManager(), new QueueTrackers()); + + monitor.Start(); + monitor.Dispose(); + monitor.Dispose(); + } +} diff --git a/src/EventStore.Core/Bus/QueueStatsCollector.cs b/src/EventStore.Core/Bus/QueueStatsCollector.cs index 467194c70..cd197a32d 100644 --- a/src/EventStore.Core/Bus/QueueStatsCollector.cs +++ b/src/EventStore.Core/Bus/QueueStatsCollector.cs @@ -89,12 +89,17 @@ public void ProcessingStarted(int queueLength) public void ProcessingStarted(Type msgType, int queueLength) { - _lifetimeQueueLengthPeak = _lifetimeQueueLengthPeak > queueLength ? _lifetimeQueueLengthPeak : queueLength; - _currentQueueLengthPeak = _currentQueueLengthPeak > queueLength ? _currentQueueLengthPeak : queueLength; + ReportQueueLength(queueLength); _inProgressMsgType = msgType; } + public void ReportQueueLength(int queueLength) + { + SetMax(ref _lifetimeQueueLengthPeak, queueLength); + SetMax(ref _currentQueueLengthPeak, queueLength); + } + public void ProcessingEnded(int itemsProcessed) { Interlocked.Add(ref _totalItems, itemsProcessed); @@ -159,6 +164,8 @@ public QueueStats GetStatistics(int currentQueueLength) { lock (_statisticsLock) { + ReportQueueLength(currentQueueLength); + var totalTime = _totalTimeWatch.Elapsed; var totalIdleTime = _totalIdleWatch.Elapsed; var totalBusyTime = _totalBusyWatch.Elapsed; @@ -174,6 +181,11 @@ public QueueStats GetStatistics(int currentQueueLength) var idleTimePercent = Math.Min(100.0, lastRunMs.Ticks != 0 ? 100.0 * (totalIdleTime - _lastTotalIdleTime).Ticks / lastRunMs.Ticks : 100); + var shouldRefresh = totalTime - _lastTotalTime >= MinRefreshPeriod; + var currentQueueLengthPeak = shouldRefresh + ? Interlocked.Exchange(ref _currentQueueLengthPeak, 0) + : Volatile.Read(ref _currentQueueLengthPeak); + var stats = new QueueStats( Name, GroupName, @@ -184,25 +196,40 @@ public QueueStats GetStatistics(int currentQueueLength) _busyWatch.IsRunning ? _busyWatch.Elapsed : (TimeSpan?)null, _idleWatch.IsRunning ? _idleWatch.Elapsed : (TimeSpan?)null, totalItems, - _currentQueueLengthPeak, - _lifetimeQueueLengthPeak, + currentQueueLengthPeak, + Volatile.Read(ref _lifetimeQueueLengthPeak), _lastProcessedMsgType, _inProgressMsgType); - if (totalTime - _lastTotalTime >= MinRefreshPeriod) + if (shouldRefresh) { _lastTotalTime = totalTime; _lastTotalIdleTime = totalIdleTime; _lastTotalBusyTime = totalBusyTime; _lastTotalItems = totalItems; - - _currentQueueLengthPeak = 0; } return stats; } } + private static void SetMax(ref int target, int value) + { + while (true) + { + var current = Volatile.Read(ref target); + if (value <= current) + { + return; + } + + if (Interlocked.CompareExchange(ref target, value, current) == current) + { + return; + } + } + } + [Conditional("DEBUG")] public void Enqueued() { diff --git a/src/EventStore.Core/ClusterVNode.cs b/src/EventStore.Core/ClusterVNode.cs index d7bfea6b4..a85980d0e 100644 --- a/src/EventStore.Core/ClusterVNode.cs +++ b/src/EventStore.Core/ClusterVNode.cs @@ -31,6 +31,7 @@ using EventStore.Core.LogAbstraction; using EventStore.Core.Messages; using EventStore.Core.Messaging; +using EventStore.Core.Metrics; using EventStore.Core.Services; using EventStore.Core.Services.Archive; using EventStore.Core.Services.Archive.Naming; @@ -205,6 +206,7 @@ internal MultiQueuedHandler WorkersHandler private readonly Func _start; private readonly INodeHttpClientFactory _nodeHttpClientFactory; private readonly EventStoreClusterClientCache _eventStoreClusterClientCache; + private ThreadPoolBacklogMonitor _threadPoolBacklogMonitor; private int _stopCalled; private int _systemInitPublished; @@ -634,6 +636,9 @@ void StartSubsystems() monitoringInnerBus.Subscribe(monitoring); monitoringInnerBus.Subscribe(monitoring); + _threadPoolBacklogMonitor = new ThreadPoolBacklogMonitor(_queueStatsManager, trackers.QueueTrackers); + _threadPoolBacklogMonitor.Start(); + var indexPath = options.Database.Index ?? Path.Combine(Db.Config.Path, ESConsts.DefaultIndexDirectoryName); var pTableMaxReaderCount = GetPTableMaxReaderCount(readerThreadsCount); @@ -2007,6 +2012,8 @@ public async ValueTask HandleAsync(SystemMessage.BecomeShuttingDown message, Can _reloadConfigSignalRegistration?.Dispose(); _reloadConfigSignalRegistration = null; + _threadPoolBacklogMonitor?.Dispose(); + _threadPoolBacklogMonitor = null; foreach (var subsystem in _subsystems ?? []) { diff --git a/src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs b/src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs new file mode 100644 index 000000000..2ddb2d0a9 --- /dev/null +++ b/src/EventStore.Core/Metrics/ThreadPoolBacklogMonitor.cs @@ -0,0 +1,125 @@ +using System; +using System.Threading; +using EventStore.Core.Bus; +using EventStore.Core.Services.Monitoring.Stats; +using EventStore.Core.Time; + +namespace EventStore.Core.Metrics; + +public sealed class ThreadPoolBacklogMonitor : IMonitoredQueue, IThreadPoolWorkItem, IDisposable +{ + private static readonly TimeSpan SampleInterval = TimeSpan.FromSeconds(1); + private const string QueueName = "ThreadPoolBacklog"; + + private readonly QueueStatsCollector _queueStats; + private readonly QueueTracker _tracker; + private readonly Timer _timer; + private readonly object _timerGate = new(); + private Instant _enqueuedAt; + private bool _timerDisposed; + private int _stopped; + + public ThreadPoolBacklogMonitor(QueueStatsManager queueStatsManager, QueueTrackers trackers) + { + _queueStats = queueStatsManager.CreateQueueStatsCollector(QueueName); + _tracker = trackers.GetTrackerForQueue(QueueName); + _timer = new Timer(_ => QueueWorkItem()); + } + + public string Name => _queueStats.Name; + + public void Start() + { + _queueStats.Start(); + QueueMonitor.Default.Register(this); + QueueWorkItem(); + } + + public void Stop() + { + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; + } + + lock (_timerGate) + { + if (!_timerDisposed) + { + _timer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); + } + } + + QueueMonitor.Default.Unregister(this); + _queueStats.Stop(); + } + + public void Dispose() + { + Stop(); + lock (_timerGate) + { + if (_timerDisposed) + { + return; + } + + _timerDisposed = true; + _timer.Dispose(); + } + } + + public void Execute() + { + if (Volatile.Read(ref _stopped) != 0) + { + return; + } + + _queueStats.EnterBusy(); + try + { + var length = GetPendingWorkItemCount(); + _queueStats.ProcessingStarted(length); + _tracker.RecordQueueLength(length); + _tracker.RecordMessageDequeued(_enqueuedAt); + _queueStats.ProcessingEnded(1); + } + finally + { + _queueStats.EnterIdle(); + } + + lock (_timerGate) + { + if (Volatile.Read(ref _stopped) == 0 && !_timerDisposed) + { + _timer.Change(SampleInterval, Timeout.InfiniteTimeSpan); + } + } + } + + public QueueStats GetStatistics() => + _queueStats.GetStatistics(GetPendingWorkItemCount()); + + private void QueueWorkItem() + { + if (Volatile.Read(ref _stopped) != 0) + { + return; + } + + _enqueuedAt = _tracker.Now; + ThreadPool.UnsafeQueueUserWorkItem(this, preferLocal: false); + } + + private static int GetPendingWorkItemCount() + { + var count = ThreadPool.PendingWorkItemCount; + return count > int.MaxValue ? int.MaxValue : (int)count; + } + + private sealed class ThreadPoolBacklogSample + { + } +}