Skip to content

feat(cubestore): execute sorted partial aggregate per partition below the merge, propagate LIMIT into it#10995

Open
waralexrom wants to merge 7 commits into
masterfrom
cubestore-inline-aggregate-optimization
Open

feat(cubestore): execute sorted partial aggregate per partition below the merge, propagate LIMIT into it#10995
waralexrom wants to merge 7 commits into
masterfrom
cubestore-inline-aggregate-optimization

Conversation

@waralexrom
Copy link
Copy Markdown
Member

Summary

For prefix GROUP BY queries the dominant worker cost was SortPreservingMergeExec merging all raw rows, because the sorted partial aggregate sat above the merge. This PR pushes the sorted partial aggregate below the merge (so the merge carries reduced partial states instead of raw rows) and propagates LIMIT down to the aggregate so it emits early and stops scanning (previously the sorted aggregate's emit threshold was a hard batch_size = 4096, so any result under 4096 groups read its entire input before emitting anything and LIMIT could not short-circuit).

Measured on a 10M-row stand: the worker step of a 1000-group prefix GROUP BY is ~3.7x faster (357ms → 97ms) with no RSS increase; GROUP BY ... ORDER BY <prefix> LIMIT 10 goes from ~365ms to ~14ms (×28).

Changes

  • push_sorted_partial_aggregate_below_merge (pre-optimize pass): rewrites PartialAggregate(Sorted) → SortPreservingMerge → source(N partitions) into SortPreservingMerge → per-partition PartialAggregate(Sorted). Only sorted streaming aggregates are pushed — they hold O(1) accumulators per partition, while a hash aggregate would multiply its O(num_groups) memory by the partition count. Duplicate group keys across partitions are combined by the router's Final aggregate, same as partial states from different workers.
  • InlineAggregateStream honors a group limit with a strict contract: emit-early threshold becomes min(batch_size, remaining), only closed groups are emitted, the backlog from large input batches is drained in chunks, the final emit is clamped, and input reading stops as soon as the limit is reached.
  • add_limit_to_workers is rewritten as a limit descent: the worker limit descends through sort preserving merges and lands directly above the first aggregate as a per-partition row limit (LocalLimit forward / TailLimit reverse / GlobalLimit for a single partition), additionally passing the limit into InlineAggregateExec for the early input stop. A row limit never sits above a merge of per-partition partial aggregates (it would truncate a group's partial states across duplicate keys and silently corrupt totals), and the safety is derived from the plan shape rather than from which pass produced it.
  • TailLimitExec: keeps a sliding window of trailing batches (O(limit) memory) instead of collecting its whole input, returns the last limit rows of each input partition, and declares maintains_input_order.

Testing

  • Unit tests (TDD for the result-integrity parts): stream limit contract (closed groups only at batch boundaries, exact limit under backlog, early input stop), the push rewrite (sorted pushed, hash/non-partial/fetch/single-partition not pushed, result equality with cross-partition duplicate keys), the limit descent (merged inline and raw aggregates, reverse tails, single partition), TailLimit sliding window and multi-partition tails.
  • SQL integration tests: prefix GROUP BY with overlapping keys across union partitions with/without LIMIT, DESC, ORDER BY <aggregate>; 7500 groups (above the 4096 batch size) with LIMIT 10/LIMIT 5000/DESC; plan shape assertions for the new MergeSort → LocalLimit → InlinePartialAggregate, limit and MergeSort → TailLimit → InlinePartialAggregate forms.
  • Full suites green: cubestore lib, in-process, cluster, migration, multi-process (over_10k_join is flaky on master under parallel load too — verified 4/5 failures on both master and this branch under the same load, unrelated).

… LIMIT into it

Rewrite the worker plan PartialAggregate(Sorted) -> SortPreservingMerge into
SortPreservingMerge -> per-partition PartialAggregate(Sorted), so the merge
carries reduced partial states instead of all raw rows. Only sorted streaming
aggregates are pushed: they hold O(1) accumulators per partition, while a hash
aggregate would multiply its O(num_groups) memory by the partition count.

Because the merged stream now contains duplicate group keys from different
partitions, a plain worker row limit could truncate a group's partial states
and silently corrupt its total. add_limit_to_workers turns the worker limit
into a per-partition group limit on the aggregate plus a widened row budget
(limit * partitions) on the merge (TailLimit for the reverse case).

InlineAggregateStream honors the group limit: the emit-early threshold becomes
min(batch_size, remaining limit) instead of a hard batch_size (4096), only
closed groups are emitted, and input reading stops once the limit is reached,
so a downstream LIMIT short-circuits the scan.
…ndow

TailLimitStream collected its whole input to take the tail, materializing all
worker rows for reverse limits; now it keeps a sliding window of trailing
batches covering 'limit' rows, newer rows displace older ones.

TailLimitExec returns the last 'limit' rows of each input partition instead of
requiring a single one. The reverse worker limit above merged per-partition
partial aggregates becomes a per-partition tail below the merge: within a
partition the aggregate emits unique group keys, so 'limit' rows there are
'limit' complete groups and the merge carries at most 'limit' rows per
partition instead of all groups with cross-partition duplicates. Groups beyond
the last 'limit' may arrive with partial totals, but the router orders by the
group key and its own limit drops them.
… first aggregate

InlineAggregateStream could emit more groups than its limit: emit-early ran
once per input batch while a single batch can bring an arbitrary group
backlog, and the final emit was unclamped. Now the backlog is drained in emit
threshold chunks at the top of the poll loop (also stops reading input as
early as possible) and the final emit is clamped by the remaining limit.

add_limit_to_workers is rewritten as a limit descent: probe from the worker
input through sort preserving merges to the first aggregate and place a
per-partition row limit directly above it (LocalLimit forward, TailLimit
reverse, GlobalLimit for a single partition), additionally passing the limit
into InlineAggregateExec for the early input stop. Within a partition the
aggregate emits unique group keys, so the row limit cuts at group boundaries;
correctness no longer depends on which pass produced the plan shape, and a
merge of per-partition partial aggregates never gets a row limit above it --
the fetch = limit * partitions widening is gone. Plans without aggregation
keep the plain top-level limit.

TailLimitExec now declares maintains_input_order, matching its passthrough
properties.
@waralexrom waralexrom requested a review from a team as a code owner June 2, 2026 15:15
@github-actions github-actions Bot added cube store Issues relating to Cube Store rust Pull requests that update Rust code labels Jun 2, 2026
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Jun 2, 2026

Claude finished @waralexrom's task in 3m 19s —— View job


Code Review — verdict: ship after addressing notes · 0 high, 0 medium, 4 low

Full review (click to expand)

Checklist

  • Read PR diff
  • Review inline_aggregate_stream.rs limit contract
  • Review distributed_partial_aggregate.rs push & limit descent
  • Review tail_limit.rs sliding window
  • Review tests
  • Post findings

Summary

The optimization is well-motivated and the implementation is careful in the places that matter most for correctness:

  • Pushing the sorted partial aggregate below the merge is gated tightly (Partial, Sorted, single group expr, no fetch on merge, partition_count > 1, re-check Sorted on the rewritten agg). The cross-partition duplicate-key path is handled by the router's Final aggregate, same as cross-worker partial states — that's the right invariant and it's tested directly in pushes_sorted_partial_aggregate_below_merge.
  • The limit-descent rule (first_aggregate_below_mergesLocalLimit/TailLimit/GlobalLimit above the first aggregate, never above a merge of per-partition partial states) prevents the row-limit-truncates-partial-states corruption described in the PR body. Plan-shape based, not pass-provenance based — good.
  • InlineAggregateStream limit contract: only closed groups emitted, backlog drained in chunks, exact clamp on the final emit, input reading stops once the limit is reached. All four invariants have dedicated tests (limit_emits_only_closed_groups, limit_holds_when_one_batch_overshoots_it, limit_results_match_no_limit_prefix, limit_stops_reading_input via CountingExec).
  • TailLimitExec correctly switches from "collect everything" to a sliding-tail window keyed by row count; multi-partition behavior is tested.

Inline findings (all low severity, none blocking)

  1. distributed_partial_aggregate.rs:305first_aggregate_below_merges only walks SPM nodes; any ProjectionExec/CoalescePartitionsExec between worker and merge silently disables early-stop. Worth documenting or extending.
  2. inline_aggregate/mod.rs:121with_limit does not refresh cache/statistics; statistics().num_rows won't reflect the bound. Fine for the current pipeline, but worth a comment so it's not "fixed" later in a way that breaks something else.
  3. tail_limit.rs:135 — sliding-window pop condition is correct but doesn't slice a large first batch eagerly, so the O(limit) memory claim is only true when batches are reasonably small. Bounded in practice by batch_size.
  4. inline_aggregate_stream.rs:329 — defensive threshold == 0 short-circuit duplicates the guard already provided by limit_reached(). Keep it, but a one-line note avoids a future "simplification".

Other observations

  • Nit: TailLimitExec::name(&self) -> &str is inconsistent with the &'static str used elsewhere in this codebase (InlineAggregateExec, etc.) — pre-existing, ignore unless touching the file.
  • push_aggregate_to_workers still carries the pre-existing // TODO upgrade DF: WorkerExec limit_and_reverse must be wrong here — out of scope for this PR; flagging only so it's not lost.
  • Test coverage is solid across both unit and SQL-integration layers; plan-shape assertions on the new MergeSort → LocalLimit → InlinePartialAggregate, limit form are exactly the right thing to lock in.
· [Branch](https://github.com/cube-js/cube/tree/cubestore-inline-aggregate-optimization)

Comment thread rust/cubestore/cubestore/src/queryplanner/inline_aggregate/mod.rs
Comment thread rust/cubestore/cubestore/src/queryplanner/tail_limit.rs
…ognized shapes

The limit descent now looks through ProjectionExec: a row limit is a plain
count and commutes with column renames. Without this, a projection between
the worker and the merge would route the limit into the generic fallback,
whose LimitPushdown descends through projections into merges and could place
a row fetch above a duplicate-bearing merge of per-partition partial states.

For shapes the descent doesn't recognize, the fallback now refuses to add a
worker limit at all when the subtree contains a per-partition partial
aggregate: skipping the limit is always correct, the router applies the real
one.
@codecov
Copy link
Copy Markdown

codecov Bot commented Jun 2, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 58.50%. Comparing base (35bed42) to head (21fb7d4).
⚠️ Report is 33 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (35bed42) and HEAD (21fb7d4). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (35bed42) HEAD (21fb7d4)
cubesql 1 0
Additional details and impacted files
@@             Coverage Diff             @@
##           master   #10995       +/-   ##
===========================================
- Coverage   78.81%   58.50%   -20.31%     
===========================================
  Files         470      216      -254     
  Lines       93438    17151    -76287     
  Branches     3466     3494       +28     
===========================================
- Hits        73644    10035    -63609     
+ Misses      19291     6607    -12684     
- Partials      503      509        +6     
Flag Coverage Δ
cube-backend 58.50% <ø> (+0.07%) ⬆️
cubesql ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

…h size

A batch covering the whole window replaces it on arrival, cut to the last
'limit' rows; smaller batches go through the usual suffix eviction. The
window now never holds more than 2 * limit rows regardless of input batch
sizes (previously a single large batch stayed in the window whole until the
end of the input).
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Jun 2, 2026

Claude finished @waralexrom's task in 1m 26s —— View job


Code Review — verdict: ship · 0 high, 0 medium, 0 low (all prior findings addressed)

Full review (click to expand)

Checklist

  • Read incremental diff since prior review
  • Verify projection-descent fix in distributed_partial_aggregate.rs
  • Verify tail-window bounding in tail_limit.rs
  • Verify stream-limit doc + emit_remaining refactor in inline_aggregate_stream.rs
  • Verify with_limit statistics comment in inline_aggregate/mod.rs
  • Resolve stale review threads (all four)

Summary

All four low-severity findings from the prior review (15:18Z) are addressed across 7535ad3, 2b78a19, fe1ebe8, and 8962d63. Each fix went a bit further than a doc-comment patch, which is good:

  1. Projection descent (distributed_partial_aggregate.rs): first_aggregate_below_merges and limit_above_first_aggregate now look through ProjectionExec as well as SortPreservingMergeExec, with a comment that the row limit is a plain count and commutes with column renames. Additionally a contains_multi_partition_partial_aggregate tripwire skips the worker-limit fallback entirely for unrecognized shapes that contain a per-partition partial aggregate — this is the right safety: skipping the worker limit is always correct, the router applies the real one. Tests worker_limit_descends_through_projection and worker_limit_skipped_for_unrecognized_shape_with_per_partition_aggregate lock both paths.

  2. TailLimit window bounding (tail_limit.rs): collect_tail_window now (a) replaces the window when an input batch alone covers limit, slicing it to the last limit rows via skip_first_rows, and (b) the docstring states the new bound window ≤ 2 * limit. maintains_input_order is also now declared. New tests batches_larger_than_limit and window_stays_bounded verify both the bound and correctness against a 1000-row first batch.

  3. emit_remaining refactor (inline_aggregate_stream.rs): the end-of-input emit is now clamped by the limit and split between EmitTo::First(n) / EmitTo::All, and the drain-before-poll ordering at the top of the loop ensures the backlog is emitted before any further input is read. The new test limit_drains_backlog_without_reading_input proves a single oversize input batch yields exactly limit groups with zero additional polls. The defensive threshold == 0 short-circuit is kept with a comment explaining the upstream limit_reached() guard.

  4. with_limit statistics doc (inline_aggregate/mod.rs): a doc-comment now states the intentional non-narrowing of plan properties/statistics. Matches the pipeline reality (no stats-based optimizer runs after add_limit_to_workers).

Nothing else to flag. The plan-shape-based safety (limit never lands above a merge of per-partition partial states), the closed-groups-only emit contract, and the cross-partition-duplicate-key path through the router's Final aggregate continue to read correctly. Test coverage on this round is appropriately targeted at the regressions each fix could re-introduce.

Pre-existing nits left in place from prior review (out of scope for this PR):

  • TailLimitExec::name(&self) -> &str inconsistent with the codebase's &'static str convention.
  • push_aggregate_to_workers TODO about WorkerExec limit_and_reverse.
· [Branch](https://github.com/cube-js/cube/tree/cubestore-inline-aggregate-optimization)

A global (no GROUP BY) aggregate doesn't use the input ordering, but the scan
still merges its partitions with a SortPreservingMergeExec when the index has
a sort key. For such queries the sort key often comes from the equality
filters (it is picked for index selection and partition pruning, not as an
ordering requirement), so the filters make the merge keys constant and every
merge comparison becomes a full-length tie across all chunks -- pure waste.

The new pre-optimize pass replaces such merges under the aggregate with plain
partition coalescing, descending through filters, projections and unions.
Restricted to hash aggregates without group expressions (one accumulator set
per partition even when later optimizations make them run per partition; a
grouped hash aggregate would multiply its hash table by the partition count)
and without ordering requirements (array_agg(ORDER BY) and other
order-sensitive aggregates keep their merge). The deduplicating merge under
LastRowByUniqueKey is out of the descent's reach and stays intact.
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Jun 3, 2026

Claude finished @waralexrom's task in 1m 58s —— View job


Code Review — verdict: ship · 0 high, 0 medium, 0 low

Full review (click to expand)

Checklist

  • Read incremental diff since prior review (8962d63..HEAD)
  • Review drop_sort_merge_under_global_aggregate / replace_merges_with_coalesce in distributed_partial_aggregate.rs
  • Review pipeline wiring in optimizations/mod.rs
  • Review SQL tests in cubestore-sql-tests/src/tests.rs
  • Confirm prior review threads remain valid

Summary

The only new code since 8962d63 is commit 21fb7d4 — a new pre-optimize pass that swaps SortPreservingMergeExec for CoalescePartitionsExec under a global aggregate that doesn't need its input ordered. Implementation reads correctly and the gating is tight:

  • Trigger conditions are conservative and the right set: input_order_mode == Linear and empty group_expr and required_input_ordering()[0].is_none(). The third check is the one that catches order-sensitive accumulators (array_agg(ORDER BY), first_value/last_value) which can sit under an empty-GROUP BY aggregate while still needing sorted input — the keeps_sort_merge_under_order_sensitive_aggregate unit test locks this.
  • The grouped-hash guard (commit message: "hash table per partition" risk) is exactly the same invariant used elsewhere in the file for partial-aggregate push-down. Test keeps_sort_merge_under_grouped_hash_aggregate covers it.
  • replace_merges_with_coalesce descends through FilterExec / ProjectionExec / UnionExec only — all three are partition- and row-preserving and don't impose ordering requirements of their own, so replacing an SPM in their subtree with a CoalescePartitionsExec keeps the row multiset reaching the aggregate identical. Any other node terminates the descent, which is why LastRowByUniqueKey stays untouched — the dedicated global_aggregate_unique_key_keeps_merge SQL test pins this.
  • merge.fetch().is_some() short-circuit keeps a limit-bearing merge intact, since CoalescePartitionsExec has no fetch slot.
  • Recursion into merge.input() before wrapping in CoalescePartitionsExec handles nested SPMs correctly (e.g. union-of-merges).
  • The worker-plan shape change in sql/mod.rs is consistent with the new pass — the old MergeSort → Scan becomes Scan directly under the per-partition aggregate, and a single CoalescePartitions lives above the partial aggregate. The SQL test global_aggregate_no_chunk_merge asserts exactly this shape.

Nothing to flag. Pre-existing nits called out in earlier rounds (TailLimitExec::name return type, push_aggregate_to_workers TODO) remain out of scope.

All four prior review threads are resolved upstream.

· [Branch](https://github.com/cube-js/cube/tree/cubestore-inline-aggregate-optimization)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cube store Issues relating to Cube Store rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant