Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dtos and other classes for task #2446

Draft
wants to merge 3 commits into
base: feat/data-loader/control-file
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,20 @@ public enum CoreError implements ScalarDbError {
"Multiple data mappings found for column '%s' in table '%s'",
"",
""),
DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN(
Category.USER_ERROR,
"0166",
"Missing required field or column mapping for clustering key %s",
"",
""),
DATA_LOADER_MISSING_PARTITION_KEY_COLUMN(
Category.USER_ERROR,
"0167",
"Missing required field or column mapping for partition key %s",
"",
""),
DATA_LOADER_MISSING_COLUMN(
Category.USER_ERROR, "0168", "Missing field or column mapping for %s", "", ""),
//
// Errors for the concurrency error category
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.dataloader.core.FileFormat;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFile;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileValidationLevel;
import com.scalar.db.dataloader.core.dataimport.log.LogMode;
import lombok.Builder;
import lombok.Data;

/** Import options to import data into one or more ScalarDB tables */
@Builder
@Data
public class ImportOptions {

@Builder.Default private final ImportMode importMode = ImportMode.UPSERT;
@Builder.Default private final boolean requireAllColumns = false;
@Builder.Default private final FileFormat fileFormat = FileFormat.JSON;
@Builder.Default private final boolean prettyPrint = false;
@Builder.Default private final boolean ignoreNullValues = false;
@Builder.Default private final LogMode logMode = LogMode.SPLIT_BY_DATA_CHUNK;

@Builder.Default
private final ControlFileValidationLevel controlFileValidationLevel =
ControlFileValidationLevel.MAPPED;

@Builder.Default private final char delimiter = ',';

@Builder.Default private final boolean logSuccessRecords = false;
@Builder.Default private final boolean logRawRecord = false;

private final int dataChunkSize;
private final int transactionBatchSize;
private final ControlFile controlFile;
private final String namespace;
private final String tableName;
private final int maxThreads;
private final String customHeaderRow;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.scalar.db.dataloader.core.dataimport.log;

public enum LogMode {
SINGLE_FILE,
SPLIT_BY_DATA_CHUNK
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.scalar.db.dataloader.core.dataimport.task;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ImportTaskConstants {
public static final String ERROR_COULD_NOT_FIND_PARTITION_KEY =
"could not find the partition key";
public static final String ERROR_UPSERT_INSERT_MISSING_COLUMNS =
"the source record needs to contain all fields if the UPSERT turns into an INSERT";
public static final String ERROR_DATA_ALREADY_EXISTS = "record already exists";
public static final String ERROR_DATA_NOT_FOUND = "record was not found";
public static final String ERROR_COULD_NOT_FIND_CLUSTERING_KEY =
"could not find the clustering key";
public static final String ERROR_TABLE_METADATA_MISSING = "No table metadata found";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.scalar.db.dataloader.core.dataimport.task.mapping;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;

public class ImportDataMapping {

/**
* * Update the source data replace the source column name with the target column name according
* to control file table data
*
* @param source source data
* @param controlFileTable control file table to map source data
*/
public static void apply(ObjectNode source, ControlFileTable controlFileTable) {
// Copy the source field data to the target column if missing
for (ControlFileTableFieldMapping mapping : controlFileTable.getMappings()) {
String sourceField = mapping.getSourceField();
String targetColumn = mapping.getTargetColumn();

if (source.has(sourceField) && !source.has(targetColumn)) {
source.set(targetColumn, source.get(sourceField));
source.remove(sourceField);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.scalar.db.dataloader.core.dataimport.task.validation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.concurrent.Immutable;

/** The validation result for a data source record */
@Immutable
public final class ImportSourceRecordValidationResult {

private final List<String> errorMessages;
private final Set<String> columnsWithErrors;

/** Constructor */
public ImportSourceRecordValidationResult() {
this.errorMessages = new ArrayList<>();
this.columnsWithErrors = new HashSet<>();
}

/**
* Add a validation error message for a column. Also marking the column as containing an error.
*
* @param columnName column name
* @param errorMessage error message
*/
public void addErrorMessage(String columnName, String errorMessage) {
this.columnsWithErrors.add(columnName);
this.errorMessages.add(errorMessage);
}

/** @return Immutable list of validation error messages */
public List<String> getErrorMessages() {
return Collections.unmodifiableList(this.errorMessages);
}

/** @return Immutable set of columns that had errors */
public Set<String> getColumnsWithErrors() {
return Collections.unmodifiableSet(this.columnsWithErrors);
}

/** @return Validation is valid or not */
public boolean isValid() {
return this.errorMessages.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.scalar.db.dataloader.core.dataimport.task.validation;

import com.fasterxml.jackson.databind.JsonNode;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.DatabaseKeyType;
import com.scalar.db.dataloader.core.util.TableMetadataUtil;
import java.util.Set;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ImportSourceRecordValidator {

/**
* Create list for validation error messages. Validate everything and not return when one single
* error is found. Avoiding trial and error imports where every time a new error appears
*
* @param partitionKeyNames List of partition keys in table
* @param clusteringKeyNames List of clustering keys in table
* @param columnNames List of all column names in table
* @param sourceRecord source data
* @param allColumnsRequired If true treat missing columns as an error
* @return Source record validation result
*/
public static ImportSourceRecordValidationResult validateSourceRecord(
Set<String> partitionKeyNames,
Set<String> clusteringKeyNames,
Set<String> columnNames,
JsonNode sourceRecord,
boolean allColumnsRequired) {
ImportSourceRecordValidationResult validationResult = new ImportSourceRecordValidationResult();

// check if partition keys are found
checkMissingKeys(DatabaseKeyType.PARTITION, partitionKeyNames, sourceRecord, validationResult);

// check if clustering keys are found
checkMissingKeys(
DatabaseKeyType.CLUSTERING, clusteringKeyNames, sourceRecord, validationResult);

// Check if the record is missing any columns
if (allColumnsRequired) {
checkMissingColumns(
sourceRecord, columnNames, validationResult, validationResult.getColumnsWithErrors());
}

return validationResult;
}

/**
* Check if the required keys are found in the data file.
*
* @param keyType Type of key to validate
* @param keyColumnNames List of required column names
* @param sourceRecord source data
* @param validationResult Source record validation result
*/
public static void checkMissingKeys(
DatabaseKeyType keyType,
Set<String> keyColumnNames,
JsonNode sourceRecord,
ImportSourceRecordValidationResult validationResult) {
for (String columnName : keyColumnNames) {
if (!sourceRecord.has(columnName)) {
String errorMessageFormat =
keyType == DatabaseKeyType.PARTITION
? CoreError.DATA_LOADER_MISSING_PARTITION_KEY_COLUMN.buildMessage(columnName)
: CoreError.DATA_LOADER_MISSING_CLUSTERING_KEY_COLUMN.buildMessage(columnName);
validationResult.addErrorMessage(columnName, errorMessageFormat);
}
}
}

/**
* Make sure the json object is not missing any columns. Error added to validation errors lists
*
* @param sourceRecord Source json object
* @param columnNames List of column names for a table
* @param validationResult Source record validation result
* @param ignoreColumns Columns that can be ignored in the check
*/
public static void checkMissingColumns(
JsonNode sourceRecord,
Set<String> columnNames,
ImportSourceRecordValidationResult validationResult,
Set<String> ignoreColumns) {
Set<String> metadataColumns = TableMetadataUtil.getMetadataColumns();
for (String columnName : columnNames) {
// If the field is not a metadata column and is missing and should not be ignored
if ((ignoreColumns == null || !ignoreColumns.contains(columnName))
&& !TableMetadataUtil.isMetadataColumn(columnName, metadataColumns, columnNames)
&& !sourceRecord.has(columnName)) {
validationResult.addErrorMessage(
columnName, CoreError.DATA_LOADER_MISSING_COLUMN.buildMessage(columnName));
}
}
}

/**
* Make sure the json object is not missing any columns. Error added to validation errors lists
*
* @param sourceRecord Source json object
* @param columnNames List of column names for a table
* @param validationResult Source record validation result
*/
public static void checkMissingColumns(
JsonNode sourceRecord,
Set<String> columnNames,
ImportSourceRecordValidationResult validationResult) {
ImportSourceRecordValidator.checkMissingColumns(
sourceRecord, columnNames, validationResult, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.scalar.db.dataloader.core.dataimport.task.mapping;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTable;
import com.scalar.db.dataloader.core.dataimport.controlfile.ControlFileTableFieldMapping;
import java.util.ArrayList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ImportDataMappingTest {

ControlFileTable controlFilTable;

@BeforeEach
void setup() {
controlFilTable = new ControlFileTable("namespace", "table");
ControlFileTableFieldMapping m1 = new ControlFileTableFieldMapping("source_id", "target_id");
ControlFileTableFieldMapping m2 =
new ControlFileTableFieldMapping("source_name", "target_name");
ControlFileTableFieldMapping m3 =
new ControlFileTableFieldMapping("source_email", "target_email");
ArrayList<ControlFileTableFieldMapping> mappingArrayList = new ArrayList<>();
mappingArrayList.add(m1);
mappingArrayList.add(m2);
mappingArrayList.add(m3);
controlFilTable.getMappings().addAll(mappingArrayList);
}

@Test
void apply_withValidData_shouldUpdateSourceData() throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode source = objectMapper.createObjectNode();
source.put("source_id", "111");
source.put("source_name", "abc");
source.put("source_email", "[email protected]");
ImportDataMapping.apply(source, controlFilTable);
// Assert changes
Assertions.assertEquals("111", source.get("target_id").asText());
Assertions.assertEquals("abc", source.get("target_name").asText());
Assertions.assertEquals("[email protected]", source.get("target_email").asText());
}
}
Loading
Loading