Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions packages/db/src/buffers/session-buffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,51 @@ describe('SessionBuffer', () => {

insertSpy.mockRestore();
});

it('does not re-insert already-landed chunks when a later chunk fails', async () => {
process.env.SESSION_BUFFER_CHUNK_SIZE = '1';
try {
const buffer = new SessionBuffer();
await buffer.ingest(
makePayload({ deviceId: 'device-A', sessionId: 'session-A' })
);
await buffer.ingest(
makePayload({ deviceId: 'device-B', sessionId: 'session-B' })
);
expect(await buffer.getBufferSize()).toBe(2);

// chunkSize=1 → two single-row chunks. The first lands in CH, the
// second throws (e.g. request timeout).
vi.mocked(ch.insert).mockClear();
vi.mocked(ch.insert)
.mockResolvedValueOnce(undefined as any)
.mockRejectedValueOnce(new Error('ClickHouse unavailable'));

await expect(buffer.processBuffer()).rejects.toThrow(
'ClickHouse unavailable'
);

// Only the failed row may be re-queued. The old LRANGE→LTRIM design
// kept BOTH rows queued (the LTRIM never ran), so the next flush
// re-inserted the already-landed chunk — duplicate rows in CH that
// double-count the session in every sum(sign * ...) metric.
expect(await buffer.getBufferSize()).toBe(1);

vi.mocked(ch.insert).mockResolvedValue(undefined as any);
await buffer.processBuffer();
expect(await buffer.getBufferSize()).toBe(0);

// device-A's row went to CH exactly once across both flushes.
const callsWithA = vi
.mocked(ch.insert)
.mock.calls.filter((c) =>
((c[0] as any).values as any[]).some(
(row) => row.device_id === 'device-A'
)
).length;
expect(callsWithA).toBe(1);
} finally {
delete process.env.SESSION_BUFFER_CHUNK_SIZE;
}
});
});
65 changes: 48 additions & 17 deletions packages/db/src/buffers/session-buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,19 @@ export class SessionBuffer extends BaseBuffer {
}

async processBuffer() {
const events = await this.redis.lrange(
this.redisKey,
0,
this.batchSize - 1
);
if (events.length === 0) return;
// LPOP with COUNT atomically claims the queue head. The previous
// LRANGE → insert → LTRIM bracket had a partial-failure pathology:
// when any chunk after the first successful one threw, the LTRIM
// never ran, so the next flush re-read and re-inserted chunks that
// had ALREADY landed in ClickHouse. Duplicate net-0 pairs collapse
// away, but a duplicated net=+1 row (e.g. a session's final state)
// double-counts that session in every sum(sign * ...) metric.
//
// ioredis LPOP with COUNT returns string[] | null.
const events = (await this.redis.lpop(this.redisKey, this.batchSize)) as
| string[]
| null;
if (!events || events.length === 0) return;

const parsed: IClickhouseSession[] = [];
for (const e of events) {
Expand All @@ -494,19 +501,43 @@ export class SessionBuffer extends BaseBuffer {
? this.squashSessionsByVersion(parsed)
: parsed;

for (const chunk of this.chunks(sessions, this.chunkSize)) {
await ch.insert({
table: TABLE_NAMES.sessions,
values: chunk,
format: 'JSONEachRow',
});
const chunks = this.chunks(sessions, this.chunkSize);
let nextChunk = 0;
try {
for (; nextChunk < chunks.length; nextChunk++) {
await ch.insert({
table: TABLE_NAMES.sessions,
values: chunks[nextChunk]!,
format: 'JSONEachRow',
});
}
} catch (error) {
// Re-queue everything not confirmed inserted — the failed chunk and
// every chunk after it — at the head, preserving order. Rows already
// squashed re-squash to themselves on the retry flush. The error
// still propagates so the base flush records result:'error'.
const remaining = chunks.slice(nextChunk).flat();
if (remaining.length > 0) {
try {
await this.redis.lpush(
this.redisKey,
...remaining
.slice()
.reverse()
.map((row) => JSON.stringify(row))
);
} catch (requeueErr) {
// If even the re-queue fails the rows are lost — log loudly so
// it is investigable rather than silent.
this.logger.error(
{ err: requeueErr, lostRowCount: remaining.length },
'CRITICAL: failed to re-queue session rows to Redis — rows LOST'
);
}
}
throw error;
}

// Trim only after a successful insert — on failure the rows stay queued.
// Don't swallow: let it propagate so the base flush records result:'error'
// (and the next flush retries the still-queued rows).
await this.redis.ltrim(this.redisKey, events.length, -1);

this.logger.debug({ count: events.length }, 'Processed sessions');
}

Expand Down