diff --git a/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java b/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java index 9da66147a418..67841627e176 100755 --- a/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java +++ b/core/common/src/main/java/alluxio/underfs/ObjectUnderFileSystem.java @@ -727,8 +727,6 @@ private boolean renameDirectoryInternal(String src, String dst, DeleteBuffer del if (!copyObject(srcKey, stripPrefixIfPresent(convertToFolderName(dst)))) { return false; } - deleteBuffer.add(srcKey); - // Rename each child in the src folder to destination/child // a. Since renames are a copy operation, files are added to a buffer and processed concurrently // b. Pseudo-directories are metadata only operations are not added to the buffer @@ -746,6 +744,9 @@ private boolean renameDirectoryInternal(String src, String dst, DeleteBuffer del buffer.add(new Pair<>(childSrcPath, childDstPath)); } } + // The files are arranged in the order of the parent directories + // to avoid the failure of deleting the directories first. + deleteBuffer.add(srcKey); // Get result of parallel file renames int filesRenamed = buffer.getResult().size(); if (filesRenamed != buffer.mEntriesAdded) { diff --git a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java index eaa58ce570f0..67a54f3be3f3 100644 --- a/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java +++ b/core/server/master/src/main/java/alluxio/master/file/DefaultFileSystemMaster.java @@ -3157,6 +3157,27 @@ && shouldPersistPath( } } } + // After rename succeeds, avoid this asynchronous persistence task + if (srcInode.isDirectory() && !context.getPersist()) { + LOG.debug("The source directory {} has been renamed, " + + "cancel the child persistence task.", srcInodePath); + try (LockedInodePathList descendants = mInodeTree.getDescendants(srcInodePath)) { + for (LockedInodePath childPath : descendants) { + Inode childInode = childPath.getInode(); + if (childInode.isFile() && !childInode.isPersisted()) { + long fileId = childInode.getId(); + LOG.debug("Cancel the child {} persistence task, file id {}", childPath, fileId); + // Remove the file from the set of files to persist. + mPersistRequests.remove(fileId); + // Cancel any ongoing jobs. + PersistJob job = mPersistJobs.get(fileId); + if (job != null) { + job.setCancelState(PersistJob.CancelState.TO_BE_CANCELED); + } + } + } + } + } } /** diff --git a/job/server/src/main/java/alluxio/job/plan/persist/PersistDefinition.java b/job/server/src/main/java/alluxio/job/plan/persist/PersistDefinition.java index 694aa4fdc80e..ea6f3301b44e 100644 --- a/job/server/src/main/java/alluxio/job/plan/persist/PersistDefinition.java +++ b/job/server/src/main/java/alluxio/job/plan/persist/PersistDefinition.java @@ -18,6 +18,7 @@ import alluxio.client.file.URIStatus; import alluxio.collections.Pair; import alluxio.conf.Configuration; +import alluxio.exception.status.CancelledException; import alluxio.grpc.OpenFilePOptions; import alluxio.grpc.ReadPType; import alluxio.job.RunTaskContext; @@ -43,6 +44,8 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; @@ -126,7 +129,9 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args, if (!uriStatus.isCompleted()) { throw new IOException("Cannot persist an incomplete Alluxio file: " + uri); } - long bytesWritten; + long bytesWritten = 0; + List successMkdirsList = new ArrayList<>(); + boolean isTaskSuccess = true; try (Closer closer = Closer.create()) { OpenFilePOptions options = OpenFilePOptions.newBuilder() .setReadType(ReadPType.NO_CACHE) @@ -149,6 +154,10 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args, curUfsPath = curUfsPath.getParent(); } while (!ancestorUfsAndAlluxioPaths.empty()) { + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Task received interrupt signal, may be cancelled."); + throw new CancelledException("Interrupted before create directory and copy file"); + } Pair ancestorUfsAndAlluxioPath = ancestorUfsAndAlluxioPaths.pop(); String ancestorUfsPath = ancestorUfsAndAlluxioPath.getFirst(); String ancestorAlluxioPath = ancestorUfsAndAlluxioPath.getSecond(); @@ -164,6 +173,7 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args, List allAcls = Stream.concat(status.getDefaultAcl().getEntries().stream(), status.getAcl().getEntries().stream()).collect(Collectors.toList()); ufs.setAclEntries(ancestorUfsPath, allAcls); + successMkdirsList.add(ancestorUfsPath); } else if (!ufs.isDirectory(ancestorUfsPath)) { throw new IOException( "Failed to create " + ufsPath + " with permission " + options @@ -180,6 +190,22 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args, ufs.setAclEntries(dstPath.toString(), allAcls); bytesWritten = IOUtils.copyLarge(in, out, new byte[8 * Constants.MB]); incrementPersistedMetric(ufsClient.getUfsMountPointUri(), bytesWritten); + } catch (Exception e) { + isTaskSuccess = false; + LOG.warn("Failed run persiste task of {}, case {}", ufsPath, e.getMessage()); + throw e; + } finally { + if (!isTaskSuccess) { + LOG.warn("The created directory needs to be rolled back and deleted, directory size {}.", + successMkdirsList.size()); + ufs.deleteExistingFile(ufsPath); + Collections.reverse(successMkdirsList); + for (String path : successMkdirsList) { + if (!ufs.deleteDirectory(path)) { + LOG.warn("Failed delete ufs path {}", path); + } + } + } } LOG.info("Persisted file {} with size {}", ufsPath, bytesWritten); }