Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions apps/api/e2e/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Session E2E

Two harnesses over a shared foundation (`lib.ts`), both driving the **real stack**
over HTTP and asserting state in **both ClickHouse and Redis**:

- `session-e2e.ts` (`e2e:sessions`) — **correctness**: the full lifecycle for one
session per scenario (open/extend/close via reaper + boundary, replay, identify),
including Redis cleanup.
- `session-stress.ts` (`e2e:sessions:stress`) — **volume + drain**: ramps out many
sessions, then drives reaper + buffer flushes until *everything* has drained
(every `session_end` emitted, Redis cleaned, session buffer empty) and reconciles
the ClickHouse counts. Exits only when nothing is left open.

## What it covers

| Scenario | Asserts |
|----------|---------|
| Single session → reaper close | session blob + wallclock + projects-set in Redis; one `session_start` + N events in CH; after the reaper closes it: one `session_end`, a collapsed `sessions` row, and Redis fully cleaned (blob + wallclock gone, idempotency claim present). |
| Boundary split | a >idle-window gap opens a NEW session id and emits a `session_end` for the first + a `session_start` for the second. |
| Replay | a replay chunk lands in `session_replay_chunks` under the echoed session id. |
| Identify | `session:profile:{pid}:{profileId}` pointer written; events carry the identified `profile_id`. |

## Running

Sessions idle out after 30 min by default. Shrink that and start the stack with
the **same** value the harness uses, then run the harness:

```bash
# 1. Start the stack with a short idle window (Docker must be up: pnpm dock:up)
SESSION_TIMEOUT_MS=4000 pnpm dev

# 2. In another terminal, run the harness with the SAME timeout
SESSION_TIMEOUT_MS=4000 pnpm --filter @openpanel/api e2e:sessions

# …or the stress + drain test (500 sessions by default)
SESSION_TIMEOUT_MS=4000 pnpm --filter @openpanel/api e2e:sessions:stress
```

Stress tunables (env): `E2E_SESSIONS` (500), `E2E_CONCURRENCY` (25),
`E2E_EVENTS_PER_SESSION` (3), `E2E_DRAIN_TIMEOUT_MS` (120000).

It exits non-zero if any check fails and prints a summary. Total run is ~30–60s
with a 4s window (each close waits roughly one idle window).

The harness triggers the reaper on demand via the worker's `/debug/cron`
endpoint, so it never waits for the 5-minute reaper cron.

### Notes
- Uses a dedicated, isolated project (`e2e-sessions`) and a throwaway client
(`ignoreCorsAndSecret`), created/upserted automatically under org `openpanel-dev`.
- Each run uses fresh device IPs, so reruns don't collide with prior state.
- Overridable: `E2E_API_URL` (default `:3333`), `E2E_WORKER_URL` (default `:9999`).
- The harness and the stack **must share the same `SESSION_TIMEOUT_MS`** — the
harness derives its idle waits from it.
231 changes: 231 additions & 0 deletions apps/api/e2e/lib.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/**
* Shared building blocks for the session E2E + stress harnesses:
* config, an HTTP track client, the reaper trigger, Redis/ClickHouse helpers,
* fixtures, polling, and a tiny check/report framework.
*/

import { ClientType, chQuery, db, getClientByIdCached } from '@openpanel/db';
import { getRedisCache } from '@openpanel/redis';

// ── Config ──────────────────────────────────────────────────────────────────
export const API_URL = process.env.E2E_API_URL || 'http://localhost:3333';
export const WORKER_URL = process.env.E2E_WORKER_URL || 'http://localhost:9999';
export const ORG_ID = 'openpanel-dev';
export const PROJECT_ID = 'e2e-sessions';
export const CLIENT_ID = 'e2e1e2e1-0000-4000-8000-000000000001';
export const UA =
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36';

export const SESSION_TIMEOUT_MS = Number.parseInt(
process.env.SESSION_TIMEOUT_MS || String(1000 * 60 * 30),
10
);
/** How long to wait for a session to fall outside its idle window before closing. */
export const IDLE_WAIT_MS = SESSION_TIMEOUT_MS + 2000;

export const redis = getRedisCache();
export const runId = Date.now();

export { chQuery };

// ── Redis key helpers ───────────────────────────────────────────────────────
export const sessionKey = (deviceId: string) => `session:${PROJECT_ID}:${deviceId}`;
export const wallclockKey = `session:wallclock:${PROJECT_ID}`;
export const profileKey = (profileId: string) =>
`session:profile:${PROJECT_ID}:${profileId}`;
export const claimKey = (deviceId: string, sessionId: string) =>
`session:end:emitted:${PROJECT_ID}:${deviceId}:${sessionId}`;
/** The session-buffer's Redis list (ground-truth pending CH rows). */
export const SESSION_BUFFER_LIST = 'session-buffer';

export async function getBlob(deviceId: string) {
const raw = await redis.get(sessionKey(deviceId));
return raw ? JSON.parse(raw) : null;
}

// ── Timing ──────────────────────────────────────────────────────────────────
export const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));

/** Poll `fn` until it returns a truthy value or the timeout elapses. */
export async function pollUntil<T>(
fn: () => Promise<T>,
{ timeoutMs = 30_000, intervalMs = 750 } = {}
): Promise<T | null> {
const deadline = Date.now() + timeoutMs;
// biome-ignore lint/nursery/noConstantCondition: poll loop
while (true) {
const value = await fn();
if (value) return value;
if (Date.now() >= deadline) return null;
await sleep(intervalMs);
}
}

