Skip to content

feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397

Merged
clintropolis merged 7 commits into
apache:masterfrom
clintropolis:async-cursor-factory
May 8, 2026
Merged

feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397
clintropolis merged 7 commits into
apache:masterfrom
clintropolis:async-cursor-factory

Conversation

@clintropolis
Copy link
Copy Markdown
Member

@clintropolis clintropolis commented May 1, 2026

Description

This PR adds an async variant of CursorFactory.makeCursorHolder which returns a new AsyncCursorHolder type that provides lifecycle management of the async load process and migrates MSQ frame processors to use it, so that future cursor factories backed by partial / lazy-loaded storage can perform I/O without blocking the processing thread. This PR introduces no partial-loading behavior on its own, instead it just establishes the integration shape for more upcoming partial-segment work that will follow this PR. Every existing cursor factory remains synchronous via the default implementation which just calls AsyncCursorHolder.completed(makeCursorHolder(spec)).

Worth noting, I am planning some changes in a separate PR for V10 segments so that a partial segment can provide a TimeBoundaryInspector for usage by GroupByPreShuffleFrameProcessor without needing to download any column data so we can avoid making an async variant of it.

changes:

  • add CursorFactory.makeCursorHolderAsync(CursorBuildSpec) for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning AsyncCursorHolder.completed(makeCursorHolder(spec)) so existing implementations remain async-correct without changes
  • add GroupingEngine.makeCursorHolderAsync returning AsyncCursorHolder, and extracting shared processWithCursorHolder helper from GroupingEngine.process(), so that a caller which can yield and then resume can wait for the CursorHolder to be ready and later process it
  • migrate ScanQueryFrameProcessor.runWithSegment to call makeCursorHolderAsync and yield via ReturnOrAwait while the load is pending
  • migrate GroupByPreShuffleFrameProcessor.runWithSegment cursor path to call GroupingEngine.makeCursorHolderAsync and yield via ReturnOrAwait while loading

… use it

