Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add import log classes and utils #2463

Draft
wants to merge 5 commits into
base: feat/data-loader/import-process
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data-loader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ subprojects {
implementation("org.apache.commons:commons-lang3:${commonsLangVersion}")
implementation("commons-io:commons-io:${commonsIoVersion}")
implementation("org.slf4j:slf4j-simple:${slf4jVersion}")
implementation("software.amazon.awssdk:s3:2.25.31")

// Mockito
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package com.scalar.db.dataloader.core.dataimport.log;

import com.fasterxml.jackson.databind.JsonNode;
import com.scalar.db.dataloader.core.Constants;
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public abstract class AbstractImportLogger implements ImportEventListener {

protected static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();

protected final ImportLoggerConfig config;
protected final LogWriterFactory logWriterFactory;
protected final List<ImportEventListener> listeners = new ArrayList<>();

public void addListener(ImportEventListener listener) {
listeners.add(listener);
}

public void removeListener(ImportEventListener listener) {
listeners.remove(listener);
}

@Override
public void onDataChunkStarted(ImportDataChunkStatus importDataChunkStatus) {
// Currently we are not logging the start of a data chunk
}

@Override
public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
// Currently we are not logging the start of a transaction batch
notifyTransactionBatchStarted(batchStatus);
}

@Override
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
// skip logging success records if the configuration is set to skip
if (shouldSkipLoggingSuccess(batchResult)) {
return;
}

logTransactionBatch(batchResult);
notifyTransactionBatchCompleted(batchResult);
}

@Override
public void onTaskComplete(ImportTaskResult taskResult) {
// TODO: we can remove this event if it's current not being used in the import Manager as well
}

protected abstract void logTransactionBatch(ImportTransactionBatchResult batchResult);

protected boolean shouldSkipLoggingSuccess(ImportTransactionBatchResult batchResult) {
return batchResult.isSuccess() && !config.isLogSuccessRecords();
}

protected JsonNode createFilteredTransactionBatchLogJsonNode(
ImportTransactionBatchResult batchResult) {

// If the batch result does not contain any records, return the batch result as is
if (batchResult.getRecords() == null) {
return OBJECT_MAPPER.valueToTree(batchResult);
}

// Create a new list to store the modified import task results
List<ImportTaskResult> modifiedRecords = new ArrayList<>();

// Loop over the records in the batchResult
for (ImportTaskResult taskResult : batchResult.getRecords()) {
// Create a new ImportTaskResult and not add the raw record yet
List<ImportTargetResult> targetResults =
batchResult.isSuccess()
? taskResult.getTargets()
: updateTargetStatusForAbortedTransactionBatch(taskResult.getTargets());
ImportTaskResult.ImportTaskResultBuilder builder =
ImportTaskResult.builder()
.rowNumber(taskResult.getRowNumber())
.targets(targetResults)
.dataChunkId(taskResult.getDataChunkId())
.rowNumber(taskResult.getRowNumber());

// Only add the raw record if the configuration is set to log raw source data
if (config.isLogRawSourceRecords()) {
builder.rawRecord(taskResult.getRawRecord());
}
ImportTaskResult modifiedTaskResult = builder.build();

// Add the modified task result to the list
modifiedRecords.add(modifiedTaskResult);
}

// Create a new transaction batch result with the modified import task results
ImportTransactionBatchResult modifiedBatchResult =
ImportTransactionBatchResult.builder()
.dataChunkId(batchResult.getDataChunkId())
.transactionBatchId(batchResult.getTransactionBatchId())
.transactionId(batchResult.getTransactionId())
.records(modifiedRecords)
.errors(batchResult.getErrors())
.success(batchResult.isSuccess())
.build();

// Convert the modified batch result to a JsonNode
return OBJECT_MAPPER.valueToTree(modifiedBatchResult);
}

protected void closeLogWriter(LogWriter logWriter) {
if (logWriter != null) {
try {
logWriter.close();
} catch (IOException e) {
logError("Failed to close a log writer", e);
}
}
}

protected abstract void logError(String errorMessage, Exception e);

protected LogWriter createLogWriter(String logFilePath) throws IOException {
return logWriterFactory.createLogWriter(logFilePath);
}

private void notifyTransactionBatchStarted(ImportTransactionBatchStatus status) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchStarted(status);
}
}

private void notifyTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchCompleted(batchResult);
}
}

