Skip to content

feat(cubestore): EXPLAIN ANALYZE DETAILED per-query trace#11016

Open
waralexrom wants to merge 21 commits into
masterfrom
cubestore-datailed-analyze
Open

feat(cubestore): EXPLAIN ANALYZE DETAILED per-query trace#11016
waralexrom wants to merge 21 commits into
masterfrom
cubestore-datailed-analyze

Conversation

@waralexrom
Copy link
Copy Markdown
Member

Summary

Adds EXPLAIN ANALYZE DETAILED <select> to CubeStore: a per-query trace that decomposes a real query execution across the whole topology (entry → main → workers → select subprocess) into planning, metastore, transport, execution, serialization and memory — so prod bottlenecks (where local ≠ prod) can be found without blind per-query tracing in steady state.

The trace is gated: it only runs for the explicit DETAILED command and is a no-op on the normal query path.

Changes

  • Trace module (trace.rs): OpSample (aggregated by kind/label, with bytes/rows/is_wrapper) + typed per-region structs (RouterTrace/MainTrace/WorkerTrace/SubprocessTrace/QueryTrace); per-query TraceCtx behind a task-local gate; OpGuard captures time only when a detailed query is in flight.
  • Real battle path: new NetworkMessage variants RouterSelectDetailed/SelectDetailed (hot Select/SelectResult untouched — flexbuffers tolerates a trailing field only new→old, so changing a hot message would break rolling deploys). The main runs the real router plan; a WorkerTraceCollector rides the DataFusion TaskContext so ClusterSendExec records per-worker traces while rows still flow into the merge (no new plan node).
  • Measurements: metastore calls by method (via a #[cuberpc::service(trace_guard=…)]-generated TracedMetaStore decorator, both DI sites); router planning phases; per-node DataFusion metrics (elapsed_compute/output_rows) on main and workers; per-query MemoryPool peak on main and subprocess; serialized byte volume; executed physical plan text; per-boundary transport (wire+queue) = round-trip − child wall.
  • Output: rendered as a compact nested tree with per-node totals and a bottom category summary (overall + per node); durations human-readable (us/ms/s), bytes (B/KB/MB). Wrapper spans (*.execute, round-trips, choose_index) are excluded from the summary so categories don't double-count.

Testing

  • cargo test -p cubestore --lib explain_analyze_detailed — smoke test on a router+worker cluster asserting the trace contains every level + the category summary.
  • Verified end-to-end on a local 2-worker stend: the trace populates all levels with sensible decomposition (e.g. the metastore hot reads getActivePartitionsAndChunksByIndexIdForSelect / getMultiPartitionSubtree appear under the entry node's planning).

🤖 Generated with Claude Code

@waralexrom waralexrom requested a review from a team as a code owner June 5, 2026 09:57
@github-actions github-actions Bot added cube store Issues relating to Cube Store rust Pull requests that update Rust code labels Jun 5, 2026
waralexrom added 20 commits June 5, 2026 12:00
End-to-end per-query trace plumbing across the network and IPC boundaries:
- trace module: OpKind/OpSample aggregatable measurements + typed exec
  fields, per-query TraceCtx behind a task-local gate (OpGuard captures
  Instant + Weak<TraceCtx> only when a detailed query is in flight).
- IPC: WorkerMessage::Select carries a detailed flag; the subprocess
  Response returns an Option<SubprocessTrace>; stage guards on the single
  select code path are no-ops without an active ctx.
- network: NetworkMessage::AnalyzeDetailed/Result + Cluster::run_analyze_detailed,
  executed for real through the select subprocess.
- command: parse EXPLAIN ANALYZE DETAILED, assemble QueryTrace on the entry
  node and render it to a DataFrame.

Measurement points (metastore, memory, physical plan) land in follow-ups.
- cuberpc generates variant_name() on the *RpcMethodCall enum for cheap
  per-method labels.
- ClusterMetaStoreClient::invoke_method records an OpKind::Metastore sample
  keyed by method name.
- explain_detailed scopes the planning phase (logical_plan) so choose_index
  metastore calls are captured.
…decorator

- cuberpc #[service(trace_guard = path)] generates a Traced<Service> decorator
  that wraps Arc<dyn Service> and holds <path>(method) across each async call,
  keeping cuberpc decoupled from the host crate's tracing module.
- MetaStore is wrapped at both DI sites (remote MetaStoreRpcClient and local
  RocksMetaStore), so metastore calls are traced by method name under an active
  trace ctx on any node, RPC or local. The concrete RocksMetaStore stays
  retrievable by type.
- Drops the now-redundant invoke_method guard (the decorator supersedes it).

variant_name() is generated for the upcoming transport/access split.
Add OpKind::Planning and phase guards inside logical_plan (session_context,
statement_to_plan, optimize, choose_index) and explain_detailed
(plan.serialize, router_physical_plan). Under an active trace ctx the router
planning breaks down into phases, with metastore calls interleaved as separate
Metastore samples; no-ops on the normal query path.
…t path

EXPLAIN ANALYZE DETAILED now executes the full query on a real main worker
(random, like prod) instead of a fan-out, so the final stages run where they
actually happen.

- new message variants RouterSelectDetailed/SelectDetailed (+ *Result); hot
  Select/SelectResult untouched (flexbuffers tolerates a trailing field only
  new->old, so modifying a hot message would break rolling deploys).
- main runs the real router plan via execute_router_plan_detailed; a
  WorkerTraceCollector rides the DataFusion TaskContext so ClusterSendExec
  records per-worker traces while rows still flow into the merge (no new plan
  node, task-local would not survive DF task spawning).
- workers run their part for real and return rows + WorkerTrace.
- MainTrace + QueryTrace { router, main }; render gains main level.
- main.router_physical_plan guard; obsolete AnalyzeDetailed path removed.

MemoryPool peak and per-node DataFusion metrics on the main are the next step.
Add TrackingMemoryPool (an unbounded MemoryPool that records the peak of operator
reservations) and run the detailed router-plan execution under it via a per-query
RuntimeEnv. The peak is isolated from concurrent queries and lands in
MainTrace.exec_memory_peak_bytes — the finalization memory of the query.

- queryplanner: make_execution_context_with_runtime / minimal_session_state_*_with_runtime
  let a caller inject a RuntimeEnv (default unchanged for existing callers).
- covers operator-reserved memory (sort/aggregate/join), not every allocation.

Subprocess (worker) memory peak and per-node DF metrics are the next steps.
execute_worker_plan takes an optional TrackingMemoryPool; when a detailed query
runs, the subprocess builds its RuntimeEnv with it and reports the peak into
SubprocessTrace.exec_memory_peak_bytes. None on the normal path (default runtime,
no overhead).
After the detailed router-plan execution, walk the physical plan and record each
node's elapsed_compute into MainTrace.ops as OpKind::Execution samples keyed by
node type (aggregated: summed time + node count). Surfaces where time goes in the
final stages (aggregation/sort/merge) beyond the single main.execute bucket.
Adds trace::record_op for pre-measured samples.
execute_worker_plan records per-node elapsed_compute of the worker subplan into
the subprocess trace (OpKind::Execution) when running detailed, mirroring the
main side. Gives per-worker execution breakdown (scan/filter/partial-agg).
SerializedRecordBatchStream::byte_size + OpGuard::set_bytes on result.serialize
and chunks.serialize, so the trace shows transport data volume (bytes) alongside
time for the serialize stages.
OpSample gains a rows field (aggregated like bytes); record_plan_node_metrics
records DataFusion output_rows per node on both main and workers, rendered as a
rows column. Surfaces nodes processing unexpectedly many rows.
The main (router plan) and each worker (subplan) stash their pp_phys_plan text
through the trace ctx into MainTrace/SubprocessTrace.physical_plan, rendered as a
Plan row. Surfaces the plan tree shape for understanding heavy customer queries.
Routed through the ctx, so no execute_worker_plan return-type change.
…trip

Each region records its total wall time (SubprocessTrace/WorkerTrace/MainTrace
.total_us); ClusterSendExec times the main->worker round-trip into
WorkerTrace.net_roundtrip_us. These let transport be derived as round-trip minus
the child's wall (the missing main->worker leg is now covered).
…nalyze

Compute transport (wire + queue) = round-trip - child wall for each boundary and
emit Transport rows: transport.entry_to_main, transport.main_to_worker (per
worker), transport.ipc (per worker). saturating_sub guards measurement skew.
Replace the wide flat table with a single-column nested tree (router -> main ->
worker -> subprocess): per-node measurements as aligned label/value lines, plan
as an indented block. Far more readable for manual inspection.
…durations

Detailed-analyze tree now opens with a per-category summary (sum of elapsed by
kind, sorted desc, wrappers excluded to avoid double counting), tags every line
with its category, and formats durations as us/ms/s and bytes as B/KB/MB.
… per-node breakdown

Each node header now shows its total wall time; the category summary moves below
the tree and gains an overall block plus per-node (router/main/workers)
category breakdowns.
…from summary

OpSample gains is_wrapper, set via OpGuard::start_wrapper at the span that contains
other measured ops (round-trips, *.execute, plan.choose_index). The summary filters
on the flag instead of matching labels by string across files, and choose_index is
now a wrapper so its nested metastore calls are no longer double-counted in Planning.
Categories now form a non-overlapping partition.
Runs the command on a router+worker test cluster and asserts the single trace
cell contains every level plus the category summary (router/main/worker,
Planning/Metastore/Execution/Transport, transport.* and plan:). The test harness
has no select subprocess pool, so the subprocess section is not asserted.
DETAILED is a superset of EXTENDED (per-node DataFusion metrics on main and
workers, plus metastore/transport/planning/memory). Drops the EXTENDED command,
its parser arm/dispatch, and the execute flag threaded through ExplainAnalyze /
run_explain_analyze / pp_worker_plan; plain EXPLAIN ANALYZE returns to non-executing
(plan-only) as before. parse test repurposed to cover DETAILED + plain EXPLAIN.
@waralexrom waralexrom force-pushed the cubestore-datailed-analyze branch from 4297c0e to 4f84a78 Compare June 5, 2026 10:18
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Jun 5, 2026

Claude finished @waralexrom's task in 5m 15s —— View job


Code review — EXPLAIN ANALYZE DETAILED

Overall this is a solid piece of plumbing: the gating via task-local TRACE + OpGuard makes the trace cheap on the hot path, the WorkerTraceCollector ride‑along through TaskContext is a clean way to keep ClusterSendExec unchanged, and the proc‑macro‑generated TracedMetaStore decorator gives consistent metastore samples on both the local and remote paths. Fidelity caveats (non‑streaming on the detailed path, TrackingMemoryPool being unbounded) are documented in code where they bite.

Verdict: 0 high, 1 medium, 4 low.

Findings & checklist
  • Read diff and key files
  • Check for stale prior threads (none)
  • Analyze trace.rs, sql/mod.rs, cluster/mod.rs, query_executor.rs, message.rs, parser.rs
  • Post inline comments
  • Final summary

Findings (linked to inline comments):

Severity Location Issue
Medium cluster/mod.rs:1544-1565 In-process worker fallback silently drops the detailed flag — no subprocess · region, no memory pool, no node metrics. Smoke test runs through this path and masks the gap.
Low cluster/message.rs:26-37 NETWORK_MESSAGE_VERSION not bumped despite new variants. By‑name repr keeps existing variants working; old binaries receiving a new variant fail with a generic deser error instead of the friendlier protocol‑version mismatch.
Low query_executor.rs:280-283 record_plan_node_metrics walks ClusterSendExec too. Its elapsed_compute includes time waiting on workers — folded into main's Execution category while also derived as Transport. Soft double‑count.
Low trace.rs:120-139 OpSample::push is O(N) scan-and-merge under the mutex. Fine today; flagging because record_plan_node_metrics could land many distinct labels in wide plans.
Low sql/mod.rs:5538-5553 Smoke test only asserts marker substrings — an empty workers vec or zero totals would still pass. A couple of structural assertions would catch silently-empty traces.

Things that look right and were worth checking:

  • Task-local TRACE correctly nested across the worker→subprocess scope (outer worker ctx and inner subprocess ctx are distinct, so worker ops aren't double-recorded by subprocess guards).
  • OpGuard holds a Weak<TraceCtx> so guards that outlive the scope silently drop instead of panicking.
  • TracedMetaStore is wired at both DI sites (remote MetaStoreRpcClient and local RocksMetaStore), so Metastore samples are emitted from whichever path the worker actually uses.
  • The is_wrapper flag correctly excludes span containers (ipc.select, route_select_detailed, subprocess.execute, plan.choose_index, main.execute) from the category summary while still showing them in the tree.
  • Per-query TrackingMemoryPool lives inside the query's own RuntimeEnv, so the peak is isolated from concurrent queries sharing the process.
  • The PR's wire-compatibility note about flexbuffers (re: trailing struct fields, not enum variants) was investigated — adding enum variants is fine because the default serde repr is by name, not by index.
· branch `cubestore-datailed-analyze`

Comment on lines 26 to +37

/// Detailed-trace mirror of [RouterSelect]: the entry node asks a main worker to
/// run the full router plan for real and return the assembled `MainTrace`.
RouterSelectDetailed(SerializedPlan),
RouterSelectDetailedResult(Result<MainTrace, CubeError>),

/// Detailed-trace mirror of [Select]: a worker runs its part for real and returns
/// both the result rows (for the main to merge) and its `WorkerTrace`.
SelectDetailed(SerializedPlan, WorkerPlanningParams),
SelectDetailedResult(
Result<(SchemaRef, Vec<SerializedRecordBatchStream>, WorkerTrace), CubeError>,
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NETWORK_MESSAGE_VERSION is still 1 even though NetworkMessage gained four new variants. Serde's default externally‑tagged repr keys by variant name, so existing variants keep working across a rolling deploy — but an old binary that receives one of the new variants will fail deserialization with a generic "unknown variant" error rather than the friendlier protocol-version mismatch on line 137. Since EXPLAIN ANALYZE DETAILED is user-initiated and opt-in this is unlikely to bite in practice, but worth either (a) bumping the version to make the failure mode clear, or (b) leaving a comment here noting that adding variants is wire‑compatible only because the repr is by name.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude We have never bumped the protocol version before. In this case, we preserve both compatibility on the production path and consistency during a rolling deployment across the cluster (which would actually be disrupted by a version bump). So it seems that bumping the version is not justified here.

Copy link
Copy Markdown
Contributor

@claude claude Bot Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @waralexrom's task in 39s —— View job


Re: bumping NETWORK_MESSAGE_VERSION

You're right, and I'll withdraw the suggestion to bump it. Your reasoning is actually stronger than my original note gave it credit for.

The check at message.rs:136 is a strict equality:

let ver = socket.read_u32().await?;
if ver != NETWORK_MESSAGE_VERSION {
    return Err(CubeError::user(format!("Network protocol version mismatch. ...")));
}

So bumping 1 → 2 doesn't just make the new *Detailed variants fail more clearly — it makes every message between a v1 node and a v2 node fail, including the hot Select / RouterSelect / MetaStoreCall traffic. During a rolling deploy the cluster is mixed-version by definition, so a bump would take the whole cluster down for the rollout window. That's the opposite of what the version field should protect.

Staying at 1 is correct here:

  • Existing variants stay wire-compatible because serde's flexbuffers repr is externally-tagged by name, not by index — a mixed-version cluster keeps talking on the production path.
  • New *Detailed variants only ever cross the wire when a user explicitly runs EXPLAIN ANALYZE DETAILED, which is opt-in and not part of steady-state or rolling-deploy traffic. The only degenerate case (an old binary receiving a new variant) yields a generic "unknown variant" deser error instead of the friendlier mismatch message — an acceptable trade for not breaking rolling deploys.

So: no version bump. The only thing I'd still gently suggest — purely optional — is a one-line comment near the version constant or the new variants recording why it's safe to add variants without bumping (the by-name repr + opt-in path), so the next person doesn't re-derive this or "helpfully" bump it. Happy to add that if you want it; otherwise this is resolved on my end.
· branch cubestore-datailed-analyze

Comment on lines +1544 to +1565
async fn run_local_select_detailed_worker(
&self,
plan_node: SerializedPlan,
worker_planning_params: WorkerPlanningParams,
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>, WorkerTrace), CubeError> {
let ctx = crate::trace::TraceCtx::new();
let node_name = self.server_name.clone();
let started = std::time::Instant::now();
let (schema, records, subprocess) = crate::trace::scoped(Some(ctx.clone()), async {
self.run_local_select_worker_impl(plan_node, worker_planning_params, true)
.await
.map(|(schema, records, _size, subtrace)| (schema, records, subtrace))
})
.await?;
let worker_trace = WorkerTrace {
node_name,
total_us: started.elapsed().as_micros() as u64,
net_roundtrip_us: None,
ops: ctx.take_ops(),
subprocess,
};
Ok((schema, records, worker_trace))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the select subprocess pool is disabled (non-Linux / unconfigured), run_local_select_worker_impl silently drops the detailed flag in its fallback branch: it calls execute_worker_plan(..., None) and returns subprocess = None. The user gets a tree missing the subprocess · region with no indication the trace was degraded — distinguishing "no subprocess pool" from "subprocess didn't run" requires reading the source.

Reasonable fixes:

  • Pass a TrackingMemoryPool through the fallback branch too — execute_worker_plan already records per-node metrics + plan text when memory_pool.is_some(), so the subprocess section can be populated even without the pool.
  • Or, when degraded, surface a one-line note in the trace (e.g. a synthetic op or a header) explaining the subprocess pool wasn't available.

The PR's smoke test runs in this exact path and doesn't assert the subprocess · marker, which currently masks the gap.

Comment on lines +280 to +283
}
// Harvest per-node DataFusion metrics of the final stages (router-level nodes
// above ClusterSend), aggregated by node type into the active trace.
record_plan_node_metrics(&physical_plan);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record_plan_node_metrics walks the entire main physical plan, including ClusterSendExec itself. ClusterSendExec's elapsed_compute includes the wall time spent waiting on workers (the run_select_detailed future runs inside its execute), so it gets folded into the main's Execution category — while that same wait time is also accounted for via the per-worker net_roundtrip_usTransport derivation in the summary.

That's a soft double-count for the overall total. Consider either marking ClusterSendExec so record_plan_node_metrics skips it, or flagging the recorded Execution/ClusterSendExec sample as a wrapper so it's tree-visible but excluded from the summary.

Comment on lines +120 to +139
fn push(&self, sample: OpSample) {
let mut ops = self.ops.lock().unwrap();
if let Some(existing) = ops
.iter_mut()
.find(|o| o.kind == sample.kind && o.label == sample.label)
{
existing.elapsed_us += sample.elapsed_us;
existing.count += sample.count;
existing.bytes = match (existing.bytes, sample.bytes) {
(Some(a), Some(b)) => Some(a + b),
(a, b) => a.or(b),
};
existing.rows = match (existing.rows, sample.rows) {
(Some(a), Some(b)) => Some(a + b),
(a, b) => a.or(b),
};
} else {
ops.push(sample);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Linear scan-and-merge inside the mutex. For typical traces (~tens of ops) this is fine, but a query that records many distinct (kind, label) keys (e.g. a wide physical plan with many node types via record_plan_node_metrics) becomes quadratic and holds the mutex across the whole scan. A small HashMap<(OpKind, &str), usize> index built lazily on first push would keep this O(1) per insert and remove the cost concern for larger plans.

Not blocking — flagging in case detailed traces ever land in a path that records hundreds of samples.

Comment on lines +5538 to +5553
// Smoke check: the whole path produced the levels + the summary.
// (The test harness runs the worker in-process, without the select
// subprocess pool, so no `subprocess ·` section here.)
for marker in [
"summary by category",
"router",
"Metastore",
"main \u{b7}",
"worker \u{b7}",
"Planning",
"Execution",
"transport.",
"plan:",
] {
assert!(trace.contains(marker), "trace missing '{}':\n{}", marker, trace);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The smoke test only asserts marker presence — it never checks that the numbers are sensible. A regression that leaves MainTrace.total_us == 0, an empty workers vector, or a malformed transport line would still pass as long as the literal substrings appear.

Consider adding a couple of cheap structural assertions: e.g. parse out at least one total <duration> and assert it's >0, or grep for main · 127.0.0.1:14016 and worker · 127.0.0.1:14016 rather than the generic main \u{b7} / worker \u{b7} substrings. It's still a smoke test, but it would catch silently-empty traces.

@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 5, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.57%. Comparing base (adc8d26) to head (be0f17f).
⚠️ Report is 5 commits behind head on master.

Additional details and impacted files
@@           Coverage Diff           @@
##           master   #11016   +/-   ##
=======================================
  Coverage   58.57%   58.57%           
=======================================
  Files         216      216           
  Lines       17181    17181           
  Branches     3502     3502           
=======================================
  Hits        10064    10064           
  Misses       6609     6609           
  Partials      508      508           
Flag Coverage Δ
cube-backend 58.57% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

…dule

Move explain_detailed (orchestration) and render_query_trace (pure QueryTrace ->
DataFrame rendering) out of the oversized sql/mod.rs into sql/explain_detailed.rs.
render stays out of trace.rs to keep that a DataFrame-free serde leaf.
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Jun 5, 2026

Claude finished @waralexrom's task in 1m 12s —— View job


Code review — incremental (synchronize)

Only one new commit since the prior review: be0f17f — a pure code-extraction refactor moving explain_detailed orchestration and render_query_trace out of sql/mod.rs into a new sql/explain_detailed.rs. No behavioral change.

Verdict on the new commit: 0 findings. The split is clean — trace.rs stays a DataFrame-free serde leaf, the orchestration is self-contained, and the smoke test (now at sql/mod.rs:5229, same code) is untouched.

Status of prior threads
  • Re-affirmed: 5 prior unresolved threads still apply (code was moved or unchanged):
  • No new inline comments needed — refactor introduces no new issues.
· branch `cubestore-datailed-analyze`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cube store Issues relating to Cube Store rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant