Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.QueryableIndexColumnSelectorFactory;
import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.VirtualColumns;
Expand All @@ -64,7 +65,7 @@
*
* @see RowFrameCursorFactory the row-based version
*/
public class ColumnarFrameCursorFactory implements CursorFactory
public class ColumnarFrameCursorFactory implements ResidentCursorFactory
{
private final Frame frame;
private final RowSignature signature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.column.ColumnCapabilities;
Expand All @@ -49,7 +50,7 @@
*
* @see ColumnarFrameCursorFactory the columnar version
*/
public class RowFrameCursorFactory implements CursorFactory
public class RowFrameCursorFactory implements ResidentCursorFactory
{
private final Frame frame;
private final FrameReader frameReader;
Expand Down

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to make implementations of CursorFactory implement this method, why offer a default at all?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CursorFactory is sort indirectly a @PublicApi via Segment#as and also a sort of very low level extension point for custom segment implementations, so adding a new method with no default means extension writers would need to implement it and recompile for functionality that is currently at least off by default, so it seemed nicer to implement it so that they don't need to recompile immediately, making this async stuff more opt-in. I was thinking if something changes that makes it so that this path can be hit by default then I would change this to remove the default because at that point it really would need to be implemented

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks for the context!

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment;

import org.apache.druid.error.DruidException;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;

Expand All @@ -35,14 +36,22 @@ public interface CursorFactory extends ColumnInspector
/**
* Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for cursor factories that may need to do I/O
* (e.g. download column data from deep storage) before they can serve a cursor. Callers running on threads that
* must not block should use this.
* must not block use this rather than {@link #makeCursorHolder}.
* <p>
* The default implementation completes synchronously by delegating to {@link #makeCursorHolder(CursorBuildSpec)},
* which keeps every existing implementation async-correct without changes.
* There is intentionally no working default: this method must be explicitly implemented to participate in
* async-aware engines (MSQ). A factory whose source is always fully-resident and never needs to block while waiting
* on some other thread to perform work can implement {@link ResidentCursorFactory} instead of {@link CursorFactory}
* directly, which provides a default implementation of this method that wraps
* {@link #makeCursorHolder(CursorBuildSpec)}.
*/
default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
{
return AsyncCursorHolder.completed(makeCursorHolder(spec));
throw DruidException.defensive(
"makeCursorHolderAsync is not implemented by [%s]. Override it (or implement ResidentCursorFactory): return "
+ "AsyncCursorHolder.completed(makeCursorHolder(spec)) if the source is always fully resident, or build/await "
+ "the cursor holder asynchronously if it supports load on demand.",
getClass().getName()
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import java.util.LinkedHashSet;
import java.util.List;

public class QueryableIndexCursorFactory implements CursorFactory
public class QueryableIndexCursorFactory implements ResidentCursorFactory
{
private final QueryableIndex index;
private final TimeBoundaryInspector timeBoundaryInspector;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

/**
* A {@link CursorFactory} whose {@link #makeCursorHolder} never blocks on I/O, i.e. a fully-resident / in-memory
* source with no on-demand loading. Implementing this interface, rather than {@link CursorFactory} directly, declares
* that intent and supplies the trivial {@link #makeCursorHolderAsync} implementation: the holder is built synchronously
* and returned already completed.
* <p>
* Factories backed by, or wrapping, an on-demand source must implement {@link CursorFactory} directly and provide a
* real {@link CursorFactory#makeCursorHolderAsync} that builds/awaits the holder asynchronously so they never block the
* calling thread.
*/
public interface ResidentCursorFactory extends CursorFactory
{
@Override
default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
{
return AsyncCursorHolder.completed(makeCursorHolder(spec));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import javax.annotation.Nullable;
import java.util.List;

public class RowBasedCursorFactory<RowType> implements CursorFactory
public class RowBasedCursorFactory<RowType> implements ResidentCursorFactory
{
private final Sequence<RowType> rowSequence;
private final RowAdapter<RowType> rowAdapter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,33 +76,86 @@ public UnnestCursorFactory(

@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter());

final Closer closer = Closer.create();
// base holder is built lazily on first asCursor()/getOrdering() and registered for close at that point
final Supplier<CursorHolder> baseHolderSupplier = Suppliers.memoize(
() -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
);
return unnestCursorHolder(spec, filterSplit, closer, baseHolderSupplier);
}

@Override
public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
{
final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, unnestColumn, filterSplit.getBaseTableFilter());

// Build the base-table holder asynchronously (a partial base segment downloads its required columns here), then
// wrap the ready holder in the unnest holder. Closing the returned holder before it's ready cancels the base load.
final AsyncCursorHolder baseAsync = baseCursorFactory.makeCursorHolderAsync(unnestBuildSpec);
final AsyncCursorHolder asyncHolder = new AsyncCursorHolder(baseAsync::close);
baseAsync.addReadyCallback(() -> {
final CursorHolder unnestHolder;
try {
// release() transfers ownership of the base holder to us (and surfaces a base-load failure as its cause); from
// here the unnest holder owns closing it. Construction below can't throw, so the catch only fires on a base
// failure or a cancel race (baseAsync already closed), neither of which leaves a base holder to leak.
final CursorHolder baseHolder = baseAsync.release();
final Closer closer = Closer.create();
closer.register(baseHolder);
unnestHolder = unnestCursorHolder(spec, filterSplit, closer, Suppliers.ofInstance(baseHolder));
}
catch (Throwable t) {
asyncHolder.setException(t);
return;
}
if (!asyncHolder.set(unnestHolder)) {
// awaiter closed the wrapper while we were producing the holder; close it so the base holder doesn't leak
unnestHolder.close();
}
});
return asyncHolder;
}

/**
* Split the spec's filters into base-table and post-unnest filters (see
* {@link #computeBaseAndPostUnnestFilters}). Cheap and metadata-only; shared by the sync and async holder paths.
*/
private UnnestFilterSplit computeFilterSplit(CursorBuildSpec spec)
{
final String input = getUnnestInputIfDirectAccess(unnestColumn);
final UnnestFilterSplit filterSplit = computeBaseAndPostUnnestFilters(
return computeBaseAndPostUnnestFilters(
spec.getFilter(),
filter != null ? filter.toFilter() : null,
spec.getVirtualColumns(),
unnestColumn,
input,
input == null ? null : spec.getVirtualColumns().getColumnCapabilitiesWithFallback(baseCursorFactory, input)
);
final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(
spec,
unnestColumn,
filterSplit.getBaseTableFilter()
);
}

/**
* Build the unnest {@link CursorHolder} on top of a base-table holder. {@code baseHolderSupplier} provides the base
* holder: the sync path builds it lazily on first use and registers it with {@code closer}, while the async path
* supplies an already-materialized holder pre-registered with {@code closer}.
*/
private CursorHolder unnestCursorHolder(
CursorBuildSpec spec,
UnnestFilterSplit filterSplit,
Closer closer,
Supplier<CursorHolder> baseHolderSupplier
)
{
return new CursorHolder()
{
final Closer closer = Closer.create();
final Supplier<CursorHolder> cursorHolderSupplier = Suppliers.memoize(
() -> closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
);

@Override
public Cursor asCursor()
{
final Cursor cursor = cursorHolderSupplier.get().asCursor();
final Cursor cursor = baseHolderSupplier.get().asCursor();
if (cursor == null) {
return null;
}
Expand Down Expand Up @@ -135,7 +188,7 @@ public Cursor asCursor()
@Override
public List<OrderBy> getOrdering()
{
return computeOrdering(cursorHolderSupplier.get().getOrdering());
return computeOrdering(baseHolderSupplier.get().getOrdering());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.druid.segment.ConcatenatingCursor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.EmptyCursorHolder;
import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
Expand All @@ -50,7 +50,7 @@
import java.util.Arrays;
import java.util.List;

public class IncrementalIndexCursorFactory implements CursorFactory
public class IncrementalIndexCursorFactory implements ResidentCursorFactory
{
private static final ColumnCapabilities.CoercionLogic COERCE_LOGIC =
new ColumnCapabilities.CoercionLogic()
Expand Down
Loading
Loading