feat(cubestore): execute sorted partial aggregate per partition below the merge, propagate LIMIT into it#10995
feat(cubestore): execute sorted partial aggregate per partition below the merge, propagate LIMIT into it#10995waralexrom wants to merge 7 commits into
Conversation
… LIMIT into it Rewrite the worker plan PartialAggregate(Sorted) -> SortPreservingMerge into SortPreservingMerge -> per-partition PartialAggregate(Sorted), so the merge carries reduced partial states instead of all raw rows. Only sorted streaming aggregates are pushed: they hold O(1) accumulators per partition, while a hash aggregate would multiply its O(num_groups) memory by the partition count. Because the merged stream now contains duplicate group keys from different partitions, a plain worker row limit could truncate a group's partial states and silently corrupt its total. add_limit_to_workers turns the worker limit into a per-partition group limit on the aggregate plus a widened row budget (limit * partitions) on the merge (TailLimit for the reverse case). InlineAggregateStream honors the group limit: the emit-early threshold becomes min(batch_size, remaining limit) instead of a hard batch_size (4096), only closed groups are emitted, and input reading stops once the limit is reached, so a downstream LIMIT short-circuits the scan.
…ndow TailLimitStream collected its whole input to take the tail, materializing all worker rows for reverse limits; now it keeps a sliding window of trailing batches covering 'limit' rows, newer rows displace older ones. TailLimitExec returns the last 'limit' rows of each input partition instead of requiring a single one. The reverse worker limit above merged per-partition partial aggregates becomes a per-partition tail below the merge: within a partition the aggregate emits unique group keys, so 'limit' rows there are 'limit' complete groups and the merge carries at most 'limit' rows per partition instead of all groups with cross-partition duplicates. Groups beyond the last 'limit' may arrive with partial totals, but the router orders by the group key and its own limit drops them.
… first aggregate InlineAggregateStream could emit more groups than its limit: emit-early ran once per input batch while a single batch can bring an arbitrary group backlog, and the final emit was unclamped. Now the backlog is drained in emit threshold chunks at the top of the poll loop (also stops reading input as early as possible) and the final emit is clamped by the remaining limit. add_limit_to_workers is rewritten as a limit descent: probe from the worker input through sort preserving merges to the first aggregate and place a per-partition row limit directly above it (LocalLimit forward, TailLimit reverse, GlobalLimit for a single partition), additionally passing the limit into InlineAggregateExec for the early input stop. Within a partition the aggregate emits unique group keys, so the row limit cuts at group boundaries; correctness no longer depends on which pass produced the plan shape, and a merge of per-partition partial aggregates never gets a row limit above it -- the fetch = limit * partitions widening is gone. Plans without aggregation keep the plain top-level limit. TailLimitExec now declares maintains_input_order, matching its passthrough properties.
|
Claude finished @waralexrom's task in 3m 19s —— View job Code Review — verdict: ship after addressing notes · 0 high, 0 medium, 4 lowFull review (click to expand)Checklist
Summary The optimization is well-motivated and the implementation is careful in the places that matter most for correctness:
Inline findings (all low severity, none blocking)
Other observations
|
…ognized shapes The limit descent now looks through ProjectionExec: a row limit is a plain count and commutes with column renames. Without this, a projection between the worker and the merge would route the limit into the generic fallback, whose LimitPushdown descends through projections into merges and could place a row fetch above a duplicate-bearing merge of per-partition partial states. For shapes the descent doesn't recognize, the fallback now refuses to add a worker limit at all when the subtree contains a per-partition partial aggregate: skipping the limit is always correct, the router applies the real one.
Codecov Report✅ All modified and coverable lines are covered by tests.
Additional details and impacted files@@ Coverage Diff @@
## master #10995 +/- ##
===========================================
- Coverage 78.81% 58.50% -20.31%
===========================================
Files 470 216 -254
Lines 93438 17151 -76287
Branches 3466 3494 +28
===========================================
- Hits 73644 10035 -63609
+ Misses 19291 6607 -12684
- Partials 503 509 +6
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…h size A batch covering the whole window replaces it on arrival, cut to the last 'limit' rows; smaller batches go through the usual suffix eviction. The window now never holds more than 2 * limit rows regardless of input batch sizes (previously a single large batch stayed in the window whole until the end of the input).
…ensive threshold guard
|
Claude finished @waralexrom's task in 1m 26s —— View job Code Review — verdict: ship · 0 high, 0 medium, 0 low (all prior findings addressed)Full review (click to expand)Checklist
Summary All four low-severity findings from the prior review (15:18Z) are addressed across
Nothing else to flag. The plan-shape-based safety (limit never lands above a merge of per-partition partial states), the closed-groups-only emit contract, and the cross-partition-duplicate-key path through the router's Final aggregate continue to read correctly. Test coverage on this round is appropriately targeted at the regressions each fix could re-introduce. Pre-existing nits left in place from prior review (out of scope for this PR):
|
A global (no GROUP BY) aggregate doesn't use the input ordering, but the scan still merges its partitions with a SortPreservingMergeExec when the index has a sort key. For such queries the sort key often comes from the equality filters (it is picked for index selection and partition pruning, not as an ordering requirement), so the filters make the merge keys constant and every merge comparison becomes a full-length tie across all chunks -- pure waste. The new pre-optimize pass replaces such merges under the aggregate with plain partition coalescing, descending through filters, projections and unions. Restricted to hash aggregates without group expressions (one accumulator set per partition even when later optimizations make them run per partition; a grouped hash aggregate would multiply its hash table by the partition count) and without ordering requirements (array_agg(ORDER BY) and other order-sensitive aggregates keep their merge). The deduplicating merge under LastRowByUniqueKey is out of the descent's reach and stays intact.
|
Claude finished @waralexrom's task in 1m 58s —— View job Code Review — verdict: ship · 0 high, 0 medium, 0 lowFull review (click to expand)Checklist
Summary The only new code since
Nothing to flag. Pre-existing nits called out in earlier rounds ( All four prior review threads are resolved upstream. |
Summary
For prefix
GROUP BYqueries the dominant worker cost wasSortPreservingMergeExecmerging all raw rows, because the sorted partial aggregate sat above the merge. This PR pushes the sorted partial aggregate below the merge (so the merge carries reduced partial states instead of raw rows) and propagatesLIMITdown to the aggregate so it emits early and stops scanning (previously the sorted aggregate's emit threshold was a hardbatch_size= 4096, so any result under 4096 groups read its entire input before emitting anything andLIMITcould not short-circuit).Measured on a 10M-row stand: the worker step of a 1000-group prefix
GROUP BYis ~3.7x faster (357ms → 97ms) with no RSS increase;GROUP BY ... ORDER BY <prefix> LIMIT 10goes from ~365ms to ~14ms (×28).Changes
push_sorted_partial_aggregate_below_merge(pre-optimize pass): rewritesPartialAggregate(Sorted) → SortPreservingMerge → source(N partitions)intoSortPreservingMerge → per-partition PartialAggregate(Sorted). Only sorted streaming aggregates are pushed — they hold O(1) accumulators per partition, while a hash aggregate would multiply its O(num_groups) memory by the partition count. Duplicate group keys across partitions are combined by the router's Final aggregate, same as partial states from different workers.InlineAggregateStreamhonors a grouplimitwith a strict contract: emit-early threshold becomesmin(batch_size, remaining), only closed groups are emitted, the backlog from large input batches is drained in chunks, the final emit is clamped, and input reading stops as soon as the limit is reached.add_limit_to_workersis rewritten as a limit descent: the worker limit descends through sort preserving merges and lands directly above the first aggregate as a per-partition row limit (LocalLimitforward /TailLimitreverse /GlobalLimitfor a single partition), additionally passing the limit intoInlineAggregateExecfor the early input stop. A row limit never sits above a merge of per-partition partial aggregates (it would truncate a group's partial states across duplicate keys and silently corrupt totals), and the safety is derived from the plan shape rather than from which pass produced it.TailLimitExec: keeps a sliding window of trailing batches (O(limit) memory) instead of collecting its whole input, returns the lastlimitrows of each input partition, and declaresmaintains_input_order.Testing
GROUP BYwith overlapping keys across union partitions with/withoutLIMIT,DESC,ORDER BY <aggregate>; 7500 groups (above the 4096 batch size) withLIMIT 10/LIMIT 5000/DESC; plan shape assertions for the newMergeSort → LocalLimit → InlinePartialAggregate, limitandMergeSort → TailLimit → InlinePartialAggregateforms.over_10k_joinis flaky on master under parallel load too — verified 4/5 failures on both master and this branch under the same load, unrelated).