Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add import processes #2462

Draft
wants to merge 6 commits into
base: feat/data-loader/import-task
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,13 @@ public enum CoreError implements ScalarDbError {
""),
DATA_LOADER_MISSING_COLUMN(
Category.USER_ERROR, "0168", "Missing field or column mapping for %s", "", ""),
DATA_LOADER_MISSING_SOURCE_FIELD(
Category.USER_ERROR,
"0169",
"The data mapping source field '%s' for table '%s' is missing in the json data record",
"",
""),

//
// Errors for the concurrency error category
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.scalar.db.dataloader.core;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

public class DataLoaderObjectMapper extends ObjectMapper {

public DataLoaderObjectMapper() {
super();
this.setSerializationInclusion(JsonInclude.Include.NON_NULL);
this.registerModule(new JavaTimeModule());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;

public interface ImportEventListener {

void onDataChunkStarted(ImportDataChunkStatus status);

void addOrUpdateDataChunkStatus(ImportDataChunkStatus status);

void onDataChunkCompleted(ImportDataChunkStatus status);

void onAllDataChunksCompleted();

void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus);

void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult);

void onTaskComplete(ImportTaskResult taskResult);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.scalar.db.dataloader.core.dataimport;

import com.scalar.db.api.*;
import com.scalar.db.dataloader.core.ScalarDBMode;
import com.scalar.db.dataloader.core.dataimport.dao.ScalarDBDao;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessor;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorFactory;
import com.scalar.db.dataloader.core.dataimport.processor.ImportProcessorParams;
import com.scalar.db.dataloader.core.dataimport.processor.TableColumnDataTypes;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.NonNull;

@AllArgsConstructor
public class ImportManager implements ImportEventListener {

@NonNull private final Map<String, TableMetadata> tableMetadata;
@NonNull private final BufferedReader importFileReader;
@NonNull private final ImportOptions importOptions;
private final ImportProcessorFactory importProcessorFactory;
private final List<ImportEventListener> listeners = new ArrayList<>();
private final ScalarDBMode scalarDBMode;
private final DistributedStorage distributedStorage;
private final DistributedTransactionManager distributedTransactionManager;
private final List<ImportDataChunkStatus> importDataChunkStatusList = new ArrayList<>();

/**
* * Start the import process
*
* @return list of import data chunk status objects
*/
public List<ImportDataChunkStatus> startImport() {
ImportProcessorParams params =
ImportProcessorParams.builder()
.scalarDBMode(scalarDBMode)
.importOptions(importOptions)
.tableMetadataByTableName(tableMetadata)
.dao(new ScalarDBDao())
.distributedTransactionManager(distributedTransactionManager)
.distributedStorage(distributedStorage)
.tableColumnDataTypes(getTableColumnDataTypes())
.build();
ImportProcessor processor = importProcessorFactory.createImportProcessor(params);
processor.addListener(this);
// If the data chunk size is 0, then process the entire file in a single data chunk
int dataChunkSize =
importOptions.getDataChunkSize() == 0
? Integer.MAX_VALUE
: importOptions.getDataChunkSize();
return processor.process(
dataChunkSize, importOptions.getTransactionBatchSize(), importFileReader);
}

public void addListener(ImportEventListener listener) {
listeners.add(listener);
}

public void removeListener(ImportEventListener listener) {
listeners.remove(listener);
}

@Override
public void onDataChunkStarted(ImportDataChunkStatus status) {
for (ImportEventListener listener : listeners) {
listener.onDataChunkStarted(status);
}
}

@Override
public void addOrUpdateDataChunkStatus(ImportDataChunkStatus status) {
synchronized (importDataChunkStatusList) {
for (int i = 0; i < importDataChunkStatusList.size(); i++) {
if (importDataChunkStatusList.get(i).getDataChunkId() == status.getDataChunkId()) {
// Object found, replace it with the new one
importDataChunkStatusList.set(i, status);
return;
}
}
// If object is not found, add it to the list
importDataChunkStatusList.add(status);
}
}

@Override
public void onDataChunkCompleted(ImportDataChunkStatus status) {
for (ImportEventListener listener : listeners) {
listener.onDataChunkCompleted(status);
}
}

@Override
public void onTransactionBatchStarted(ImportTransactionBatchStatus status) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchStarted(status);
}
}

@Override
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
for (ImportEventListener listener : listeners) {
listener.onTransactionBatchCompleted(batchResult);
}
}

@Override
public void onTaskComplete(ImportTaskResult taskResult) {
for (ImportEventListener listener : listeners) {
listener.onTaskComplete(taskResult);
}
}

@Override
public void onAllDataChunksCompleted() {
for (ImportEventListener listener : listeners) {
listener.onAllDataChunksCompleted();
}
}

public List<ImportDataChunkStatus> getImportDataChunkStatusList() {
return importDataChunkStatusList;
}

