From 0de6fa84e5819631bc836bfb139182f22c8601a2 Mon Sep 17 00:00:00 2001 From: Parikshit Singh Date: Fri, 12 Jun 2026 16:39:17 +0530 Subject: [PATCH] =?UTF-8?q?fix(session-buffer):=20atomic=20queue=20consume?= =?UTF-8?q?=20=E2=80=94=20no=20duplicate=20rows=20on=20partial=20insert=20?= =?UTF-8?q?failure?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flush read the queue head with LRANGE, inserted it in chunks, and LTRIMmed only after every chunk succeeded. When any chunk after the first successful one threw (e.g. a request timeout under load), the LTRIM never ran — so the next flush re-read and re-inserted chunks that had ALREADY landed in ClickHouse. Duplicate net-0 pairs collapse away in the VersionedCollapsingMergeTree, but a duplicated net=+1 row (a session's final state) double-counts that session in every sum(sign * ...) metric. Replace the LRANGE/LTRIM bracket with LPOP COUNT (atomically claims the head in one round trip), insert chunk-by-chunk as before, and on failure re-queue only the not-yet-confirmed chunks at the head, preserving order. The error still propagates so the base flush records result:'error' and the next flush retries the re-queued rows. Same design as the event-buffer consume path. Includes a regression test: first chunk lands, second throws → only the failed row is re-queued, and the landed row is never re-inserted on the retry flush. Co-Authored-By: Claude Fable 5 --- .../db/src/buffers/session-buffer.test.ts | 47 ++++++++++++++ packages/db/src/buffers/session-buffer.ts | 65 ++++++++++++++----- 2 files changed, 95 insertions(+), 17 deletions(-) diff --git a/packages/db/src/buffers/session-buffer.test.ts b/packages/db/src/buffers/session-buffer.test.ts index e6ba88bca..a811204d4 100644 --- a/packages/db/src/buffers/session-buffer.test.ts +++ b/packages/db/src/buffers/session-buffer.test.ts @@ -374,4 +374,51 @@ describe('SessionBuffer', () => { insertSpy.mockRestore(); }); + + it('does not re-insert already-landed chunks when a later chunk fails', async () => { + process.env.SESSION_BUFFER_CHUNK_SIZE = '1'; + try { + const buffer = new SessionBuffer(); + await buffer.ingest( + makePayload({ deviceId: 'device-A', sessionId: 'session-A' }) + ); + await buffer.ingest( + makePayload({ deviceId: 'device-B', sessionId: 'session-B' }) + ); + expect(await buffer.getBufferSize()).toBe(2); + + // chunkSize=1 → two single-row chunks. The first lands in CH, the + // second throws (e.g. request timeout). + vi.mocked(ch.insert).mockClear(); + vi.mocked(ch.insert) + .mockResolvedValueOnce(undefined as any) + .mockRejectedValueOnce(new Error('ClickHouse unavailable')); + + await expect(buffer.processBuffer()).rejects.toThrow( + 'ClickHouse unavailable' + ); + + // Only the failed row may be re-queued. The old LRANGE→LTRIM design + // kept BOTH rows queued (the LTRIM never ran), so the next flush + // re-inserted the already-landed chunk — duplicate rows in CH that + // double-count the session in every sum(sign * ...) metric. + expect(await buffer.getBufferSize()).toBe(1); + + vi.mocked(ch.insert).mockResolvedValue(undefined as any); + await buffer.processBuffer(); + expect(await buffer.getBufferSize()).toBe(0); + + // device-A's row went to CH exactly once across both flushes. + const callsWithA = vi + .mocked(ch.insert) + .mock.calls.filter((c) => + ((c[0] as any).values as any[]).some( + (row) => row.device_id === 'device-A' + ) + ).length; + expect(callsWithA).toBe(1); + } finally { + delete process.env.SESSION_BUFFER_CHUNK_SIZE; + } + }); }); diff --git a/packages/db/src/buffers/session-buffer.ts b/packages/db/src/buffers/session-buffer.ts index 8ff03ac63..c93eab86b 100644 --- a/packages/db/src/buffers/session-buffer.ts +++ b/packages/db/src/buffers/session-buffer.ts @@ -466,12 +466,19 @@ export class SessionBuffer extends BaseBuffer { } async processBuffer() { - const events = await this.redis.lrange( - this.redisKey, - 0, - this.batchSize - 1 - ); - if (events.length === 0) return; + // LPOP with COUNT atomically claims the queue head. The previous + // LRANGE → insert → LTRIM bracket had a partial-failure pathology: + // when any chunk after the first successful one threw, the LTRIM + // never ran, so the next flush re-read and re-inserted chunks that + // had ALREADY landed in ClickHouse. Duplicate net-0 pairs collapse + // away, but a duplicated net=+1 row (e.g. a session's final state) + // double-counts that session in every sum(sign * ...) metric. + // + // ioredis LPOP with COUNT returns string[] | null. + const events = (await this.redis.lpop(this.redisKey, this.batchSize)) as + | string[] + | null; + if (!events || events.length === 0) return; const parsed: IClickhouseSession[] = []; for (const e of events) { @@ -494,19 +501,43 @@ export class SessionBuffer extends BaseBuffer { ? this.squashSessionsByVersion(parsed) : parsed; - for (const chunk of this.chunks(sessions, this.chunkSize)) { - await ch.insert({ - table: TABLE_NAMES.sessions, - values: chunk, - format: 'JSONEachRow', - }); + const chunks = this.chunks(sessions, this.chunkSize); + let nextChunk = 0; + try { + for (; nextChunk < chunks.length; nextChunk++) { + await ch.insert({ + table: TABLE_NAMES.sessions, + values: chunks[nextChunk]!, + format: 'JSONEachRow', + }); + } + } catch (error) { + // Re-queue everything not confirmed inserted — the failed chunk and + // every chunk after it — at the head, preserving order. Rows already + // squashed re-squash to themselves on the retry flush. The error + // still propagates so the base flush records result:'error'. + const remaining = chunks.slice(nextChunk).flat(); + if (remaining.length > 0) { + try { + await this.redis.lpush( + this.redisKey, + ...remaining + .slice() + .reverse() + .map((row) => JSON.stringify(row)) + ); + } catch (requeueErr) { + // If even the re-queue fails the rows are lost — log loudly so + // it is investigable rather than silent. + this.logger.error( + { err: requeueErr, lostRowCount: remaining.length }, + 'CRITICAL: failed to re-queue session rows to Redis — rows LOST' + ); + } + } + throw error; } - // Trim only after a successful insert — on failure the rows stay queued. - // Don't swallow: let it propagate so the base flush records result:'error' - // (and the next flush retries the still-queued rows). - await this.redis.ltrim(this.redisKey, events.length, -1); - this.logger.debug({ count: events.length }, 'Processed sessions'); }