fix(session-buffer): atomic queue consume — no duplicate rows on partial insert failure#394
Conversation
…ial insert failure The flush read the queue head with LRANGE, inserted it in chunks, and LTRIMmed only after every chunk succeeded. When any chunk after the first successful one threw (e.g. a request timeout under load), the LTRIM never ran — so the next flush re-read and re-inserted chunks that had ALREADY landed in ClickHouse. Duplicate net-0 pairs collapse away in the VersionedCollapsingMergeTree, but a duplicated net=+1 row (a session's final state) double-counts that session in every sum(sign * ...) metric. Replace the LRANGE/LTRIM bracket with LPOP COUNT (atomically claims the head in one round trip), insert chunk-by-chunk as before, and on failure re-queue only the not-yet-confirmed chunks at the head, preserving order. The error still propagates so the base flush records result:'error' and the next flush retries the re-queued rows. Same design as the event-buffer consume path. Includes a regression test: first chunk lands, second throws → only the failed row is re-queued, and the landed row is never re-inserted on the retry flush. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
|
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughRedis queue consumption now uses atomic ChangesSession buffer flush recovery
Sequence DiagramsequenceDiagram
participant SessionBuffer
participant Redis
participant ClickHouse
SessionBuffer->>Redis: LPOP(redisKey, batchSize)
Redis-->>SessionBuffer: claimed items
loop each chunk
SessionBuffer->>ClickHouse: insert(chunk)
alt insert succeeds
ClickHouse-->>SessionBuffer: success
else insert fails
ClickHouse-->>SessionBuffer: error
SessionBuffer->>Redis: LPUSH(failed and remaining chunks)
Redis-->>SessionBuffer: re-queued items
Note over SessionBuffer: Re-throw original error
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Problem
SessionBuffer.processBuffer()reads the queue head withLRANGE, inserts it in chunks, andLTRIMs only after all chunks succeed. If any chunk after the first successful one throws (e.g. a ClickHouse request timeout under load), theLTRIMnever runs — so the next flush re-reads and re-inserts chunks that already landed.Duplicate net-0 pairs collapse away in the
VersionedCollapsingMergeTree, but a duplicated net=+1 row (a session's resident state) double-counts that session in everysum(sign * …)metric (sessions, pageviews, bounce rate) until its next update — and permanently for sessions whose final state row is the duplicated one.This is the same partial-failure pathology the event buffer's consume path already fixed (#387).
Fix
LPOP COUNTatomically claims the queue head (no LRANGE/LTRIM window).result:'error'.Verification
chunkSize=1, two sessions enqueued, first insert lands, second throws → only the failed row remains queued; after the retry flush, the landed row was inserted exactly once across both flushes (the old design re-inserted it).session-buffer.test.tssuite passes (16 tests) against real Redis 7;tsc --noEmitclean.Note: this composes with the per-version squash netting already on main — netting fixes which rows a batch emits; this PR fixes the batch's delivery being exactly-once from the queue's perspective.
Summary by CodeRabbit
Bug Fixes
Tests