HDDS-15412. [DataNode] Disk volume-specific container replication thread pool#10595
Draft
jojochuang wants to merge 6 commits into
Draft
HDDS-15412. [DataNode] Disk volume-specific container replication thread pool#10595jojochuang wants to merge 6 commits into
jojochuang wants to merge 6 commits into
Conversation
Change-Id: Ifa6911e468f073ac2cc848edc3da20dc377c983c
…ction for EC decommission Change-Id: Ie4b8662b10de5d3e73312f6b18ac825853dfd72e
Change-Id: Ic4e49c06f798cd4a15a630e742e17debba9b7494 (cherry picked from commit aca4193)
…gic. Summary of changes: - Resolved compilation failure in hdds-client by adding missing gRPC metrics and increment methods to XceiverClientMetrics. - Implemented a retry mechanism for container replication when volume-level outbound limits are reached. - Added REPLICATION_LIMIT_REACHED error code to DatanodeClientProtocol.proto. - Updated OnDemandContainerReplicationSource to throw REPLICATION_LIMIT_REACHED when the limit is exceeded. - Updated PushReplicator and DownloadAndImportReplicator to catch the limit-reached error and mark the task as QUEUED. - Modified ReplicationSupervisor to re-enqueue QUEUED tasks with a 1-second delay using a new ScheduledExecutorService. - Enhanced SimpleContainerDownloader to detect RESOURCE_EXHAUSTED gRPC status and propagate it as a replication limit error. - Verified fixes with TestXceiverClientMetricsUnit and existing integration tests in TestDecommissionAndMaintenance. Change-Id: I6b27e1a1c422613001531cf26de1adb746d2266e
Change-Id: I84755b5cbe564c9336425ef3966eb6b36d9aa13c
…ead pool. Change-Id: I1221186f92020585ad5107829660a635a4366ffd
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces disk volume–aware throttling for DataNode container replication and adds related throttling/metrics improvements across SCM and client layers to better handle overloaded conditions without cross-interference.
Changes:
- DataNode: route replication tasks to per-volume executors, enforce per-volume outbound replication-read limits, and retry when sources respond with RESOURCE_EXHAUSTED.
- SCM: track inflight EC reconstruction commands, add a global reconstruction limit check, and enhance EC decommission handling to switch to reconstruction when a node is highly loaded.
- Client/tests: add gRPC failure counters + an interceptor class, update APIs/signatures, and extend unit/integration tests.
Reviewed changes
Copilot reviewed 29 out of 29 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java | Adjusts helper signatures for downloader exceptions. |
| hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java | Updates OnDemand replication source construction with config. |
| hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java | Adds test for reconstruction-limit behavior. |
| hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java | Adds inflight reconstruction limit test + helper. |
| hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java | Adds test coverage for “switch to reconstruction” path. |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java | Adds reconstruction-limit check to processing loop. |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java | Hooks reconstruction-limit check for under-replication processor. |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java | Adds inflight reconstruction tracking, node-load checks, and new configs. |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java | Implements reconstruction-limit hook (no-op). |
| hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java | Refactors reconstruction scheduling + prefers in-service sources; adds “switch to recon” on decom overload. |
| hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto | Adds REPLICATION_LIMIT_REACHED result enum. |
| hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java | Adds test for per-volume executors + cleanup behavior. |
| hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java | Updates OnDemand replication source construction with config. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java | Propagates RESOURCE_EXHAUSTED as a retryable storage result. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java | Makes getTarget() public for volume selection logic. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java | Introduces per-volume executors, cleanup, and rescheduling for QUEUED tasks. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java | Adds per-volume outbound limit config and passes config into source. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java | Treats replication-limit reached as retryable (QUEUED). |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java | Enforces per-volume outbound read throttling. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java | Maps REPLICATION_LIMIT_REACHED to gRPC RESOURCE_EXHAUSTED. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java | Persists chosen target volume on the task and retries on limit reached. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java | Makes chooseNextVolume public for supervisor volume selection. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java | Updates interface to throw IOException. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java | Adds per-task volume + queued-time refresh for retries. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java | Tracks active outbound replications per volume. |
| hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java | Wires volume chooser into supervisor; passes replication config into source. |
| hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientMetricsUnit.java | Adds unit test for new gRPC metrics counters. |
| hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java | Adds counters for gRPC auth/connection failures. |
| hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/GrpcClientMetricsInterceptor.java | New interceptor class to increment the new counters. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| executorToShutdown.shutdownNow(); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| it.remove(); |
Comment on lines
+575
to
+578
| if (task.getStatus() == Status.QUEUED) { | ||
| task.updateQueuedTime(); | ||
| scheduler.schedule(() -> addTask(task), 1, TimeUnit.SECONDS); | ||
| } |
Comment on lines
+70
to
+82
| HddsVolume volume = (HddsVolume) container.getContainerData().getVolume(); | ||
| if (volume != null) { | ||
| if (volume.getActiveOutboundReplications() >= | ||
| config.getVolumeOutboundLimit()) { | ||
| LOG.info("Volume {} has reached the maximum number of concurrent " + | ||
| "replication reads ({})", volume.getStorageID(), | ||
| config.getVolumeOutboundLimit()); | ||
| throw new StorageContainerException("Volume " + volume.getStorageID() + | ||
| " has reached the maximum number of concurrent replication reads (" | ||
| + config.getVolumeOutboundLimit() + ")", REPLICATION_LIMIT_REACHED); | ||
| } | ||
| volume.incActiveOutboundReplications(); | ||
| } |
Comment on lines
+104
to
+108
| if (resourceExhaustedCount > 0) { | ||
| throw new StorageContainerException("All sources are busy or failed " + | ||
| "for container " + containerId, | ||
| ContainerProtos.Result.REPLICATION_LIMIT_REACHED); | ||
| } |
Comment on lines
+115
to
+119
| if (reconstructionLimitReached(replicationManager)) { | ||
| LOG.info("The maximum number of pending reconstruction commands ({}) " + | ||
| "are scheduled. Ending the iteration.", | ||
| replicationManager.getReconstructionInFlightLimit()); | ||
| break; |
Comment on lines
+1296
to
+1298
| java.lang.reflect.Field field = ReplicationSupervisor.class.getDeclaredField("volumeExecutors"); | ||
| field.setAccessible(true); | ||
| Map<?, ?> volumeExecutors = (Map<?, ?>) field.get(supervisor); |
| ReconstructECContainersCommand cmd2 = new ReconstructECContainersCommand( | ||
| 2L, Collections.emptyList(), | ||
| ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()), | ||
| integers2ByteString(ImmutableList.of(3)), (ECReplicationConfig) repConfig); |
Comment on lines
+33
to
+39
| public class GrpcClientMetricsInterceptor implements ClientInterceptor { | ||
|
|
||
| private final XceiverClientMetrics metrics; | ||
|
|
||
| public GrpcClientMetricsInterceptor(XceiverClientMetrics metrics) { | ||
| this.metrics = metrics; | ||
| } |
|
|
||
| ContainerInfo container = ReplicationTestUtil.createContainerInfo( | ||
| repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
HDDS-15412. [DataNode] Disk volume-specific container replication thread pool
Please describe your PR in detail:
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-15412
How was this patch tested?
Draft.
https://github.com/jojochuang/ozone/actions/runs/28054326249