diff --git a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
index 6dba57245fcd..fd9ffc15e2ff 100644
--- a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
@@ -40,6 +40,7 @@
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.QueryableIndexColumnSelectorFactory;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.VirtualColumns;
@@ -64,7 +65,7 @@
*
* @see RowFrameCursorFactory the row-based version
*/
-public class ColumnarFrameCursorFactory implements CursorFactory
+public class ColumnarFrameCursorFactory implements ResidentCursorFactory
{
private final Frame frame;
private final RowSignature signature;
diff --git a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
index 93523893c8cd..4f72640e95cd 100644
--- a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
@@ -33,6 +33,7 @@
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -49,7 +50,7 @@
*
* @see ColumnarFrameCursorFactory the columnar version
*/
-public class RowFrameCursorFactory implements CursorFactory
+public class RowFrameCursorFactory implements ResidentCursorFactory
{
private final Frame frame;
private final FrameReader frameReader;
diff --git a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
index 4a79490adc3a..486625bb7f3b 100644
--- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment;
+import org.apache.druid.error.DruidException;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
@@ -35,14 +36,22 @@ public interface CursorFactory extends ColumnInspector
/**
* Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for cursor factories that may need to do I/O
* (e.g. download column data from deep storage) before they can serve a cursor. Callers running on threads that
- * must not block should use this.
+ * must not block use this rather than {@link #makeCursorHolder}.
*
- * The default implementation completes synchronously by delegating to {@link #makeCursorHolder(CursorBuildSpec)},
- * which keeps every existing implementation async-correct without changes.
+ * There is intentionally no working default: this method must be explicitly implemented to participate in
+ * async-aware engines (MSQ). A factory whose source is always fully-resident and never needs to block while waiting
+ * on some other thread to perform work can implement {@link ResidentCursorFactory} instead of {@link CursorFactory}
+ * directly, which provides a default implementation of this method that wraps
+ * {@link #makeCursorHolder(CursorBuildSpec)}.
*/
default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
{
- return AsyncCursorHolder.completed(makeCursorHolder(spec));
+ throw DruidException.defensive(
+ "makeCursorHolderAsync is not implemented by [%s]. Override it (or implement ResidentCursorFactory): return "
+ + "AsyncCursorHolder.completed(makeCursorHolder(spec)) if the source is always fully resident, or build/await "
+ + "the cursor holder asynchronously if it supports load on demand.",
+ getClass().getName()
+ );
}
/**
diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 4acd2d4e28a4..2cf20eced767 100644
--- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -56,7 +56,7 @@
import java.util.LinkedHashSet;
import java.util.List;
-public class QueryableIndexCursorFactory implements CursorFactory
+public class QueryableIndexCursorFactory implements ResidentCursorFactory
{
private final QueryableIndex index;
private final TimeBoundaryInspector timeBoundaryInspector;
diff --git a/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
new file mode 100644
index 000000000000..99cf4461709c
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+/**
+ * A {@link CursorFactory} whose {@link #makeCursorHolder} never blocks on I/O, i.e. a fully-resident / in-memory
+ * source with no on-demand loading. Implementing this interface, rather than {@link CursorFactory} directly, declares
+ * that intent and supplies the trivial {@link #makeCursorHolderAsync} implementation: the holder is built synchronously
+ * and returned already completed.
+ *
+ * Factories backed by, or wrapping, an on-demand source must implement {@link CursorFactory} directly and provide a
+ * real {@link CursorFactory#makeCursorHolderAsync} that builds/awaits the holder asynchronously so they never block the
+ * calling thread.
+ */
+public interface ResidentCursorFactory extends CursorFactory
+{
+ @Override
+ default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ return AsyncCursorHolder.completed(makeCursorHolder(spec));
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
index 13f2da53e82c..3594c3a92f8b 100644
--- a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
@@ -32,7 +32,7 @@
import javax.annotation.Nullable;
import java.util.List;
-public class RowBasedCursorFactory implements CursorFactory
+public class RowBasedCursorFactory implements ResidentCursorFactory
{
private final Sequence rowSequence;
private final RowAdapter rowAdapter;
diff --git a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
index 8359b8664ab0..b39264ac807f 100644
--- a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
@@ -76,9 +76,59 @@ public UnnestCursorFactory(
@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+ {
+ final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
+ final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter());
+
+ final Closer closer = Closer.create();
+ // base holder is built lazily on first asCursor()/getOrdering() and registered for close at that point
+ final Supplier baseHolderSupplier = Suppliers.memoize(
+ () -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
+ );
+ return unnestCursorHolder(spec, filterSplit, closer, baseHolderSupplier);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
+ final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter());
+
+ // Build the base-table holder asynchronously (a partial base segment downloads its required columns here), then
+ // wrap the ready holder in the unnest holder. Closing the returned holder before it's ready cancels the base load.
+ final AsyncCursorHolder baseAsync = baseCursorFactory.makeCursorHolderAsync(unnestBuildSpec);
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(baseAsync::close);
+ baseAsync.addReadyCallback(() -> {
+ final CursorHolder unnestHolder;
+ try {
+ // release() transfers ownership of the base holder to us (and surfaces a base-load failure as its cause); from
+ // here the unnest holder owns closing it. Construction below can't throw, so the catch only fires on a base
+ // failure or a cancel race (baseAsync already closed), neither of which leaves a base holder to leak.
+ final CursorHolder baseHolder = baseAsync.release();
+ final Closer closer = Closer.create();
+ closer.register(baseHolder);
+ unnestHolder = unnestCursorHolder(spec, filterSplit, closer, Suppliers.ofInstance(baseHolder));
+ }
+ catch (Throwable t) {
+ asyncHolder.setException(t);
+ return;
+ }
+ if (!asyncHolder.set(unnestHolder)) {
+ // awaiter closed the wrapper while we were producing the holder; close it so the base holder doesn't leak
+ unnestHolder.close();
+ }
+ });
+ return asyncHolder;
+ }
+
+ /**
+ * Split the spec's filters into base-table and post-unnest filters (see
+ * {@link #computeBaseAndPostUnnestFilters}). Cheap and metadata-only; shared by the sync and async holder paths.
+ */
+ private UnnestFilterSplit computeFilterSplit(CursorBuildSpec spec)
{
final String input = getUnnestInputIfDirectAccess(unnestColumn);
- final UnnestFilterSplit filterSplit = computeBaseAndPostUnnestFilters(
+ return computeBaseAndPostUnnestFilters(
spec.getFilter(),
filter != null ? filter.toFilter() : null,
spec.getVirtualColumns(),
@@ -86,23 +136,26 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
input,
input == null ? null : spec.getVirtualColumns().getColumnCapabilitiesWithFallback(baseCursorFactory, input)
);
- final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(
- spec,
- unnestColumn,
- filterSplit.getBaseTableFilter()
- );
+ }
+ /**
+ * Build the unnest {@link CursorHolder} on top of a base-table holder. {@code baseHolderSupplier} provides the base
+ * holder: the sync path builds it lazily on first use and registers it with {@code closer}, while the async path
+ * supplies an already-materialized holder pre-registered with {@code closer}.
+ */
+ private CursorHolder unnestCursorHolder(
+ CursorBuildSpec spec,
+ UnnestFilterSplit filterSplit,
+ Closer closer,
+ Supplier baseHolderSupplier
+ )
+ {
return new CursorHolder()
{
- final Closer closer = Closer.create();
- final Supplier cursorHolderSupplier = Suppliers.memoize(
- () -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
- );
-
@Override
public Cursor asCursor()
{
- final Cursor cursor = cursorHolderSupplier.get().asCursor();
+ final Cursor cursor = baseHolderSupplier.get().asCursor();
if (cursor == null) {
return null;
}
@@ -135,7 +188,7 @@ public Cursor asCursor()
@Override
public List getOrdering()
{
- return computeOrdering(cursorHolderSupplier.get().getOrdering());
+ return computeOrdering(baseHolderSupplier.get().getOrdering());
}
@Override
diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
index 744a856c8454..07cebffa2c96 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
@@ -30,9 +30,9 @@
import org.apache.druid.segment.ConcatenatingCursor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
-import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.EmptyCursorHolder;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
@@ -50,7 +50,7 @@
import java.util.Arrays;
import java.util.List;
-public class IncrementalIndexCursorFactory implements CursorFactory
+public class IncrementalIndexCursorFactory implements ResidentCursorFactory
{
private static final ColumnCapabilities.CoercionLogic COERCE_LOGIC =
new ColumnCapabilities.CoercionLogic()
diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
index b5354de02ad1..a7030fae5465 100644
--- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
@@ -27,6 +27,7 @@
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.AsyncCursorHolder;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
@@ -77,12 +78,74 @@ public HashJoinSegmentCursorFactory(
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
final Filter combinedFilter = baseFilterAnd(spec.getFilter());
+ final Set physicalColumns = computeBasePhysicalColumns(spec, combinedFilter);
- // for physical column tracking, we start by copying base spec physical columns
+ if (clauses.isEmpty()) {
+ // if there are no clauses, we can just use the base cursor directly if we apply the combined filter
+ return baseCursorFactory.makeCursorHolder(noClausesBaseSpec(spec, combinedFilter, physicalColumns));
+ }
+
+ // else we need to wipe out the grouping, aggregations, and ordering
+ final Closer joinablesCloser = Closer.create();
+ final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter, physicalColumns);
+ final CursorHolder baseCursorHolder = joinablesCloser.register(
+ baseCursorFactory.makeCursorHolder(plan.baseBuildSpec)
+ );
+ return joinCursorHolder(plan, joinablesCloser, baseCursorHolder);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ final Filter combinedFilter = baseFilterAnd(spec.getFilter());
+ final Set physicalColumns = computeBasePhysicalColumns(spec, combinedFilter);
+
+ if (clauses.isEmpty()) {
+ return baseCursorFactory.makeCursorHolderAsync(noClausesBaseSpec(spec, combinedFilter, physicalColumns));
+ }
+
+ final Closer joinablesCloser = Closer.create();
+ // Join filter analysis + base-spec computation are CPU-only; do them synchronously to learn the base spec.
+ final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter, physicalColumns);
+
+ // Build the left/base holder asynchronously (a partial base segment downloads its required columns here); the
+ // join's build-side joinables are already-resident in-memory tables, so the base is the only async piece. Once it's
+ // ready, wrap it with the join cursors. Closing the returned holder before it's ready cancels the base load.
+ final AsyncCursorHolder baseAsync = baseCursorFactory.makeCursorHolderAsync(plan.baseBuildSpec);
+ final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(baseAsync::close);
+ baseAsync.addReadyCallback(() -> {
+ final CursorHolder joinHolder;
+ try {
+ // release() transfers ownership of the base holder to us (and surfaces a base-load failure as its cause); the
+ // join holder now owns closing it via joinablesCloser. The wrap below can't throw, so the catch only fires on
+ // a base failure or a cancel race (baseAsync already closed), in both cases joinablesCloser is still empty.
+ final CursorHolder baseHolder = baseAsync.release();
+ joinablesCloser.register(baseHolder);
+ joinHolder = joinCursorHolder(plan, joinablesCloser, baseHolder);
+ }
+ catch (Throwable t) {
+ asyncHolder.setException(t);
+ return;
+ }
+ if (!asyncHolder.set(joinHolder)) {
+ // awaiter closed the wrapper while we were producing the holder; close it so the base holder doesn't leak
+ joinHolder.close();
+ }
+ });
+ return asyncHolder;
+ }
+
+ /**
+ * Physical columns to pass to the base cursor: a copy of the spec's physical columns plus any columns required by
+ * the combined filter (null when the spec didn't declare physical columns, meaning "all"). Shared by the no-clauses
+ * and join paths.
+ */
+ @Nullable
+ private static Set computeBasePhysicalColumns(CursorBuildSpec spec, @Nullable Filter combinedFilter)
+ {
final Set physicalColumns = spec.getPhysicalColumns() != null
? new HashSet<>(spec.getPhysicalColumns())
: null;
-
if (physicalColumns != null && combinedFilter != null) {
for (String column : combinedFilter.getRequiredColumns()) {
if (!spec.getVirtualColumns().exists(column)) {
@@ -90,129 +153,135 @@ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
}
}
}
+ return physicalColumns;
+ }
- if (clauses.isEmpty()) {
- // if there are no clauses, we can just use the base cursor directly if we apply the combined filter
- final CursorBuildSpec newSpec = CursorBuildSpec.builder(spec)
- .setFilter(combinedFilter)
- .setPhysicalColumns(physicalColumns)
- .build();
- return baseCursorFactory.makeCursorHolder(newSpec);
- }
-
- // else we need to wipe out the grouping, aggregations, and ordering
-
- return new CursorHolder()
- {
- final Closer joinablesCloser = Closer.create();
-
- /**
- * Typically the same as {@link HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when
- * an unnest datasource is layered on top of a join datasource.
- */
- final JoinFilterPreAnalysis actualPreAnalysis;
-
- /**
- * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link #actualPreAnalysis} and
- * {@link HashJoinSegmentCursorFactory#baseFilter}.
- */
- final JoinFilterSplit joinFilterSplit;
-
- /**
- * Cursor holder for {@link HashJoinSegmentCursorFactory#baseCursorFactory}.
- */
- final CursorHolder baseCursorHolder;
-
- {
- // Filter pre-analysis key implied by the call to "makeCursorHolder". We need to sanity-check that it matches
- // the actual pre-analysis that was done. Note: we could now infer a rewrite config from the "makeCursorHolder"
- // call (it requires access to the query context which we now have access to since the move away from
- // CursorFactory) but this code hasn't been updated to sanity-check it, so currently we are still skipping it
- // by re-using the one present in the cached key.
- final JoinFilterPreAnalysisKey keyIn =
- new JoinFilterPreAnalysisKey(
- joinFilterPreAnalysis.getKey().getRewriteConfig(),
- clauses,
- spec.getVirtualColumns(),
- combinedFilter
- );
-
- final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey();
- if (keyIn.equals(keyCached)) {
- // Common case: key used during filter pre-analysis (keyCached) matches key implied by makeCursorHolder call
- // (keyIn).
- actualPreAnalysis = joinFilterPreAnalysis;
- } else {
- // Less common case: key differs. Re-analyze the filter. This case can happen when an unnest datasource is
- // layered on top of a join datasource.
- actualPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn);
- }
+ /**
+ * Base cursor spec for the no-clauses case: the original spec with the combined filter and base physical columns.
+ */
+ private static CursorBuildSpec noClausesBaseSpec(
+ CursorBuildSpec spec,
+ @Nullable Filter combinedFilter,
+ @Nullable Set physicalColumns
+ )
+ {
+ return CursorBuildSpec.builder(spec)
+ .setFilter(combinedFilter)
+ .setPhysicalColumns(physicalColumns)
+ .build();
+ }
- joinFilterSplit = JoinFilterAnalyzer.splitFilter(
- actualPreAnalysis,
- baseFilter
+ /**
+ * Run the (CPU-only) join filter pre-analysis and compute the base-table cursor build spec. Returns the analysis
+ * results needed at cursor-construction time ({@link JoinCursorPlan}). {@code physicalColumns} is mutated in place
+ * (it accumulates base-filter, pre-join virtual-column, and clause-condition columns, minus the join prefixes).
+ */
+ private JoinCursorPlan planJoinCursor(
+ CursorBuildSpec spec,
+ @Nullable Filter combinedFilter,
+ @Nullable Set physicalColumns
+ )
+ {
+ // Filter pre-analysis key implied by the call to "makeCursorHolder". We need to sanity-check that it matches
+ // the actual pre-analysis that was done. Note: we could now infer a rewrite config from the "makeCursorHolder"
+ // call (it requires access to the query context which we now have access to) but this code hasn't been updated to
+ // sanity-check it, so currently we are still skipping it by re-using the one present in the cached key.
+ final JoinFilterPreAnalysisKey keyIn =
+ new JoinFilterPreAnalysisKey(
+ joinFilterPreAnalysis.getKey().getRewriteConfig(),
+ clauses,
+ spec.getVirtualColumns(),
+ combinedFilter
);
- // start with a full scan clipped to interval
- final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder =
- CursorBuildSpec.builder()
- .setInterval(spec.getInterval())
- .setQueryContext(spec.getQueryContext())
- .setQueryMetrics(spec.getQueryMetrics());
-
- // retain time ordering if preferred
- Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering());
- if (timeOrder == Order.DESCENDING) {
- cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder());
- } else if (timeOrder == Order.ASCENDING) {
- cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder());
- }
+ final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey();
+ final JoinFilterPreAnalysis actualPreAnalysis;
+ if (keyIn.equals(keyCached)) {
+ // Common case: key used during filter pre-analysis (keyCached) matches key implied by makeCursorHolder call
+ // (keyIn).
+ actualPreAnalysis = joinFilterPreAnalysis;
+ } else {
+ // Less common case: key differs. Re-analyze the filter. This case can happen when an unnest datasource is
+ // layered on top of a join datasource.
+ actualPreAnalysis = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn);
+ }
- // add pushdown filters if present
- if (joinFilterSplit.getBaseTableFilter().isPresent()) {
- cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get());
- }
- final VirtualColumns preJoinVirtualColumns = VirtualColumns.fromIterable(
- Iterables.concat(
- Sets.difference(
- ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()),
- joinFilterPreAnalysis.getPostJoinVirtualColumns()
- ),
- joinFilterSplit.getPushDownVirtualColumns()
- )
- );
- cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns);
-
- // add all base table physical columns if they were originally set
- if (physicalColumns != null) {
- if (joinFilterSplit.getBaseTableFilter().isPresent()) {
- for (String column : joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) {
- if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) {
- physicalColumns.add(column);
- }
- }
- }
- for (VirtualColumn virtualColumn : preJoinVirtualColumns.getVirtualColumns()) {
- for (String column : virtualColumn.requiredColumns()) {
- if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) {
- physicalColumns.add(column);
- }
- }
- }
- final Set prefixes = new HashSet<>();
- for (JoinableClause clause : clauses) {
- prefixes.add(clause.getPrefix());
- physicalColumns.addAll(clause.getCondition().getRequiredColumns());
+ final JoinFilterSplit joinFilterSplit = JoinFilterAnalyzer.splitFilter(actualPreAnalysis, baseFilter);
+
+ // start with a full scan clipped to interval
+ final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder =
+ CursorBuildSpec.builder()
+ .setInterval(spec.getInterval())
+ .setQueryContext(spec.getQueryContext())
+ .setQueryMetrics(spec.getQueryMetrics());
+
+ // retain time ordering if preferred
+ Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering());
+ if (timeOrder == Order.DESCENDING) {
+ cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder());
+ } else if (timeOrder == Order.ASCENDING) {
+ cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder());
+ }
+
+ // add pushdown filters if present
+ if (joinFilterSplit.getBaseTableFilter().isPresent()) {
+ cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get());
+ }
+ final VirtualColumns preJoinVirtualColumns = VirtualColumns.fromIterable(
+ Iterables.concat(
+ Sets.difference(
+ ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()),
+ joinFilterPreAnalysis.getPostJoinVirtualColumns()
+ ),
+ joinFilterSplit.getPushDownVirtualColumns()
+ )
+ );
+ cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns);
+
+ // add all base table physical columns if they were originally set
+ if (physicalColumns != null) {
+ if (joinFilterSplit.getBaseTableFilter().isPresent()) {
+ for (String column : joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) {
+ if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) {
+ physicalColumns.add(column);
}
- for (String prefix : prefixes) {
- physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, prefix));
+ }
+ }
+ for (VirtualColumn virtualColumn : preJoinVirtualColumns.getVirtualColumns()) {
+ for (String column : virtualColumn.requiredColumns()) {
+ if (!spec.getVirtualColumns().exists(column) && !preJoinVirtualColumns.exists(column)) {
+ physicalColumns.add(column);
}
- cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns);
}
-
- baseCursorHolder = joinablesCloser.register(baseCursorFactory.makeCursorHolder(cursorBuildSpecBuilder.build()));
}
+ final Set prefixes = new HashSet<>();
+ for (JoinableClause clause : clauses) {
+ prefixes.add(clause.getPrefix());
+ physicalColumns.addAll(clause.getCondition().getRequiredColumns());
+ }
+ for (String prefix : prefixes) {
+ physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, prefix));
+ }
+ cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns);
+ }
+
+ return new JoinCursorPlan(actualPreAnalysis, joinFilterSplit, cursorBuildSpecBuilder.build());
+ }
+ /**
+ * Build the join {@link CursorHolder} on top of a base-table holder. {@code baseCursorHolder} must already be
+ * registered with {@code joinablesCloser} (the sync path registers it when built; the async path registers the
+ * released holder); closing the returned holder closes {@code joinablesCloser} (the base holder + per-cursor join
+ * matchers created in {@link CursorHolder#asCursor}).
+ */
+ private CursorHolder joinCursorHolder(
+ JoinCursorPlan plan,
+ Closer joinablesCloser,
+ CursorHolder baseCursorHolder
+ )
+ {
+ return new CursorHolder()
+ {
@Override
public Cursor asCursor()
{
@@ -230,8 +299,8 @@ public Cursor asCursor()
return PostJoinCursor.wrap(
retVal,
- VirtualColumns.fromIterable(actualPreAnalysis.getPostJoinVirtualColumns()),
- joinFilterSplit.getJoinTableFilter().orElse(null)
+ VirtualColumns.fromIterable(plan.actualPreAnalysis.getPostJoinVirtualColumns()),
+ plan.joinFilterSplit.getJoinTableFilter().orElse(null)
);
}
@@ -320,4 +389,39 @@ private List computeOrdering(final List baseOrdering)
return limit == baseOrdering.size() ? baseOrdering : baseOrdering.subList(0, limit);
}
+
+ /**
+ * Outputs of {@link #planJoinCursor}: the (possibly re-computed) join filter pre-analysis and split needed when the
+ * join cursor is actually constructed, plus the base-table cursor build spec used to open the left-side holder.
+ */
+ private static final class JoinCursorPlan
+ {
+ /**
+ * Typically the same as {@link HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when an unnest
+ * datasource is layered on top of a join datasource.
+ */
+ private final JoinFilterPreAnalysis actualPreAnalysis;
+
+ /**
+ * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link #actualPreAnalysis} and
+ * {@link HashJoinSegmentCursorFactory#baseFilter}.
+ */
+ private final JoinFilterSplit joinFilterSplit;
+
+ /**
+ * Build spec for the left-side {@link HashJoinSegmentCursorFactory#baseCursorFactory} holder.
+ */
+ private final CursorBuildSpec baseBuildSpec;
+
+ private JoinCursorPlan(
+ JoinFilterPreAnalysis actualPreAnalysis,
+ JoinFilterSplit joinFilterSplit,
+ CursorBuildSpec baseBuildSpec
+ )
+ {
+ this.actualPreAnalysis = actualPreAnalysis;
+ this.joinFilterSplit = joinFilterSplit;
+ this.baseBuildSpec = baseBuildSpec;
+ }
+ }
}
diff --git a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
index 94654f58c851..546bd55f0373 100644
--- a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
@@ -27,6 +27,7 @@
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.NoopQueryableIndex;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -88,7 +89,7 @@ public Interval getDataInterval()
public T as(@Nonnull Class clazz)
{
if (CursorFactory.class.equals(clazz)) {
- return (T) new CursorFactory()
+ return (T) new ResidentCursorFactory()
{
@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
diff --git a/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
new file mode 100644
index 000000000000..e9e9305e6f3d
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test {@link CursorFactory} that produces its async cursor holder on demand: {@link #makeCursorHolderAsync} hands back
+ * an unfinished {@link AsyncCursorHolder}, and {@link #complete()} fills it (by building the delegate's holder for the
+ * last requested spec) so a test can drive the await transition deterministically. {@link #canceled} flips when the
+ * returned holder is closed before it is completed. Used to exercise the {@code makeCursorHolderAsync} override of
+ * wrapping cursor factories (e.g. unnest, join) against a base that loads asynchronously.
+ */
+public class DeferredCursorFactory implements CursorFactory
+{
+ private final CursorFactory delegate;
+ public final AtomicBoolean canceled = new AtomicBoolean(false);
+ private AsyncCursorHolder pending;
+ private CursorBuildSpec pendingSpec;
+
+ public DeferredCursorFactory(CursorFactory delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+ {
+ return delegate.makeCursorHolder(spec);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ pendingSpec = spec;
+ pending = new AsyncCursorHolder(() -> canceled.set(true));
+ return pending;
+ }
+
+ /**
+ * Completes the most recent {@link #makeCursorHolderAsync} holder by building the delegate's holder for the spec it
+ * was called with.
+ */
+ public void complete()
+ {
+ pending.set(delegate.makeCursorHolder(pendingSpec));
+ }
+
+ @Override
+ public RowSignature getRowSignature()
+ {
+ return delegate.getRowSignature();
+ }
+
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return delegate.getColumnCapabilities(column);
+ }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
index 6a4cadcc79e8..54564621b22d 100644
--- a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
@@ -256,6 +256,62 @@ public void test_unnest_factory_basic()
}
}
+ @Test
+ public void test_unnest_factory_async_awaits_base_then_produces_unnest_cursor()
+ {
+ // base produces its holder asynchronously (mimics a partial segment downloading its columns). The unnest async
+ // holder must stay not-ready until the base completes, then yield the same unnested rows as the sync path.
+ final DeferredCursorFactory base = new DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY);
+ final UnnestCursorFactory unnest = new UnnestCursorFactory(
+ base,
+ new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()),
+ null
+ );
+
+ final AsyncCursorHolder asyncHolder = unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ try {
+ Assert.assertFalse("unnest holder must wait for the base to complete", asyncHolder.isReady());
+
+ base.complete();
+ Assert.assertTrue("unnest holder becomes ready once the base completes", asyncHolder.isReady());
+
+ try (final CursorHolder cursorHolder = asyncHolder.release()) {
+ final Cursor cursor = cursorHolder.asCursor();
+ final DimensionSelector dimSelector =
+ cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+ final List