private List<ImportTargetResult> updateTargetStatusForAbortedTransactionBatch(
List<ImportTargetResult> targetResults) {
for (int i = 0; i < targetResults.size(); i++) {
ImportTargetResult target = targetResults.get(i);
if (target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
ImportTargetResult newTarget =
ImportTargetResult.builder()
.importAction(target.getImportAction())
.status(ImportTargetResultStatus.ABORTED)
.importedRecord(target.getImportedRecord())
.namespace(target.getNamespace())
.tableName(target.getTableName())
.dataMapped(target.isDataMapped())
.errors(Collections.singletonList(Constants.ABORT_TRANSACTION_STATUS))
.build();
targetResults.set(i, newTarget);
}
}
return targetResults;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.scalar.db.dataloader.core.dataimport.log;

import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class ImportLoggerConfig {
String logDirectoryPath;
boolean logSuccessRecords;
boolean logRawSourceRecords;
boolean prettyPrint;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.scalar.db.dataloader.core.dataimport.log;

public class ImportLoggerException extends Exception {

public ImportLoggerException(String message) {
super(message);
}

public ImportLoggerException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.scalar.db.dataloader.core.dataimport.log;

public class LogConstants {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.scalar.db.dataloader.core.dataimport.log;

/** The location where the logs are stored. */
public enum LogStorageLocation {
LOCAL_FILE_STORAGE,
AWS_S3
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.scalar.db.dataloader.core.dataimport.log;

import com.fasterxml.jackson.databind.JsonNode;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriter;
import com.scalar.db.dataloader.core.dataimport.log.writer.LogWriterFactory;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResult;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTargetResultStatus;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleFileImportLogger extends AbstractImportLogger {

protected static final String SUMMARY_LOG_FILE_NAME = "summary.log";
protected static final String SUCCESS_LOG_FILE_NAME = "success.json";
protected static final String FAILURE_LOG_FILE_NAME = "failure.json";
private static final Logger LOGGER = LoggerFactory.getLogger(SingleFileImportLogger.class);
private LogWriter summaryLogWriter;
private LogWriter successLogWriter;
private LogWriter failureLogWriter;

public SingleFileImportLogger(ImportLoggerConfig config, LogWriterFactory logWriterFactory)
throws IOException {
super(config, logWriterFactory);
successLogWriter = createLogWriter(config.getLogDirectoryPath() + SUCCESS_LOG_FILE_NAME);
failureLogWriter = createLogWriter(config.getLogDirectoryPath() + FAILURE_LOG_FILE_NAME);
}

@Override
public void onTaskComplete(ImportTaskResult taskResult) {
if (!config.isLogSuccessRecords() && !config.isLogRawSourceRecords()) return;
try {
writeImportTaskResultDetailToLogs(taskResult);
} catch (Exception e) {
logError("Failed to write success/failure logs", e);
}
}

@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {}

@Override
public void onDataChunkCompleted(ImportDataChunkStatus dataChunkStatus) {
try {
logDataChunkSummary(dataChunkStatus);
} catch (IOException e) {
logError("Failed to log the data chunk summary", e);
}
}

@Override
public void onAllDataChunksCompleted() {
closeAllLogWriters();
}

@Override
protected void logTransactionBatch(ImportTransactionBatchResult batchResult) {
try {
LogWriter logWriter = getLogWriterForTransactionBatch(batchResult);
JsonNode jsonNode = createFilteredTransactionBatchLogJsonNode(batchResult);
writeToLogWriter(logWriter, jsonNode);
} catch (IOException e) {
logError("Failed to write a transaction batch record to the log file", e);
}
}

@Override
protected void logError(String errorMessage, Exception exception) {
LOGGER.error(errorMessage, exception);
}

private void logDataChunkSummary(ImportDataChunkStatus dataChunkStatus) throws IOException {
if (summaryLogWriter == null) {
summaryLogWriter = createLogWriter(config.getLogDirectoryPath() + SUMMARY_LOG_FILE_NAME);
}
writeImportDataChunkSummary(dataChunkStatus, summaryLogWriter);
}

private void writeImportDataChunkSummary(
ImportDataChunkStatus dataChunkStatus, LogWriter logWriter) throws IOException {
JsonNode jsonNode = OBJECT_MAPPER.valueToTree(dataChunkStatus);
writeToLogWriter(logWriter, jsonNode);
}

private LogWriter getLogWriterForTransactionBatch(ImportTransactionBatchResult batchResult)
throws IOException {
String logFileName = batchResult.isSuccess() ? SUCCESS_LOG_FILE_NAME : FAILURE_LOG_FILE_NAME;
LogWriter logWriter = batchResult.isSuccess() ? successLogWriter : failureLogWriter;
if (logWriter == null) {
logWriter = createLogWriter(config.getLogDirectoryPath() + logFileName);
if (batchResult.isSuccess()) {
successLogWriter = logWriter;
} else {
failureLogWriter = logWriter;
}
}
return logWriter;
}

private void writeImportTaskResultDetailToLogs(ImportTaskResult importTaskResult)
throws IOException {
JsonNode jsonNode;
for (ImportTargetResult target : importTaskResult.getTargets()) {
if (config.isLogSuccessRecords()
&& target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
synchronized (successLogWriter) {
jsonNode = OBJECT_MAPPER.valueToTree(target);
successLogWriter.write(jsonNode);
successLogWriter.flush();
}
}
if (config.isLogRawSourceRecords()
&& !target.getStatus().equals(ImportTargetResultStatus.SAVED)) {
synchronized (failureLogWriter) {
jsonNode = OBJECT_MAPPER.valueToTree(target);
failureLogWriter.write(jsonNode);
failureLogWriter.flush();
}
}
}
}

private void writeToLogWriter(LogWriter logWriter, JsonNode jsonNode) throws IOException {
logWriter.write(jsonNode);
logWriter.flush();
}

private void closeAllLogWriters() {
closeLogWriter(summaryLogWriter);
closeLogWriter(successLogWriter);
closeLogWriter(failureLogWriter);
summaryLogWriter = null;
successLogWriter = null;
failureLogWriter = null;
}
}
Loading
Loading