diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java index c398aec1c9b8..3913d1d692c1 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartAbortInfo.java @@ -17,6 +17,8 @@ package org.apache.hadoop.ozone.om.helpers; +import java.util.Collections; +import java.util.List; import java.util.Objects; /** @@ -28,13 +30,21 @@ public final class OmMultipartAbortInfo { private final String multipartOpenKey; private final OmMultipartKeyInfo omMultipartKeyInfo; private final BucketLayout bucketLayout; + private final List partsKeyInfoToDelete; + private final List partsTableKeysToDelete; private OmMultipartAbortInfo(String multipartKey, String multipartOpenKey, - OmMultipartKeyInfo omMultipartKeyInfo, BucketLayout bucketLayout) { + OmMultipartKeyInfo omMultipartKeyInfo, BucketLayout bucketLayout, + List partsKeyInfoToDelete, + List partsTableKeysToDelete) { this.multipartKey = multipartKey; this.multipartOpenKey = multipartOpenKey; this.omMultipartKeyInfo = omMultipartKeyInfo; this.bucketLayout = bucketLayout; + this.partsKeyInfoToDelete = partsKeyInfoToDelete == null ? + Collections.emptyList() : partsKeyInfoToDelete; + this.partsTableKeysToDelete = partsTableKeysToDelete == null ? + Collections.emptyList() : partsTableKeysToDelete; } public String getMultipartKey() { @@ -53,6 +63,14 @@ public BucketLayout getBucketLayout() { return bucketLayout; } + public List getPartsKeyInfoToDelete() { + return partsKeyInfoToDelete; + } + + public List getPartsTableKeysToDelete() { + return partsTableKeysToDelete; + } + /** * Builder of OmMultipartAbortInfo. */ @@ -61,6 +79,8 @@ public static class Builder { private String multipartOpenKey; private OmMultipartKeyInfo omMultipartKeyInfo; private BucketLayout bucketLayout; + private List partsKeyInfoToDelete; + private List partsTableKeysToDelete; public Builder setMultipartKey(String mpuKey) { this.multipartKey = mpuKey; @@ -82,9 +102,21 @@ public Builder setBucketLayout(BucketLayout layout) { return this; } + public Builder setPartsKeyInfoToDelete(List keyInfos) { + this.partsKeyInfoToDelete = keyInfos; + return this; + } + + public Builder setPartsTableKeysToDelete( + List partKeys) { + this.partsTableKeysToDelete = partKeys; + return this; + } + public OmMultipartAbortInfo build() { return new OmMultipartAbortInfo(multipartKey, - multipartOpenKey, omMultipartKeyInfo, bucketLayout); + multipartOpenKey, omMultipartKeyInfo, bucketLayout, + partsKeyInfoToDelete, partsTableKeysToDelete); } } @@ -103,13 +135,16 @@ public boolean equals(Object other) { return this.multipartKey.equals(that.multipartKey) && this.multipartOpenKey.equals(that.multipartOpenKey) && this.bucketLayout.equals(that.bucketLayout) && - this.omMultipartKeyInfo.equals(that.omMultipartKeyInfo); + this.omMultipartKeyInfo.equals(that.omMultipartKeyInfo) && + this.partsKeyInfoToDelete.equals(that.partsKeyInfoToDelete) && + this.partsTableKeysToDelete.equals(that.partsTableKeysToDelete); } @Override public int hashCode() { return Objects.hash(multipartKey, multipartOpenKey, - bucketLayout, omMultipartKeyInfo); + bucketLayout, omMultipartKeyInfo, partsKeyInfoToDelete, + partsTableKeysToDelete); } } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java index 88036d3e66a6..1f59e3b7bd9c 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java @@ -41,6 +41,9 @@ * upload part information of the key. */ public final class OmMultipartKeyInfo extends WithObjectID implements CopyObject { + public static final byte LEGACY_SCHEMA_VERSION = 0; + public static final byte SPLIT_PARTS_TABLE_SCHEMA_VERSION = 1; + private static final Codec CODEC = new DelegatedCodec<>( Proto2Codec.get(MultipartKeyInfo.getDefaultInstance()), OmMultipartKeyInfo::getFromProto, @@ -258,7 +261,7 @@ public PartKeyInfoMap getPartKeyInfoMap() { } public void addPartKeyInfo(PartKeyInfo partKeyInfo) { - if (schemaVersion == 1) { + if (schemaVersion == SPLIT_PARTS_TABLE_SCHEMA_VERSION) { throw new IllegalStateException( "PartKeyInfoMap is not supported for schemaVersion 1"); } @@ -314,7 +317,7 @@ public Builder(OmMultipartKeyInfo multipartKeyInfo) { this.acls = AclListBuilder.of(multipartKeyInfo.acls); this.partKeyInfoList = new TreeMap<>(); - if (multipartKeyInfo.getSchemaVersion() == 0) { + if (multipartKeyInfo.getSchemaVersion() == LEGACY_SCHEMA_VERSION) { for (PartKeyInfo partKeyInfo : multipartKeyInfo.partKeyInfoMap) { this.partKeyInfoList.put(partKeyInfo.getPartNumber(), partKeyInfo); } @@ -427,7 +430,8 @@ protected OmMultipartKeyInfo buildObject() { public static Builder builderFromProto( MultipartKeyInfo multipartKeyInfo) { final SortedMap list = new TreeMap<>(); - if (!multipartKeyInfo.hasSchemaVersion() || multipartKeyInfo.getSchemaVersion() == 0) { + if (!multipartKeyInfo.hasSchemaVersion() + || multipartKeyInfo.getSchemaVersion() == LEGACY_SCHEMA_VERSION) { multipartKeyInfo.getPartKeyInfoListList().forEach(partKeyInfo -> list.put(partKeyInfo.getPartNumber(), partKeyInfo)); } @@ -473,7 +477,8 @@ public static OmMultipartKeyInfo getFromProto( * @return MultipartKeyInfo */ public MultipartKeyInfo getProto() { - if (schemaVersion == 1 && partKeyInfoMap != null && partKeyInfoMap.size() > 0) { + if (schemaVersion == SPLIT_PARTS_TABLE_SCHEMA_VERSION + && partKeyInfoMap != null && partKeyInfoMap.size() > 0) { throw new IllegalStateException( "PartKeyInfoMap must be empty for schemaVersion 1"); } @@ -507,7 +512,7 @@ public MultipartKeyInfo getProto() { } builder.addAllAcls(OzoneAclUtil.toProtobuf(acls)); - if (schemaVersion == 0) { + if (schemaVersion == LEGACY_SCHEMA_VERSION) { builder.addAllPartKeyInfoList(partKeyInfoMap); } return builder.build(); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartPartInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartPartInfo.java index f8bf57de1498..19d8aa968451 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartPartInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartPartInfo.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.hdds.client.ReplicationConfig; import org.apache.hadoop.hdds.utils.db.Codec; import org.apache.hadoop.hdds.utils.db.DelegatedCodec; import org.apache.hadoop.hdds.utils.db.Proto2Codec; @@ -312,6 +313,27 @@ public static OmMultipartPartInfo from( return builder.build(); } + public OmKeyInfo toOmKeyInfo(String volumeName, String bucketName, + String keyName, ReplicationConfig replicationConfig) { + OmKeyInfo.Builder builder = new OmKeyInfo.Builder() + .setVolumeName(volumeName) + .setBucketName(bucketName) + .setKeyName(keyName) + .setReplicationConfig(replicationConfig) + .setOmKeyLocationInfos(keyLocationInfos) + .setDataSize(dataSize) + .setCreationTime(modificationTime) + .setModificationTime(modificationTime) + .setObjectID(objectID) + .setUpdateID(updateID) + .setFileEncryptionInfo(encInfo) + .setFileChecksum(fileChecksum); + if (eTag != null) { + builder.addMetadata(OzoneConsts.ETAG, eTag); + } + return builder.build(); + } + private KeyLocationList getKeyLocationInfosAsProto() { if (keyLocationInfos == null || keyLocationInfos.isEmpty()) { throw new IllegalArgumentException("keyLocationList is required"); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 08b6d6abbf18..073f98ac7229 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -103,6 +103,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.Stack; import java.util.TreeMap; import java.util.concurrent.TimeUnit; @@ -156,6 +157,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList; import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts; @@ -169,6 +171,7 @@ import org.apache.hadoop.ozone.om.request.OMClientRequest; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; +import org.apache.hadoop.ozone.om.request.s3.multipart.MultipartPartScanUtil; import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; import org.apache.hadoop.ozone.om.service.CompactionService; import org.apache.hadoop.ozone.om.service.DirectoryDeletingService; @@ -1138,6 +1141,40 @@ public OmMultipartUploadListParts listParts(String volumeName, throw new OMException("No Such Multipart upload exists for this key.", ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); } else { + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.SPLIT_PARTS_TABLE_SCHEMA_VERSION) { + SortedMap parts = + MultipartPartScanUtil.scanParts(metadataManager, uploadID); + List omPartInfoList = new ArrayList<>(); + int count = 0; + for (Map.Entry entry + : parts.entrySet()) { + int partNumber = entry.getKey(); + if (partNumber <= partNumberMarker) { + continue; + } + if (count == maxParts) { + isTruncated = true; + break; + } + OmMultipartPartInfo partInfo = entry.getValue(); + nextPartNumberMarker = partNumber; + omPartInfoList.add(new OmPartInfo(partNumber, + partInfo.getPartName(), partInfo.getModificationTime(), + partInfo.getDataSize(), partInfo.getETag())); + count++; + } + if (!isTruncated) { + nextPartNumberMarker = 0; + } + OmMultipartUploadListParts listParts = + new OmMultipartUploadListParts( + multipartKeyInfo.getReplicationConfig(), + nextPartNumberMarker, isTruncated); + listParts.addPartList(omPartInfoList); + return listParts; + } + Iterator partKeyInfoMapIterator = multipartKeyInfo.getPartKeyInfoMap().iterator(); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/MultipartPartScanUtil.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/MultipartPartScanUtil.java new file mode 100644 index 000000000000..0307bc564baa --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/MultipartPartScanUtil.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.request.s3.multipart; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.cache.CacheKey; +import org.apache.hadoop.hdds.utils.db.cache.CacheValue; +import org.apache.hadoop.ozone.om.OMMetadataManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; +import org.apache.hadoop.ozone.om.helpers.QuotaUtil; + +/** + * Cache-aware scanner for multipart parts table rows. + */ +public final class MultipartPartScanUtil { + + private MultipartPartScanUtil() { + } + + public static SortedMap scanParts( + OMMetadataManager omMetadataManager, String uploadId) throws IOException { + SortedMap parts = new TreeMap<>(); + OmMultipartPartKey prefix = OmMultipartPartKey.prefix(uploadId); + + try (TableIterator> + iterator = omMetadataManager.getMultipartPartsTable().iterator(prefix)) { + while (iterator.hasNext()) { + Table.KeyValue kv = + iterator.next(); + if (kv == null) { + continue; + } + OmMultipartPartKey key = kv.getKey(); + if (!uploadId.equals(key.getUploadId())) { + break; + } + if (key.hasPartNumber()) { + parts.put(key.getPartNumber(), kv.getValue()); + } + } + } + + Iterator, + CacheValue>> cacheIterator = + omMetadataManager.getMultipartPartsTable().cacheIterator(); + while (cacheIterator.hasNext()) { + Map.Entry, CacheValue> + cacheEntry = cacheIterator.next(); + OmMultipartPartKey key = cacheEntry.getKey().getCacheKey(); + if (!uploadId.equals(key.getUploadId()) || !key.hasPartNumber()) { + continue; + } + OmMultipartPartInfo value = cacheEntry.getValue().getCacheValue(); + if (value == null) { + parts.remove(key.getPartNumber()); + } else { + parts.put(key.getPartNumber(), value); + } + } + + return parts; + } + + public static List getPartKeys(String uploadId, + SortedMap parts) { + List partKeys = new ArrayList<>(parts.size()); + for (Integer partNumber : parts.keySet()) { + partKeys.add(OmMultipartPartKey.of(uploadId, partNumber)); + } + return partKeys; + } + + public static void addPartCleanupCacheEntries( + OMMetadataManager omMetadataManager, + List partKeys, long transactionLogIndex) { + for (OmMultipartPartKey partKey : partKeys) { + omMetadataManager.getMultipartPartsTable().addCacheEntry( + new CacheKey<>(partKey), CacheValue.get(transactionLogIndex)); + } + } + + public static long getReplicatedSize( + SortedMap parts, + ReplicationConfig replicationConfig) { + long replicatedSize = 0; + for (OmMultipartPartInfo part : parts.values()) { + replicatedSize += QuotaUtil.getReplicatedSize( + part.getDataSize(), replicationConfig); + } + return replicatedSize; + } + + public static List toOmKeyInfoList( + SortedMap parts, String volumeName, + String bucketName, String keyName, ReplicationConfig replicationConfig) { + List keyInfos = new ArrayList<>(parts.size()); + for (OmMultipartPartInfo part : parts.values()) { + keyInfos.add(part.toOmKeyInfo(volumeName, bucketName, keyName, + replicationConfig)); + } + return keyInfos; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java index f805d9f07631..a84757ea5287 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3ExpiredMultipartUploadsAbortRequest.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.SortedMap; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; @@ -35,9 +36,13 @@ import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.OmMultipartUpload; +import org.apache.hadoop.ozone.om.helpers.QuotaUtil; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; @@ -186,6 +191,7 @@ private void processResults(OMMetrics omMetrics, } + @SuppressWarnings("methodlength") private void updateTableCache(OzoneManager ozoneManager, long trxnLogIndex, ExpiredMultipartUploadsBucket mpusPerBucket, Map> abortedMultipartUploads) @@ -273,12 +279,33 @@ private void updateTableCache(OzoneManager ozoneManager, // When abort uploaded key, we need to subtract the PartKey length // from the volume usedBytes. long quotaReleased = 0; - int keyFactor = omMultipartKeyInfo.getReplicationConfig() - .getRequiredNodes(); - for (PartKeyInfo iterPartKeyInfo : omMultipartKeyInfo. - getPartKeyInfoMap()) { - quotaReleased += - iterPartKeyInfo.getPartKeyInfo().getDataSize() * keyFactor; + long numParts; + List partsKeyInfoToDelete = new ArrayList<>(); + List partsTableKeysToDelete = new ArrayList<>(); + if (omMultipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION) { + for (PartKeyInfo iterPartKeyInfo : omMultipartKeyInfo. + getPartKeyInfoMap()) { + quotaReleased += QuotaUtil.getReplicatedSize( + iterPartKeyInfo.getPartKeyInfo().getDataSize(), + omMultipartKeyInfo.getReplicationConfig()); + } + numParts = omMultipartKeyInfo.getPartKeyInfoMap().size(); + } else { + SortedMap tableParts = + MultipartPartScanUtil.scanParts(omMetadataManager, + multipartUpload.getUploadId()); + quotaReleased += MultipartPartScanUtil.getReplicatedSize( + tableParts, omMultipartKeyInfo.getReplicationConfig()); + partsKeyInfoToDelete.addAll(MultipartPartScanUtil.toOmKeyInfoList( + tableParts, multipartUpload.getVolumeName(), + multipartUpload.getBucketName(), multipartUpload.getKeyName(), + omMultipartKeyInfo.getReplicationConfig())); + partsTableKeysToDelete.addAll(MultipartPartScanUtil.getPartKeys( + multipartUpload.getUploadId(), tableParts)); + MultipartPartScanUtil.addPartCleanupCacheEntries(omMetadataManager, + partsTableKeysToDelete, trxnLogIndex); + numParts = tableParts.size(); } omBucketInfo.incrUsedBytes(-quotaReleased); @@ -288,6 +315,8 @@ private void updateTableCache(OzoneManager ozoneManager, .setMultipartOpenKey(multipartOpenKey) .setMultipartKeyInfo(omMultipartKeyInfo) .setBucketLayout(omBucketInfo.getBucketLayout()) + .setPartsKeyInfoToDelete(partsKeyInfoToDelete) + .setPartsTableKeysToDelete(partsTableKeysToDelete) .build(); abortedMultipartUploads.computeIfAbsent(omBucketInfo, @@ -315,7 +344,6 @@ private void updateTableCache(OzoneManager ozoneManager, .addCacheEntry(new CacheKey<>(expiredMPUKeyName), CacheValue.get(trxnLogIndex)); - long numParts = omMultipartKeyInfo.getPartKeyInfoMap().size(); ozoneManager.getMetrics().incNumExpiredMPUAborted(); ozoneManager.getMetrics().incNumExpiredMPUPartsAborted(numParts); LOG.debug("Expired MPU {} aborted containing {} parts.", diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java index a9aeff0ac5d1..056a730f9f6d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequest.java @@ -21,7 +21,10 @@ import java.io.IOException; import java.nio.file.InvalidPathException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.SortedMap; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; @@ -34,6 +37,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.QuotaUtil; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.ozone.om.request.util.OMMultipartUploadUtils; @@ -96,6 +101,7 @@ public OMRequest preExecute(OzoneManager ozoneManager) throws IOException { } @Override + @SuppressWarnings("methodlength") public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, ExecutionContext context) { final long trxnLogIndex = context.getIndex(); @@ -123,6 +129,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut OMClientResponse omClientResponse = null; Result result = null; OmBucketInfo omBucketInfo = null; + List partsKeyInfoToDelete = new ArrayList<>(); + List partsTableKeysToDelete = new ArrayList<>(); try { mergeOmLockDetails( omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK, volumeName, @@ -172,10 +180,26 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut // When abort uploaded key, we need to subtract the PartKey length from // the volume usedBytes. long quotaReleased = 0; - for (PartKeyInfo iterPartKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) { - quotaReleased += QuotaUtil.getReplicatedSize( - iterPartKeyInfo.getPartKeyInfo().getDataSize(), - multipartKeyInfo.getReplicationConfig()); + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION) { + for (PartKeyInfo iterPartKeyInfo : multipartKeyInfo.getPartKeyInfoMap()) { + quotaReleased += QuotaUtil.getReplicatedSize( + iterPartKeyInfo.getPartKeyInfo().getDataSize(), + multipartKeyInfo.getReplicationConfig()); + } + } else { + SortedMap tableParts = + MultipartPartScanUtil.scanParts(omMetadataManager, + multipartKeyInfo.getUploadID()); + quotaReleased += MultipartPartScanUtil.getReplicatedSize( + tableParts, multipartKeyInfo.getReplicationConfig()); + partsKeyInfoToDelete.addAll(MultipartPartScanUtil.toOmKeyInfoList( + tableParts, volumeName, bucketName, keyName, + multipartKeyInfo.getReplicationConfig())); + partsTableKeysToDelete.addAll(MultipartPartScanUtil.getPartKeys( + multipartKeyInfo.getUploadID(), tableParts)); + MultipartPartScanUtil.addPartCleanupCacheEntries(omMetadataManager, + partsTableKeysToDelete, trxnLogIndex); } omBucketInfo.incrUsedBytes(-quotaReleased); @@ -190,7 +214,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut CacheValue.get(trxnLogIndex)); omClientResponse = getOmClientResponse(ozoneManager, multipartKeyInfo, - multipartKey, multipartOpenKey, omResponse, omBucketInfo); + multipartKey, multipartOpenKey, omResponse, omBucketInfo, + partsKeyInfoToDelete, partsTableKeysToDelete); result = Result.SUCCESS; } catch (IOException | InvalidPathException ex) { @@ -239,16 +264,19 @@ protected OMClientResponse getOmClientResponse(Exception exception, exception), getBucketLayout()); } + @SuppressWarnings("checkstyle:ParameterNumber") protected OMClientResponse getOmClientResponse(OzoneManager ozoneManager, OmMultipartKeyInfo multipartKeyInfo, String multipartKey, String multipartOpenKey, OMResponse.Builder omResponse, - OmBucketInfo omBucketInfo) { + OmBucketInfo omBucketInfo, List partsKeyInfoToDelete, + List partsTableKeysToDelete) { OMClientResponse omClientResponse = new S3MultipartUploadAbortResponse( omResponse.setAbortMultiPartUploadResponse( MultipartUploadAbortResponse.newBuilder()).build(), multipartKey, multipartOpenKey, multipartKeyInfo, - omBucketInfo.copyObject(), getBucketLayout()); + omBucketInfo.copyObject(), getBucketLayout(), partsKeyInfoToDelete, + partsTableKeysToDelete); return omClientResponse; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java index 635da1a7c1f9..950da3aeae55 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadAbortRequestWithFSO.java @@ -17,10 +17,13 @@ package org.apache.hadoop.ozone.om.request.s3.multipart; +import java.util.List; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadAbortResponseWithFSO; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartUploadAbortResponse; @@ -50,13 +53,15 @@ protected OMClientResponse getOmClientResponse(Exception exception, protected OMClientResponse getOmClientResponse(OzoneManager ozoneManager, OmMultipartKeyInfo multipartKeyInfo, String multipartKey, String multipartOpenKey, OMResponse.Builder omResponse, - OmBucketInfo omBucketInfo) { + OmBucketInfo omBucketInfo, List partsKeyInfoToDelete, + List partsTableKeysToDelete) { OMClientResponse omClientResp = new S3MultipartUploadAbortResponseWithFSO( omResponse.setAbortMultiPartUploadResponse( MultipartUploadAbortResponse.newBuilder()).build(), multipartKey, multipartOpenKey, multipartKeyInfo, - omBucketInfo.copyObject(), getBucketLayout()); + omBucketInfo.copyObject(), getBucketLayout(), partsKeyInfoToDelete, + partsTableKeysToDelete); return omClientResp; } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java index ac123ff680ac..96a0c710db6f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequest.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.utils.db.cache.CacheKey; import org.apache.hadoop.hdds.utils.db.cache.CacheValue; import org.apache.hadoop.ozone.OzoneConsts; @@ -41,6 +42,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; +import org.apache.hadoop.ozone.om.helpers.QuotaUtil; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; import org.apache.hadoop.ozone.om.request.util.OmResponseUtil; @@ -124,6 +128,10 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut getOmRequest()); OMClientResponse omClientResponse = null; OzoneManagerProtocolProtos.PartKeyInfo oldPartKeyInfo = null; + OmMultipartPartInfo oldMultipartPartInfo = null; + OmKeyInfo oldPartOmKeyInfo = null; + OmMultipartPartInfo multipartPartInfo = null; + OmMultipartPartKey multipartPartKey = null; String openKey = null; OmKeyInfo omKeyInfo = null; String multipartKey = null; @@ -193,7 +201,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR); } - oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber); + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION) { + oldPartKeyInfo = multipartKeyInfo.getPartKeyInfo(partNumber); + } else { + multipartPartKey = getMultipartPartKey(uploadID, partNumber); + oldMultipartPartInfo = omMetadataManager.getMultipartPartsTable() + .get(multipartPartKey); + if (oldMultipartPartInfo != null) { + oldPartOmKeyInfo = oldMultipartPartInfo.toOmKeyInfo( + volumeName, bucketName, keyName, multipartKeyInfo.getReplicationConfig()); + } + } // Build this multipart upload part info. OzoneManagerProtocolProtos.PartKeyInfo.Builder partKeyInfo = @@ -203,8 +222,18 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut partKeyInfo.setPartKeyInfo(omKeyInfo.getProtobuf( getOmRequest().getVersion())); - // Add this part information in to multipartKeyInfo. - multipartKeyInfo.addPartKeyInfo(partKeyInfo.build()); + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION) { + // Add this part information in to multipartKeyInfo. + multipartKeyInfo.addPartKeyInfo(partKeyInfo.build()); + } else { + validateSplitPartInfo(omKeyInfo, partNumber); + multipartPartInfo = OmMultipartPartInfo.from( + partName, partNumber, omKeyInfo); + omMetadataManager.getMultipartPartsTable().addCacheEntry( + new CacheKey<>(multipartPartKey), + CacheValue.get(trxnLogIndex, multipartPartInfo)); + } // Set the UpdateID to current transactionLogIndex multipartKeyInfo = multipartKeyInfo.toBuilder() @@ -236,7 +265,9 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut Map keyVersionsToDeleteMap = null; long correctedSpace = omKeyInfo.getReplicatedSize(); - if (null != oldPartKeyInfo) { + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION + && null != oldPartKeyInfo) { OmKeyInfo partKeyToBeDeleted = OmKeyInfo.getFromProtobuf(oldPartKeyInfo.getPartKeyInfo()); correctedSpace -= partKeyToBeDeleted.getReplicatedSize(); @@ -247,6 +278,21 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut String delKeyName = omMetadataManager.getOzoneDeletePathKey( partKeyToBeDeleted.getObjectID(), multipartKey); + if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) { + keyVersionsToDeleteMap = new HashMap<>(); + keyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo); + } + } else if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.SPLIT_PARTS_TABLE_SCHEMA_VERSION + && oldMultipartPartInfo != null && oldPartOmKeyInfo != null) { + correctedSpace -= QuotaUtil.getReplicatedSize( + oldMultipartPartInfo.getDataSize(), + multipartKeyInfo.getReplicationConfig()); + RepeatedOmKeyInfo oldVerKeyInfo = getOldVersionsToCleanUp( + oldPartOmKeyInfo, omBucketInfo.getObjectID(), trxnLogIndex); + String delKeyName = omMetadataManager.getOzoneDeletePathKey( + oldPartOmKeyInfo.getObjectID(), multipartKey); + if (!oldVerKeyInfo.getOmKeyInfoList().isEmpty()) { keyVersionsToDeleteMap = new HashMap<>(); keyVersionsToDeleteMap.put(delKeyName, oldVerKeyInfo); @@ -271,7 +317,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut omResponse.setCommitMultiPartUploadResponse(commitResponseBuilder); omClientResponse = getOmClientResponse(ozoneManager, keyVersionsToDeleteMap, openKey, - omKeyInfo, multipartKey, multipartKeyInfo, omResponse.build(), + omKeyInfo, multipartKey, multipartKeyInfo, multipartPartKey, + multipartPartInfo, omResponse.build(), omBucketInfo.copyObject(), bucketId); result = Result.SUCCESS; @@ -280,7 +327,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut exception = ex; omClientResponse = getOmClientResponse(ozoneManager, null, openKey, - omKeyInfo, multipartKey, multipartKeyInfo, + omKeyInfo, multipartKey, multipartKeyInfo, multipartPartKey, + multipartPartInfo, createErrorOMResponse(omResponse, exception), copyBucketInfo, bucketId); } finally { if (acquiredLock) { @@ -309,11 +357,13 @@ public static String getPartName(String ozoneKey, String uploadID, protected S3MultipartUploadCommitPartResponse getOmClientResponse( OzoneManager ozoneManager, Map keyToDeleteMap, String openKey, OmKeyInfo omKeyInfo, String multipartKey, - OmMultipartKeyInfo multipartKeyInfo, OMResponse build, + OmMultipartKeyInfo multipartKeyInfo, OmMultipartPartKey multipartPartKey, + OmMultipartPartInfo multipartPartInfo, OMResponse build, OmBucketInfo omBucketInfo, long bucketId) { return new S3MultipartUploadCommitPartResponse(build, multipartKey, openKey, - multipartKeyInfo, keyToDeleteMap, omKeyInfo, + multipartKeyInfo, multipartPartKey, multipartPartInfo, + keyToDeleteMap, omKeyInfo, omBucketInfo, bucketId, getBucketLayout()); } @@ -370,6 +420,25 @@ private String getMultipartKey(String volumeName, String bucketName, keyName, uploadID); } + private OmMultipartPartKey getMultipartPartKey(String uploadId, + int partNumber) { + return OmMultipartPartKey.of(uploadId, partNumber); + } + + private void validateSplitPartInfo(OmKeyInfo omKeyInfo, int partNumber) + throws OMException { + if (StringUtils.isBlank(omKeyInfo.getMetadata().get(OzoneConsts.ETAG))) { + throw new OMException("Missing ETag for multipart upload part " + + partNumber, OMException.ResultCodes.INVALID_REQUEST); + } + if (omKeyInfo.getKeyLocationVersions() == null + || omKeyInfo.getKeyLocationVersions().isEmpty() + || omKeyInfo.getLatestVersionLocations().getLocationList().isEmpty()) { + throw new OMException("Missing block locations for multipart upload part " + + partNumber, OMException.ResultCodes.INVALID_REQUEST); + } + } + @RequestFeatureValidator( conditions = ValidationCondition.CLUSTER_NEEDS_FINALIZATION, processingPhase = RequestProcessingPhase.PRE_PROCESS, diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java index 6b042e453c90..94f0189ddbd4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCommitPartRequestWithFSO.java @@ -26,6 +26,8 @@ import org.apache.hadoop.ozone.om.helpers.OmFSOFile; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCommitPartResponse; @@ -71,11 +73,13 @@ protected S3MultipartUploadCommitPartResponse getOmClientResponse( OzoneManager ozoneManager, Map keyToDeleteMap, String openKey, OmKeyInfo omKeyInfo, String multipartKey, - OmMultipartKeyInfo multipartKeyInfo, + OmMultipartKeyInfo multipartKeyInfo, OmMultipartPartKey multipartPartKey, + OmMultipartPartInfo multipartPartInfo, OzoneManagerProtocolProtos.OMResponse build, OmBucketInfo omBucketInfo, long bucketId) { return new S3MultipartUploadCommitPartResponseWithFSO(build, multipartKey, - openKey, multipartKeyInfo, keyToDeleteMap, omKeyInfo, + openKey, multipartKeyInfo, multipartPartKey, multipartPartInfo, + keyToDeleteMap, omKeyInfo, omBucketInfo, bucketId, getBucketLayout()); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java index 179eb87aabae..e35f2cc88c08 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequest.java @@ -29,6 +29,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import org.apache.commons.codec.digest.DigestUtils; @@ -51,6 +53,8 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.request.key.OMKeyRequest; @@ -277,8 +281,20 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut validateIfMatchETag(keyArgs, existingKeyInfo); if (!partsList.isEmpty()) { + SortedMap multipartPartInfoMap = + Collections.emptySortedMap(); + List multipartPartKeysToDelete = + Collections.emptyList(); + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.SPLIT_PARTS_TABLE_SCHEMA_VERSION) { + multipartPartInfoMap = MultipartPartScanUtil.scanParts( + omMetadataManager, uploadID); + multipartPartKeysToDelete = MultipartPartScanUtil.getPartKeys( + uploadID, multipartPartInfoMap); + } final OmMultipartKeyInfo.PartKeyInfoMap partKeyInfoMap - = multipartKeyInfo.getPartKeyInfoMap(); + = getPartKeyInfoMap(multipartKeyInfo, volumeName, bucketName, + keyName, multipartPartInfoMap); if (partKeyInfoMap.size() == 0) { LOG.error("Complete MultipartUpload failed for key {} , MPU Key has" + " no parts in OM, parts given to upload are {}", ozoneKey, @@ -296,7 +312,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut List partLocationInfos = new ArrayList<>(); long dataSize = getMultipartDataSize(requestedVolume, requestedBucket, keyName, ozoneKey, partKeyInfoMap, partsListSize, - partLocationInfos, partsList, ozoneManager); + partLocationInfos, partsList, ozoneManager, + multipartPartInfoMap); // All parts have same replication information. Here getting from last // part. @@ -347,6 +364,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut updateCache(omMetadataManager, dbBucketKey, omBucketInfo, dbOzoneKey, dbMultipartOpenKey, multipartKey, omKeyInfo, trxnLogIndex); + MultipartPartScanUtil.addPartCleanupCacheEntries(omMetadataManager, + multipartPartKeysToDelete, trxnLogIndex); omResponse.setCompleteMultiPartUploadResponse( MultipartUploadCompleteResponse.newBuilder() @@ -360,7 +379,8 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut omClientResponse = getOmClientResponse(multipartKey, omResponse, dbMultipartOpenKey, omKeyInfo, allKeyInfoToRemove, omBucketInfo, - volumeId, bucketId, missingParentInfos, multipartKeyInfo); + volumeId, bucketId, missingParentInfos, multipartKeyInfo, + multipartPartKeysToDelete); result = Result.SUCCESS; } else { @@ -402,11 +422,12 @@ protected OMClientResponse getOmClientResponse(String multipartKey, OmKeyInfo omKeyInfo, List allKeyInfoToRemove, OmBucketInfo omBucketInfo, long volumeId, long bucketId, List missingParentInfos, - OmMultipartKeyInfo multipartKeyInfo) { + OmMultipartKeyInfo multipartKeyInfo, + List multipartPartKeysToDelete) { return new S3MultipartUploadCompleteResponse(omResponse.build(), multipartKey, dbMultipartOpenKey, omKeyInfo, allKeyInfoToRemove, - getBucketLayout(), omBucketInfo, bucketId); + getBucketLayout(), omBucketInfo, bucketId, multipartPartKeysToDelete); } protected void checkDirectoryAlreadyExists(OzoneManager ozoneManager, @@ -572,6 +593,29 @@ protected void addKeyTableCacheEntry(OMMetadataManager omMetadataManager, CacheValue.get(transactionLogIndex, omKeyInfo)); } + private OmMultipartKeyInfo.PartKeyInfoMap getPartKeyInfoMap( + OmMultipartKeyInfo multipartKeyInfo, String volumeName, String bucketName, + String keyName, SortedMap multipartPartInfoMap) { + if (multipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION) { + return multipartKeyInfo.getPartKeyInfoMap(); + } + + TreeMap partKeyInfos = new TreeMap<>(); + for (Map.Entry entry + : multipartPartInfoMap.entrySet()) { + OmMultipartPartInfo partInfo = entry.getValue(); + OmKeyInfo partKeyInfo = partInfo.toOmKeyInfo(volumeName, bucketName, + keyName, multipartKeyInfo.getReplicationConfig()); + partKeyInfos.put(entry.getKey(), PartKeyInfo.newBuilder() + .setPartName(partInfo.getPartName()) + .setPartNumber(partInfo.getPartNumber()) + .setPartKeyInfo(partKeyInfo.getProtobuf(getOmRequest().getVersion())) + .build()); + } + return new OmMultipartKeyInfo.PartKeyInfoMap(partKeyInfos); + } + private int getPartsListSize(String requestedVolume, String requestedBucket, String keyName, String ozoneKey, List partNumbers, @@ -603,7 +647,8 @@ private long getMultipartDataSize(String requestedVolume, OmMultipartKeyInfo.PartKeyInfoMap partKeyInfoMap, int partsListSize, List partLocationInfos, List partsList, - OzoneManager ozoneManager) throws OMException { + OzoneManager ozoneManager, + SortedMap multipartPartInfoMap) throws OMException { long dataSize = 0; int currentPartCount = 0; boolean eTagBasedValidationAvailable = partsList.stream().allMatch(OzoneManagerProtocolProtos.Part::hasETag); @@ -612,8 +657,26 @@ private long getMultipartDataSize(String requestedVolume, currentPartCount++; int partNumber = part.getPartNumber(); PartKeyInfo partKeyInfo = partKeyInfoMap.get(partNumber); - MultipartCommitRequestPart requestPart = eTagBasedValidationAvailable ? - eTagBasedValidator.apply(part, partKeyInfo) : partNameBasedValidator.apply(part, partKeyInfo); + OmMultipartPartInfo multipartPartInfo = + multipartPartInfoMap.get(partNumber); + MultipartCommitRequestPart requestPart; + if (multipartPartInfo != null) { + String requestPartId; + String omPartId; + if (eTagBasedValidationAvailable) { + requestPartId = part.getETag(); + omPartId = multipartPartInfo.getETag(); + } else { + requestPartId = part.getPartName(); + omPartId = multipartPartInfo.getPartName(); + } + requestPart = new MultipartCommitRequestPart( + requestPartId, omPartId, StringUtils.equals(requestPartId, omPartId)); + } else { + requestPart = eTagBasedValidationAvailable ? + eTagBasedValidator.apply(part, partKeyInfo) : + partNameBasedValidator.apply(part, partKeyInfo); + } if (!requestPart.isValid()) { throw new OMException( failureMessage(requestedVolume, requestedBucket, keyName) + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java index a5c8b2703d68..da6c2640c36c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/multipart/S3MultipartUploadCompleteRequestWithFSO.java @@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.response.s3.multipart.S3MultipartUploadCompleteResponse; @@ -165,12 +166,13 @@ protected OMClientResponse getOmClientResponse(String multipartKey, String dbMultipartOpenKey, OmKeyInfo omKeyInfo, List allKeyInfoToRemove, OmBucketInfo omBucketInfo, long volumeId, long bucketId, List missingParentInfos, - OmMultipartKeyInfo multipartKeyInfo) { + OmMultipartKeyInfo multipartKeyInfo, + List multipartPartKeysToDelete) { return new S3MultipartUploadCompleteResponseWithFSO(omResponse.build(), multipartKey, dbMultipartOpenKey, omKeyInfo, allKeyInfoToRemove, getBucketLayout(), omBucketInfo, volumeId, bucketId, - missingParentInfos, multipartKeyInfo); + missingParentInfos, multipartKeyInfo, multipartPartKeysToDelete); } @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java index 25e9b582bdbf..203d53170c5d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/AbstractS3MultipartAbortResponse.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.response.key.OmKeyResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -74,24 +75,34 @@ protected void addAbortToBatch( OmMultipartKeyInfo omMultipartKeyInfo = abortInfo .getOmMultipartKeyInfo(); - // Move all the parts to delete table - for (PartKeyInfo partKeyInfo: omMultipartKeyInfo.getPartKeyInfoMap()) { - OmKeyInfo currentKeyPartInfo = - OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo()); + if (omMultipartKeyInfo.getSchemaVersion() + == OmMultipartKeyInfo.LEGACY_SCHEMA_VERSION) { + // Move all the parts to delete table + for (PartKeyInfo partKeyInfo: omMultipartKeyInfo.getPartKeyInfoMap()) { + OmKeyInfo currentKeyPartInfo = + OmKeyInfo.getFromProtobuf(partKeyInfo.getPartKeyInfo()); - // TODO: Similar to open key deletion response, we can check if the - // MPU part actually contains blocks, and only move the to - // deletedTable if it does. + // TODO: Similar to open key deletion response, we can check if the + // MPU part actually contains blocks, and only move the to + // deletedTable if it does. - RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete(omBucketInfo.getObjectID(), - currentKeyPartInfo, omMultipartKeyInfo.getUpdateID()); + addPartToDeletedTable(omMetadataManager, batchOperation, + omBucketInfo, abortInfo, currentKeyPartInfo, + omMultipartKeyInfo.getUpdateID()); + } + } else { + for (OmKeyInfo currentKeyPartInfo : + abortInfo.getPartsKeyInfoToDelete()) { + addPartToDeletedTable(omMetadataManager, batchOperation, + omBucketInfo, abortInfo, currentKeyPartInfo, + omMultipartKeyInfo.getUpdateID()); + } - // multi-part key format is volumeName/bucketName/keyName/uploadId - String deleteKey = omMetadataManager.getOzoneDeletePathKey( - currentKeyPartInfo.getObjectID(), abortInfo.getMultipartKey()); - - omMetadataManager.getDeletedTable().putWithBatch(batchOperation, - deleteKey, repeatedOmKeyInfo); + for (OmMultipartPartKey partKey : + abortInfo.getPartsTableKeysToDelete()) { + omMetadataManager.getMultipartPartsTable().deleteWithBatch( + batchOperation, partKey); + } } } // update bucket usedBytes. @@ -100,6 +111,18 @@ protected void addAbortToBatch( omBucketInfo.getBucketName()), omBucketInfo); } + private void addPartToDeletedTable(OMMetadataManager omMetadataManager, + BatchOperation batchOperation, OmBucketInfo omBucketInfo, + OmMultipartAbortInfo abortInfo, OmKeyInfo currentKeyPartInfo, + long updateID) throws IOException { + RepeatedOmKeyInfo repeatedOmKeyInfo = OmUtils.prepareKeyForDelete( + omBucketInfo.getObjectID(), currentKeyPartInfo, updateID); + String deleteKey = omMetadataManager.getOzoneDeletePathKey( + currentKeyPartInfo.getObjectID(), abortInfo.getMultipartKey()); + omMetadataManager.getDeletedTable().putWithBatch(batchOperation, + deleteKey, repeatedOmKeyInfo); + } + /** * Adds the operation of aborting a multipart upload to the batch operation. * Both LEGACY/OBS and FSO have similar abort logic. The only difference diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java index 89919fc44042..dbe35f08cc7a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3ExpiredMultipartUploadsAbortResponse.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE; @@ -39,7 +40,7 @@ * Handles response to abort expired MPUs. */ @CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, OPEN_FILE_TABLE, - DELETED_TABLE, MULTIPART_INFO_TABLE, BUCKET_TABLE}) + DELETED_TABLE, MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, BUCKET_TABLE}) public class S3ExpiredMultipartUploadsAbortResponse extends AbstractS3MultipartAbortResponse { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java index 371d24e31b2b..ccf6f81d0a1d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponse.java @@ -20,15 +20,21 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE; import jakarta.annotation.Nonnull; import java.io.IOException; +import java.util.Collections; +import java.util.List; import org.apache.hadoop.hdds.utils.db.BatchOperation; import org.apache.hadoop.ozone.om.OMMetadataManager; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartAbortInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -36,7 +42,7 @@ * Response for Multipart Abort Request. */ @CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, DELETED_TABLE, - MULTIPART_INFO_TABLE, BUCKET_TABLE}) + MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, BUCKET_TABLE}) public class S3MultipartUploadAbortResponse extends AbstractS3MultipartAbortResponse { @@ -44,16 +50,23 @@ public class S3MultipartUploadAbortResponse extends private String multipartOpenKey; private OmMultipartKeyInfo omMultipartKeyInfo; private OmBucketInfo omBucketInfo; + private List partsKeyInfoToDelete; + private List partsTableKeysToDelete; + @SuppressWarnings("checkstyle:ParameterNumber") public S3MultipartUploadAbortResponse(@Nonnull OMResponse omResponse, String multipartKey, String multipartOpenKey, @Nonnull OmMultipartKeyInfo omMultipartKeyInfo, - @Nonnull OmBucketInfo omBucketInfo, @Nonnull BucketLayout bucketLayout) { + @Nonnull OmBucketInfo omBucketInfo, @Nonnull BucketLayout bucketLayout, + List partsKeyInfoToDelete, + List partsTableKeysToDelete) { super(omResponse, bucketLayout); this.multipartKey = multipartKey; this.multipartOpenKey = multipartOpenKey; this.omMultipartKeyInfo = omMultipartKeyInfo; this.omBucketInfo = omBucketInfo; + this.partsKeyInfoToDelete = partsKeyInfoToDelete; + this.partsTableKeysToDelete = partsTableKeysToDelete; } /** @@ -69,8 +82,15 @@ public S3MultipartUploadAbortResponse(@Nonnull OMResponse omResponse, @Override public void addToDBBatch(OMMetadataManager omMetadataManager, BatchOperation batchOperation) throws IOException { - addAbortToBatch(omMetadataManager, batchOperation, - multipartKey, multipartOpenKey, omMultipartKeyInfo, omBucketInfo, - getBucketLayout()); + OmMultipartAbortInfo abortInfo = new OmMultipartAbortInfo.Builder() + .setMultipartKey(multipartKey) + .setMultipartOpenKey(multipartOpenKey) + .setMultipartKeyInfo(omMultipartKeyInfo) + .setBucketLayout(getBucketLayout()) + .setPartsKeyInfoToDelete(partsKeyInfoToDelete) + .setPartsTableKeysToDelete(partsTableKeysToDelete) + .build(); + addAbortToBatch(omMetadataManager, batchOperation, omBucketInfo, + Collections.singletonList(abortInfo)); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponseWithFSO.java index 93d3c45289d2..b467c3b4eed3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponseWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadAbortResponseWithFSO.java @@ -20,12 +20,16 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE; import jakarta.annotation.Nonnull; +import java.util.List; import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -33,17 +37,21 @@ * Response for Multipart Abort Request - prefix layout. */ @CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, DELETED_TABLE, - MULTIPART_INFO_TABLE, BUCKET_TABLE}) + MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, BUCKET_TABLE}) public class S3MultipartUploadAbortResponseWithFSO extends S3MultipartUploadAbortResponse { + @SuppressWarnings("checkstyle:ParameterNumber") public S3MultipartUploadAbortResponseWithFSO(@Nonnull OMResponse omResponse, String multipartKey, String multipartOpenKey, @Nonnull OmMultipartKeyInfo omMultipartKeyInfo, - @Nonnull OmBucketInfo omBucketInfo, @Nonnull BucketLayout bucketLayout) { + @Nonnull OmBucketInfo omBucketInfo, @Nonnull BucketLayout bucketLayout, + List partsKeyInfoToDelete, + List partsTableKeysToDelete) { super(omResponse, multipartKey, multipartOpenKey, omMultipartKeyInfo, - omBucketInfo, bucketLayout); + omBucketInfo, bucketLayout, partsKeyInfoToDelete, + partsTableKeysToDelete); } /** diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java index 0351b4f71bd5..2c2bb20244c5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponse.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.NO_SUCH_MULTIPART_UPLOAD_ERROR; import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK; @@ -36,6 +37,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.key.OmKeyResponse; @@ -45,12 +48,14 @@ * Response for S3MultipartUploadCommitPart request. */ @CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, DELETED_TABLE, - MULTIPART_INFO_TABLE, BUCKET_TABLE}) + MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, BUCKET_TABLE}) public class S3MultipartUploadCommitPartResponse extends OmKeyResponse { private final String multipartKey; + private final OmMultipartPartKey multipartPartKey; private final String openKey; private final OmMultipartKeyInfo omMultipartKeyInfo; + private final OmMultipartPartInfo omMultipartPartInfo; private final Map keyToDeleteMap; private final OmKeyInfo openPartKeyInfoToBeDeleted; private final OmBucketInfo omBucketInfo; @@ -66,6 +71,8 @@ public class S3MultipartUploadCommitPartResponse extends OmKeyResponse { public S3MultipartUploadCommitPartResponse(@Nonnull OMResponse omResponse, String multipartKey, String openKey, @Nullable OmMultipartKeyInfo omMultipartKeyInfo, + @Nullable OmMultipartPartKey multipartPartKey, + @Nullable OmMultipartPartInfo omMultipartPartInfo, @Nullable Map keyToDeleteMap, @Nullable OmKeyInfo openPartKeyInfoToBeDeleted, @Nonnull OmBucketInfo omBucketInfo, @@ -73,8 +80,10 @@ public S3MultipartUploadCommitPartResponse(@Nonnull OMResponse omResponse, @Nonnull BucketLayout bucketLayout) { super(omResponse, bucketLayout); this.multipartKey = multipartKey; + this.multipartPartKey = multipartPartKey; this.openKey = openKey; this.omMultipartKeyInfo = omMultipartKeyInfo; + this.omMultipartPartInfo = omMultipartPartInfo; this.keyToDeleteMap = keyToDeleteMap; this.openPartKeyInfoToBeDeleted = openPartKeyInfoToBeDeleted; this.omBucketInfo = omBucketInfo; @@ -118,9 +127,13 @@ public void addToDBBatch(OMMetadataManager omMetadataManager, omMetadataManager.getMultipartInfoTable().putWithBatch(batchOperation, multipartKey, omMultipartKeyInfo); + if (multipartPartKey != null && omMultipartPartInfo != null) { + omMetadataManager.getMultipartPartsTable().putWithBatch(batchOperation, + multipartPartKey, omMultipartPartInfo); + } - // This information has been added to multipartKeyInfo. So, we can - // safely delete part key info from open key table. + // This information has been added to multipartInfoTable or + // multipartPartsTable. So, we can safely delete the part open key. omMetadataManager.getOpenKeyTable(getBucketLayout()) .deleteWithBatch(batchOperation, openKey); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponseWithFSO.java index 51722825f938..ca1d399c6ecc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponseWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCommitPartResponseWithFSO.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.BUCKET_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE; import jakarta.annotation.Nonnull; @@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -37,7 +40,7 @@ * Response for S3MultipartUploadCommitPartWithFSO request. */ @CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, DELETED_TABLE, - MULTIPART_INFO_TABLE, BUCKET_TABLE}) + MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, BUCKET_TABLE}) public class S3MultipartUploadCommitPartResponseWithFSO extends S3MultipartUploadCommitPartResponse { @@ -51,11 +54,14 @@ public class S3MultipartUploadCommitPartResponseWithFSO public S3MultipartUploadCommitPartResponseWithFSO( @Nonnull OMResponse omResponse, String multipartKey, String openKey, @Nullable OmMultipartKeyInfo omMultipartKeyInfo, + @Nullable OmMultipartPartKey multipartPartKey, + @Nullable OmMultipartPartInfo omMultipartPartInfo, @Nullable Map keyToDeleteMap, @Nullable OmKeyInfo openPartKeyInfoToBeDeleted, @Nonnull OmBucketInfo omBucketInfo, long bucketId, @Nonnull BucketLayout bucketLayout) { super(omResponse, multipartKey, openKey, omMultipartKeyInfo, + multipartPartKey, omMultipartPartInfo, keyToDeleteMap, openPartKeyInfoToBeDeleted, omBucketInfo, bucketId, bucketLayout); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java index b46aebf7d34f..a1dfe4ed317d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponse.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DELETED_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_KEY_TABLE; import jakarta.annotation.Nonnull; @@ -32,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout; import org.apache.hadoop.ozone.om.helpers.OmBucketInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.om.response.key.OmKeyResponse; @@ -46,12 +48,13 @@ * 3) Delete unused parts. */ @CleanupTableInfo(cleanupTables = {OPEN_KEY_TABLE, KEY_TABLE, DELETED_TABLE, - MULTIPART_INFO_TABLE, BUCKET_TABLE}) + MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, BUCKET_TABLE}) public class S3MultipartUploadCompleteResponse extends OmKeyResponse { private String multipartKey; private String multipartOpenKey; private OmKeyInfo omKeyInfo; private List allKeyInfoToRemove; + private List multipartPartKeysToDelete; private OmBucketInfo omBucketInfo; private long bucketId; @@ -64,7 +67,8 @@ public S3MultipartUploadCompleteResponse( @Nonnull List allKeyInfoToRemove, @Nonnull BucketLayout bucketLayout, OmBucketInfo omBucketInfo, - long bucketId) { + long bucketId, + List multipartPartKeysToDelete) { super(omResponse, bucketLayout); this.allKeyInfoToRemove = allKeyInfoToRemove; this.multipartKey = multipartKey; @@ -72,6 +76,7 @@ public S3MultipartUploadCompleteResponse( this.omKeyInfo = omKeyInfo; this.omBucketInfo = omBucketInfo; this.bucketId = bucketId; + this.multipartPartKeysToDelete = multipartPartKeysToDelete; } /** @@ -93,6 +98,12 @@ public void addToDBBatch(OMMetadataManager omMetadataManager, .deleteWithBatch(batchOperation, multipartOpenKey); omMetadataManager.getMultipartInfoTable().deleteWithBatch(batchOperation, multipartKey); + if (multipartPartKeysToDelete != null) { + for (OmMultipartPartKey multipartPartKey : multipartPartKeysToDelete) { + omMetadataManager.getMultipartPartsTable().deleteWithBatch( + batchOperation, multipartPartKey); + } + } // 2. Add key to KeyTable addToKeyTable(omMetadataManager, batchOperation); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseWithFSO.java index 2147a039a531..1e68e90b99df 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/multipart/S3MultipartUploadCompleteResponseWithFSO.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.DIRECTORY_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_INFO_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.MULTIPART_PARTS_TABLE; import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.OPEN_FILE_TABLE; import jakarta.annotation.Nonnull; @@ -33,6 +34,7 @@ import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmMultipartPartKey; import org.apache.hadoop.ozone.om.request.file.OMFileRequest; import org.apache.hadoop.ozone.om.response.CleanupTableInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; @@ -46,7 +48,7 @@ * 3) Delete unused parts. */ @CleanupTableInfo(cleanupTables = {OPEN_FILE_TABLE, FILE_TABLE, DELETED_TABLE, - MULTIPART_INFO_TABLE, DIRECTORY_TABLE}) + MULTIPART_INFO_TABLE, MULTIPART_PARTS_TABLE, DIRECTORY_TABLE}) public class S3MultipartUploadCompleteResponseWithFSO extends S3MultipartUploadCompleteResponse { @@ -68,9 +70,11 @@ public S3MultipartUploadCompleteResponseWithFSO( OmBucketInfo omBucketInfo, @Nonnull long volumeId, @Nonnull long bucketId, List missingParentInfos, - OmMultipartKeyInfo multipartKeyInfo) { + OmMultipartKeyInfo multipartKeyInfo, + List multipartPartKeysToDelete) { super(omResponse, multipartKey, multipartOpenKey, omKeyInfo, - allKeyInfoToRemove, bucketLayout, omBucketInfo, bucketId); + allKeyInfoToRemove, bucketLayout, omBucketInfo, bucketId, + multipartPartKeysToDelete); this.volumeId = volumeId; this.bucketId = bucketId; this.missingParentInfos = missingParentInfos; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java index ac56273d628c..447fc8182ae8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartResponse.java @@ -290,7 +290,7 @@ public S3MultipartUploadCommitPartResponse createS3CommitMPUResponseFSO( } return new S3MultipartUploadCommitPartResponseWithFSO(omResponse, - multipartKey, openKey, multipartKeyInfo, keyToDeleteMap, + multipartKey, openKey, multipartKeyInfo, null, null, keyToDeleteMap, openPartKeyInfoToBeDeleted, omBucketInfo, omBucketInfo.getObjectID(), getBucketLayout()); } @@ -327,7 +327,7 @@ public S3MultipartUploadCompleteResponse createS3CompleteMPUResponseFSO( return new S3MultipartUploadCompleteResponseWithFSO(omResponse, multipartKey, multipartOpenKey, omKeyInfo, allKeyInfoToRemove, getBucketLayout(), omBucketInfo, volumeId, bucketId, null, - multipartKeyInfo); + multipartKeyInfo, Collections.emptyList()); } protected S3InitiateMultipartUploadResponse getS3InitiateMultipartUploadResp( @@ -343,7 +343,7 @@ protected S3MultipartUploadAbortResponse getS3MultipartUploadAbortResp( OMResponse omResponse) { return new S3MultipartUploadAbortResponse(omResponse, multipartKey, multipartOpenKey, omMultipartKeyInfo, omBucketInfo, - getBucketLayout()); + getBucketLayout(), Collections.emptyList(), Collections.emptyList()); } public BucketLayout getBucketLayout() { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java index 0aabd317f7b4..ab6beafae711 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/s3/multipart/TestS3MultipartUploadAbortResponseWithFSO.java @@ -98,7 +98,8 @@ protected S3MultipartUploadAbortResponse getS3MultipartUploadAbortResp( OzoneManagerProtocolProtos.OMResponse omResponse) { return new S3MultipartUploadAbortResponseWithFSO(omResponse, multipartKey, multipartOpenKey, omMultipartKeyInfo, omBucketInfo, - getBucketLayout()); + getBucketLayout(), java.util.Collections.emptyList(), + java.util.Collections.emptyList()); } @Override