From 9221068079472f160cbe94fc29067495ec2e4829 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 18 Jun 2026 17:22:08 -0700 Subject: [PATCH] feat: unnest and join makeCursorHolderAsync --- .../columnar/ColumnarFrameCursorFactory.java | 3 +- .../segment/row/RowFrameCursorFactory.java | 3 +- .../apache/druid/segment/CursorFactory.java | 17 +- .../segment/QueryableIndexCursorFactory.java | 2 +- .../druid/segment/ResidentCursorFactory.java | 39 ++ .../druid/segment/RowBasedCursorFactory.java | 2 +- .../druid/segment/UnnestCursorFactory.java | 79 +++- .../IncrementalIndexCursorFactory.java | 4 +- .../join/HashJoinSegmentCursorFactory.java | 338 ++++++++++++------ .../loading/TombstoneSegmentizerFactory.java | 3 +- .../druid/segment/DeferredCursorFactory.java | 80 +++++ .../segment/UnnestCursorFactoryTest.java | 56 +++ .../HashJoinSegmentCursorFactoryTest.java | 75 ++++ 13 files changed, 560 insertions(+), 141 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java create mode 100644 processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java diff --git a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java index 6dba57245fcd..fd9ffc15e2ff 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java @@ -40,6 +40,7 @@ import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorHolder; import org.apache.druid.segment.QueryableIndexColumnSelectorFactory; +import org.apache.druid.segment.ResidentCursorFactory; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.SimpleSettableOffset; import org.apache.druid.segment.VirtualColumns; @@ -64,7 +65,7 @@ * * @see RowFrameCursorFactory the row-based version */ -public class ColumnarFrameCursorFactory implements CursorFactory +public class ColumnarFrameCursorFactory implements ResidentCursorFactory { private final Frame frame; private final RowSignature signature; diff --git a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java index 93523893c8cd..4f72640e95cd 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java @@ -33,6 +33,7 @@ import org.apache.druid.segment.CursorBuildSpec; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.ResidentCursorFactory; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.SimpleSettableOffset; import org.apache.druid.segment.column.ColumnCapabilities; @@ -49,7 +50,7 @@ * * @see ColumnarFrameCursorFactory the columnar version */ -public class RowFrameCursorFactory implements CursorFactory +public class RowFrameCursorFactory implements ResidentCursorFactory { private final Frame frame; private final FrameReader frameReader; diff --git a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java index 4a79490adc3a..486625bb7f3b 100644 --- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java @@ -19,6 +19,7 @@ package org.apache.druid.segment; +import org.apache.druid.error.DruidException; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.RowSignature; @@ -35,14 +36,22 @@ public interface CursorFactory extends ColumnInspector /** * Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for cursor factories that may need to do I/O * (e.g. download column data from deep storage) before they can serve a cursor. Callers running on threads that - * must not block should use this. + * must not block use this rather than {@link #makeCursorHolder}. *

- * The default implementation completes synchronously by delegating to {@link #makeCursorHolder(CursorBuildSpec)}, - * which keeps every existing implementation async-correct without changes. + * There is intentionally no working default: this method must be explicitly implemented to participate in + * async-aware engines (MSQ). A factory whose source is always fully-resident and never needs to block while waiting + * on some other thread to perform work can implement {@link ResidentCursorFactory} instead of {@link CursorFactory} + * directly, which provides a default implementation of this method that wraps + * {@link #makeCursorHolder(CursorBuildSpec)}. */ default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) { - return AsyncCursorHolder.completed(makeCursorHolder(spec)); + throw DruidException.defensive( + "makeCursorHolderAsync is not implemented by [%s]. Override it (or implement ResidentCursorFactory): return " + + "AsyncCursorHolder.completed(makeCursorHolder(spec)) if the source is always fully resident, or build/await " + + "the cursor holder asynchronously if it supports load on demand.", + getClass().getName() + ); } /** diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java index 1537454e9fc9..dbb789395c2d 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java @@ -56,7 +56,7 @@ import java.util.LinkedHashSet; import java.util.List; -public class QueryableIndexCursorFactory implements CursorFactory +public class QueryableIndexCursorFactory implements ResidentCursorFactory { private final QueryableIndex index; private final TimeBoundaryInspector timeBoundaryInspector; diff --git a/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java new file mode 100644 index 000000000000..99cf4461709c --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +/** + * A {@link CursorFactory} whose {@link #makeCursorHolder} never blocks on I/O, i.e. a fully-resident / in-memory + * source with no on-demand loading. Implementing this interface, rather than {@link CursorFactory} directly, declares + * that intent and supplies the trivial {@link #makeCursorHolderAsync} implementation: the holder is built synchronously + * and returned already completed. + *

+ * Factories backed by, or wrapping, an on-demand source must implement {@link CursorFactory} directly and provide a + * real {@link CursorFactory#makeCursorHolderAsync} that builds/awaits the holder asynchronously so they never block the + * calling thread. + */ +public interface ResidentCursorFactory extends CursorFactory +{ + @Override + default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + return AsyncCursorHolder.completed(makeCursorHolder(spec)); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java index 13f2da53e82c..3594c3a92f8b 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; import java.util.List; -public class RowBasedCursorFactory implements CursorFactory +public class RowBasedCursorFactory implements ResidentCursorFactory { private final Sequence rowSequence; private final RowAdapter rowAdapter; diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java index 8359b8664ab0..b39264ac807f 100644 --- a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java @@ -76,9 +76,59 @@ public UnnestCursorFactory( @Override public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + final UnnestFilterSplit filterSplit = computeFilterSplit(spec); + final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter()); + + final Closer closer = Closer.create(); + // base holder is built lazily on first asCursor()/getOrdering() and registered for close at that point + final Supplier baseHolderSupplier = Suppliers.memoize( + () -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec)) + ); + return unnestCursorHolder(spec, filterSplit, closer, baseHolderSupplier); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final UnnestFilterSplit filterSplit = computeFilterSplit(spec); + final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter()); + + // Build the base-table holder asynchronously (a partial base segment downloads its required columns here), then + // wrap the ready holder in the unnest holder. Closing the returned holder before it's ready cancels the base load. + final AsyncCursorHolder baseAsync = baseCursorFactory.makeCursorHolderAsync(unnestBuildSpec); + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(baseAsync::close); + baseAsync.addReadyCallback(() -> { + final CursorHolder unnestHolder; + try { + // release() transfers ownership of the base holder to us (and surfaces a base-load failure as its cause); from + // here the unnest holder owns closing it. Construction below can't throw, so the catch only fires on a base + // failure or a cancel race (baseAsync already closed), neither of which leaves a base holder to leak. + final CursorHolder baseHolder = baseAsync.release(); + final Closer closer = Closer.create(); + closer.register(baseHolder); + unnestHolder = unnestCursorHolder(spec, filterSplit, closer, Suppliers.ofInstance(baseHolder)); + } + catch (Throwable t) { + asyncHolder.setException(t); + return; + } + if (!asyncHolder.set(unnestHolder)) { + // awaiter closed the wrapper while we were producing the holder; close it so the base holder doesn't leak + unnestHolder.close(); + } + }); + return asyncHolder; + } + + /** + * Split the spec's filters into base-table and post-unnest filters (see + * {@link #computeBaseAndPostUnnestFilters}). Cheap and metadata-only; shared by the sync and async holder paths. + */ + private UnnestFilterSplit computeFilterSplit(CursorBuildSpec spec) { final String input = getUnnestInputIfDirectAccess(unnestColumn); - final UnnestFilterSplit filterSplit = computeBaseAndPostUnnestFilters( + return computeBaseAndPostUnnestFilters( spec.getFilter(), filter != null ? filter.toFilter() : null, spec.getVirtualColumns(), @@ -86,23 +136,26 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec) input, input == null ? null : spec.getVirtualColumns().getColumnCapabilitiesWithFallback(baseCursorFactory, input) ); - final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec( - spec, - unnestColumn, - filterSplit.getBaseTableFilter() - ); + } + /** + * Build the unnest {@link CursorHolder} on top of a base-table holder. {@code baseHolderSupplier} provides the base + * holder: the sync path builds it lazily on first use and registers it with {@code closer}, while the async path + * supplies an already-materialized holder pre-registered with {@code closer}. + */ + private CursorHolder unnestCursorHolder( + CursorBuildSpec spec, + UnnestFilterSplit filterSplit, + Closer closer, + Supplier baseHolderSupplier + ) + { return new CursorHolder() { - final Closer closer = Closer.create(); - final Supplier cursorHolderSupplier = Suppliers.memoize( - () -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec)) - ); - @Override public Cursor asCursor() { - final Cursor cursor = cursorHolderSupplier.get().asCursor(); + final Cursor cursor = baseHolderSupplier.get().asCursor(); if (cursor == null) { return null; } @@ -135,7 +188,7 @@ public Cursor asCursor() @Override public List getOrdering() { - return computeOrdering(cursorHolderSupplier.get().getOrdering()); + return computeOrdering(baseHolderSupplier.get().getOrdering()); } @Override diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java index 744a856c8454..07cebffa2c96 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java @@ -30,9 +30,9 @@ import org.apache.druid.segment.ConcatenatingCursor; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorHolder; import org.apache.druid.segment.EmptyCursorHolder; +import org.apache.druid.segment.ResidentCursorFactory; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.column.ColumnCapabilitiesImpl; import org.apache.druid.segment.column.ColumnType; @@ -50,7 +50,7 @@ import java.util.Arrays; import java.util.List; -public class IncrementalIndexCursorFactory implements CursorFactory +public class IncrementalIndexCursorFactory implements ResidentCursorFactory { private static final ColumnCapabilities.CoercionLogic COERCE_LOGIC = new ColumnCapabilities.CoercionLogic() diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java index b5354de02ad1..a7030fae5465 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.query.Order; import org.apache.druid.query.OrderBy; import org.apache.druid.query.filter.Filter; +import org.apache.druid.segment.AsyncCursorHolder; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; import org.apache.druid.segment.CursorFactory; @@ -77,12 +78,74 @@ public HashJoinSegmentCursorFactory( public CursorHolder makeCursorHolder(CursorBuildSpec spec) { final Filter combinedFilter = baseFilterAnd(spec.getFilter()); + final Set physicalColumns = computeBasePhysicalColumns(spec, combinedFilter); - // for physical column tracking, we start by copying base spec physical columns + if (clauses.isEmpty()) { + // if there are no clauses, we can just use the base cursor directly if we apply the combined filter + return baseCursorFactory.makeCursorHolder(noClausesBaseSpec(spec, combinedFilter, physicalColumns)); + } + + // else we need to wipe out the grouping, aggregations, and ordering + final Closer joinablesCloser = Closer.create(); + final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter, physicalColumns); + final CursorHolder baseCursorHolder = joinablesCloser.register( + baseCursorFactory.makeCursorHolder(plan.baseBuildSpec) + ); + return joinCursorHolder(plan, joinablesCloser, baseCursorHolder); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + final Filter combinedFilter = baseFilterAnd(spec.getFilter()); + final Set physicalColumns = computeBasePhysicalColumns(spec, combinedFilter); + + if (clauses.isEmpty()) { + return baseCursorFactory.makeCursorHolderAsync(noClausesBaseSpec(spec, combinedFilter, physicalColumns)); + } + + final Closer joinablesCloser = Closer.create(); + // Join filter analysis + base-spec computation are CPU-only; do them synchronously to learn the base spec. + final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter, physicalColumns); + + // Build the left/base holder asynchronously (a partial base segment downloads its required columns here); the + // join's build-side joinables are already-resident in-memory tables, so the base is the only async piece. Once it's + // ready, wrap it with the join cursors. Closing the returned holder before it's ready cancels the base load. + final AsyncCursorHolder baseAsync = baseCursorFactory.makeCursorHolderAsync(plan.baseBuildSpec); + final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(baseAsync::close); + baseAsync.addReadyCallback(() -> { + final CursorHolder joinHolder; + try { + // release() transfers ownership of the base holder to us (and surfaces a base-load failure as its cause); the + // join holder now owns closing it via joinablesCloser. The wrap below can't throw, so the catch only fires on + // a base failure or a cancel race (baseAsync already closed), in both cases joinablesCloser is still empty. + final CursorHolder baseHolder = baseAsync.release(); + joinablesCloser.register(baseHolder); + joinHolder = joinCursorHolder(plan, joinablesCloser, baseHolder); + } + catch (Throwable t) { + asyncHolder.setException(t); + return; + } + if (!asyncHolder.set(joinHolder)) { + // awaiter closed the wrapper while we were producing the holder; close it so the base holder doesn't leak + joinHolder.close(); + } + }); + return asyncHolder; + } + + /** + * Physical columns to pass to the base cursor: a copy of the spec's physical columns plus any columns required by + * the combined filter (null when the spec didn't declare physical columns, meaning "all"). Shared by the no-clauses + * and join paths. + */ + @Nullable + private static Set computeBasePhysicalColumns(CursorBuildSpec spec, @Nullable Filter combinedFilter) + { final Set physicalColumns = spec.getPhysicalColumns() != null ? new HashSet<>(spec.getPhysicalColumns()) : null; - if (physicalColumns != null && combinedFilter != null) { for (String column : combinedFilter.getRequiredColumns()) { if (!spec.getVirtualColumns().exists(column)) { @@ -90,129 +153,135 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec) } } } + return physicalColumns; + } - if (clauses.isEmpty()) { - // if there are no clauses, we can just use the base cursor directly if we apply the combined filter - final CursorBuildSpec newSpec = CursorBuildSpec.builder(spec) - .setFilter(combinedFilter) - .setPhysicalColumns(physicalColumns) - .build(); - return baseCursorFactory.makeCursorHolder(newSpec); - } - - // else we need to wipe out the grouping, aggregations, and ordering - - return new CursorHolder() - { - final Closer joinablesCloser = Closer.create(); - - /** - * Typically the same as {@link HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when - * an unnest datasource is layered on top of a join datasource. - */ - final JoinFilterPreAnalysis actualPreAnalysis; - - /** - * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link #actualPreAnalysis} and - * {@link HashJoinSegmentCursorFactory#baseFilter}. - */ - final JoinFilterSplit joinFilterSplit; - - /** - * Cursor holder for {@link HashJoinSegmentCursorFactory#baseCursorFactory}. - */ - final CursorHolder baseCursorHolder; - - { - // Filter pre-analysis key implied by the call to "makeCursorHolder". We need to sanity-check that it matches - // the actual pre-analysis that was done. Note: we could now infer a rewrite config from the "makeCursorHolder" - // call (it requires access to the query context which we now have access to since the move away from - // CursorFactory) but this code hasn't been updated to sanity-check it, so currently we are still skipping it - // by re-using the one present in the cached key. - final JoinFilterPreAnalysisKey keyIn = - new JoinFilterPreAnalysisKey( - joinFilterPreAnalysis.getKey().getRewriteConfig(), - clauses, - spec.getVirtualColumns(), - combinedFilter - ); - - final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey(); - if (keyIn.equals(keyCached)) { - // Common case: key used during filter pre-analysis (keyCached) matches key implied by makeCursorHolder call - // (keyIn). - actualPreAnalysis = joinFilterPreAnalysis; - } else { - // Less common case: key differs. Re-analyze the filter. This case can happen when an unnest datasource is - // layered on top of a join datasource. - actualPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn); - } + /** + * Base cursor spec for the no-clauses case: the original spec with the combined filter and base physical columns. + */ + private static CursorBuildSpec noClausesBaseSpec( + CursorBuildSpec spec, + @Nullable Filter combinedFilter, + @Nullable Set physicalColumns + ) + { + return CursorBuildSpec.builder(spec) + .setFilter(combinedFilter) + .setPhysicalColumns(physicalColumns) + .build(); + } - joinFilterSplit = JoinFilterAnalyzer.splitFilter( - actualPreAnalysis, - baseFilter + /** + * Run the (CPU-only) join filter pre-analysis and compute the base-table cursor build spec. Returns the analysis + * results needed at cursor-construction time ({@link JoinCursorPlan}). {@code physicalColumns} is mutated in place + * (it accumulates base-filter, pre-join virtual-column, and clause-condition columns, minus the join prefixes). + */ + private JoinCursorPlan planJoinCursor( + CursorBuildSpec spec, + @Nullable Filter combinedFilter, + @Nullable Set physicalColumns + ) + { + // Filter pre-analysis key implied by the call to "makeCursorHolder". We need to sanity-check that it matches + // the actual pre-analysis that was done. Note: we could now infer a rewrite config from the "makeCursorHolder" + // call (it requires access to the query context which we now have access to) but this code hasn't been updated to + // sanity-check it, so currently we are still skipping it by re-using the one present in the cached key. + final JoinFilterPreAnalysisKey keyIn = + new JoinFilterPreAnalysisKey( + joinFilterPreAnalysis.getKey().getRewriteConfig(), + clauses, + spec.getVirtualColumns(), + combinedFilter ); - // start with a full scan clipped to interval - final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder = - CursorBuildSpec.builder() - .setInterval(spec.getInterval()) - .setQueryContext(spec.getQueryContext()) - .setQueryMetrics(spec.getQueryMetrics()); - - // retain time ordering if preferred - Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering()); - if (timeOrder == Order.DESCENDING) { - cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder()); - } else if (timeOrder == Order.ASCENDING) { - cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder()); - } + final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey(); + final JoinFilterPreAnalysis actualPreAnalysis; + if (keyIn.equals(keyCached)) { + // Common case: key used during filter pre-analysis (keyCached) matches key implied by makeCursorHolder call + // (keyIn). + actualPreAnalysis = joinFilterPreAnalysis; + } else { + // Less common case: key differs. Re-analyze the filter. This case can happen when an unnest datasource is + // layered on top of a join datasource. + actualPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn); + } - // add pushdown filters if present - if (joinFilterSplit.getBaseTableFilter().isPresent()) { - cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get()); - } - final VirtualColumns preJoinVirtualColumns = VirtualColumns.fromIterable( - Iterables.concat( - Sets.difference( - ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()), - joinFilterPreAnalysis.getPostJoinVirtualColumns() - ), - joinFilterSplit.getPushDownVirtualColumns() - ) - ); - cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns); - - // add all base table physical columns if they were originally set - if (physicalColumns != null) { - if (joinFilterSplit.getBaseTableFilter().isPresent()) { - for (String column : joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) { - if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) { - physicalColumns.add(column); - } - } - } - for (VirtualColumn virtualColumn : preJoinVirtualColumns.getVirtualColumns()) { - for (String column : virtualColumn.requiredColumns()) { - if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) { - physicalColumns.add(column); - } - } - } - final Set prefixes = new HashSet<>(); - for (JoinableClause clause : clauses) { - prefixes.add(clause.getPrefix()); - physicalColumns.addAll(clause.getCondition().getRequiredColumns()); + final JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(actualPreAnalysis, baseFilter); + + // start with a full scan clipped to interval + final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder = + CursorBuildSpec.builder() + .setInterval(spec.getInterval()) + .setQueryContext(spec.getQueryContext()) + .setQueryMetrics(spec.getQueryMetrics()); + + // retain time ordering if preferred + Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering()); + if (timeOrder == Order.DESCENDING) { + cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder()); + } else if (timeOrder == Order.ASCENDING) { + cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder()); + } + + // add pushdown filters if present + if (joinFilterSplit.getBaseTableFilter().isPresent()) { + cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get()); + } + final VirtualColumns preJoinVirtualColumns = VirtualColumns.fromIterable( + Iterables.concat( + Sets.difference( + ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()), + joinFilterPreAnalysis.getPostJoinVirtualColumns() + ), + joinFilterSplit.getPushDownVirtualColumns() + ) + ); + cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns); + + // add all base table physical columns if they were originally set + if (physicalColumns != null) { + if (joinFilterSplit.getBaseTableFilter().isPresent()) { + for (String column : joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) { + if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) { + physicalColumns.add(column); } - for (String prefix : prefixes) { - physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, prefix)); + } + } + for (VirtualColumn virtualColumn : preJoinVirtualColumns.getVirtualColumns()) { + for (String column : virtualColumn.requiredColumns()) { + if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) { + physicalColumns.add(column); } - cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns); } - - baseCursorHolder = joinablesCloser.register(baseCursorFactory.makeCursorHolder(cursorBuildSpecBuilder.build())); } + final Set prefixes = new HashSet<>(); + for (JoinableClause clause : clauses) { + prefixes.add(clause.getPrefix()); + physicalColumns.addAll(clause.getCondition().getRequiredColumns()); + } + for (String prefix : prefixes) { + physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, prefix)); + } + cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns); + } + + return new JoinCursorPlan(actualPreAnalysis, joinFilterSplit, cursorBuildSpecBuilder.build()); + } + /** + * Build the join {@link CursorHolder} on top of a base-table holder. {@code baseCursorHolder} must already be + * registered with {@code joinablesCloser} (the sync path registers it when built; the async path registers the + * released holder); closing the returned holder closes {@code joinablesCloser} (the base holder + per-cursor join + * matchers created in {@link CursorHolder#asCursor}). + */ + private CursorHolder joinCursorHolder( + JoinCursorPlan plan, + Closer joinablesCloser, + CursorHolder baseCursorHolder + ) + { + return new CursorHolder() + { @Override public Cursor asCursor() { @@ -230,8 +299,8 @@ public Cursor asCursor() return PostJoinCursor.wrap( retVal, - VirtualColumns.fromIterable(actualPreAnalysis.getPostJoinVirtualColumns()), - joinFilterSplit.getJoinTableFilter().orElse(null) + VirtualColumns.fromIterable(plan.actualPreAnalysis.getPostJoinVirtualColumns()), + plan.joinFilterSplit.getJoinTableFilter().orElse(null) ); } @@ -320,4 +389,39 @@ private List computeOrdering(final List baseOrdering) return limit == baseOrdering.size() ? baseOrdering : baseOrdering.subList(0, limit); } + + /** + * Outputs of {@link #planJoinCursor}: the (possibly re-computed) join filter pre-analysis and split needed when the + * join cursor is actually constructed, plus the base-table cursor build spec used to open the left-side holder. + */ + private static final class JoinCursorPlan + { + /** + * Typically the same as {@link HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when an unnest + * datasource is layered on top of a join datasource. + */ + private final JoinFilterPreAnalysis actualPreAnalysis; + + /** + * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link #actualPreAnalysis} and + * {@link HashJoinSegmentCursorFactory#baseFilter}. + */ + private final JoinFilterSplit joinFilterSplit; + + /** + * Build spec for the left-side {@link HashJoinSegmentCursorFactory#baseCursorFactory} holder. + */ + private final CursorBuildSpec baseBuildSpec; + + private JoinCursorPlan( + JoinFilterPreAnalysis actualPreAnalysis, + JoinFilterSplit joinFilterSplit, + CursorBuildSpec baseBuildSpec + ) + { + this.actualPreAnalysis = actualPreAnalysis; + this.joinFilterSplit = joinFilterSplit; + this.baseBuildSpec = baseBuildSpec; + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java index 94654f58c851..546bd55f0373 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.segment.CursorHolder; import org.apache.druid.segment.NoopQueryableIndex; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.ResidentCursorFactory; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.column.ColumnCapabilities; @@ -88,7 +89,7 @@ public Interval getDataInterval() public T as(@Nonnull Class clazz) { if (CursorFactory.class.equals(clazz)) { - return (T) new CursorFactory() + return (T) new ResidentCursorFactory() { @Override public CursorHolder makeCursorHolder(CursorBuildSpec spec) diff --git a/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java new file mode 100644 index 000000000000..e9e9305e6f3d --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.RowSignature; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Test {@link CursorFactory} that produces its async cursor holder on demand: {@link #makeCursorHolderAsync} hands back + * an unfinished {@link AsyncCursorHolder}, and {@link #complete()} fills it (by building the delegate's holder for the + * last requested spec) so a test can drive the await transition deterministically. {@link #canceled} flips when the + * returned holder is closed before it is completed. Used to exercise the {@code makeCursorHolderAsync} override of + * wrapping cursor factories (e.g. unnest, join) against a base that loads asynchronously. + */ +public class DeferredCursorFactory implements CursorFactory +{ + private final CursorFactory delegate; + public final AtomicBoolean canceled = new AtomicBoolean(false); + private AsyncCursorHolder pending; + private CursorBuildSpec pendingSpec; + + public DeferredCursorFactory(CursorFactory delegate) + { + this.delegate = delegate; + } + + @Override + public CursorHolder makeCursorHolder(CursorBuildSpec spec) + { + return delegate.makeCursorHolder(spec); + } + + @Override + public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec) + { + pendingSpec = spec; + pending = new AsyncCursorHolder(() -> canceled.set(true)); + return pending; + } + + /** + * Completes the most recent {@link #makeCursorHolderAsync} holder by building the delegate's holder for the spec it + * was called with. + */ + public void complete() + { + pending.set(delegate.makeCursorHolder(pendingSpec)); + } + + @Override + public RowSignature getRowSignature() + { + return delegate.getRowSignature(); + } + + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + return delegate.getColumnCapabilities(column); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java index 6a4cadcc79e8..54564621b22d 100644 --- a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java @@ -256,6 +256,62 @@ public void test_unnest_factory_basic() } } + @Test + public void test_unnest_factory_async_awaits_base_then_produces_unnest_cursor() + { + // base produces its holder asynchronously (mimics a partial segment downloading its columns). The unnest async + // holder must stay not-ready until the base completes, then yield the same unnested rows as the sync path. + final DeferredCursorFactory base = new DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY); + final UnnestCursorFactory unnest = new UnnestCursorFactory( + base, + new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()), + null + ); + + final AsyncCursorHolder asyncHolder = unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + try { + Assert.assertFalse("unnest holder must wait for the base to complete", asyncHolder.isReady()); + + base.complete(); + Assert.assertTrue("unnest holder becomes ready once the base completes", asyncHolder.isReady()); + + try (final CursorHolder cursorHolder = asyncHolder.release()) { + final Cursor cursor = cursorHolder.asCursor(); + final DimensionSelector dimSelector = + cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME)); + final List rows = new ArrayList<>(); + while (!cursor.isDone()) { + rows.add(dimSelector.getObject()); + cursor.advance(); + } + Assert.assertEquals( + Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "10", "11", "12", "13", "14", "15", "8", "9"), + rows + ); + } + } + finally { + // no-op after release(); cancels the base load if an assertion above bailed before release() + asyncHolder.close(); + } + } + + @Test + public void test_unnest_factory_async_close_before_ready_cancels_base() + { + final DeferredCursorFactory base = new DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY); + final UnnestCursorFactory unnest = new UnnestCursorFactory( + base, + new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()), + null + ); + + final AsyncCursorHolder asyncHolder = unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + Assert.assertFalse(asyncHolder.isReady()); + asyncHolder.close(); + Assert.assertTrue("closing the unnest holder before it's ready cancels the base load", base.canceled.get()); + } + @Test public void test_unnest_factory_basic_array_column() { diff --git a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java index e1b4ffdf5410..2c803dc40e7e 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java @@ -30,8 +30,12 @@ import org.apache.druid.query.filter.InDimFilter; import org.apache.druid.query.filter.OrDimFilter; import org.apache.druid.query.filter.SelectorDimFilter; +import org.apache.druid.segment.AsyncCursorHolder; +import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.CursorHolder; +import org.apache.druid.segment.DeferredCursorFactory; import org.apache.druid.segment.ReferenceCountedSegmentProvider; import org.apache.druid.segment.TopNOptimizationInspector; import org.apache.druid.segment.VirtualColumns; @@ -215,6 +219,77 @@ public void test_makeCursor_factToCountryLeft() ); } + @Test + public void test_makeCursorAsync_factToCountryLeft_awaitsBaseThenJoins() + { + final List joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)); + final JoinFilterPreAnalysis joinFilterPreAnalysis = + makeDefaultConfigPreAnalysis(null, joinableClauses, VirtualColumns.EMPTY); + + // joined row count from the synchronous path, to compare the async result against without re-listing every row + final int expectedRows; + try (CursorHolder syncHolder = new HashJoinSegmentCursorFactory( + factSegment.as(CursorFactory.class), + null, + joinableClauses, + joinFilterPreAnalysis + ).makeCursorHolder(CursorBuildSpec.FULL_SCAN)) { + expectedRows = countRows(syncHolder); + } + Assert.assertTrue(expectedRows > 0); + + // async path: the left/base holder loads on demand (mimics a partial base segment); build-side joinable is resident + final DeferredCursorFactory base = new DeferredCursorFactory(factSegment.as(CursorFactory.class)); + final AsyncCursorHolder asyncHolder = new HashJoinSegmentCursorFactory( + base, + null, + joinableClauses, + joinFilterPreAnalysis + ).makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + try { + Assert.assertFalse("join holder must wait for the base to complete", asyncHolder.isReady()); + base.complete(); + Assert.assertTrue("join holder becomes ready once the base completes", asyncHolder.isReady()); + try (CursorHolder holder = asyncHolder.release()) { + Assert.assertEquals(expectedRows, countRows(holder)); + } + } + finally { + asyncHolder.close(); + } + } + + @Test + public void test_makeCursorAsync_factToCountry_closeBeforeReadyCancelsBase() + { + final List joinableClauses = ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT)); + final JoinFilterPreAnalysis joinFilterPreAnalysis = + makeDefaultConfigPreAnalysis(null, joinableClauses, VirtualColumns.EMPTY); + + final DeferredCursorFactory base = new DeferredCursorFactory(factSegment.as(CursorFactory.class)); + final AsyncCursorHolder asyncHolder = new HashJoinSegmentCursorFactory( + base, + null, + joinableClauses, + joinFilterPreAnalysis + ).makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN); + + Assert.assertFalse(asyncHolder.isReady()); + asyncHolder.close(); + Assert.assertTrue("closing the join holder before it's ready cancels the base load", base.canceled.get()); + } + + private static int countRows(CursorHolder holder) + { + final Cursor cursor = holder.asCursor(); + int count = 0; + while (!cursor.isDone()) { + cursor.advance(); + count++; + } + return count; + } + @Test public void test_makeCursor_factToCountryLeftUsingLookup() {