[FLINK-38544][checkpoint] Support spilling for checkpointing during recovery#28517
Open
1996fanrui wants to merge 6 commits into
Open
[FLINK-38544][checkpoint] Support spilling for checkpointing during recovery#285171996fanrui wants to merge 6 commits into
1996fanrui wants to merge 6 commits into
Conversation
…m toBeConsumedBuffers
Split the single toBeConsumedBuffers queue into two queues with disjoint
responsibilities:
- recoveredBuffers (new): holds buffers migrated from RecoveredInputChannel
during construction; consumed by getNextRecoveredBuffer() which retains
the priority-event interleaving and last-buffer dynamic next-data-type
detection introduced by FLINK-39018.
- toBeConsumedBuffers (existing): reverted to its pre-FLINK-39018 role of
holding FullyFilledBuffer partial-buffer splits only. The recovery-aware
early branch in getNextBuffer() and the checkpointStarted inflight scan
no longer touch this queue.
Restores the checkState(toBeConsumedBuffers.isEmpty()) guard in
requestSubpartitions() (removed by cebc174). hasPendingPriorityEvent,
notifyPriorityEvent, and the constructor signature are unchanged.
Pure refactor: no public API change, no new tests; verified by the 9 existing
LocalInputChannelTest regression cases.
(cherry picked from commit 292cc4b)
(cherry picked from commit 7fbfc78)
…lling v2 - Adds BufferRequester, RecoverableInputChannel, RecoveryCheckpointTrigger interfaces with their final signatures (including getChannelInfo on RecoverableInputChannel and NO_OP singleton on RecoveryCheckpointTrigger). - Adds RecoveryCheckpointBarrier sentinel + DiskSnapshot data class with final 3-arg constructor signature and Chunk / StartPos / empty() helpers. - ChannelStateWriter gains addInputDataFromSpill and peekWriteResult default methods so all callers can compile against the interface without the dispatcher implementation landing in this phase. - RecoveredInputChannel#releaseAllResources visibility: package-private -> public References to SpillFile in DiskSnapshot's constructor are forward references; SpillFile itself lands in Phase 3. Each phase commit only needs to compile as a whole tree at the final commit, not in isolation. Design: requirements/38544/phase1_interfaces/design.md (cherry picked from commit 98c7b42)
- Local/Remote InputChannel implement RecoverableInputChannel from Phase 1 - recoveredBuffers reshaped to Deque<Buffer>; allRecoveredBuffersDelivered flag - getNextBuffer() unified under a single inRecovery predicate - checkpointStarted split into mutually-exclusive in-recovery / not-in-recovery - stateConsumedFuture triggered by (allRecoveredBuffersDelivered && queue empty) - RecoveredInputChannel.toInputChannel migrates via the new push interface; the initialRecoveredBuffers constructor parameter is gone. - LocalInputChannel.getNextRecoveredBuffer helper deleted Design: requirements/38544/phase2_input_channel/design.md (cherry picked from commit 8290409)
- New SpillFile: append-only segmented disk store with 64 MiB segments, reference counter + cleanedUp guard, and Snapshot view over segments and entries. All public signatures (append, snapshot, readBytesAt, acquire, release, isClosed) land in this commit; later phases only fill in bodies. - New FilteredBufferWriter: prefilter + postfilter buffer accumulator, flushing the post-filter buffer to disk on rotation. - New SpillFileWriter: thin facade exposing SpillFile lifecycle to filter callers. - RecoveredChannelStateHandler.recover filter branch routes output to a SpillFile instead of channel.onRecoveredStateBuffer; the accumulator's prefilter and postfilter buffers are sourced from the source channel's exclusive pool (no heap fallback). - InputChannelRecoveredStateHandler exposes getProducedSpillFile so Phase 4 drain wiring can pick up the frozen file after filter completes; spill-tmp-directories argument is required (no backward-compat shim). Design: requirements/38544/phase3_spill_writer/design.md (cherry picked from commit 2cbbbd6)
… removal - New SpillFileReader implements RecoveryCheckpointTrigger + Closeable. drain(): buffer alloc + disk read outside lock; deliver + offset advance inside lock. snapshotAndInsertBarriers(cpId): atomic startPos snapshot + per-channel barrier insert. Constructor derives the InputChannelInfo map internally; bodies pair acquire/release against SpillFile's ref counter. - New RecoveredChannelBufferRequester delegates to RecoveredInputChannel pool. - RecoveredInputChannel.requestBufferBlocking heap fallback removed (no more MemorySegmentFactory.allocateUnpooledSegment; OOM path eliminated). - channelIOExecutor wired: filter-on submits drain after conversion completes; exceptions bubble via StreamTask.asyncExceptionHandler. Design: requirements/38544/phase4_spill_reader/design.md (cherry picked from commit 1315d38)
- ChannelState dispatcher onCheckpointStartedForAllInputs implements Step 1 (snapshotAndInsertBarriers) -> Step 2 (per-input checkpointStarted) -> Step 3 (addInputDataFromSpill) -> cpId-completion release callback. - Hook AlternatingWaitingForFirstBarrierUnaligned.barrierReceived and AlternatingCollectingBarriers.alignedCheckpointTimeout into the dispatcher. - ChannelStateWriterImpl.addInputDataFromSpill: async demux by Chunk.channelInfo, empty snapshot inline early return, failures propagate via ChannelStateWriteResult. - Stream task pipelines (One/Two/Multiple) wire ChannelState through the InputProcessorUtil + SingleCheckpointBarrierHandler so the dispatcher hook reaches the right barrier-handler instance. - ITCases (relocated under flink-runtime to share the package with SpillFile): rescale + filter + large record OOM regression, UC during recovery. FLINK-38544 spilling v2 feature complete. Design: requirements/38544/phase5_coordination/design.md (cherry picked from commit 7badbd2)
Collaborator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What is the purpose of the change
(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)
Brief change log