Skip to content

fix(session-buffer): atomic queue consume — no duplicate rows on partial insert failure#394

Open
parikshit223933 wants to merge 1 commit into
Openpanel-dev:mainfrom
parikshit223933:upstream-fix/session-buffer-atomic-consume
Open

fix(session-buffer): atomic queue consume — no duplicate rows on partial insert failure#394
parikshit223933 wants to merge 1 commit into
Openpanel-dev:mainfrom
parikshit223933:upstream-fix/session-buffer-atomic-consume

Conversation

@parikshit223933

@parikshit223933 parikshit223933 commented Jun 12, 2026

Copy link
Copy Markdown

Problem

SessionBuffer.processBuffer() reads the queue head with LRANGE, inserts it in chunks, and LTRIMs only after all chunks succeed. If any chunk after the first successful one throws (e.g. a ClickHouse request timeout under load), the LTRIM never runs — so the next flush re-reads and re-inserts chunks that already landed.

Duplicate net-0 pairs collapse away in the VersionedCollapsingMergeTree, but a duplicated net=+1 row (a session's resident state) double-counts that session in every sum(sign * …) metric (sessions, pageviews, bounce rate) until its next update — and permanently for sessions whose final state row is the duplicated one.

This is the same partial-failure pathology the event buffer's consume path already fixed (#387).

Fix

  • LPOP COUNT atomically claims the queue head (no LRANGE/LTRIM window).
  • Insert chunk-by-chunk exactly as before.
  • On failure, re-queue only the not-yet-confirmed chunks (the failed one and everything after it) back at the head, preserving order; already-squashed rows re-squash to themselves on the retry. The error still propagates so the base flush records result:'error'.
  • If the re-queue itself fails, log loudly (rows would otherwise vanish silently).

Verification

  • New regression test: with chunkSize=1, two sessions enqueued, first insert lands, second throws → only the failed row remains queued; after the retry flush, the landed row was inserted exactly once across both flushes (the old design re-inserted it).
  • Full session-buffer.test.ts suite passes (16 tests) against real Redis 7; tsc --noEmit clean.

Note: this composes with the per-version squash netting already on main — netting fixes which rows a batch emits; this PR fixes the batch's delivery being exactly-once from the queue's perspective.

Summary by CodeRabbit

  • Bug Fixes

    • Improved handling of failed database writes to prevent duplicate data entries
    • Enhanced atomic queue processing for more reliable data buffering and error recovery
  • Tests

    • Added test verifying correct behavior when database chunk inserts fail

…ial insert failure

The flush read the queue head with LRANGE, inserted it in chunks, and
LTRIMmed only after every chunk succeeded. When any chunk after the
first successful one threw (e.g. a request timeout under load), the
LTRIM never ran — so the next flush re-read and re-inserted chunks that
had ALREADY landed in ClickHouse. Duplicate net-0 pairs collapse away in
the VersionedCollapsingMergeTree, but a duplicated net=+1 row (a
session's final state) double-counts that session in every
sum(sign * ...) metric.

Replace the LRANGE/LTRIM bracket with LPOP COUNT (atomically claims the
head in one round trip), insert chunk-by-chunk as before, and on failure
re-queue only the not-yet-confirmed chunks at the head, preserving
order. The error still propagates so the base flush records
result:'error' and the next flush retries the re-queued rows.

Same design as the event-buffer consume path. Includes a regression
test: first chunk lands, second throws → only the failed row is
re-queued, and the landed row is never re-inserted on the retry flush.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@CLAassistant

Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@coderabbitai

coderabbitai Bot commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e90a46b5-d81a-4695-a8d5-b9da1dd8e2cd

📥 Commits

Reviewing files that changed from the base of the PR and between acc29e9 and 0de6fa8.

📒 Files selected for processing (2)
  • packages/db/src/buffers/session-buffer.test.ts
  • packages/db/src/buffers/session-buffer.ts

📝 Walkthrough

Walkthrough

Redis queue consumption now uses atomic LPOP claiming instead of LRANGE plus later LTRIM. ClickHouse writes are performed chunk-by-chunk, and if one chunk fails, only that chunk and the remaining chunks are re-queued. A regression test verifies already-inserted chunks are not duplicated on retry.

Changes

Session buffer flush recovery

Layer / File(s) Summary
Atomic claim and failed chunk recovery
packages/db/src/buffers/session-buffer.ts
processBuffer now claims up to batchSize items with redis.lpop(redisKey, batchSize), then inserts ClickHouse chunks sequentially. If a chunk insert fails, it re-queues the failed chunk and later chunks to the Redis head in preserved order and re-throws the original error.
Regression coverage for duplicate avoidance
packages/db/src/buffers/session-buffer.test.ts
A regression test sets SESSION_BUFFER_CHUNK_SIZE=1, ingests two sessions, makes the second insert fail, checks that only the failed row remains buffered, then flushes again and confirms the first device row appears in only one insert across both flushes.

Sequence Diagram

sequenceDiagram
  participant SessionBuffer
  participant Redis
  participant ClickHouse
  SessionBuffer->>Redis: LPOP(redisKey, batchSize)
  Redis-->>SessionBuffer: claimed items
  loop each chunk
    SessionBuffer->>ClickHouse: insert(chunk)
    alt insert succeeds
      ClickHouse-->>SessionBuffer: success
    else insert fails
      ClickHouse-->>SessionBuffer: error
      SessionBuffer->>Redis: LPUSH(failed and remaining chunks)
      Redis-->>SessionBuffer: re-queued items
      Note over SessionBuffer: Re-throw original error
    end
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐇 I packed each chunk in tidy rows,
And caught the one that would not go.
The first hopped through just once, no more,
The rest returned to Redis' door.
Retry, rebound, and onward flow.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and clearly summarizes the main fix: replacing LRANGE/LTRIM with atomic LPOP to prevent duplicate row insertion when partial ClickHouse inserts fail.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants