Skip to content

Commit

Permalink
Async Deletion of Previous Metadata and Statistics Files (apache#312)
Browse files Browse the repository at this point in the history
* delete manifest, manifest list, prev files, stats when drop table with purge

* unit test for drop table

* refine warning code

* code format

* refine warning code

* remove unused code

* remove unused import

* code format

* remove additional manifest and manifest list deletion

* add stat deletion test

* code format

* add new AsyncTaskType

* Schedule prev metadata and stat files deletion in seperated tasks

* Table content cleanup task handler

* Unit test for table clean up

* code format

* register task handler

* handler table content files in batch

* adjust unit test after batch processing

* add unit test for TableContentCleanupTaskHandler

* code format

* merge cleanup tasks into one

* code format

* refactor manifest cleanup handler based on comments
- 1. renaming
- 2. add log and exception handling
- 3. remove unnecessary log

* refactor table cleanup handler based on comments
- 1. renaming
- 2. extract task entities creation into methods
- 3. remove unnecessary filtering

* add TODO

* renaming

* split the task type in cleanup task handler

* error handling

---------

Co-authored-by: Yufei Gu <[email protected]>
  • Loading branch information
danielhumanmod and flyrain authored Dec 6, 2024
1 parent b5685ef commit edd09f0
Show file tree
Hide file tree
Showing 7 changed files with 824 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

public enum AsyncTaskType {
ENTITY_CLEANUP_SCHEDULER(1),
FILE_CLEANUP(2);
MANIFEST_FILE_CLEANUP(2),
METADATA_FILE_BATCH_CLEANUP(3);

private final int typeCode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private static TaskEntity createTask(String taskName, long id) {
.setName(taskName)
.withData("data")
.setId(id)
.withTaskType(AsyncTaskType.FILE_CLEANUP)
.withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP)
.setCreateTimestamp(Instant.now().toEpochMilli())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@
import org.slf4j.LoggerFactory;

/**
* {@link TaskHandler} responsible for deleting all of the files in a manifest and the manifest
* itself. Since data files may be present in multiple manifests across different snapshots, we
* assume a data file that doesn't exist is missing because it was already deleted by another task.
* {@link TaskHandler} responsible for deleting table files: 1. Manifest files: It contains all the
* files in a manifest and the manifest itself. Since data files may be present in multiple
* manifests across different snapshots, we assume a data file that doesn't exist is missing because
* it was already deleted by another task. 2. Table metadata files: It contains previous metadata
* and statistics files, which are grouped and deleted in batch
*/
// TODO: Rename this class since we introducing metadata cleanup here
public class ManifestFileCleanupTaskHandler implements TaskHandler {
public static final int MAX_ATTEMPTS = 3;
public static final int FILE_DELETION_RETRY_MILLIS = 100;
Expand All @@ -62,66 +65,119 @@ public ManifestFileCleanupTaskHandler(

@Override
public boolean canHandleTask(TaskEntity task) {
return task.getTaskType() == AsyncTaskType.FILE_CLEANUP;
return task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP
|| task.getTaskType() == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP;
}

@Override
public boolean handleTask(TaskEntity task) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData());
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task)) {

// if the file doesn't exist, we assume that another task execution was successful, but failed
// to drop the task entity. Log a warning and return success
if (!TaskUtils.exists(manifestFile.path(), authorizedFileIO)) {
if (task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP) {
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
} else if (task.getTaskType() == AsyncTaskType.METADATA_FILE_BATCH_CLEANUP) {
return cleanUpMetadataFiles(cleanupTask.getMetadataFiles(), authorizedFileIO, tableId);
} else {
LOGGER
.atWarn()
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
.log("Manifest cleanup task scheduled, but manifest file doesn't exist");
return true;
}

ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, authorizedFileIO);
List<CompletableFuture<Void>> dataFileDeletes =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE),
false)
.map(
file ->
tryDelete(
tableId, authorizedFileIO, manifestFile, file.path().toString(), null, 1))
.toList();
LOGGER.debug(
"Scheduled {} data files to be deleted from manifest {}",
dataFileDeletes.size(),
manifestFile.path());
try {
// wait for all data files to be deleted, then wait for the manifest itself to be deleted
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
.thenCompose(
(v) -> {
LOGGER
.atInfo()
.addKeyValue("manifestFile", manifestFile.path())
.log("All data files in manifest deleted - deleting manifest");
return tryDelete(
tableId, authorizedFileIO, manifestFile, manifestFile.path(), null, 1);
})
.get();
return true;
} catch (InterruptedException e) {
LOGGER.error(
"Interrupted exception deleting data files from manifest {}", manifestFile.path(), e);
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e);
.log("Unknown task type {}", task.getTaskType());
return false;
}
}
}

private boolean cleanUpManifestFile(
ManifestFile manifestFile, FileIO fileIO, TableIdentifier tableId) {
// if the file doesn't exist, we assume that another task execution was successful, but
// failed to drop the task entity. Log a warning and return success
if (!TaskUtils.exists(manifestFile.path(), fileIO)) {
LOGGER
.atWarn()
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("tableId", tableId)
.log("Manifest cleanup task scheduled, but manifest file doesn't exist");
return true;
}

ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile, fileIO);
List<CompletableFuture<Void>> dataFileDeletes =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataFiles.iterator(), Spliterator.IMMUTABLE),
false)
.map(file -> tryDelete(tableId, fileIO, manifestFile, file.path().toString(), null, 1))
.toList();
LOGGER.debug(
"Scheduled {} data files to be deleted from manifest {}",
dataFileDeletes.size(),
manifestFile.path());
try {
// wait for all data files to be deleted, then wait for the manifest itself to be deleted
CompletableFuture.allOf(dataFileDeletes.toArray(CompletableFuture[]::new))
.thenCompose(
(v) -> {
LOGGER
.atInfo()
.addKeyValue("manifestFile", manifestFile.path())
.log("All data files in manifest deleted - deleting manifest");
return tryDelete(tableId, fileIO, manifestFile, manifestFile.path(), null, 1);
})
.get();
return true;
} catch (InterruptedException e) {
LOGGER.error(
"Interrupted exception deleting data files from manifest {}", manifestFile.path(), e);
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.error("Unable to delete data files from manifest {}", manifestFile.path(), e);
return false;
}
}

private boolean cleanUpMetadataFiles(
List<String> metadataFiles, FileIO fileIO, TableIdentifier tableId) {
List<String> validFiles =
metadataFiles.stream().filter(file -> TaskUtils.exists(file, fileIO)).toList();
if (validFiles.isEmpty()) {
LOGGER
.atWarn()
.addKeyValue("metadataFiles", metadataFiles.toString())
.addKeyValue("tableId", tableId)
.log("Table metadata cleanup task scheduled, but the none of the file in batch exists");
return true;
}
if (validFiles.size() < metadataFiles.size()) {
List<String> missingFiles =
metadataFiles.stream().filter(file -> !TaskUtils.exists(file, fileIO)).toList();
LOGGER
.atWarn()
.addKeyValue("metadataFiles", metadataFiles.toString())
.addKeyValue("missingFiles", missingFiles)
.addKeyValue("tableId", tableId)
.log(
"Table metadata cleanup task scheduled, but {} files in the batch are missing",
missingFiles.size());
}

// Schedule the deletion for each file asynchronously
List<CompletableFuture<Void>> deleteFutures =
validFiles.stream().map(file -> tryDelete(tableId, fileIO, null, file, null, 1)).toList();

try {
// Wait for all delete operations to finish
CompletableFuture<Void> allDeletes =
CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0]));
allDeletes.join();
} catch (Exception e) {
LOGGER.error("Exception detected during metadata file deletion", e);
return false;
}

return true;
}

private static ManifestFile decodeManifestData(String manifestFileData) {
try {
return ManifestFiles.decode(Base64.decodeBase64(manifestFileData));
Expand All @@ -134,16 +190,16 @@ private CompletableFuture<Void> tryDelete(
TableIdentifier tableId,
FileIO fileIO,
ManifestFile manifestFile,
String dataFile,
String file,
Throwable e,
int attempt) {
if (e != null && attempt <= MAX_ATTEMPTS) {
LOGGER
.atWarn()
.addKeyValue("dataFile", dataFile)
.addKeyValue("file", file)
.addKeyValue("attempt", attempt)
.addKeyValue("error", e.getMessage())
.log("Error encountered attempting to delete data file");
.log("Error encountered attempting to delete file");
}
if (attempt > MAX_ATTEMPTS && e != null) {
return CompletableFuture.failedFuture(e);
Expand All @@ -155,27 +211,27 @@ private CompletableFuture<Void> tryDelete(
// file's existence, but then it is deleted before we have a chance to
// send the delete request. In such a case, we <i>should</i> retry
// and find
if (TaskUtils.exists(dataFile, fileIO)) {
fileIO.deleteFile(dataFile);
if (TaskUtils.exists(file, fileIO)) {
fileIO.deleteFile(file);
} else {
LOGGER
.atInfo()
.addKeyValue("dataFile", dataFile)
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("file", file)
.addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "")
.addKeyValue("tableId", tableId)
.log("Manifest cleanup task scheduled, but data file doesn't exist");
.log("table file cleanup task scheduled, but data file doesn't exist");
}
},
executorService)
.exceptionallyComposeAsync(
newEx -> {
LOGGER
.atWarn()
.addKeyValue("dataFile", dataFile)
.addKeyValue("tableIdentifer", tableId)
.addKeyValue("manifestFile", manifestFile.path())
.addKeyValue("dataFile", file)
.addKeyValue("tableIdentifier", tableId)
.addKeyValue("manifestFile", manifestFile != null ? manifestFile.path() : "")
.log("Exception caught deleting data file from manifest", newEx);
return tryDelete(tableId, fileIO, manifestFile, dataFile, newEx, attempt + 1);
return tryDelete(tableId, fileIO, manifestFile, file, newEx, attempt + 1);
},
CompletableFuture.delayedExecutor(
FILE_DELETION_RETRY_MILLIS, TimeUnit.MILLISECONDS, executorService));
Expand All @@ -185,12 +241,18 @@ private CompletableFuture<Void> tryDelete(
public static final class ManifestCleanupTask {
private TableIdentifier tableId;
private String manifestFileData;
private List<String> metadataFiles;

public ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) {
this.tableId = tableId;
this.manifestFileData = manifestFileData;
}

public ManifestCleanupTask(TableIdentifier tableId, List<String> metadataFiles) {
this.tableId = tableId;
this.metadataFiles = metadataFiles;
}

public ManifestCleanupTask() {}

public TableIdentifier getTableId() {
Expand All @@ -209,17 +271,26 @@ public void setManifestFileData(String manifestFileData) {
this.manifestFileData = manifestFileData;
}

public List<String> getMetadataFiles() {
return metadataFiles;
}

public void setMetadataFiles(List<String> metadataFiles) {
this.metadataFiles = metadataFiles;
}

@Override
public boolean equals(Object object) {
if (this == object) return true;
if (!(object instanceof ManifestCleanupTask that)) return false;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(manifestFileData, that.manifestFileData);
&& Objects.equals(manifestFileData, that.manifestFileData)
&& Objects.equals(metadataFiles, that.metadataFiles);
}

@Override
public int hashCode() {
return Objects.hash(tableId, manifestFileData);
return Objects.hash(tableId, manifestFileData, metadataFiles);
}
}
}
Loading

0 comments on commit edd09f0

Please sign in to comment.