changes:
* add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `Futures.immediateFuture(makeCursorHolder(spec))` so existing implementations remain async-correct without changes
* add `GroupingEngine.processAsync` returning `ListenableFuture<Sequence<ResultRow>>` that uses `makeCursorHolderAsync`, extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()`
* migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait.awaitAllFutures` while the future is  pending
* migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.processAsync` and yield via `ReturnOrAwait.awaitAllFutures`
@github-actions github-actions Bot added Area - Batch Ingestion Area - Segment Format and Ser/De Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels May 1, 2026
* Asynchronous variant of {@link #process} that obtains the {@link CursorHolder} from
* {@link CursorFactory#makeCursorHolderAsync} so callers running on threads that must not block on I/O
* (e.g. MSQ frame processors) can yield via {@link org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
* until the cursor holder is ready.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This javadoc feels a bit too specific to me. It's enough to say that this is an asynchronous variant of process that uses CursorFactory#makeCursorHolderAsync to avoid blocking on acquisition of CursorHolder.

final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics);
return FutureUtils.transform(
cursorFactory.makeCursorHolderAsync(buildSpec),
cursorHolder -> processWithCursorHolder(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this end up doing the main processing in whatever thread happened to resolve the makeCursorHolderAsync future? (Because FutureUtils.transform uses a direct executor.) Possibly that'd be in a virtual storage loader thread.

I think we'll need to either adjust this method to accept a ListenableExecutorService that will be used to run processWithCursorHolder, or break it up so callers first call groupingEngine.makeCursorHolderAsync and then call groupingEngine.processCursorHolder.

bufferHolder = bufferPool.take();
}
catch (Throwable e) {
CloseableUtils.closeAndWrapExceptions(cursorHolder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder) will properly retain exceptions from closing cursorHolder as suppressed exceptions on e.

Although, there's probably some way of structuring this code to use the Closer to handle this better. Like, create the Closer first, register cursorHolder right after it's created, start the main try, then register bufferHolder after it's acquired. If the buffer fails to be acquired then the catch will close the Closer and release the cursorHolder.

);
}

cursorHolderFuture = cursorFactory.makeCursorHolderAsync(
Copy link
Copy Markdown
Contributor

@gianm gianm May 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling futures that return closeable things is tricky. Maybe we can improve it by changing the return of makeCursorHolderAsync from ListenableFuture<CursorHolder> to AsyncCursorHolder that is closeable and has methods get() (blocks if not ready), close() (closes the resource no matter where it is in its lifecycle), and addReadyCallback(Runnable) (used by nonblocking callers to learn when get is ready).

The problem with the future approach is that once this call site gets the cursorHolderFuture, it's responsible for monitoring the future and closing cursorHolder if the future resolves successfully. This has to be done even if the processor is canceled before it has a chance to run through normally. It requires extra carefulness and is easy to mess up.

One way it can be handled is by attaching a callback in cleanup that closes the holder in onSuccess, like:

if (cursorHolderFuture != null) {
  Futures.addCallback(
    cursorHolderFuture,
    new FutureCallback<>() {
      void onSuccess(CursorHolder holder) { holder.close(); }
      void onFailure(Throwable t) { /* nothing */ }
    }
  );
}

But even with this structure, it's important watch out for pitfalls. A big one is that you can never cancel a future that returns a closeable thing. Cancellation of the future can cause the object to be orphaned and eventually GCed without being closed (if the object is created before cancellation has a chance to interrupt whatever was creating it).

If we can avoid these problems by returning something directly closeable (like this AsyncCursorHolder idea) rather than future-of-closeable, then the caller code becomes simpler.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

This is an automated review by Codex GPT-5

CursorBuildSpec buildSpec
)
{
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Close cursorHolder if query config override throws

processWithCursorHolder now receives an already-created CursorHolder, but computes querySpecificConfig before entering any cleanup-protected block. If withOverrides throws, for example due to an invalid groupBy query context value, the CursorHolder returned by makeCursorHolderAsync/makeCursorHolder is never closed. Move this config resolution before acquiring the holder, or wrap it so cursorHolder is closed on every pre-sequence failure.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.


This is an automated review by Codex GPT-5

* regardless of where the underlying load is in its lifecycle.
* <p>
* The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable)
* makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canceling (spelling)

* can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the
* load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves.
* <p>
* Typical usage from a non-blocking caller (e.g. an MSQ frame processor):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to include (e.g. an MSQ frame processor) here. I would also edit the code sample to be less MSQ specific.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example code got a bit weird trying to get rid of ReturnOrAwait, so instead of tried to frame it that the example is using 'ReturnOrAwait' to show how you're supposed to use this thing, since the important part is the yield-then-resume when ready pattern, which i think makes it feel less like this is a thing for MSQ than the last iteration did?

Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
}

private final ListenableFuture<CursorHolder> future;
private boolean closed = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use @GuardedBy on closed and disposed, since synchronization is crucial given how they're used.

* <li>Already closed: no-op.</li>
* </ul>
* Note that closing does NOT cancel an in-flight load — cancelling a future-of-Closeable is the exact lifecycle
* hazard this class exists to prevent. The load completes normally and the resulting holder is closed promptly.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really we should structure this to be capable of canceling the in-flight load. I think to make it work we'll want to not base this class on futures at all. Something like:

  • Accept a Runnable canceler in the constructor
  • Replace the future field with Either<Throwable, T>
  • Expose set and setException methods that return boolean (true for set accepted, false for set not accepted)
  • In close, call canceler if the set methods haven't been called yet
  • Whatever provides the actual CursorHolder should close it if set returns false

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nicer, though I ended up not using Either for the field since it didn't save me much and was kind of awkward since internally we are always dealing with one or the other (i did use it to make it easier to share a set method internally though)

Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
cb.run();
}
catch (Throwable ignored) {
// Best-effort; one bad callback shouldn't break others.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.warn something, otherwise a callback failure will be completely silent, and this could make debugging something difficult down the road. Especially since the likely outcome of an readiness callback failing would be that things just get stalled out.

}
if (error != null) {
// pass through as is
if (error instanceof RuntimeException runtime) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or Error?

* <p>
* Callbacks registered via {@link #addReadyCallback} fire outside the lock to avoid re-entrancy deadlocks.
*/
public boolean set(CursorHolder holder)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should reject null holder with a defensive exception.

* itself. Throws {@link DruidException} if the load was already completed (from prior calls to this method or
* {@link #setException}).
* <p>
* Callbacks registered via {@link #addReadyCallback} fire outside the lock to avoid re-entrancy deadlocks.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean? Seems like there's only one lock (this) and it should be fine to re-enter it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i guess bad wording; like its more like avoiding problems like if the callback needs to hold some other lock we don't have any weird ordering issues like if that lock is some lock shared between callback thread and producing thread, will try to clarify

if (result == null) {
throw DruidException.defensive("AsyncCursorHolder is not ready yet");
}
disposed = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set result to null here to allow GC? The AsyncCursorHolder can live longer than the reference from release() if the AsyncCursorHolder is added to a high-level Closer or withBaggage or something like that.

final Runnable cancelerToRun;
synchronized (this) {
if (closed) {
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of idempotent close, how about throwing a defensive error here? Double-close can be indicative of resource management problems, and an exception would help us identify those potential issues.

If you do this then the javadoc should be updated too.

return;
}
if (result != null) {
// Result is here and no one has released it; we close it.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic would end up being simpler if you had nulled out result in release. It would then just be: always close result if it is nonnull, because if it's nonnull, we own it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the nulling out of result in release(), however i still have the disposed flag because otherwise calling release would allow something to be able to call set/setException again which seems unchill, so I've kept this short-circuit on disposed here as is and haven't really changed this part.

We do essentially always call close on the result if we get to this part where we check not null, stuff maybe looks a bit more funny because we actually call close and the canceler outside of the synchronized block?

@clintropolis clintropolis merged commit 8ce2f36 into apache:master May 8, 2026
64 of 65 checks passed
@clintropolis clintropolis deleted the async-cursor-factory branch May 8, 2026 10:39
@github-actions github-actions Bot added this to the 38.0.0 milestone May 8, 2026
gianm added a commit that referenced this pull request May 11, 2026
Repairs a collision between #19423 and #19397. With cursor-building
and processing decoupled, both halves of the operation need the
optimized aggregators.
317brian pushed a commit to 317brian/druid that referenced this pull request May 11, 2026
… use it (apache#19397)

changes:
* add `AsyncCursorHolder` to manage async loading lifecycle for a cursor holder until ownership of the `CursorHolder` it produces can be transferred to the consumer (see javadoc for details)
* add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `AsyncCursorHolder.completed(makeCursorHolder(spec))` so existing implementations remain async-correct without changes
* add `GroupingEngine.makeCursorHolderAsync` returning `AsyncCursorHolder`, and extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()`, so that a caller which can yield and then resume can wait for the `CursorHolder` to be ready and later process it
* migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait` while the load is pending
* migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.makeCursorHolderAsync` and yield via `ReturnOrAwait` while loading
317brian pushed a commit to 317brian/druid that referenced this pull request May 11, 2026
…9445)

Repairs a collision between apache#19423 and apache#19397. With cursor-building
and processing decoupled, both halves of the operation need the
optimized aggregators.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Segment Format and Ser/De

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants