feat(cubestore): trim worker partial hash aggregate to top-k groups#11024
feat(cubestore): trim worker partial hash aggregate to top-k groups#11024waralexrom wants to merge 1 commit into
Conversation
Add TopKHashAggregateExec: a worker-side partial hash aggregate that trims its output to the query LIMIT before shipping partial states to the router, for `GROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT k`. It reuses DataFusion's GroupValues/GroupsAccumulator building blocks and owns the consume/emit loop (no spill, no early-emit, full table emitted once), so the only fork change is making `new_group_values` public. The trim keeps the k smallest groups by a TOTAL order T = ORDER BY ++ remaining group-by columns when local groups exceed `factor * k` (configurable via ConfigObj / CUBESTORE_PARTIAL_HASH_AGGREGATE_TOPK_FACTOR, default 2). Correctness uses T in two places that must agree: the worker cut and the router's global Sort+Limit (the rewriter appends T's tail to the router sort, which is safe since ORDER BY is a prefix of T). The rewriter only fires on the exact Sort -> Final agg -> ClusterSend/Worker -> Partial hash agg chain and bails on HAVING filters, nested aggregates or computed projections. Points cubestore at the cube-js/arrow-datafusion `cubestore-hash-aggregate-limit` branch for the `pub new_group_values` change.
|
Claude finished @waralexrom's task in 2m 18s —— View job Code Review — PR #11024Verdict: looks solid; 0 high, 2 medium, 3 low — expand for details.
SummaryThe core correctness story (total order The worker still builds the full group table — the win is network bytes, not worker memory. The PR description acknowledges this; just naming it here so the perf framing is clear. Findings (inline)Medium
Low
TestsPlan test covers: trim attaches, |
| continue; | ||
| } | ||
| let name = &group_names[idx]; | ||
| let options = SortOptions::default(); |
There was a problem hiding this comment.
Tie-break sort options are always SortOptions::default() (asc, nulls_first=true) for both the worker trim and the router-sort tail. This is correctness-safe (the appended columns are tie-breakers, the user's ORDER BY prefix is preserved), but it means the previously-unspecified ordering of tied rows now becomes deterministic in a particular direction that doesn't follow the leading ORDER BY direction (e.g. a DESC query produces asc tie-breaks). Worth a one-line comment near here making this contract explicit so the next reader doesn't worry the tail should mirror the leading direction.
| fn descend_to_worker_partial(mut node: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> { | ||
| let mut crossed_boundary = false; | ||
| loop { | ||
| if let Some(agg) = node.as_any().downcast_ref::<AggregateExec>() { | ||
| return (crossed_boundary && *agg.mode() == AggregateMode::Partial) | ||
| .then_some(node.clone()); | ||
| } | ||
| if node.as_any().is::<ClusterSendExec>() || node.as_any().is::<WorkerExec>() { | ||
| crossed_boundary = true; | ||
| node = child(&node)?; | ||
| } else if is_row_passthrough(&node) { | ||
| node = child(&node)?; | ||
| } else { | ||
| return None; | ||
| } | ||
| } |
There was a problem hiding this comment.
crossed_boundary is a bool but the descent doesn't reject crossing a second boundary. Once crossed_boundary is true, the loop happily walks through another ClusterSendExec/WorkerExec (either via this branch — which re-sets the same flag — or via is_row_passthrough, which also treats them as passthrough). In current Cubestore plans nested cluster boundaries shouldn't appear, but if they ever did (e.g. via a future plan shape), this rewriter would still fire on the inner Partial and the "one cut equals global top-k by T" invariant would no longer hold. Either reject the second boundary explicitly (if crossed_boundary { return None; }) or remove ClusterSendExec/WorkerExec from is_row_passthrough so the only path across them is this controlled branch.
| } | ||
| let name = &group_names[idx]; | ||
| let options = SortOptions::default(); | ||
| let sort_col_idx = sort_input_schema.index_of(name).ok()?; |
There was a problem hiding this comment.
sort_input_schema.index_of(name) resolves by name on the Final aggregate's output schema. Group columns come first in DataFusion's aggregate output so this is correct in practice, but it's fragile if an aggregate output is ever aliased to the same name as a group-by column (e.g. SUM(day) AS day GROUP BY day — unusual but legal in some plans) — index_of returns the first match, which is what we want here, but the invariant is implicit. A comment asserting "group cols are at the prefix of Final's output schema" would be cheap insurance.
| fn aggregate_expressions( | ||
| aggr_expr: &[Arc<AggregateFunctionExpr>], | ||
| _col_idx_base: usize, | ||
| ) -> DFResult<Vec<Vec<Arc<dyn PhysicalExpr>>>> { |
There was a problem hiding this comment.
_col_idx_base is unused. Either remove the parameter or use it (the DataFusion original threads num_group_exprs into argument resolution for AggregateMode::Partial to offset column indices, and dropping it without comment is easy to misread as a port bug). Minor.
| Ok(Box::pin(stream)) | ||
| } | ||
|
|
||
| fn metrics(&self) -> Option<MetricsSet> { |
There was a problem hiding this comment.
No metrics are exposed (metrics() returns None). Production diagnosis of "did the trim actually fire on this worker and by how much?" will be a lot easier with even minimal MetricsSet counters (pre_trim_groups, emitted_groups, trim_triggered). The current implementation builds the full table either way, so users can't tell from query metrics whether factor is well-tuned or whether the rewriter even matched.
Summary
Lets the worker-side partial hash aggregate trim its output to the query
LIMITbefore shipping partial states to the router, so far fewer rows cross the network forGROUP BY <non-index-prefix> ORDER BY <subset of group-by> LIMIT kqueries. The router stays unchanged in shape (Final + Sort + Limit).Changes
TopKHashAggregateExec(+ stream) inqueryplanner/topk_aggregate/: a worker-side partial hash aggregate that builds the full group table and, at emit, keeps only theksmallest groups by a total orderT = ORDER BY ++ remaining group-by columnswhen local groups exceedfactor * k. It reuses DataFusion'sGroupValues/GroupsAccumulatorand owns the consume/emit loop (no spill / no early-emit), so the only fork change is makingnew_group_valuespublic.optimizations/topk_aggregate_rewriter.rs: recognizes the exactSort(/Limit) → Final agg → ClusterSend/Worker → Partial hash aggchain (bails on HAVING filter / nested aggregate / computed projection), replaces the partial withTopKHashAggregateExec, and also appendsT's tie-break tail to the router's globalSortso the router selects by the same total order (required for correctness; safe becauseORDER BYis a prefix ofT).partial_hash_aggregate_topk_factor(default 2, envCUBESTORE_PARTIAL_HASH_AGGREGATE_TOPK_FACTOR), threadedConfigObj → QueryExecutorImpl → PreOptimizeRule.cubestore-hash-aggregate-limitfork branch (one-linepub new_group_valueschange).Note
Depends on the cube-js/arrow-datafusion PR that makes
new_group_valuespublic (this branch'sCargo.tomlpoints at the fork branchcubestore-hash-aggregate-limit). Retarget to the merged ref before merging.Testing
cargo test -p cubestore-sql-tests --test in-process: 171 passed, 0 failed.planning_topk_hash_aggregate(plan): trim attaches;k = limit + offset; no trim when ordering by an aggregate / without a limit; subsetORDER BYtotalizes the worker cut and extends the router sort with the tie-break column.topk_hash_aggregate_trim(results): ASC/DESC full-key + proper-subsetORDER BY(asserted order-independently).cluster(multi-worker) andmigrationsuites in CI.