feat: use consumer sessions table to track leases, don't update each topic individually#4
Open
abelanger5 wants to merge 1 commit into
Open
feat: use consumer sessions table to track leases, don't update each topic individually#4abelanger5 wants to merge 1 commit into
abelanger5 wants to merge 1 commit into
Conversation
…topic individually
There was a problem hiding this comment.
Pull request overview
This PR refactors exclusive-topic leasing to scale better at high topic counts by centralizing liveness/lease renewal into a per-consumer “session” row (consumer_sessions) instead of continuously updating per-topic timestamps. It updates the SQL layer and outbox runtime logic accordingly, replacing the per-topic renewer with a watcher that writes a one-shot grace-period expiry when an AcquireTopic context ends.
Changes:
- Add
consumer_sessionstable and a new upsert query; update topic locking query to derive expiry from either per-topic override or consumer session expiry. - Replace per-topic lease renewer with a per-topic cancellation watcher + add a background consumer-session heartbeat renewer.
- Add/adjust E2E and unit tests to validate bounded writes, reacquire race behavior, and legacy per-topic timestamp compatibility.
Reviewed changes
Copilot reviewed 9 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sqlc/schema.sql | Align schema with existing migrations and add consumer_sessions table definition for sqlc schema. |
| sqlc/queries.sql | Update GetTopicForUpdate to COALESCE per-topic override vs session expiry; add UpsertConsumerSession. |
| sqlc/queries.sql.go | Regenerate sqlc output for updated queries and new upsert method. |
| sqlc/models.go | Add ConsumerSession model. |
| sqlc/migrations/20260701000000_v0_3_0.sql | Add migration creating consumer_sessions. |
| outbox.go | Switch lease behavior to session-deferred leases; add session renewer and lease watcher usage. |
| exclusive_lease_watcher.go | New per-topic watcher that fires once on ctx end with a detached context. |
| exclusive_lease_watcher_test.go | Unit tests for watcher semantics (single-fire, restart ordering, cleanup). |
| exclusive_lease_renewer.go | Remove old per-topic periodic renewer. |
| exclusive_lease_renewer_test.go | Remove tests for the deleted renewer. |
| exclusive_consumer_e2e_test.go | Update existing exclusive consumer test and add new regression/behavior tests for session leases. |
Files not reviewed (2)
- sqlc/models.go: Generated file
- sqlc/queries.sql.go: Generated file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+420
to
424
| // The per-topic timestamp is left NULL, so the lease defers to this | ||
| // instance's consumer session for liveness (and any stale override from a | ||
| // previous holder is cleared). | ||
| if err := o.queries.SetTopicExclusiveConsumer(ctx, wrapped, sqlc.SetTopicExclusiveConsumerParams{ | ||
| Topic: topic, |
Comment on lines
+62
to
+65
| defer w.running.CompareAndDelete(topic, handle) | ||
| <-watchCtx.Done() | ||
| w.onCancel(context.WithoutCancel(watchCtx), topic) | ||
| }() |
Comment on lines
+274
to
+278
| if err := o.upsertConsumerSession(ctx); err != nil { | ||
| o.logger.Error().Err(err).Msg("exclusive consumer: failed to register consumer session") | ||
| } | ||
|
|
||
| go o.runConsumerSessionRenewer(ctx) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
At a very large volume of topics (>40k in tested environment), we start to see a lot of overhead from each updating a timestamp on each topic to manage leasing. This consolidates topic leasing by using a single consumer row with an expiring lease. We only update the consumer id and then manage leasing on the consumer row.
This is a breaking change, upgrading from
v0.2.1will result in lease-stealing from old instances.Type of change
Checklist
Changes have been:
🤖 AI Disclosure