// ── Report framework ──────────────────────────────────────────────────────
type Result = { scenario: string; name: string; ok: boolean; detail?: string };
const results: Result[] = [];
let currentScenario = 'setup';

export function scenario(name: string) {
currentScenario = name;
console.log(`\n▶ ${name}`);
}
export function check(name: string, ok: boolean, detail?: string) {
results.push({ scenario: currentScenario, name, ok, detail });
console.log(` ${ok ? '✓' : '✗'} ${name}${detail && !ok ? ` — ${detail}` : ''}`);
}
/** Print the summary and return the number of failed checks. */
export function summarize(): number {
const failed = results.filter((r) => !r.ok);
console.log(`\n${'─'.repeat(60)}`);
console.log(`${results.length - failed.length}/${results.length} checks passed`);
if (failed.length) {
console.log('\nFailures:');
for (const f of failed) {
console.log(` ✗ [${f.scenario}] ${f.name}${f.detail ? ` — ${f.detail}` : ''}`);
}
}
return failed.length;
}

// ── HTTP ────────────────────────────────────────────────────────────────────
export type TrackResponse = { deviceId: string; sessionId: string };

export async function track(body: unknown, ip: string): Promise<TrackResponse> {
const res = await fetch(`${API_URL}/track`, {
method: 'POST',
headers: {
'content-type': 'application/json',
'openpanel-client-id': CLIENT_ID,
'user-agent': UA,
'x-client-ip': ip,
},
body: JSON.stringify(body),
});
if (!res.ok) {
throw new Error(`POST /track ${res.status}: ${await res.text()}`);
}
return (await res.json()) as TrackResponse;
}

export const screenView = (
ip: string,
path: string,
extra: Record<string, unknown> = {}
) =>
track(
{
type: 'track',
payload: {
name: 'screen_view',
properties: { __path: `https://e2e.test${path}`, __ip: ip, ...extra },
},
},
ip
);

/** Run a worker cron on demand via the local /debug/cron endpoint. */
export async function triggerCron(type: string) {
const res = await fetch(`${WORKER_URL}/debug/cron/${type}`, {
method: 'POST',
headers: { accept: 'application/json' },
});
if (!res.ok) {
throw new Error(`trigger cron ${type} ${res.status}: ${await res.text()}`);
}
}

export const triggerReaper = () => triggerCron('sessionReaper');

// ── ClickHouse counting (scoped to a set of session ids for run isolation) ──
const quoteList = (ids: string[]) =>
ids.map((id) => `'${id.replace(/'/g, "")}'`).join(',');

export async function countByName(
sessionIds: string[],
name: string
): Promise<number> {
if (sessionIds.length === 0) return 0;
const rows = await chQuery<{ c: string }>(
`SELECT count() AS c FROM events WHERE project_id = '${PROJECT_ID}' AND name = '${name}' AND session_id IN (${quoteList(sessionIds)})`
);
return Number(rows[0]?.c ?? 0);
}

// ── Fixtures ────────────────────────────────────────────────────────────────
export async function ensureFixtures() {
scenario('setup: project + client');
try {
await db.organization.upsert({
where: { id: ORG_ID },
create: { id: ORG_ID, name: 'OpenPanel Dev' },
update: {},
});
await db.project.upsert({
where: { id: PROJECT_ID },
create: { id: PROJECT_ID, name: 'E2E Sessions', organizationId: ORG_ID },
update: {},
});
await db.client.upsert({
where: { id: CLIENT_ID },
create: {
id: CLIENT_ID,
name: 'e2e',
organizationId: ORG_ID,
projectId: PROJECT_ID,
type: ClientType.write,
ignoreCorsAndSecret: true,
secret: null,
},
update: { ignoreCorsAndSecret: true, projectId: PROJECT_ID },
});
await getClientByIdCached.clear(CLIENT_ID);
check('fixtures ready', true);
} catch (error) {
check('fixtures ready', false, (error as Error).message);
throw error;
}
}

export async function preflight() {
scenario('preflight');
const api = await fetch(`${API_URL}/`)
.then((r) => r.ok || r.status === 404)
.catch(() => false);
check(`api reachable at ${API_URL}`, !!api);
const worker = await fetch(`${WORKER_URL}/debug/cron`)
.then((r) => r.ok)
.catch(() => false);
check(`worker debug reachable at ${WORKER_URL}`, !!worker);
if (!api || !worker) {
throw new Error('Stack not reachable — start it with `pnpm dev` first.');
}
if (SESSION_TIMEOUT_MS > 60_000) {
console.warn(
`\n⚠ SESSION_TIMEOUT_MS=${SESSION_TIMEOUT_MS}ms — this run will be slow.\n` +
' Re-run the stack AND the harness with e.g. SESSION_TIMEOUT_MS=4000.'
);
}
}

/** Run `fn` over `items` with at most `concurrency` in flight. */
export async function runPool<T>(
items: T[],
concurrency: number,
fn: (item: T, index: number) => Promise<void>
): Promise<void> {
let next = 0;
const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
while (true) {
const i = next++;
if (i >= items.length) return;
await fn(items[i]!, i);
}
});
await Promise.all(workers);
}

export async function shutdown(failed: number): Promise<never> {
await redis.quit().catch(() => {});
process.exit(failed ? 1 : 0);
}
Loading
Loading