public TableColumnDataTypes getTableColumnDataTypes() {
TableColumnDataTypes tableColumnDataTypes = new TableColumnDataTypes();
tableMetadata.forEach(
(name, metadata) ->
metadata
.getColumnDataTypes()
.forEach((k, v) -> tableColumnDataTypes.addColumnDataType(name, k, v)));
return tableColumnDataTypes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.scalar.db.dataloader.core.dataimport.processor;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.scalar.db.dataloader.core.DataLoaderObjectMapper;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunk;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportRow;
import java.io.BufferedReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

public class CsvImportProcessor extends ImportProcessor {
private static final DataLoaderObjectMapper OBJECT_MAPPER = new DataLoaderObjectMapper();
private static final AtomicInteger dataChunkIdCounter = new AtomicInteger(0);

public CsvImportProcessor(ImportProcessorParams params) {
super(params);
}

/**
* Process the data from the import file
*
* @param dataChunkSize size of data chunk
* @param transactionBatchSize size of transaction batch
* @param reader reader which reads the source file
* @return process data chunk status list
*/
@Override
public List<ImportDataChunkStatus> process(
int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
int numCores = Runtime.getRuntime().availableProcessors();
ExecutorService dataChunkExecutor = Executors.newFixedThreadPool(numCores);
// Create a queue to hold data batches
Queue<ImportDataChunk> dataChunkQueue = new LinkedList<>();
Thread readerThread =
new Thread(
() -> {
try {
String header = params.getImportOptions().getCustomHeaderRow();
String delimiter = Character.toString(params.getImportOptions().getDelimiter());
if (delimiter.trim().isEmpty()) {
delimiter = ",";
}
if (header == null) {
header = reader.readLine();
}
String[] headerArray = header.split(delimiter);
String line;
int rowNumber = 1;
List<ImportRow> currentDataChunk = new ArrayList<>();
while ((line = reader.readLine()) != null) {
String[] dataArray = line.split(delimiter);
if (headerArray.length != dataArray.length) {
// Throw a custom exception for related issue
throw new RuntimeException();
}
JsonNode jsonNode = combineHeaderAndData(headerArray, dataArray);
if (jsonNode == null || jsonNode.isEmpty()) {
continue;
}

ImportRow importRow = new ImportRow(rowNumber, jsonNode);
currentDataChunk.add(importRow);
// If the data chunk is full, add it to the queue
if (currentDataChunk.size() == dataChunkSize) {
int dataChunkId = dataChunkIdCounter.getAndIncrement();
ImportDataChunk importDataChunk =
ImportDataChunk.builder()
.dataChunkId(dataChunkId)
.sourceData(currentDataChunk)
.build();
dataChunkQueue.offer(importDataChunk);
currentDataChunk = new ArrayList<>();
}
rowNumber++;
}

// Add the last data chunk to the queue
if (!currentDataChunk.isEmpty()) {
int dataChunkId = dataChunkIdCounter.getAndIncrement();
ImportDataChunk importDataChunk =
ImportDataChunk.builder()
.dataChunkId(dataChunkId)
.sourceData(currentDataChunk)
.build();
dataChunkQueue.offer(importDataChunk);
}

} catch (IOException e) {
throw new RuntimeException();
}
});

readerThread.start();
try {
// Wait for readerThread to finish
readerThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Process data chunks in parallel
List<Future<?>> dataChunkFutures = new ArrayList<>();
while (!dataChunkQueue.isEmpty()) {
ImportDataChunk dataChunk = dataChunkQueue.poll();
Future<?> dataChunkFuture =
dataChunkExecutor.submit(
() -> processDataChunk(dataChunk, transactionBatchSize, numCores));
dataChunkFutures.add(dataChunkFuture);
}

List<ImportDataChunkStatus> importDataChunkStatusList = new ArrayList<>();
// Wait for all data chunk threads to complete
for (Future<?> dataChunkFuture : dataChunkFutures) {
try {
importDataChunkStatusList.add((ImportDataChunkStatus) dataChunkFuture.get());
} catch (Exception e) {
// TODO: handle the exception
e.printStackTrace();
}
}
dataChunkExecutor.shutdown();
notifyAllDataChunksCompleted();
return importDataChunkStatusList;
}

private JsonNode combineHeaderAndData(String[] header, String[] data) {
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
for (int i = 0; i < header.length; i++) {
objectNode.put(header[i], data[i]);
}
return objectNode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.scalar.db.dataloader.core.dataimport.processor;

public class DefaultImportProcessorFactory implements ImportProcessorFactory {

/**
* Create import processor object based in file format in import params
*
* @param params import processor params objects
* @return generated import processor object
*/
@Override
public ImportProcessor createImportProcessor(ImportProcessorParams params) {
ImportProcessor importProcessor;
switch (params.getImportOptions().getFileFormat()) {
case JSONL:
importProcessor = new JsonLinesImportProcessor(params);
break;
case JSON:
importProcessor = new JsonImportProcessor(params);
break;
case CSV:
importProcessor = new CsvImportProcessor(params);
break;
default:
importProcessor = null;
}
return importProcessor;
}
}
Loading
Loading