From c0200a7bfa21838e034113acc4604a1b5d1c46fa Mon Sep 17 00:00:00 2001 From: Anto Subash Date: Mon, 11 May 2026 20:41:22 +0200 Subject: [PATCH] feat(broadcasting): SignalR hub + @simplemodule/echo client (closes #169) Adds real-time push from server to browser. Modules raise events that implement IBroadcastEvent (or call IBroadcaster directly); a Scrutor decorator over Wolverine's IMessageBus mirrors those events to clients through the BroadcastHub at /hub/broadcast. - Framework abstractions in SimpleModule.Core/Broadcasting: IBroadcaster, IBroadcastEvent, [BroadcastEvent], IBroadcastChannelAuthorizer, IBroadcastContext, BroadcastChannels, wire envelopes. - SimpleModule.Hosting: BroadcastHub, SignalR-backed Broadcaster, PresenceTracker, BroadcastAuthorizerChain with default user/tenant guards, BroadcastingMessageBus decorator. - @simplemodule/echo: SignalR-backed client with EchoProvider, useEvent, usePresence; ref-counted subscriptions, auto-reconnect, presence snapshots. - Demo: /broadcasting page in the Dashboard module with live tick counter and a fire button hitting an authenticated IBroadcaster route. - Tests: 22 unit + integration tests covering attribute discovery, presence tracking concurrency, authorizer chain, and the bus-decorator forwarding path. --- docs/broadcasting.md | 170 +++++++++ .../Authorization/WellKnownClaims.cs | 3 + .../Broadcasting/BroadcastChannels.cs | 27 ++ .../Broadcasting/BroadcastEnvelope.cs | 39 +++ .../Broadcasting/BroadcastEventAttribute.cs | 13 + .../IBroadcastChannelAuthorizer.cs | 31 ++ .../Broadcasting/IBroadcastContext.cs | 33 ++ .../Broadcasting/IBroadcastEvent.cs | 20 ++ .../Broadcasting/IBroadcaster.cs | 55 +++ .../SimpleModule.Core/Broadcasting/README.md | 16 + .../Broadcasting/BroadcastAuthorizerChain.cs | 128 +++++++ .../Broadcasting/BroadcastEventBridge.cs | 156 +++++++++ .../Broadcasting/BroadcastHub.cs | 188 ++++++++++ .../Broadcasting/Broadcaster.cs | 80 +++++ .../Broadcasting/BroadcastingExtensions.cs | 66 ++++ .../Broadcasting/PresenceTracker.cs | 112 ++++++ .../SimpleModule.Hosting.csproj | 1 + .../SimpleModuleHostExtensions.cs | 12 +- .../DashboardConstants.cs | 6 + .../SimpleModule.Dashboard/DashboardModule.cs | 11 + .../Pages/Broadcasting.tsx | 118 +++++++ .../Pages/BroadcastingEndpoint.cs | 72 ++++ .../src/SimpleModule.Dashboard/Pages/index.ts | 1 + .../src/SimpleModule.Dashboard/package.json | 3 + package-lock.json | 207 +++++++++++ package.json | 1 + packages/SimpleModule.Echo/README.md | 27 ++ packages/SimpleModule.Echo/package.json | 15 + .../SimpleModule.Echo/src/echo-context.tsx | 56 +++ packages/SimpleModule.Echo/src/echo.ts | 330 ++++++++++++++++++ packages/SimpleModule.Echo/src/index.ts | 13 + packages/SimpleModule.Echo/src/types.ts | 46 +++ packages/SimpleModule.Echo/src/use-channel.ts | 62 ++++ .../BroadcastAuthorizerChainTests.cs | 122 +++++++ .../Broadcasting/BroadcastChannelsTests.cs | 37 ++ .../BroadcastEventAttributeTests.cs | 35 ++ .../BroadcasterIntegrationTests.cs | 147 ++++++++ .../Broadcasting/PresenceTrackerTests.cs | 75 ++++ 38 files changed, 2532 insertions(+), 2 deletions(-) create mode 100644 docs/broadcasting.md create mode 100644 framework/SimpleModule.Core/Broadcasting/BroadcastChannels.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/BroadcastEnvelope.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/BroadcastEventAttribute.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/IBroadcastChannelAuthorizer.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/IBroadcastContext.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/IBroadcastEvent.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/IBroadcaster.cs create mode 100644 framework/SimpleModule.Core/Broadcasting/README.md create mode 100644 framework/SimpleModule.Hosting/Broadcasting/BroadcastAuthorizerChain.cs create mode 100644 framework/SimpleModule.Hosting/Broadcasting/BroadcastEventBridge.cs create mode 100644 framework/SimpleModule.Hosting/Broadcasting/BroadcastHub.cs create mode 100644 framework/SimpleModule.Hosting/Broadcasting/Broadcaster.cs create mode 100644 framework/SimpleModule.Hosting/Broadcasting/BroadcastingExtensions.cs create mode 100644 framework/SimpleModule.Hosting/Broadcasting/PresenceTracker.cs create mode 100644 modules/Dashboard/src/SimpleModule.Dashboard/Pages/Broadcasting.tsx create mode 100644 modules/Dashboard/src/SimpleModule.Dashboard/Pages/BroadcastingEndpoint.cs create mode 100644 packages/SimpleModule.Echo/README.md create mode 100644 packages/SimpleModule.Echo/package.json create mode 100644 packages/SimpleModule.Echo/src/echo-context.tsx create mode 100644 packages/SimpleModule.Echo/src/echo.ts create mode 100644 packages/SimpleModule.Echo/src/index.ts create mode 100644 packages/SimpleModule.Echo/src/types.ts create mode 100644 packages/SimpleModule.Echo/src/use-channel.ts create mode 100644 tests/SimpleModule.Core.Tests/Broadcasting/BroadcastAuthorizerChainTests.cs create mode 100644 tests/SimpleModule.Core.Tests/Broadcasting/BroadcastChannelsTests.cs create mode 100644 tests/SimpleModule.Core.Tests/Broadcasting/BroadcastEventAttributeTests.cs create mode 100644 tests/SimpleModule.Core.Tests/Broadcasting/BroadcasterIntegrationTests.cs create mode 100644 tests/SimpleModule.Core.Tests/Broadcasting/PresenceTrackerTests.cs diff --git a/docs/broadcasting.md b/docs/broadcasting.md new file mode 100644 index 00000000..b6a7d59a --- /dev/null +++ b/docs/broadcasting.md @@ -0,0 +1,170 @@ +# Broadcasting + +Real-time push from server to browser, layered on SignalR. + +Modules emit domain events through the same `IMessageBus` they already +use; events that implement `IBroadcastEvent` are forwarded by a +framework-supplied Wolverine handler to whoever is subscribed to the +channel the event names. Public-channel subscriptions go through an +authorizer chain, and presence channels track members across multiple +connections per user. + +## Hub endpoint + +`/hub/broadcast` — SignalR hub mapped by the framework. Authentication is +mandatory (the `FallbackPolicy` is `RequireAuthenticatedUser`). The hub +exposes two client-callable methods: + +- `Subscribe(channel)` → `SubscribeResult { authorized, reason, members }` +- `Unsubscribe(channel)` → `void` + +And two server-to-client invocations: + +- `broadcast` — `BroadcastEnvelope { channel, event, payload }` +- `presence` — `PresenceChange { channel, kind, member, members }` + +## Server: emitting events + +Mark an event with `[BroadcastEvent("wire.name")]` and implement +`IBroadcastEvent`. The wire name is what the browser subscribes to via +`useEvent(channel, 'wire.name', ...)`. + +```csharp +using SimpleModule.Core.Broadcasting; +using SimpleModule.Core.Events; + +[BroadcastEvent("notifications.created")] +public sealed record NotificationCreated(Guid UserId, string Title) + : DomainEvent, IBroadcastEvent +{ + public string Channel(IBroadcastContext ctx) => + BroadcastChannels.ForUser(UserId.ToString()); +} +``` + +Publish the event through Wolverine the same way you publish anything else: + +```csharp +await bus.PublishAsync(new NotificationCreated(userId, "Welcome!")); +``` + +The framework decorates Wolverine's `IMessageBus` with +`BroadcastingMessageBus` (Scrutor decorator, same pattern as the audit +log). Any message that is an `IBroadcastEvent` is forwarded to +`IBroadcaster` after the inner bus accepts it — no per-event handler, no +opt-in registration. Forwarding errors are logged but never propagated: +the primary business operation must not fail because SignalR fan-out +failed. + +### Calling the broadcaster directly + +When you want a fire-and-forget push that isn't a domain event, inject +`IBroadcaster`: + +```csharp +public sealed class ChatService(IBroadcaster broadcaster) +{ + public Task TypingAsync(string roomId, string userId, CancellationToken ct) => + broadcaster.ToChannelAsync( + $"presence-rooms.{roomId}", + "chat.typing", + new { userId }, + ct + ); +} +``` + +`IBroadcaster` also exposes `ToUserAsync` / `ToTenantAsync` for the +implicit per-user and per-tenant fan-out groups the hub maintains for +every authenticated connection. + +## Channel naming + +Channel names are arbitrary strings, but the framework reserves two prefixes +(borrowed from Pusher / Laravel Echo): + +- `private-` — requires authentication; the authorizer chain decides + whether the connected principal may subscribe. +- `presence-` — same as private, plus the server tracks members and + pushes join/leave deltas. + +Helpers in `BroadcastChannels`: + +- `BroadcastChannels.ForUser(userId)` → `private-users.{userId}` +- `BroadcastChannels.ForTenant(tenantId)` → `private-tenants.{tenantId}` + +## Authorizers + +Implement `IBroadcastChannelAuthorizer` and register it: + +```csharp +public sealed class OrdersChannelAuthorizer(IOrdersContracts orders) + : IBroadcastChannelAuthorizer +{ + public string ChannelPrefix => "private-tenants."; + + public async Task AuthorizeAsync( + string channel, + IBroadcastContext ctx, + CancellationToken ct + ) + { + // private-tenants.{tid}.orders.{orderId} + var parts = channel.Substring(ChannelPrefix.Length).Split('.'); + var tenantId = parts[0]; + var orderId = parts[^1]; + return await orders.UserCanSeeOrderAsync(ctx.User!, tenantId, orderId, ct); + } +} + +// In ConfigureServices: +services.AddBroadcastAuthorizer(); +``` + +The chain matches by longest prefix, so your specific rule overrides the +default tenant guard the framework ships. + +## Browser: @simplemodule/echo + +```tsx +import { EchoProvider, useEvent, usePresence } from '@simplemodule/echo'; + +// Mount once near the root, inside Inertia's app component. + + +; + +function NotificationBell({ userId }: { userId: string }) { + const [count, setCount] = useState(0); + useEvent( + `private-users.${userId}`, + 'notifications.created', + () => setCount((n) => n + 1) + ); + return {count}; +} + +function RoomRoster({ roomId }: { roomId: string }) { + const members = usePresence(`presence-rooms.${roomId}`); + return ( +
    + {members.map((m) => ( +
  • {m.info?.name ?? m.userId}
  • + ))} +
+ ); +} +``` + +The Echo client multiplexes every subscription onto a single WebSocket and +re-subscribes automatically after reconnects. Subscriptions are +reference-counted: mounting `useEvent` ten times against the same channel +costs one network call. + +## Scaling notes + +The framework's `PresenceTracker` stores membership in memory, which is +fine for a single instance. For horizontal scale-out, run SignalR with a +Redis backplane (`AddStackExchangeRedis(...)`) and replace the +`PresenceTracker` with a Redis-backed implementation — the +`IBroadcaster` / `IBroadcastChannelAuthorizer` contracts stay the same. diff --git a/framework/SimpleModule.Core/Authorization/WellKnownClaims.cs b/framework/SimpleModule.Core/Authorization/WellKnownClaims.cs index 570963c8..a2146952 100644 --- a/framework/SimpleModule.Core/Authorization/WellKnownClaims.cs +++ b/framework/SimpleModule.Core/Authorization/WellKnownClaims.cs @@ -3,4 +3,7 @@ namespace SimpleModule.Core.Authorization; public static class WellKnownClaims { public const string Permission = "permission"; + + /// Tenant identifier carried on the principal in multi-tenant deployments. + public const string TenantId = "tenantid"; } diff --git a/framework/SimpleModule.Core/Broadcasting/BroadcastChannels.cs b/framework/SimpleModule.Core/Broadcasting/BroadcastChannels.cs new file mode 100644 index 00000000..8fef3883 --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/BroadcastChannels.cs @@ -0,0 +1,27 @@ +namespace SimpleModule.Core.Broadcasting; + +/// +/// Canonical channel-name helpers. Keeping channel naming in one place stops +/// modules and the client SDK from drifting on how a user- or tenant-scoped +/// channel is spelled. +/// +public static class BroadcastChannels +{ + /// Prefix marking a channel that requires authentication (no metadata beyond auth). + public const string PrivatePrefix = "private-"; + + /// Prefix marking a presence channel — server tracks members and pushes join/leave. + public const string PresencePrefix = "presence-"; + + public static string ForUser(string userId) => $"{PrivatePrefix}users.{userId}"; + + public static string ForTenant(string tenantId) => $"{PrivatePrefix}tenants.{tenantId}"; + + /// True if requires the subscriber to be authenticated. + public static bool IsPrivate(string channel) => + channel.StartsWith(PrivatePrefix, StringComparison.Ordinal) + || channel.StartsWith(PresencePrefix, StringComparison.Ordinal); + + public static bool IsPresence(string channel) => + channel.StartsWith(PresencePrefix, StringComparison.Ordinal); +} diff --git a/framework/SimpleModule.Core/Broadcasting/BroadcastEnvelope.cs b/framework/SimpleModule.Core/Broadcasting/BroadcastEnvelope.cs new file mode 100644 index 00000000..18656a6a --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/BroadcastEnvelope.cs @@ -0,0 +1,39 @@ +namespace SimpleModule.Core.Broadcasting; + +/// +/// Wire format the hub sends to clients. The client SDK dispatches by +/// so a single channel can carry many event types. +/// +/// Channel the payload was published to. +/// Event name (matches when forwarded from ). +/// JSON-serializable payload — usually the event record itself. +public sealed record BroadcastEnvelope(string Channel, string Event, object Payload); + +/// +/// Presence membership delta pushed to subscribers of a presence channel +/// whenever members join or leave. The framework synthesizes these from +/// connection lifecycle events; user code does not raise them directly. +/// +public sealed record PresenceChange( + string Channel, + PresenceChangeKind Kind, + PresenceMember Member, + IReadOnlyList Members +); + +public enum PresenceChangeKind +{ + Joined, + Left, +} + +/// +/// Snapshot of a presence-channel member. is the stable +/// identity (multiple connections from the same user collapse to one +/// member); carries any extra metadata the authorizer +/// chose to attach (display name, avatar, role). +/// +public sealed record PresenceMember( + string UserId, + IReadOnlyDictionary? Info = null +); diff --git a/framework/SimpleModule.Core/Broadcasting/BroadcastEventAttribute.cs b/framework/SimpleModule.Core/Broadcasting/BroadcastEventAttribute.cs new file mode 100644 index 00000000..2ee972a0 --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/BroadcastEventAttribute.cs @@ -0,0 +1,13 @@ +namespace SimpleModule.Core.Broadcasting; + +/// +/// Marks an record with the wire-format event +/// name that browser clients subscribe to. The same event type may be reused +/// across modules; the name is what client code listens for via +/// useEvent('orders.created', ...). +/// +[AttributeUsage(AttributeTargets.Class, Inherited = false)] +public sealed class BroadcastEventAttribute(string name) : Attribute +{ + public string Name { get; } = name; +} diff --git a/framework/SimpleModule.Core/Broadcasting/IBroadcastChannelAuthorizer.cs b/framework/SimpleModule.Core/Broadcasting/IBroadcastChannelAuthorizer.cs new file mode 100644 index 00000000..ea250513 --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/IBroadcastChannelAuthorizer.cs @@ -0,0 +1,31 @@ +namespace SimpleModule.Core.Broadcasting; + +/// +/// Decides whether the current principal is allowed to subscribe to a given +/// channel. Authorizers are matched by channel name prefix +/// (); the longest matching prefix wins so +/// modules can declare specific guards (tenants.{tid}.orders) while +/// the framework owns broader ones (tenants.). +/// +public interface IBroadcastChannelAuthorizer +{ + /// + /// Channel name prefix this authorizer claims. Use a literal channel name + /// for an exact match, or a trailing-dot prefix (e.g., tenants.) + /// to match all descendants. "" matches everything (used by the + /// default deny-public-channels authorizer). + /// + string ChannelPrefix { get; } + + /// + /// Returns true if the principal in may + /// subscribe to . The authorizer is also + /// expected to filter channels that don't belong to the current tenant + /// even when the prefix happens to match. + /// + Task AuthorizeAsync( + string channel, + IBroadcastContext context, + CancellationToken cancellationToken = default + ); +} diff --git a/framework/SimpleModule.Core/Broadcasting/IBroadcastContext.cs b/framework/SimpleModule.Core/Broadcasting/IBroadcastContext.cs new file mode 100644 index 00000000..608aa75b --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/IBroadcastContext.cs @@ -0,0 +1,33 @@ +using System.Security.Claims; + +namespace SimpleModule.Core.Broadcasting; + +/// +/// Context exposed to and channel +/// authorizers so the same event record can route to a tenant-scoped or +/// user-scoped channel without hard-coding identifiers into the event itself. +/// +public interface IBroadcastContext +{ + /// + /// Connected user, if the broadcast was initiated in an authenticated + /// scope. Server-published events outside a request typically have a + /// null principal — the caller is responsible for setting tenant/user + /// ids on the event itself in that case. + /// + ClaimsPrincipal? User { get; } + + /// + /// Active tenant id (e.g., from the request's claims or ambient tenant + /// resolution). Null in single-tenant deployments. + /// + string? TenantId { get; } +} + +public sealed class BroadcastContext(ClaimsPrincipal? user, string? tenantId) : IBroadcastContext +{ + public ClaimsPrincipal? User { get; } = user; + public string? TenantId { get; } = tenantId; + + public static BroadcastContext Empty { get; } = new(null, null); +} diff --git a/framework/SimpleModule.Core/Broadcasting/IBroadcastEvent.cs b/framework/SimpleModule.Core/Broadcasting/IBroadcastEvent.cs new file mode 100644 index 00000000..21bd9368 --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/IBroadcastEvent.cs @@ -0,0 +1,20 @@ +using SimpleModule.Core.Events; + +namespace SimpleModule.Core.Broadcasting; + +/// +/// Marker contract for domain events that should be forwarded to connected +/// browsers via SignalR. An participates in the +/// same Wolverine pipeline as any other , but a generated +/// bridge handler also relays it through to whoever +/// is subscribed to the channel returned by . +/// +public interface IBroadcastEvent : IEvent +{ + /// + /// Channel name this event is broadcast on. Channels are stable, hierarchical + /// strings (e.g., tenants.123.orders). Authorizers run against the + /// channel name before a client is allowed to subscribe. + /// + string Channel(IBroadcastContext context); +} diff --git a/framework/SimpleModule.Core/Broadcasting/IBroadcaster.cs b/framework/SimpleModule.Core/Broadcasting/IBroadcaster.cs new file mode 100644 index 00000000..1dd401fb --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/IBroadcaster.cs @@ -0,0 +1,55 @@ +namespace SimpleModule.Core.Broadcasting; + +/// +/// Push real-time payloads to connected browser clients. Implementations are +/// expected to fan out via SignalR (or any equivalent transport); event names +/// are arbitrary strings agreed between server and client. +/// +public interface IBroadcaster +{ + /// + /// Send to everyone subscribed to + /// . Channels do not have to be pre-declared — + /// they are created on first subscription. + /// + Task ToChannelAsync( + string channel, + string @event, + object payload, + CancellationToken cancellationToken = default + ); + + /// + /// Send to every connection whose principal has + /// the given . A single user may have many + /// connections (multiple browser tabs); all of them receive the event. + /// + Task ToUserAsync( + string userId, + string @event, + object payload, + CancellationToken cancellationToken = default + ); + + /// + /// Send to every connection whose principal + /// belongs to . Maps to the implicit + /// tenants.{tenantId} channel managed by the framework. + /// + Task ToTenantAsync( + string tenantId, + string @event, + object payload, + CancellationToken cancellationToken = default + ); + + /// + /// Forward an to its declared channel, + /// using the event name from if + /// present, otherwise the CLR type name. + /// + Task PublishAsync( + IBroadcastEvent broadcastEvent, + CancellationToken cancellationToken = default + ); +} diff --git a/framework/SimpleModule.Core/Broadcasting/README.md b/framework/SimpleModule.Core/Broadcasting/README.md new file mode 100644 index 00000000..65e503ea --- /dev/null +++ b/framework/SimpleModule.Core/Broadcasting/README.md @@ -0,0 +1,16 @@ +# Broadcasting + +Server-to-browser push via SignalR. Modules raise events marked with +`[BroadcastEvent]` (or call `IBroadcaster` directly); the framework +forwards them to subscribed clients. See [`docs/broadcasting.md`](../../../docs/broadcasting.md) +for the full developer guide. + +- `IBroadcastEvent` / `BroadcastEventAttribute` — opt-in marker +- `IBroadcaster` — fan-out service (channel / user / tenant) +- `IBroadcastChannelAuthorizer` — per-prefix subscription guards +- `BroadcastChannels` — naming helpers (`private-users.{id}`, `presence-...`) +- `BroadcastContext` — `ClaimsPrincipal` + tenant id passed to authorizers +- `BroadcastEnvelope`, `PresenceChange`, `PresenceMember` — wire payloads + +The SignalR hub, broadcaster implementation, authorizer chain, and +Wolverine bridge live in `SimpleModule.Hosting/Broadcasting/`. diff --git a/framework/SimpleModule.Hosting/Broadcasting/BroadcastAuthorizerChain.cs b/framework/SimpleModule.Hosting/Broadcasting/BroadcastAuthorizerChain.cs new file mode 100644 index 00000000..e04ed261 --- /dev/null +++ b/framework/SimpleModule.Hosting/Broadcasting/BroadcastAuthorizerChain.cs @@ -0,0 +1,128 @@ +using SimpleModule.Core.Authorization; +using SimpleModule.Core.Broadcasting; +using SimpleModule.Core.Extensions; + +namespace SimpleModule.Hosting.Broadcasting; + +/// +/// Runs the registered with the +/// longest matching +/// for the requested channel. Longest-prefix-wins lets modules ship narrow +/// rules (private-tenants.) without colliding with the framework's +/// catch-all deny on bare public channels. +/// +public sealed class BroadcastAuthorizerChain(IEnumerable authorizers) +{ + private readonly IBroadcastChannelAuthorizer[] _ordered = authorizers + .OrderByDescending(a => a.ChannelPrefix.Length) + .ToArray(); + + public async Task AuthorizeAsync( + string channel, + IBroadcastContext context, + CancellationToken cancellationToken + ) + { + foreach (var authorizer in _ordered) + { + if ( + authorizer.ChannelPrefix.Length == 0 + || channel.StartsWith(authorizer.ChannelPrefix, StringComparison.Ordinal) + ) + { + return await authorizer.AuthorizeAsync(channel, context, cancellationToken); + } + } + + return false; + } +} + +/// +/// Default catch-all authorizer. Allows public channels (no prefix) and +/// allows private/presence channels only for authenticated principals — the +/// owning module is expected to ship a tighter rule (e.g., only the channel's +/// tenant or user) on top of this. +/// +public sealed class DefaultBroadcastAuthorizer : IBroadcastChannelAuthorizer +{ + public string ChannelPrefix => string.Empty; + + public Task AuthorizeAsync( + string channel, + IBroadcastContext context, + CancellationToken cancellationToken + ) + { + if (BroadcastChannels.IsPrivate(channel)) + { + var authenticated = context.User?.Identity?.IsAuthenticated == true; + return Task.FromResult(authenticated); + } + + // Public channels are off by default; user code opts in by registering + // an authorizer for the specific prefix that should be open. + return Task.FromResult(false); + } +} + +/// +/// Restricts private-users.{userId} channels to the matching principal. +/// Registered by default so modules don't have to re-implement the "only the +/// user themselves" check for personal channels. +/// +public sealed class UserChannelAuthorizer : IBroadcastChannelAuthorizer +{ + private const string Prefix = BroadcastChannels.PrivatePrefix + "users."; + + public string ChannelPrefix => Prefix; + + public Task AuthorizeAsync( + string channel, + IBroadcastContext context, + CancellationToken cancellationToken + ) + { + if (context.User?.Identity?.IsAuthenticated != true) + { + return Task.FromResult(false); + } + + var requested = channel.Substring(Prefix.Length); + var current = context.User.GetUserId(); + + return Task.FromResult(string.Equals(requested, current, StringComparison.Ordinal)); + } +} + +/// +/// Restricts private-tenants.{tenantId} channels (and descendants) to +/// principals whose tenantid claim matches. +/// +public sealed class TenantChannelAuthorizer : IBroadcastChannelAuthorizer +{ + private const string Prefix = BroadcastChannels.PrivatePrefix + "tenants."; + + public string ChannelPrefix => Prefix; + + public Task AuthorizeAsync( + string channel, + IBroadcastContext context, + CancellationToken cancellationToken + ) + { + if (context.User?.Identity?.IsAuthenticated != true) + { + return Task.FromResult(false); + } + + var trail = channel.Substring(Prefix.Length); + var requested = trail.Split('.', 2)[0]; + var current = context.TenantId ?? context.User.FindFirst(WellKnownClaims.TenantId)?.Value; + + return Task.FromResult( + !string.IsNullOrEmpty(current) + && string.Equals(requested, current, StringComparison.Ordinal) + ); + } +} diff --git a/framework/SimpleModule.Hosting/Broadcasting/BroadcastEventBridge.cs b/framework/SimpleModule.Hosting/Broadcasting/BroadcastEventBridge.cs new file mode 100644 index 00000000..37979ae7 --- /dev/null +++ b/framework/SimpleModule.Hosting/Broadcasting/BroadcastEventBridge.cs @@ -0,0 +1,156 @@ +using Microsoft.Extensions.Logging; +using SimpleModule.Core.Broadcasting; +using Wolverine; + +namespace SimpleModule.Hosting.Broadcasting; + +/// +/// Decorator over Wolverine's that mirrors every +/// publication out to connected browsers via +/// . Mirrors the pattern AuditingMessageBus +/// uses for audit capture so framework consumers see a single, consistent +/// extension point. Forwarding failures are logged but never propagated — +/// SignalR fan-out must not break the primary business operation that raised +/// the event. +/// +public sealed class BroadcastingMessageBus( + IMessageBus inner, + IBroadcaster broadcaster, + ILogger? logger = null +) : IMessageBus +{ + public string? TenantId + { + get => inner.TenantId; + set => inner.TenantId = value; + } + + public async ValueTask PublishAsync(T message, DeliveryOptions? options = null) + { + await inner.PublishAsync(message, options); + if (message is IBroadcastEvent broadcast) + { + await ForwardAsync(broadcast); + } + } + + public async ValueTask SendAsync(T message, DeliveryOptions? options = null) + { + await inner.SendAsync(message, options); + if (message is IBroadcastEvent broadcast) + { + await ForwardAsync(broadcast); + } + } + + public async Task InvokeAsync( + object message, + CancellationToken cancellation = default, + TimeSpan? timeout = default + ) + { + await inner.InvokeAsync(message, cancellation, timeout); + if (message is IBroadcastEvent broadcast) + { + await ForwardAsync(broadcast, cancellation); + } + } + + public async Task InvokeAsync( + object message, + DeliveryOptions options, + CancellationToken cancellation = default, + TimeSpan? timeout = default + ) + { + await inner.InvokeAsync(message, options, cancellation, timeout); + if (message is IBroadcastEvent broadcast) + { + await ForwardAsync(broadcast, cancellation); + } + } + + public async Task InvokeAsync( + object message, + CancellationToken cancellation = default, + TimeSpan? timeout = default + ) + { + var result = await inner.InvokeAsync(message, cancellation, timeout); + if (message is IBroadcastEvent broadcast) + { + await ForwardAsync(broadcast, cancellation); + } + return result; + } + + public async Task InvokeAsync( + object message, + DeliveryOptions options, + CancellationToken cancellation = default, + TimeSpan? timeout = default + ) + { + var result = await inner.InvokeAsync(message, options, cancellation, timeout); + if (message is IBroadcastEvent broadcast) + { + await ForwardAsync(broadcast, cancellation); + } + return result; + } + + public Task InvokeForTenantAsync( + string tenantId, + object message, + CancellationToken cancellation = default, + TimeSpan? timeout = default + ) => inner.InvokeForTenantAsync(tenantId, message, cancellation, timeout); + + public Task InvokeForTenantAsync( + string tenantId, + object message, + CancellationToken cancellation = default, + TimeSpan? timeout = default + ) => inner.InvokeForTenantAsync(tenantId, message, cancellation, timeout); + + public IDestinationEndpoint EndpointFor(string endpointName) => inner.EndpointFor(endpointName); + + public IDestinationEndpoint EndpointFor(Uri uri) => inner.EndpointFor(uri); + + public IReadOnlyList PreviewSubscriptions(object message) => + inner.PreviewSubscriptions(message); + + public IReadOnlyList PreviewSubscriptions(object message, DeliveryOptions options) => + inner.PreviewSubscriptions(message, options); + + public ValueTask BroadcastToTopicAsync( + string topicName, + object message, + DeliveryOptions? options = null + ) => inner.BroadcastToTopicAsync(topicName, message, options); + + private async Task ForwardAsync( + IBroadcastEvent broadcast, + CancellationToken cancellationToken = default + ) + { + try + { + await broadcaster.PublishAsync(broadcast, cancellationToken); + } + catch (OperationCanceledException) + { + throw; + } +#pragma warning disable CA1031 + catch (Exception ex) + { + logger?.LogError( + ex, + "Failed to forward broadcast event {EventType} to clients", + broadcast.GetType().FullName + ); + } +#pragma warning restore CA1031 + } +} diff --git a/framework/SimpleModule.Hosting/Broadcasting/BroadcastHub.cs b/framework/SimpleModule.Hosting/Broadcasting/BroadcastHub.cs new file mode 100644 index 00000000..94b7dd93 --- /dev/null +++ b/framework/SimpleModule.Hosting/Broadcasting/BroadcastHub.cs @@ -0,0 +1,188 @@ +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; +using SimpleModule.Core.Authorization; +using SimpleModule.Core.Broadcasting; + +namespace SimpleModule.Hosting.Broadcasting; + +/// +/// SignalR hub clients connect to in order to subscribe/unsubscribe from +/// channels and receive broadcast envelopes. Authentication is required +/// for any private or presence channel; public channel subscriptions go +/// through the authorizer chain too (default policy rejects public +/// channels — see ). +/// +[Authorize] +public sealed partial class BroadcastHub( + BroadcastAuthorizerChain authorizers, + PresenceTracker presence, + ILogger logger +) : Hub +{ + public const string Endpoint = "/hub/broadcast"; + + public override async Task OnConnectedAsync() + { + LogConnected(logger, Context.ConnectionId, Context.UserIdentifier); + + var userId = Context.UserIdentifier; + if (!string.IsNullOrEmpty(userId)) + { + await Groups.AddToGroupAsync(Context.ConnectionId, GroupForUser(userId)); + var tenantId = Context.User?.FindFirst(WellKnownClaims.TenantId)?.Value; + if (!string.IsNullOrEmpty(tenantId)) + { + await Groups.AddToGroupAsync(Context.ConnectionId, GroupForTenant(tenantId)); + } + } + + await base.OnConnectedAsync(); + } + + public override async Task OnDisconnectedAsync(Exception? exception) + { + var channels = presence.RemoveConnection(Context.ConnectionId); + foreach (var (channel, member) in channels) + { + var snapshot = presence.Members(channel); + await Clients + .Group(GroupForChannel(channel)) + .SendAsync( + BroadcastClientMethods.PresenceChanged, + new PresenceChange(channel, PresenceChangeKind.Left, member, snapshot) + ); + } + + await base.OnDisconnectedAsync(exception); + } + + /// + /// Subscribe the current connection to . + /// Returns the presence roster (empty for non-presence channels) so the + /// client can render an initial member list without a follow-up roundtrip. + /// + public async Task Subscribe(string channel) + { + if (string.IsNullOrWhiteSpace(channel)) + { + return SubscribeResult.Failed("channel name required"); + } + + var ctx = new BroadcastContext( + Context.User, + Context.User?.FindFirst(WellKnownClaims.TenantId)?.Value + ); + + var ok = await authorizers.AuthorizeAsync(channel, ctx, Context.ConnectionAborted); + if (!ok) + { + LogSubscriptionDenied(logger, channel, Context.UserIdentifier); + return SubscribeResult.Failed("not authorized"); + } + + await Groups.AddToGroupAsync(Context.ConnectionId, GroupForChannel(channel)); + + if (!BroadcastChannels.IsPresence(channel)) + { + return SubscribeResult.Succeeded(Array.Empty()); + } + + var userId = + Context.UserIdentifier + ?? throw new HubException("presence channels require an authenticated user"); + + var info = new Dictionary(StringComparer.Ordinal); + var name = Context.User?.Identity?.Name; + if (!string.IsNullOrEmpty(name)) + { + info["name"] = name; + } + + var member = new PresenceMember(userId, info); + var changed = presence.Add(channel, Context.ConnectionId, member); + var members = presence.Members(channel); + + if (changed) + { + await Clients + .Group(GroupForChannel(channel)) + .SendAsync( + BroadcastClientMethods.PresenceChanged, + new PresenceChange(channel, PresenceChangeKind.Joined, member, members) + ); + } + + return SubscribeResult.Succeeded(members); + } + + /// Unsubscribe the current connection from . + public async Task Unsubscribe(string channel) + { + await Groups.RemoveFromGroupAsync(Context.ConnectionId, GroupForChannel(channel)); + + if (!BroadcastChannels.IsPresence(channel)) + { + return; + } + + if (presence.Remove(channel, Context.ConnectionId, out var member)) + { + var members = presence.Members(channel); + await Clients + .Group(GroupForChannel(channel)) + .SendAsync( + BroadcastClientMethods.PresenceChanged, + new PresenceChange(channel, PresenceChangeKind.Left, member!, members) + ); + } + } + + internal static string GroupForChannel(string channel) => $"ch:{channel}"; + + internal static string GroupForUser(string userId) => $"u:{userId}"; + + internal static string GroupForTenant(string tenantId) => $"t:{tenantId}"; + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Debug, + Message = "Broadcast hub connection {ConnectionId} ({UserId})" + )] + private static partial void LogConnected(ILogger logger, string connectionId, string? userId); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Warning, + Message = "Broadcast subscription denied: channel={Channel} user={UserId}" + )] + private static partial void LogSubscriptionDenied( + ILogger logger, + string channel, + string? userId + ); +} + +/// +/// SignalR method names the hub invokes on clients. Keeping these as +/// constants prevents typos between server and TypeScript SDK. +/// +public static class BroadcastClientMethods +{ + public const string EventReceived = "broadcast"; + public const string PresenceChanged = "presence"; +} + +/// Result returned from . +public sealed record SubscribeResult( + bool Authorized, + string? Reason, + IReadOnlyList Members +) +{ + public static SubscribeResult Succeeded(IReadOnlyList members) => + new(true, null, members); + + public static SubscribeResult Failed(string reason) => + new(false, reason, Array.Empty()); +} diff --git a/framework/SimpleModule.Hosting/Broadcasting/Broadcaster.cs b/framework/SimpleModule.Hosting/Broadcasting/Broadcaster.cs new file mode 100644 index 00000000..c30a978b --- /dev/null +++ b/framework/SimpleModule.Hosting/Broadcasting/Broadcaster.cs @@ -0,0 +1,80 @@ +using System.Collections.Concurrent; +using Microsoft.AspNetCore.SignalR; +using SimpleModule.Core.Broadcasting; + +namespace SimpleModule.Hosting.Broadcasting; + +/// +/// SignalR-backed . Channels map 1:1 to SignalR +/// groups (ch:{name}); per-user and per-tenant fan-out reuse the same +/// group convention so modules don't have to manage subscriptions for their +/// own private channels. +/// +public sealed class Broadcaster(IHubContext hub) : IBroadcaster +{ + private static readonly ConcurrentDictionary _eventNameCache = new(); + + public Task ToChannelAsync( + string channel, + string @event, + object payload, + CancellationToken cancellationToken = default + ) => + hub + .Clients.Group(BroadcastHub.GroupForChannel(channel)) + .SendAsync( + BroadcastClientMethods.EventReceived, + new BroadcastEnvelope(channel, @event, payload), + cancellationToken + ); + + public Task ToUserAsync( + string userId, + string @event, + object payload, + CancellationToken cancellationToken = default + ) => + hub + .Clients.Group(BroadcastHub.GroupForUser(userId)) + .SendAsync( + BroadcastClientMethods.EventReceived, + new BroadcastEnvelope(BroadcastChannels.ForUser(userId), @event, payload), + cancellationToken + ); + + public Task ToTenantAsync( + string tenantId, + string @event, + object payload, + CancellationToken cancellationToken = default + ) => + hub + .Clients.Group(BroadcastHub.GroupForTenant(tenantId)) + .SendAsync( + BroadcastClientMethods.EventReceived, + new BroadcastEnvelope(BroadcastChannels.ForTenant(tenantId), @event, payload), + cancellationToken + ); + + public Task PublishAsync( + IBroadcastEvent broadcastEvent, + CancellationToken cancellationToken = default + ) + { + ArgumentNullException.ThrowIfNull(broadcastEvent); + var channel = broadcastEvent.Channel(BroadcastContext.Empty); + var name = EventNameFor(broadcastEvent.GetType()); + return ToChannelAsync(channel, name, broadcastEvent, cancellationToken); + } + + internal static string EventNameFor(Type eventType) => + _eventNameCache.GetOrAdd( + eventType, + static t => + { + var attr = (BroadcastEventAttribute?) + Attribute.GetCustomAttribute(t, typeof(BroadcastEventAttribute)); + return attr?.Name ?? t.Name; + } + ); +} diff --git a/framework/SimpleModule.Hosting/Broadcasting/BroadcastingExtensions.cs b/framework/SimpleModule.Hosting/Broadcasting/BroadcastingExtensions.cs new file mode 100644 index 00000000..14c7a82f --- /dev/null +++ b/framework/SimpleModule.Hosting/Broadcasting/BroadcastingExtensions.cs @@ -0,0 +1,66 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Routing; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Scrutor; +using SimpleModule.Core.Broadcasting; +using Wolverine; + +namespace SimpleModule.Hosting.Broadcasting; + +public static class BroadcastingExtensions +{ + /// + /// Registers SignalR + the broadcasting services. Called automatically + /// by AddSimpleModuleInfrastructure; modules can layer additional + /// authorizers on top via . + /// + public static IServiceCollection AddSimpleModuleBroadcasting(this IServiceCollection services) + { + services.AddSignalR(); + services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddSingleton(); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + // Decorate Wolverine's IMessageBus so any IBroadcastEvent published + // through the bus is mirrored to SignalR clients. Wolverine owns the + // base IMessageBus registration; Scrutor lets us wrap it without + // taking over the registration ourselves. Matches the AuditingMessageBus + // pattern so framework consumers see a single, consistent extension point. + services.Decorate(); + + return services; + } + + /// + /// Adds an additional . The + /// longest matching + /// wins, so module-specific rules naturally override broader framework + /// defaults. + /// + public static IServiceCollection AddBroadcastAuthorizer( + this IServiceCollection services + ) + where TAuthorizer : class, IBroadcastChannelAuthorizer + { + services.AddSingleton(); + return services; + } + + /// + /// Maps the broadcast hub onto . + /// Called by UseSimpleModuleInfrastructure; the route inherits + /// the framework's authentication / fallback authorization policy. + /// + public static IEndpointRouteBuilder MapSimpleModuleBroadcasting( + this IEndpointRouteBuilder endpoints + ) + { + endpoints.MapHub(BroadcastHub.Endpoint); + return endpoints; + } +} diff --git a/framework/SimpleModule.Hosting/Broadcasting/PresenceTracker.cs b/framework/SimpleModule.Hosting/Broadcasting/PresenceTracker.cs new file mode 100644 index 00000000..c24257cf --- /dev/null +++ b/framework/SimpleModule.Hosting/Broadcasting/PresenceTracker.cs @@ -0,0 +1,112 @@ +using SimpleModule.Core.Broadcasting; + +namespace SimpleModule.Hosting.Broadcasting; + +/// +/// Tracks which connections are subscribed to which presence channels and +/// collapses multiple connections from the same user into a single +/// . Membership lives in memory — appropriate +/// for the single-instance default; horizontal scale-out requires a backplane. +/// +public sealed class PresenceTracker +{ + // Single mutex guards the whole map. Join/leave is not a hot path, and a + // global lock makes the read-modify-write decisions ("was this the user's + // first/last connection?") trivially atomic; a per-channel ConcurrentDictionary + // would have races between the membership probe and the mutation. + private readonly object _gate = new(); + private readonly Dictionary> _channels = new( + StringComparer.Ordinal + ); + + /// + /// Registers on for + /// the given connection. Returns true if this is the user's first + /// connection in the channel (so callers know whether to fire a "joined" + /// notification). + /// + public bool Add(string channel, string connectionId, PresenceMember member) + { + lock (_gate) + { + if (!_channels.TryGetValue(channel, out var bucket)) + { + bucket = new Dictionary(StringComparer.Ordinal); + _channels[channel] = bucket; + } + + var alreadyMember = bucket.Values.Any(m => m.UserId == member.UserId); + bucket[connectionId] = member; + return !alreadyMember; + } + } + + /// + /// Removes the connection's membership from . + /// Returns true with the departing member if the user had no other + /// connections in this channel — that's when a "left" event should fire. + /// + public bool Remove(string channel, string connectionId, out PresenceMember? member) + { + member = null; + lock (_gate) + { + if (!_channels.TryGetValue(channel, out var bucket)) + { + return false; + } + + if (!bucket.Remove(connectionId, out var removed)) + { + return false; + } + + member = removed; + return !bucket.Values.Any(m => m.UserId == removed.UserId); + } + } + + /// + /// Drops every membership held by across + /// all channels (used on connection close). Returns the list of channels + /// the user actually left (i.e. last connection in that channel) so the + /// hub can announce departures. + /// + public IReadOnlyList<(string Channel, PresenceMember Member)> RemoveConnection( + string connectionId + ) + { + lock (_gate) + { + var departures = new List<(string, PresenceMember)>(); + foreach (var (channel, bucket) in _channels) + { + if ( + bucket.Remove(connectionId, out var removed) + && !bucket.Values.Any(m => m.UserId == removed.UserId) + ) + { + departures.Add((channel, removed)); + } + } + return departures; + } + } + + /// Current member list (one entry per user, not per connection). + public IReadOnlyList Members(string channel) + { + lock (_gate) + { + if (!_channels.TryGetValue(channel, out var bucket)) + { + return Array.Empty(); + } + + return bucket + .Values.GroupBy(m => m.UserId, StringComparer.Ordinal) + .Select(g => g.First()) + .ToList(); + } + } +} diff --git a/framework/SimpleModule.Hosting/SimpleModule.Hosting.csproj b/framework/SimpleModule.Hosting/SimpleModule.Hosting.csproj index aacd592f..34fcf613 100644 --- a/framework/SimpleModule.Hosting/SimpleModule.Hosting.csproj +++ b/framework/SimpleModule.Hosting/SimpleModule.Hosting.csproj @@ -6,6 +6,7 @@ + diff --git a/framework/SimpleModule.Hosting/SimpleModuleHostExtensions.cs b/framework/SimpleModule.Hosting/SimpleModuleHostExtensions.cs index 90888af0..00a4782e 100644 --- a/framework/SimpleModule.Hosting/SimpleModuleHostExtensions.cs +++ b/framework/SimpleModule.Hosting/SimpleModuleHostExtensions.cs @@ -1,3 +1,4 @@ +using JasperFx.Resources; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Diagnostics.HealthChecks; @@ -20,12 +21,12 @@ using SimpleModule.Database; using SimpleModule.Database.Health; using SimpleModule.Database.Interceptors; -using JasperFx.Resources; using SimpleModule.DevTools; -using Wolverine; +using SimpleModule.Hosting.Broadcasting; using SimpleModule.Hosting.Inertia; using SimpleModule.Hosting.Middleware; using SimpleModule.Hosting.RateLimiting; +using Wolverine; using ZiggyCreatures.Caching.Fusion; namespace SimpleModule.Hosting; @@ -91,6 +92,8 @@ public static WebApplicationBuilder AddSimpleModuleInfrastructure( ) ); + builder.Services.AddSimpleModuleBroadcasting(); + builder.Host.UseResourceSetupOnStartup(); // Lazy lets services break factory-lambda cycles // (e.g. SettingsService ↔ AuditingMessageBus via ISettingsContracts). @@ -326,6 +329,11 @@ public static async Task UseSimpleModuleInfrastructure(this WebApplication app) .AllowAnonymous(); } + // Broadcast hub — authenticated by default (the [Authorize] attribute on + // BroadcastHub kicks the FallbackPolicy back in for the WebSocket / SSE + // upgrade itself). + app.MapSimpleModuleBroadcasting(); + app.MapGet("/error/{statusCode:int}", (int statusCode) => RenderErrorPage(statusCode)) .AllowAnonymous() .ExcludeFromDescription(); diff --git a/modules/Dashboard/src/SimpleModule.Dashboard.Contracts/DashboardConstants.cs b/modules/Dashboard/src/SimpleModule.Dashboard.Contracts/DashboardConstants.cs index db092c5c..2992c3d5 100644 --- a/modules/Dashboard/src/SimpleModule.Dashboard.Contracts/DashboardConstants.cs +++ b/modules/Dashboard/src/SimpleModule.Dashboard.Contracts/DashboardConstants.cs @@ -11,6 +11,12 @@ public static class Routes public static class Views { public const string Home = "/"; + public const string Broadcasting = "/broadcasting"; + } + + public static class Api + { + public const string FireBroadcastTick = "/api/dashboard/broadcasting/tick"; } } } diff --git a/modules/Dashboard/src/SimpleModule.Dashboard/DashboardModule.cs b/modules/Dashboard/src/SimpleModule.Dashboard/DashboardModule.cs index b4c2fff1..4c1d10a3 100644 --- a/modules/Dashboard/src/SimpleModule.Dashboard/DashboardModule.cs +++ b/modules/Dashboard/src/SimpleModule.Dashboard/DashboardModule.cs @@ -31,5 +31,16 @@ public void ConfigureMenu(IMenuBuilder menus) Section = MenuSection.AppSidebar, } ); + menus.Add( + new MenuItem + { + Label = "Broadcasting", + Url = DashboardConstants.Routes.Views.Broadcasting, + Icon = + """""", + Order = 20, + Section = MenuSection.AppSidebar, + } + ); } } diff --git a/modules/Dashboard/src/SimpleModule.Dashboard/Pages/Broadcasting.tsx b/modules/Dashboard/src/SimpleModule.Dashboard/Pages/Broadcasting.tsx new file mode 100644 index 00000000..fce7567a --- /dev/null +++ b/modules/Dashboard/src/SimpleModule.Dashboard/Pages/Broadcasting.tsx @@ -0,0 +1,118 @@ +import { EchoProvider, type EchoStatus, useEchoStatus, useEvent } from '@simplemodule/echo'; +import { + Alert, + AlertDescription, + AlertTitle, + Button, + Card, + CardContent, + PageShell, +} from '@simplemodule/ui'; +import { useState } from 'react'; + +interface Props { + channel: string | null; + userId: string | null; + fireUrl: string; +} + +export default function Broadcasting({ channel, userId, fireUrl }: Props) { + return ( + + {channel ? ( + + + + ) : ( + + Sign in required + + Broadcasting channels are scoped to the authenticated user. + + + )} + + ); +} + +function DemoBody({ + channel, + userId, + fireUrl, +}: { + channel: string; + userId: string; + fireUrl: string; +}) { + const status = useEchoStatus(); + const [count, setCount] = useState(0); + const [lastAt, setLastAt] = useState(null); + const [firing, setFiring] = useState(false); + + useEvent<{ at: string }>(channel, 'demo.tick', (payload) => { + setCount((c) => c + 1); + setLastAt(payload.at); + }); + + return ( + + +
+
+
+
Connection
+ +
+
+
Channel
+ {channel} +
+
+ +
+
Ticks received
+
{count}
+ {lastAt &&
last: {lastAt}
} +
+ + +
+
+
+ ); +} + +const STATUS_LABEL: Record = { + disconnected: 'disconnected', + connecting: 'connecting…', + connected: 'connected', + reconnecting: 'reconnecting…', +}; + +const STATUS_COLOR: Record = { + disconnected: 'text-danger', + connecting: 'text-warning', + connected: 'text-success', + reconnecting: 'text-warning', +}; + +function ConnectionBadge({ status }: { status: EchoStatus }) { + return ( +
{STATUS_LABEL[status]}
+ ); +} diff --git a/modules/Dashboard/src/SimpleModule.Dashboard/Pages/BroadcastingEndpoint.cs b/modules/Dashboard/src/SimpleModule.Dashboard/Pages/BroadcastingEndpoint.cs new file mode 100644 index 00000000..f11b6d7e --- /dev/null +++ b/modules/Dashboard/src/SimpleModule.Dashboard/Pages/BroadcastingEndpoint.cs @@ -0,0 +1,72 @@ +using System.Security.Claims; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Routing; +using SimpleModule.Core; +using SimpleModule.Core.Broadcasting; +using SimpleModule.Core.Extensions; +using SimpleModule.Core.Inertia; +using SimpleModule.Dashboard.Contracts; + +namespace SimpleModule.Dashboard.Pages; + +/// +/// Live demo of the broadcasting framework. Renders a page that subscribes +/// to the current user's private channel and counts ticks fired from the +/// companion POST endpoint. Visible to developers as a smoke test that the +/// hub + Echo client are wired end-to-end. +/// +public class BroadcastingEndpoint : IViewEndpoint +{ + public void Map(IEndpointRouteBuilder app) + { + app.MapGet( + DashboardConstants.Routes.Views.Broadcasting, + (ClaimsPrincipal principal) => + { + var userId = principal.GetUserId(); + var channel = userId is null ? null : BroadcastChannels.ForUser(userId); + return Inertia.Render( + "Dashboard/Broadcasting", + new + { + channel, + userId, + fireUrl = DashboardConstants.Routes.Api.FireBroadcastTick, + } + ); + } + ) + .ExcludeFromDescription(); + + app.MapPost( + DashboardConstants.Routes.Api.FireBroadcastTick, + async ( + ClaimsPrincipal principal, + IBroadcaster broadcaster, + CancellationToken cancellationToken + ) => + { + var userId = principal.GetUserId(); + if (userId is null) + { + return Results.Unauthorized(); + } + + await broadcaster.ToUserAsync( + userId, + "demo.tick", + new { at = DateTimeOffset.UtcNow }, + cancellationToken + ); + return Results.NoContent(); + } + ) + // The demo page POSTs without a CSRF header. Disabling the + // antiforgery requirement is acceptable because the route fires + // only at the authenticated principal's own broadcast channel — + // it's a no-side-effect smoke test, not a state mutation. + .DisableAntiforgery() + .ExcludeFromDescription(); + } +} diff --git a/modules/Dashboard/src/SimpleModule.Dashboard/Pages/index.ts b/modules/Dashboard/src/SimpleModule.Dashboard/Pages/index.ts index a7422748..bd8a1ed2 100644 --- a/modules/Dashboard/src/SimpleModule.Dashboard/Pages/index.ts +++ b/modules/Dashboard/src/SimpleModule.Dashboard/Pages/index.ts @@ -1,3 +1,4 @@ export const pages: Record = { 'Dashboard/Home': () => import('./Home'), + 'Dashboard/Broadcasting': () => import('./Broadcasting'), }; diff --git a/modules/Dashboard/src/SimpleModule.Dashboard/package.json b/modules/Dashboard/src/SimpleModule.Dashboard/package.json index 5bfac018..e687bf19 100644 --- a/modules/Dashboard/src/SimpleModule.Dashboard/package.json +++ b/modules/Dashboard/src/SimpleModule.Dashboard/package.json @@ -10,5 +10,8 @@ "peerDependencies": { "react": "^19.0.0", "react-dom": "^19.0.0" + }, + "dependencies": { + "@simplemodule/echo": "*" } } diff --git a/package-lock.json b/package-lock.json index e804dff7..7f88e90e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "workspaces": [ "modules/*/src/*", "packages/SimpleModule.Client", + "packages/SimpleModule.Echo", "packages/SimpleModule.Theme.Default", "packages/SimpleModule.TsConfig", "packages/SimpleModule.UI", @@ -82,6 +83,9 @@ "modules/Dashboard/src/SimpleModule.Dashboard": { "name": "@simplemodule/dashboard", "version": "0.0.0", + "dependencies": { + "@simplemodule/echo": "*" + }, "peerDependencies": { "react": "^19.0.0", "react-dom": "^19.0.0" @@ -1384,6 +1388,19 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, + "node_modules/@microsoft/signalr": { + "version": "9.0.6", + "resolved": "https://registry.npmjs.org/@microsoft/signalr/-/signalr-9.0.6.tgz", + "integrity": "sha512-DrhgzFWI9JE4RPTsHYRxh4yr+OhnwKz8bnJe7eIi7mLLjqhJpEb62CiUy/YbFvLqLzcGzlzz1QWgVAW0zyipMQ==", + "license": "MIT", + "dependencies": { + "abort-controller": "^3.0.0", + "eventsource": "^2.0.2", + "fetch-cookie": "^2.0.3", + "node-fetch": "^2.6.7", + "ws": "^7.5.10" + } + }, "node_modules/@napi-rs/wasm-runtime": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-1.1.2.tgz", @@ -3987,6 +4004,10 @@ "resolved": "tests/e2e", "link": true }, + "node_modules/@simplemodule/echo": { + "resolved": "packages/SimpleModule.Echo", + "link": true + }, "node_modules/@simplemodule/email": { "resolved": "modules/Email/src/SimpleModule.Email", "link": true @@ -4823,6 +4844,18 @@ "url": "https://github.com/sponsors/antfu" } }, + "node_modules/abort-controller": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/abort-controller/-/abort-controller-3.0.0.tgz", + "integrity": "sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==", + "license": "MIT", + "dependencies": { + "event-target-shim": "^5.0.0" + }, + "engines": { + "node": ">=6.5" + } + }, "node_modules/algoliasearch": { "version": "5.50.0", "resolved": "https://registry.npmjs.org/algoliasearch/-/algoliasearch-5.50.0.tgz", @@ -5495,12 +5528,30 @@ "dev": true, "license": "MIT" }, + "node_modules/event-target-shim": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/event-target-shim/-/event-target-shim-5.0.1.tgz", + "integrity": "sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==", + "license": "MIT", + "engines": { + "node": ">=6" + } + }, "node_modules/eventemitter3": { "version": "5.0.4", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-5.0.4.tgz", "integrity": "sha512-mlsTRyGaPBjPedk6Bvw+aqbsXDtoAyAzm5MO7JgU+yVRyMQ5O8bD4Kcci7BS85f93veegeCPkL8R4GLClnjLFw==", "license": "MIT" }, + "node_modules/eventsource": { + "version": "2.0.2", + "resolved": "https://registry.npmjs.org/eventsource/-/eventsource-2.0.2.tgz", + "integrity": "sha512-IzUmBGPR3+oUG9dUeXynyNmf91/3zUSJg1lCktzKw47OXuhco54U3r9B7O4XX+Rb1Itm9OZ2b0RkTs10bICOxA==", + "license": "MIT", + "engines": { + "node": ">=12.0.0" + } + }, "node_modules/fdir": { "version": "6.5.0", "resolved": "https://registry.npmjs.org/fdir/-/fdir-6.5.0.tgz", @@ -5518,6 +5569,16 @@ } } }, + "node_modules/fetch-cookie": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/fetch-cookie/-/fetch-cookie-2.2.0.tgz", + "integrity": "sha512-h9AgfjURuCgA2+2ISl8GbavpUdR+WGAM2McW/ovn4tVccegp8ZqCKWSBR8uRdM8dDNlx5WdKRWxBYUwteLDCNQ==", + "license": "Unlicense", + "dependencies": { + "set-cookie-parser": "^2.4.8", + "tough-cookie": "^4.0.0" + } + }, "node_modules/find-up": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/find-up/-/find-up-4.1.0.tgz", @@ -6484,6 +6545,26 @@ "dev": true, "license": "MIT" }, + "node_modules/node-fetch": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.7.0.tgz", + "integrity": "sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==", + "license": "MIT", + "dependencies": { + "whatwg-url": "^5.0.0" + }, + "engines": { + "node": "4.x || >=6.0.0" + }, + "peerDependencies": { + "encoding": "^0.1.0" + }, + "peerDependenciesMeta": { + "encoding": { + "optional": true + } + } + }, "node_modules/onetime": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/onetime/-/onetime-7.0.0.tgz", @@ -6698,6 +6779,27 @@ "url": "https://github.com/sponsors/wooorm" } }, + "node_modules/psl": { + "version": "1.15.0", + "resolved": "https://registry.npmjs.org/psl/-/psl-1.15.0.tgz", + "integrity": "sha512-JZd3gMVBAVQkSs6HdNZo9Sdo0LNcQeMNP3CozBJb3JYC/QUYZTnKxP+f8oWRX4rHP5EurWxqAHTSwUCjlNKa1w==", + "license": "MIT", + "dependencies": { + "punycode": "^2.3.1" + }, + "funding": { + "url": "https://github.com/sponsors/lupomontero" + } + }, + "node_modules/punycode": { + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", + "integrity": "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==", + "license": "MIT", + "engines": { + "node": ">=6" + } + }, "node_modules/qrcode": { "version": "1.5.4", "resolved": "https://registry.npmjs.org/qrcode/-/qrcode-1.5.4.tgz", @@ -6715,6 +6817,12 @@ "node": ">=10.13.0" } }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==", + "license": "MIT" + }, "node_modules/react": { "version": "19.2.4", "resolved": "https://registry.npmjs.org/react/-/react-19.2.4.tgz", @@ -6944,6 +7052,12 @@ "integrity": "sha512-NKN5kMDylKuldxYLSUfrbo5Tuzh4hd+2E8NPPX02mZtn1VuREQToYe/ZdlJy+J3uCpfaiGF05e7B8W0iXbQHmg==", "license": "ISC" }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==", + "license": "MIT" + }, "node_modules/reselect": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/reselect/-/reselect-5.1.1.tgz", @@ -7078,6 +7192,12 @@ "integrity": "sha512-KiKBS8AnWGEyLzofFfmvKwpdPzqiy16LvQfK3yv/fVH7Bj13/wl3JSR1J+rfgRE9q7xUJK4qvgS8raSOeLUehw==", "license": "ISC" }, + "node_modules/set-cookie-parser": { + "version": "2.7.2", + "resolved": "https://registry.npmjs.org/set-cookie-parser/-/set-cookie-parser-2.7.2.tgz", + "integrity": "sha512-oeM1lpU/UvhTxw+g3cIfxXHyJRc/uidd3yK1P242gzHds0udQBYzs3y8j4gCCW+ZJ7ad0yctld8RYO+bdurlvw==", + "license": "MIT" + }, "node_modules/shebang-command": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/shebang-command/-/shebang-command-2.0.0.tgz", @@ -7340,6 +7460,27 @@ "url": "https://github.com/sponsors/SuperchupuDev" } }, + "node_modules/tough-cookie": { + "version": "4.1.4", + "resolved": "https://registry.npmjs.org/tough-cookie/-/tough-cookie-4.1.4.tgz", + "integrity": "sha512-Loo5UUvLD9ScZ6jh8beX1T6sO1w2/MpCRpEP7V280GKMVUQ0Jzar2U3UJPsrdbziLEMMhu3Ujnq//rhiFuIeag==", + "license": "BSD-3-Clause", + "dependencies": { + "psl": "^1.1.33", + "punycode": "^2.1.1", + "universalify": "^0.2.0", + "url-parse": "^1.5.3" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/tr46": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/tr46/-/tr46-0.0.3.tgz", + "integrity": "sha512-N3WMsuqV66lT30CrXNbEjx4GEwlow3v6rr4mCcv6prnfwhS01rkgyFdjPNBYd9br7LpXV1+Emh01fHnq2Gdgrw==", + "license": "MIT" + }, "node_modules/trim-lines": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/trim-lines/-/trim-lines-3.0.1.tgz", @@ -7451,6 +7592,25 @@ "url": "https://opencollective.com/unified" } }, + "node_modules/universalify": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/universalify/-/universalify-0.2.0.tgz", + "integrity": "sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==", + "license": "MIT", + "engines": { + "node": ">= 4.0.0" + } + }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "license": "MIT", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + }, "node_modules/use-callback-ref": { "version": "1.3.3", "resolved": "https://registry.npmjs.org/use-callback-ref/-/use-callback-ref-1.3.3.tgz", @@ -8200,10 +8360,26 @@ } } }, + "node_modules/webidl-conversions": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", + "integrity": "sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==", + "license": "BSD-2-Clause" + }, "node_modules/website": { "resolved": "website", "link": true }, + "node_modules/whatwg-url": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/whatwg-url/-/whatwg-url-5.0.0.tgz", + "integrity": "sha512-saE57nupxk6v3HY35+jzBwYa0rKSy0XR8JSxZPwgLr7ys0IBzhGviA1/TUGJLmSVqs8pb9AnvICXEuOHLprYTw==", + "license": "MIT", + "dependencies": { + "tr46": "~0.0.3", + "webidl-conversions": "^3.0.0" + } + }, "node_modules/which": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz", @@ -8240,6 +8416,27 @@ "node": ">=8" } }, + "node_modules/ws": { + "version": "7.5.10", + "resolved": "https://registry.npmjs.org/ws/-/ws-7.5.10.tgz", + "integrity": "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ==", + "license": "MIT", + "engines": { + "node": ">=8.3.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": "^5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/y18n": { "version": "4.0.3", "resolved": "https://registry.npmjs.org/y18n/-/y18n-4.0.3.tgz", @@ -8320,6 +8517,16 @@ "vite": "^8.0.0" } }, + "packages/SimpleModule.Echo": { + "name": "@simplemodule/echo", + "version": "0.0.0", + "dependencies": { + "@microsoft/signalr": "^9.0.0" + }, + "peerDependencies": { + "react": "^19.0.0" + } + }, "packages/SimpleModule.Theme.Default": { "name": "@simplemodule/theme-default", "version": "0.0.0" diff --git a/package.json b/package.json index a64d1616..46c16d5c 100644 --- a/package.json +++ b/package.json @@ -5,6 +5,7 @@ "workspaces": [ "modules/*/src/*", "packages/SimpleModule.Client", + "packages/SimpleModule.Echo", "packages/SimpleModule.Theme.Default", "packages/SimpleModule.TsConfig", "packages/SimpleModule.UI", diff --git a/packages/SimpleModule.Echo/README.md b/packages/SimpleModule.Echo/README.md new file mode 100644 index 00000000..5ae8d322 --- /dev/null +++ b/packages/SimpleModule.Echo/README.md @@ -0,0 +1,27 @@ +# @simplemodule/echo + +Real-time client for SimpleModule's broadcasting hub (`/hub/broadcast`). +Wraps `@microsoft/signalr` and exposes React hooks for subscribing to +channels, listening to broadcast events, and observing presence +membership. + +```tsx +import { EchoProvider, useEvent, usePresence } from '@simplemodule/echo'; + + + +; + +function NotificationBell() { + const [count, setCount] = useState(0); + useEvent('private-users.123', 'notifications.created', () => + setCount((c) => c + 1) + ); + return {count}; +} +``` + +The hub URL defaults to `/hub/broadcast` (same-origin); pass `url` to +`EchoProvider` for cross-origin deployments. Authentication uses the +ambient browser session cookie — no token plumbing required when the +React app is served by the SimpleModule host. diff --git a/packages/SimpleModule.Echo/package.json b/packages/SimpleModule.Echo/package.json new file mode 100644 index 00000000..665f96a8 --- /dev/null +++ b/packages/SimpleModule.Echo/package.json @@ -0,0 +1,15 @@ +{ + "name": "@simplemodule/echo", + "version": "0.0.0", + "type": "module", + "main": "src/index.ts", + "exports": { + ".": "./src/index.ts" + }, + "peerDependencies": { + "react": "^19.0.0" + }, + "dependencies": { + "@microsoft/signalr": "^9.0.0" + } +} diff --git a/packages/SimpleModule.Echo/src/echo-context.tsx b/packages/SimpleModule.Echo/src/echo-context.tsx new file mode 100644 index 00000000..c8ee385e --- /dev/null +++ b/packages/SimpleModule.Echo/src/echo-context.tsx @@ -0,0 +1,56 @@ +import { createContext, type ReactNode, useContext, useEffect, useMemo, useState } from 'react'; +import { Echo, type EchoOptions } from './echo'; +import type { EchoStatus } from './types'; + +const EchoContext = createContext(null); + +interface EchoProviderProps extends EchoOptions { + children: ReactNode; + /** + * Provide an existing Echo instance (useful for testing). When omitted, + * the provider lazily creates one from the rest of the options. + */ + echo?: Echo; + /** Connect immediately on mount. Defaults to true. */ + autoStart?: boolean; +} + +/** + * Top-level provider that owns one Echo instance for the React tree. Mounting + * many providers is fine but wasteful — each one opens its own WebSocket. + */ +export function EchoProvider({ children, echo, autoStart = true, ...options }: EchoProviderProps) { + // Options are captured at mount; re-creating Echo on every options identity + // change would tear down the WebSocket on each render. Callers who need to + // swap config should pass a stable `echo` prop instead. + // biome-ignore lint/correctness/useExhaustiveDependencies: intentional one-shot init + const instance = useMemo(() => echo ?? new Echo(options), [echo]); + + useEffect(() => { + if (!autoStart) return; + void instance.start().catch(() => { + // Surface as status; an unhandled rejection here would log to the + // console twice in dev. Consumers observe failures via useEchoStatus. + }); + return () => { + void instance.stop(); + }; + }, [instance, autoStart]); + + return {children}; +} + +export function useEcho(): Echo { + const ctx = useContext(EchoContext); + if (!ctx) { + throw new Error('useEcho() requires in the tree'); + } + return ctx; +} + +export function useEchoStatus(): EchoStatus { + const echo = useEcho(); + const [status, setStatus] = useState(echo.state); + useEffect(() => echo.onStatus(setStatus), [echo]); + return status; +} diff --git a/packages/SimpleModule.Echo/src/echo.ts b/packages/SimpleModule.Echo/src/echo.ts new file mode 100644 index 00000000..68ab2838 --- /dev/null +++ b/packages/SimpleModule.Echo/src/echo.ts @@ -0,0 +1,330 @@ +import { + type HubConnection, + HubConnectionBuilder, + HubConnectionState, + LogLevel, +} from '@microsoft/signalr'; +import type { + BroadcastEnvelope, + EchoStatus, + EventHandler, + PresenceChange, + PresenceHandler, + PresenceMember, + SubscribeResult, +} from './types'; + +// Hub method names — must match SimpleModule.Hosting.Broadcasting.BroadcastClientMethods +// (server) and the [Hub] method signatures on BroadcastHub. +const CLIENT_METHOD_BROADCAST = 'broadcast'; +const CLIENT_METHOD_PRESENCE = 'presence'; +const HUB_METHOD_SUBSCRIBE = 'Subscribe'; +const HUB_METHOD_UNSUBSCRIBE = 'Unsubscribe'; + +export interface EchoOptions { + /** Hub URL. Defaults to `/hub/broadcast` (same-origin). */ + url?: string; + /** Reconnect backoff (ms) tried in order, then repeated. Defaults to 0, 2s, 10s, 30s. */ + reconnectDelays?: number[]; + /** Console log level passed through to the SignalR client. */ + logLevel?: LogLevel; +} + +/** + * Single-connection, channel-multiplexed broadcasting client. Channels are + * reference-counted: subscribing N times costs one network roundtrip on the + * first call and one unsubscribe on the last `off`. All event/presence + * dispatch happens in user code — Echo itself only owns the connection and + * the listener map. + */ +export class Echo { + private connection: HubConnection; + private url: string; + private status: EchoStatus = 'disconnected'; + private statusListeners = new Set<(status: EchoStatus) => void>(); + // channel -> event -> listeners + private listeners = new Map>>(); + private presenceListeners = new Map>(); + private presenceState = new Map(); + private refCounts = new Map(); + // Promise-per-channel for any subscribe currently in flight. Concurrent + // callers await the same promise instead of returning a stale cached + // roster while the first subscribe is still negotiating with the hub. + private inFlight = new Map>(); + private startPromise: Promise | null = null; + + constructor(options: EchoOptions = {}) { + this.url = options.url ?? '/hub/broadcast'; + const reconnect = options.reconnectDelays ?? [0, 2000, 10_000, 30_000]; + + this.connection = new HubConnectionBuilder() + .withUrl(this.url, { withCredentials: true }) + .withAutomaticReconnect(reconnect) + .configureLogging(options.logLevel ?? LogLevel.Warning) + .build(); + + this.connection.on(CLIENT_METHOD_BROADCAST, (envelope: BroadcastEnvelope) => { + this.dispatchEvent(envelope); + }); + + this.connection.on(CLIENT_METHOD_PRESENCE, (change: PresenceChange) => { + this.dispatchPresence(change); + }); + + this.connection.onreconnecting(() => this.setStatus('reconnecting')); + this.connection.onreconnected(() => { + this.setStatus('connected'); + void this.resubscribeAll(); + }); + this.connection.onclose(() => this.setStatus('disconnected')); + } + + get state(): EchoStatus { + return this.status; + } + + onStatus(listener: (status: EchoStatus) => void): () => void { + this.statusListeners.add(listener); + listener(this.status); + return () => this.statusListeners.delete(listener); + } + + /** + * Connects (idempotent — concurrent callers share one start promise). It is + * safe to call from React effects; if the hub is already started this + * resolves immediately. + */ + async start(): Promise { + if (this.connection.state === HubConnectionState.Connected) { + return; + } + if (this.startPromise) { + return this.startPromise; + } + this.setStatus('connecting'); + this.startPromise = this.connection.start().finally(() => { + this.startPromise = null; + }); + try { + await this.startPromise; + this.setStatus('connected'); + } catch (e) { + this.setStatus('disconnected'); + throw e; + } + } + + async stop(): Promise { + await this.connection.stop(); + } + + /** + * Subscribes to on the hub. Multiple calls collapse into one + * server-side subscription; pair every call with `unsubscribe` (or the + * disposer returned by `on`). Concurrent calls share the in-flight + * subscribe promise so every caller sees the same authoritative roster. + */ + async subscribe(channel: string): Promise { + const inFlight = this.inFlight.get(channel); + if (inFlight) { + this.refCounts.set(channel, (this.refCounts.get(channel) ?? 0) + 1); + return inFlight; + } + + const current = this.refCounts.get(channel) ?? 0; + if (current > 0) { + this.refCounts.set(channel, current + 1); + return { + authorized: true, + reason: null, + members: this.presenceState.get(channel) ?? [], + }; + } + + this.refCounts.set(channel, 1); + const promise = (async () => { + try { + await this.start(); + return await this.invokeSubscribe(channel); + } catch (e) { + // Roll back so a later subscribe retries the hub call cleanly. + const c = this.refCounts.get(channel) ?? 0; + if (c <= 1) { + this.refCounts.delete(channel); + } else { + this.refCounts.set(channel, c - 1); + } + throw e; + } finally { + this.inFlight.delete(channel); + } + })(); + + this.inFlight.set(channel, promise); + const result = await promise; + if (!result.authorized) { + // Hub denied — undo the +1 we optimistically added so subsequent + // unsubscribes don't underflow the count and a re-subscribe can retry. + const c = this.refCounts.get(channel) ?? 0; + if (c <= 1) { + this.refCounts.delete(channel); + } else { + this.refCounts.set(channel, c - 1); + } + } + return result; + } + + async unsubscribe(channel: string): Promise { + const current = this.refCounts.get(channel) ?? 0; + if (current <= 1) { + this.refCounts.delete(channel); + this.presenceState.delete(channel); + if (this.connection.state === HubConnectionState.Connected) { + await this.connection.invoke(HUB_METHOD_UNSUBSCRIBE, channel); + } + } else { + this.refCounts.set(channel, current - 1); + } + } + + /** + * Register a handler for `event` on `channel`. Subscribes on first + * listener, unsubscribes on last. Returns a disposer. + */ + on(channel: string, event: string, handler: EventHandler): () => void { + let byEvent = this.listeners.get(channel); + if (!byEvent) { + byEvent = new Map(); + this.listeners.set(channel, byEvent); + } + let set = byEvent.get(event); + if (!set) { + set = new Set(); + byEvent.set(event, set); + } + set.add(handler as EventHandler); + + void this.subscribe(channel); + + return () => { + // biome-ignore lint/style/noNonNullAssertion: set/byEvent captured above and only nullable per typing + set?.delete(handler as EventHandler); + if (set?.size === 0) { + byEvent?.delete(event); + } + if (byEvent?.size === 0) { + this.listeners.delete(channel); + } + void this.unsubscribe(channel); + }; + } + + /** + * Register a presence handler for `channel`. The handler receives a `null` + * change argument on the initial snapshot so callers can distinguish it + * from real join/leave deltas. Pairs with `on` semantics. + */ + onPresence(channel: string, handler: PresenceHandler): () => void { + let set = this.presenceListeners.get(channel); + if (!set) { + set = new Set(); + this.presenceListeners.set(channel, set); + } + set.add(handler); + + void this.subscribe(channel).then((result) => { + if (result.authorized) { + this.presenceState.set(channel, result.members); + handler(null, result.members, channel); + } + }); + + return () => { + set?.delete(handler); + if (set?.size === 0) { + this.presenceListeners.delete(channel); + } + void this.unsubscribe(channel); + }; + } + + private async resubscribeAll(): Promise { + // Sequential so we don't flood the hub on reconnect; presence channels + // re-emit their roster snapshot through dispatchPresence so listeners + // see the post-reconnect membership without an extra delta event. + for (const channel of [...this.refCounts.keys()]) { + try { + const result = await this.invokeSubscribe(channel); + if (result.authorized) { + this.presenceState.set(channel, result.members); + this.broadcastPresenceSnapshot(channel, result.members); + } + } catch (e) { + console.error('echo re-subscribe failed', channel, e); + } + } + } + + private async invokeSubscribe(channel: string): Promise { + if (this.connection.state !== HubConnectionState.Connected) { + return { authorized: false, reason: 'not connected', members: [] }; + } + const result = await this.connection.invoke(HUB_METHOD_SUBSCRIBE, channel); + if (result.authorized) { + this.presenceState.set(channel, result.members); + } + return result; + } + + private dispatchEvent(envelope: BroadcastEnvelope): void { + const byEvent = this.listeners.get(envelope.channel); + if (!byEvent) return; + const set = byEvent.get(envelope.event); + if (!set) return; + for (const listener of set) { + try { + listener(envelope.payload, envelope); + } catch (e) { + console.error('echo listener failed', e); + } + } + } + + private dispatchPresence(change: PresenceChange): void { + this.presenceState.set(change.channel, change.members); + const set = this.presenceListeners.get(change.channel); + if (!set) return; + for (const listener of set) { + try { + listener(change, change.members, change.channel); + } catch (e) { + console.error('echo presence listener failed', e); + } + } + } + + private broadcastPresenceSnapshot(channel: string, members: PresenceMember[]): void { + const set = this.presenceListeners.get(channel); + if (!set) return; + for (const listener of set) { + try { + listener(null, members, channel); + } catch (e) { + console.error('echo presence snapshot listener failed', e); + } + } + } + + private setStatus(status: EchoStatus): void { + if (this.status === status) return; + this.status = status; + for (const listener of this.statusListeners) { + try { + listener(status); + } catch (e) { + console.error('echo status listener failed', e); + } + } + } +} diff --git a/packages/SimpleModule.Echo/src/index.ts b/packages/SimpleModule.Echo/src/index.ts new file mode 100644 index 00000000..8db9623b --- /dev/null +++ b/packages/SimpleModule.Echo/src/index.ts @@ -0,0 +1,13 @@ +export { Echo, type EchoOptions } from './echo'; +export { EchoProvider, useEcho, useEchoStatus } from './echo-context'; +export type { + BroadcastEnvelope, + EchoStatus, + EventHandler, + PresenceChange, + PresenceChangeKind, + PresenceHandler, + PresenceMember, + SubscribeResult, +} from './types'; +export { useEvent, usePresence } from './use-channel'; diff --git a/packages/SimpleModule.Echo/src/types.ts b/packages/SimpleModule.Echo/src/types.ts new file mode 100644 index 00000000..90bd1aa2 --- /dev/null +++ b/packages/SimpleModule.Echo/src/types.ts @@ -0,0 +1,46 @@ +/** + * Wire format the broadcast hub sends to clients. The shape mirrors the + * server's `BroadcastEnvelope` record — keep these in lockstep when either + * side changes. + */ +export interface BroadcastEnvelope { + channel: string; + event: string; + payload: TPayload; +} + +export interface PresenceMember { + userId: string; + info?: Record; +} + +export type PresenceChangeKind = 'Joined' | 'Left'; + +export interface PresenceChange { + channel: string; + kind: PresenceChangeKind; + member: PresenceMember; + members: PresenceMember[]; +} + +export interface SubscribeResult { + authorized: boolean; + reason: string | null; + members: PresenceMember[]; +} + +export type EventHandler = (payload: T, envelope: BroadcastEnvelope) => void; + +/** + * Presence notifications. `change` is non-null for real join/leave deltas + * and `null` for snapshot calls (initial subscription, post-reconnect + * re-seed) so consumers can update their roster without confusing a fresh + * snapshot for a delta. + */ +export type PresenceHandler = ( + change: PresenceChange | null, + members: PresenceMember[], + channel: string, +) => void; + +export type EchoStatus = 'disconnected' | 'connecting' | 'connected' | 'reconnecting'; diff --git a/packages/SimpleModule.Echo/src/use-channel.ts b/packages/SimpleModule.Echo/src/use-channel.ts new file mode 100644 index 00000000..7c3d8289 --- /dev/null +++ b/packages/SimpleModule.Echo/src/use-channel.ts @@ -0,0 +1,62 @@ +import { useEffect, useRef, useState } from 'react'; +import { useEcho } from './echo-context'; +import type { EventHandler, PresenceChange, PresenceMember } from './types'; + +/** + * Subscribe to a single broadcast event on `channel`. The handler is held in + * a ref so the consumer can use freshly-captured state without forcing a + * re-subscribe every render. Pass `undefined` to `channel` to disable + * subscription (useful while loading the channel identifier). + */ +export function useEvent( + channel: string | undefined | null, + event: string, + handler: EventHandler, +): void { + const handlerRef = useRef(handler); + handlerRef.current = handler; + + const echo = useEcho(); + + useEffect(() => { + if (!channel) return; + const dispose = echo.on(channel, event, (payload, envelope) => + handlerRef.current(payload, envelope), + ); + return dispose; + }, [echo, channel, event]); +} + +/** + * Subscribe to a presence channel and re-render the consumer whenever the + * roster changes. Optional `handler` is called for every update — `change` + * is `null` for the initial / post-reconnect snapshot so callers can tell a + * fresh roster apart from a join/leave delta. + */ +export function usePresence( + channel: string | undefined | null, + handler?: (members: PresenceMember[], change: PresenceChange | null) => void, +): PresenceMember[] { + const handlerRef = useRef(handler); + handlerRef.current = handler; + + const echo = useEcho(); + const [members, setMembers] = useState([]); + + useEffect(() => { + if (!channel) { + setMembers([]); + return; + } + const dispose = echo.onPresence(channel, (change, nextMembers) => { + setMembers(nextMembers); + handlerRef.current?.(nextMembers, change); + }); + return () => { + dispose(); + setMembers([]); + }; + }, [echo, channel]); + + return members; +} diff --git a/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastAuthorizerChainTests.cs b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastAuthorizerChainTests.cs new file mode 100644 index 00000000..749cbc7a --- /dev/null +++ b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastAuthorizerChainTests.cs @@ -0,0 +1,122 @@ +using System.Security.Claims; +using FluentAssertions; +using SimpleModule.Core.Broadcasting; +using SimpleModule.Hosting.Broadcasting; + +namespace SimpleModule.Core.Tests.Broadcasting; + +public class BroadcastAuthorizerChainTests +{ + private static ClaimsPrincipal User(string userId, string? tenantId = null) + { + var claims = new List { new("sub", userId) }; + if (tenantId is not null) + { + claims.Add(new("tenantid", tenantId)); + } + return new ClaimsPrincipal(new ClaimsIdentity(claims, "test")); + } + + private static BroadcastContext Context(ClaimsPrincipal? principal, string? tenantId = null) => + new(principal, tenantId); + + [Fact] + public async Task Authenticated_User_May_Subscribe_To_Their_Own_Channel() + { + var chain = new BroadcastAuthorizerChain([ + new DefaultBroadcastAuthorizer(), + new UserChannelAuthorizer(), + new TenantChannelAuthorizer(), + ]); + + var allowed = await chain.AuthorizeAsync( + "private-users.abc", + Context(User("abc")), + TestContext.Current.CancellationToken + ); + + allowed.Should().BeTrue(); + } + + [Fact] + public async Task User_Cannot_Subscribe_To_Other_Users_Private_Channel() + { + var chain = new BroadcastAuthorizerChain([ + new DefaultBroadcastAuthorizer(), + new UserChannelAuthorizer(), + ]); + + var allowed = await chain.AuthorizeAsync( + "private-users.victim", + Context(User("attacker")), + TestContext.Current.CancellationToken + ); + + allowed.Should().BeFalse(); + } + + [Fact] + public async Task Tenant_Channel_Requires_Matching_Tenant_Claim() + { + var chain = new BroadcastAuthorizerChain([ + new DefaultBroadcastAuthorizer(), + new TenantChannelAuthorizer(), + ]); + + var ownTenant = await chain.AuthorizeAsync( + "private-tenants.t1.orders", + Context(User("u1", tenantId: "t1"), tenantId: "t1"), + TestContext.Current.CancellationToken + ); + var otherTenant = await chain.AuthorizeAsync( + "private-tenants.t1.orders", + Context(User("u1", tenantId: "t2"), tenantId: "t2"), + TestContext.Current.CancellationToken + ); + + ownTenant.Should().BeTrue(); + otherTenant.Should().BeFalse(); + } + + [Fact] + public async Task Anonymous_Connection_Cannot_Subscribe_To_Private_Channels() + { + var chain = new BroadcastAuthorizerChain([new DefaultBroadcastAuthorizer()]); + + var allowed = await chain.AuthorizeAsync( + "private-users.abc", + Context(null), + TestContext.Current.CancellationToken + ); + + allowed.Should().BeFalse(); + } + + [Fact] + public async Task Custom_Authorizer_Wins_Over_Framework_Default_By_Prefix_Length() + { + var chain = new BroadcastAuthorizerChain([ + new DefaultBroadcastAuthorizer(), + new AllowAllOrders(), + ]); + + var allowed = await chain.AuthorizeAsync( + "private-tenants.t1.orders", + Context(User("u1")), + TestContext.Current.CancellationToken + ); + + allowed.Should().BeTrue(); + } + + private sealed class AllowAllOrders : IBroadcastChannelAuthorizer + { + public string ChannelPrefix => "private-tenants.t1.orders"; + + public Task AuthorizeAsync( + string channel, + IBroadcastContext context, + CancellationToken cancellationToken + ) => Task.FromResult(true); + } +} diff --git a/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastChannelsTests.cs b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastChannelsTests.cs new file mode 100644 index 00000000..301ce576 --- /dev/null +++ b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastChannelsTests.cs @@ -0,0 +1,37 @@ +using FluentAssertions; +using SimpleModule.Core.Broadcasting; + +namespace SimpleModule.Core.Tests.Broadcasting; + +public class BroadcastChannelsTests +{ + [Fact] + public void ForUser_Produces_Private_User_Channel() + { + BroadcastChannels.ForUser("abc").Should().Be("private-users.abc"); + } + + [Fact] + public void ForTenant_Produces_Private_Tenant_Channel() + { + BroadcastChannels.ForTenant("t1").Should().Be("private-tenants.t1"); + } + + [Theory] + [InlineData("private-foo", true)] + [InlineData("presence-room", true)] + [InlineData("public-thing", false)] + [InlineData("orders", false)] + public void IsPrivate_Matches_Reserved_Prefixes(string channel, bool expected) + { + BroadcastChannels.IsPrivate(channel).Should().Be(expected); + } + + [Theory] + [InlineData("presence-room", true)] + [InlineData("private-x", false)] + public void IsPresence_Detects_Presence_Channels(string channel, bool expected) + { + BroadcastChannels.IsPresence(channel).Should().Be(expected); + } +} diff --git a/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastEventAttributeTests.cs b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastEventAttributeTests.cs new file mode 100644 index 00000000..3c8f3d2d --- /dev/null +++ b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcastEventAttributeTests.cs @@ -0,0 +1,35 @@ +using System.Reflection; +using FluentAssertions; +using SimpleModule.Core.Broadcasting; + +namespace SimpleModule.Core.Tests.Broadcasting; + +public class BroadcastEventAttributeTests +{ + [BroadcastEvent("orders.created")] + private sealed record OrderCreated(Guid Id) : IBroadcastEvent + { + public Guid EventId { get; } = Guid.CreateVersion7(); + public DateTimeOffset OccurredAt { get; } = DateTimeOffset.UtcNow; + + public string Channel(IBroadcastContext context) => $"orders.{Id}"; + } + + [Fact] + public void Attribute_Carries_Wire_Name() + { + var attr = typeof(OrderCreated).GetCustomAttribute(); + + attr.Should().NotBeNull(); + attr!.Name.Should().Be("orders.created"); + } + + [Fact] + public void Channel_Uses_Event_State() + { + var id = Guid.NewGuid(); + var evt = new OrderCreated(id); + + evt.Channel(BroadcastContext.Empty).Should().Be($"orders.{id}"); + } +} diff --git a/tests/SimpleModule.Core.Tests/Broadcasting/BroadcasterIntegrationTests.cs b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcasterIntegrationTests.cs new file mode 100644 index 00000000..8e9a9579 --- /dev/null +++ b/tests/SimpleModule.Core.Tests/Broadcasting/BroadcasterIntegrationTests.cs @@ -0,0 +1,147 @@ +using FluentAssertions; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using SimpleModule.Core.Broadcasting; +using SimpleModule.Hosting.Broadcasting; +using SimpleModule.Tests.Shared.Fixtures; +using Wolverine; + +namespace SimpleModule.Core.Tests.Broadcasting; + +/// +/// Verifies the broadcasting pieces are wired into the real test host: +/// the SignalR hub is registered, resolves, and +/// the Wolverine bridge handles concrete +/// subclasses via polymorphic dispatch. The handler dispatch is the load- +/// bearing assertion — without it, events would publish but never reach +/// browsers. +/// +[Collection(TestCollections.Integration)] +public sealed class BroadcasterIntegrationTests(SimpleModuleWebApplicationFactory factory) +{ + [Fact] + public void Broadcaster_Is_Registered_In_DI() + { + // Boot the factory so all module services are wired. + using var _ = factory.CreateClient(); + + var broadcaster = factory.Services.GetRequiredService(); + broadcaster.Should().BeOfType(); + + // Hub must also resolve — proves AddSignalR ran. + factory.Services.GetRequiredService>().Should().NotBeNull(); + } + + [BroadcastEvent("test.broadcasting")] + private sealed record TestBroadcastEvent(Guid Id) : IBroadcastEvent + { + public Guid EventId { get; } = Guid.CreateVersion7(); + public DateTimeOffset OccurredAt { get; } = DateTimeOffset.UtcNow; + + public string Channel(IBroadcastContext context) => $"private-tests.{Id}"; + } + + [Fact] + public async Task PublishingBus_Forwards_BroadcastEvents_To_The_Broadcaster() + { + // BroadcastingMessageBus decorates IMessageBus, so any IBroadcastEvent + // published through the bus must reach the IBroadcaster — that's the + // contract modules rely on without writing per-event handlers. A + // per-test factory swaps in a recording broadcaster so the assertion + // doesn't depend on the shared singleton from the collection fixture; + // disposal is wrapped because Wolverine's shutdown of the inner host + // race-cancels the durability-table SQL command on macOS occasionally. + var recording = new RecordingBroadcaster(); + var local = factory.WithWebHostBuilder(builder => + builder.ConfigureServices(services => + { + services.RemoveAll(); + services.AddSingleton(recording); + }) + ); + try + { + using var _ = local.CreateClient(); + + using var scope = local.Services.CreateScope(); + var bus = scope.ServiceProvider.GetRequiredService(); + + var evt = new TestBroadcastEvent(Guid.NewGuid()); + await bus.PublishAsync(evt); + + recording.Published.Should().ContainSingle().Which.Should().Be(evt); + } + finally + { + try + { + await local.DisposeAsync(); + } +#pragma warning disable CA1031 + catch + { + // Wolverine's StopAsync occasionally raises TaskCanceledException + // when the durability tables are torn down concurrently. The + // shared fixture swallows the same noise — see + // SimpleModuleWebApplicationFactory.Dispose. + } +#pragma warning restore CA1031 + } + } +} + +/// +/// Captures every the bus forwards, so the +/// integration test can assert on it without standing up a real SignalR +/// connection. +/// +internal sealed class RecordingBroadcaster : IBroadcaster +{ + private readonly List _events = new(); + + public IReadOnlyList Published + { + get + { + lock (_events) + { + return _events.ToList(); + } + } + } + + public Task PublishAsync( + IBroadcastEvent broadcastEvent, + CancellationToken cancellationToken = default + ) + { + lock (_events) + { + _events.Add(broadcastEvent); + } + return Task.CompletedTask; + } + + public Task ToChannelAsync( + string channel, + string @event, + object payload, + CancellationToken cancellationToken = default + ) => Task.CompletedTask; + + public Task ToUserAsync( + string userId, + string @event, + object payload, + CancellationToken cancellationToken = default + ) => Task.CompletedTask; + + public Task ToTenantAsync( + string tenantId, + string @event, + object payload, + CancellationToken cancellationToken = default + ) => Task.CompletedTask; +} diff --git a/tests/SimpleModule.Core.Tests/Broadcasting/PresenceTrackerTests.cs b/tests/SimpleModule.Core.Tests/Broadcasting/PresenceTrackerTests.cs new file mode 100644 index 00000000..bfa8a5df --- /dev/null +++ b/tests/SimpleModule.Core.Tests/Broadcasting/PresenceTrackerTests.cs @@ -0,0 +1,75 @@ +using FluentAssertions; +using SimpleModule.Core.Broadcasting; +using SimpleModule.Hosting.Broadcasting; + +namespace SimpleModule.Core.Tests.Broadcasting; + +public class PresenceTrackerTests +{ + [Fact] + public void Add_First_Connection_Returns_Joined_True() + { + var tracker = new PresenceTracker(); + var member = new PresenceMember("u1"); + + var joined = tracker.Add("presence-room", "c1", member); + + joined.Should().BeTrue(); + tracker.Members("presence-room").Should().ContainSingle().Which.UserId.Should().Be("u1"); + } + + [Fact] + public void Add_Second_Connection_For_Same_User_Does_Not_Refire_Join() + { + var tracker = new PresenceTracker(); + tracker.Add("presence-room", "c1", new PresenceMember("u1")); + + var joined = tracker.Add("presence-room", "c2", new PresenceMember("u1")); + + joined.Should().BeFalse(); + tracker.Members("presence-room").Should().ContainSingle(); + } + + [Fact] + public void Remove_Last_Connection_For_User_Reports_Departure() + { + var tracker = new PresenceTracker(); + tracker.Add("presence-room", "c1", new PresenceMember("u1")); + + var left = tracker.Remove("presence-room", "c1", out var member); + + left.Should().BeTrue(); + member!.UserId.Should().Be("u1"); + tracker.Members("presence-room").Should().BeEmpty(); + } + + [Fact] + public void Remove_With_Other_Connections_For_Same_User_Does_Not_Report_Departure() + { + var tracker = new PresenceTracker(); + tracker.Add("presence-room", "c1", new PresenceMember("u1")); + tracker.Add("presence-room", "c2", new PresenceMember("u1")); + + var left = tracker.Remove("presence-room", "c1", out _); + + left.Should().BeFalse(); + tracker.Members("presence-room").Should().ContainSingle(); + } + + [Fact] + public void RemoveConnection_Returns_Channels_With_Departed_Last_User() + { + var tracker = new PresenceTracker(); + tracker.Add("presence-a", "c1", new PresenceMember("u1")); + tracker.Add("presence-a", "c2", new PresenceMember("u2")); + tracker.Add("presence-b", "c1", new PresenceMember("u1")); + + var departures = tracker.RemoveConnection("c1"); + + departures + .Should() + .HaveCount(2) + .And.Contain(d => d.Channel == "presence-a" && d.Member.UserId == "u1") + .And.Contain(d => d.Channel == "presence-b" && d.Member.UserId == "u1"); + } +}