Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix delete a directory failed because of rename #18687

Open
wants to merge 3 commits into
base: master-2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -727,8 +727,6 @@ private boolean renameDirectoryInternal(String src, String dst, DeleteBuffer del
if (!copyObject(srcKey, stripPrefixIfPresent(convertToFolderName(dst)))) {
return false;
}
deleteBuffer.add(srcKey);

// Rename each child in the src folder to destination/child
// a. Since renames are a copy operation, files are added to a buffer and processed concurrently
// b. Pseudo-directories are metadata only operations are not added to the buffer
Expand All @@ -746,6 +744,9 @@ private boolean renameDirectoryInternal(String src, String dst, DeleteBuffer del
buffer.add(new Pair<>(childSrcPath, childDstPath));
}
}
// The files are arranged in the order of the parent directories
// to avoid the failure of deleting the directories first.
deleteBuffer.add(srcKey);
// Get result of parallel file renames
int filesRenamed = buffer.getResult().size();
if (filesRenamed != buffer.mEntriesAdded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3157,6 +3157,27 @@ && shouldPersistPath(
}
}
}
// After rename succeeds, avoid this asynchronous persistence task
if (srcInode.isDirectory() && !context.getPersist()) {
LOG.debug("The source directory {} has been renamed, "
+ "cancel the child persistence task.", srcInodePath);
try (LockedInodePathList descendants = mInodeTree.getDescendants(srcInodePath)) {
for (LockedInodePath childPath : descendants) {
Inode childInode = childPath.getInode();
if (childInode.isFile() && !childInode.isPersisted()) {
long fileId = childInode.getId();
LOG.debug("Cancel the child {} persistence task, file id {}", childPath, fileId);
// Remove the file from the set of files to persist.
mPersistRequests.remove(fileId);
// Cancel any ongoing jobs.
PersistJob job = mPersistJobs.get(fileId);
if (job != null) {
job.setCancelState(PersistJob.CancelState.TO_BE_CANCELED);
}
}
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.conf.Configuration;
import alluxio.exception.status.CancelledException;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.job.RunTaskContext;
Expand All @@ -43,6 +44,8 @@

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -126,7 +129,9 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args,
if (!uriStatus.isCompleted()) {
throw new IOException("Cannot persist an incomplete Alluxio file: " + uri);
}
long bytesWritten;
long bytesWritten = 0;
List<String> successMkdirsList = new ArrayList<>();
boolean isTaskSuccess = true;
try (Closer closer = Closer.create()) {
OpenFilePOptions options = OpenFilePOptions.newBuilder()
.setReadType(ReadPType.NO_CACHE)
Expand All @@ -149,6 +154,10 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args,
curUfsPath = curUfsPath.getParent();
}
while (!ancestorUfsAndAlluxioPaths.empty()) {
if (Thread.currentThread().isInterrupted()) {
LOG.warn("Task received interrupt signal, may be cancelled.");
throw new CancelledException("Interrupted before create directory and copy file");
}
Pair<String, String> ancestorUfsAndAlluxioPath = ancestorUfsAndAlluxioPaths.pop();
String ancestorUfsPath = ancestorUfsAndAlluxioPath.getFirst();
String ancestorAlluxioPath = ancestorUfsAndAlluxioPath.getSecond();
Expand All @@ -164,6 +173,7 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args,
List<AclEntry> allAcls = Stream.concat(status.getDefaultAcl().getEntries().stream(),
status.getAcl().getEntries().stream()).collect(Collectors.toList());
ufs.setAclEntries(ancestorUfsPath, allAcls);
successMkdirsList.add(ancestorUfsPath);
} else if (!ufs.isDirectory(ancestorUfsPath)) {
throw new IOException(
"Failed to create " + ufsPath + " with permission " + options
Expand All @@ -180,6 +190,22 @@ public SerializableVoid runTask(PersistConfig config, SerializableVoid args,
ufs.setAclEntries(dstPath.toString(), allAcls);
bytesWritten = IOUtils.copyLarge(in, out, new byte[8 * Constants.MB]);
incrementPersistedMetric(ufsClient.getUfsMountPointUri(), bytesWritten);
} catch (Exception e) {
isTaskSuccess = false;
LOG.warn("Failed run persiste task of {}, case {}", ufsPath, e.getMessage());
throw e;
} finally {
if (!isTaskSuccess) {
LOG.warn("The created directory needs to be rolled back and deleted, directory size {}.",
successMkdirsList.size());
ufs.deleteExistingFile(ufsPath);
Collections.reverse(successMkdirsList);
for (String path : successMkdirsList) {
if (!ufs.deleteDirectory(path)) {
LOG.warn("Failed delete ufs path {}", path);
}
}
}
}
LOG.info("Persisted file {} with size {}", ufsPath, bytesWritten);
}
Expand Down
Loading