Skip to content

feat: use consumer sessions table to track leases, don't update each topic individually#4

Open
abelanger5 wants to merge 1 commit into
mainfrom
belanger/reduce-topic-updates
Open

feat: use consumer sessions table to track leases, don't update each topic individually#4
abelanger5 wants to merge 1 commit into
mainfrom
belanger/reduce-topic-updates

Conversation

@abelanger5

Copy link
Copy Markdown
Contributor

Description

At a very large volume of topics (>40k in tested environment), we start to see a lot of overhead from each updating a timestamp on each topic to manage leasing. This consolidates topic leasing by using a single consumer row with an expiring lease. We only update the consumer id and then manage leasing on the consumer row.

This is a breaking change, upgrading from v0.2.1 will result in lease-stealing from old instances.

Type of change

  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Refactor (non-breaking changes to code which doesn't change any behaviour)

Checklist

Changes have been:

  • Tested (unit, integration, or manually with steps specified)
  • Linted and formatted

🤖 AI Disclosure
  • I acknowledge that an LLM was used in the creation of this Pull Request, in accordance with Hatchet's AI_POLICY.md.
  • Details: Claude for implementation and writing tests

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors exclusive-topic leasing to scale better at high topic counts by centralizing liveness/lease renewal into a per-consumer “session” row (consumer_sessions) instead of continuously updating per-topic timestamps. It updates the SQL layer and outbox runtime logic accordingly, replacing the per-topic renewer with a watcher that writes a one-shot grace-period expiry when an AcquireTopic context ends.

Changes:

  • Add consumer_sessions table and a new upsert query; update topic locking query to derive expiry from either per-topic override or consumer session expiry.
  • Replace per-topic lease renewer with a per-topic cancellation watcher + add a background consumer-session heartbeat renewer.
  • Add/adjust E2E and unit tests to validate bounded writes, reacquire race behavior, and legacy per-topic timestamp compatibility.

Reviewed changes

Copilot reviewed 9 out of 11 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
sqlc/schema.sql Align schema with existing migrations and add consumer_sessions table definition for sqlc schema.
sqlc/queries.sql Update GetTopicForUpdate to COALESCE per-topic override vs session expiry; add UpsertConsumerSession.
sqlc/queries.sql.go Regenerate sqlc output for updated queries and new upsert method.
sqlc/models.go Add ConsumerSession model.
sqlc/migrations/20260701000000_v0_3_0.sql Add migration creating consumer_sessions.
outbox.go Switch lease behavior to session-deferred leases; add session renewer and lease watcher usage.
exclusive_lease_watcher.go New per-topic watcher that fires once on ctx end with a detached context.
exclusive_lease_watcher_test.go Unit tests for watcher semantics (single-fire, restart ordering, cleanup).
exclusive_lease_renewer.go Remove old per-topic periodic renewer.
exclusive_lease_renewer_test.go Remove tests for the deleted renewer.
exclusive_consumer_e2e_test.go Update existing exclusive consumer test and add new regression/behavior tests for session leases.
Files not reviewed (2)
  • sqlc/models.go: Generated file
  • sqlc/queries.sql.go: Generated file

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread outbox.go
Comment on lines +420 to 424
// The per-topic timestamp is left NULL, so the lease defers to this
// instance's consumer session for liveness (and any stale override from a
// previous holder is cleared).
if err := o.queries.SetTopicExclusiveConsumer(ctx, wrapped, sqlc.SetTopicExclusiveConsumerParams{
Topic: topic,
Comment on lines +62 to +65
defer w.running.CompareAndDelete(topic, handle)
<-watchCtx.Done()
w.onCancel(context.WithoutCancel(watchCtx), topic)
}()
Comment thread outbox.go
Comment on lines +274 to +278
if err := o.upsertConsumerSession(ctx); err != nil {
o.logger.Error().Err(err).Msg("exclusive consumer: failed to register consumer session")
}

go o.runConsumerSessionRenewer(ctx)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants