Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import multiple tables at same time - 1 #2191

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
133 changes: 5 additions & 128 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -1010,141 +1009,19 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f
if err != nil {
utils.ErrExit("preparing for file import: %s", err)
}
Comment on lines 1009 to 1011
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this PrepareForFileImport here? we are already doing it in NewFileBatchProducer

log.Infof("Collect all interrupted/remaining splits.")
pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)
if err != nil {
utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err)
}
for _, batch := range pendingBatches {
submitBatch(batch, updateProgressFn, importBatchArgsProto)
}
if !fileFullySplit {
splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto)
}
}

func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple,
lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) {
log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset)
batchNum := lastBatchNumber + 1
numLinesTaken := lastOffset

reader, err := dataStore.Open(filePath)
if err != nil {
utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err)
}

dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor)
fileBatchProducer, err := NewFileBatchProducer(task, state)
if err != nil {
utils.ErrExit("open datafile: %q: %v", filePath, err)
utils.ErrExit("creating file batch producer: %s", err)
}
defer dataFile.Close()

log.Infof("Skipping %d lines from %q", lastOffset, filePath)
err = dataFile.SkipLines(lastOffset)
if err != nil {
utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err)
}

var readLineErr error = nil
var line string
var currentBytesRead int64
var batchWriter *BatchWriter
header := ""
if dataFileDescriptor.HasHeader {
header = dataFile.GetHeader()
}

// Helper function to initialize a new batchWriter
initBatchWriter := func() {
batchWriter = state.NewBatchWriter(filePath, t, batchNum)
err := batchWriter.Init()
for !fileBatchProducer.Done() {
batch, err := fileBatchProducer.NextBatch()
if err != nil {
utils.ErrExit("initializing batch writer for table: %q: %s", t, err)
}
// Write the header if necessary
if header != "" && dataFileDescriptor.FileFormat == datafile.CSV {
err = batchWriter.WriteHeader(header)
if err != nil {
utils.ErrExit("writing header for table: %q: %s", t, err)
}
utils.ErrExit("getting next batch: %s", err)
}
}

// Function to finalize and submit the current batch
finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) {
batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch)
if err != nil {
utils.ErrExit("finalizing batch %d: %s", batchNum, err)
}
batchWriter = nil
submitBatch(batch, updateProgressFn, importBatchArgsProto)

// Increment batchNum only if this is not the last batch
if !isLastBatch {
batchNum++
}
}

for readLineErr == nil {

if batchWriter == nil {
initBatchWriter() // Create a new batchWriter
}

line, currentBytesRead, readLineErr = dataFile.NextLine()
if readLineErr == nil || (readLineErr == io.EOF && line != "") {
// handling possible case: last dataline(i.e. EOF) but no newline char at the end
numLinesTaken += 1
}
log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead)
if currentBytesRead > tdb.MaxBatchSizeInBytes() {
//If a row is itself larger than MaxBatchSizeInBytes erroring out
ybSpecificMsg := ""
if tconf.TargetDBType == YUGABYTEDB {
ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)"
}
utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg)
}
if line != "" {
// can't use importBatchArgsProto.Columns as to use case insenstiive column names
columnNames, _ := TableToColumnNames.Get(t)
line, err = valueConverter.ConvertRow(t, columnNames, line)
if err != nil {
utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err)
}

// Check if adding this record exceeds the max batch size
if batchWriter.NumRecordsWritten == batchSizeInNumRows ||
dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead

// Finalize the current batch without adding the record
finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead)

//carry forward the bytes to next batch
dataFile.ResetBytesRead(currentBytesRead)

// Start a new batch by calling the initBatchWriter function
initBatchWriter()
}

// Write the record to the new or current batch
err = batchWriter.WriteRecord(line)
if err != nil {
utils.ErrExit("Write to batch %d: %s", batchNum, err)
}
}

// Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0
if readLineErr == io.EOF {
finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead())
dataFile.ResetBytesRead(0)
} else if readLineErr != nil {
utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr)
}
}

log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t)
}

func executePostSnapshotImportSqls() error {
Expand Down
233 changes: 233 additions & 0 deletions yb-voyager/cmd/importDataFileBatchProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
Copyright (c) YugabyteDB, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd

import (
"fmt"
"io"

log "github.com/sirupsen/logrus"
"github.com/yugabyte/yb-voyager/yb-voyager/src/datafile"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
)

type FileBatchProducer struct {
task *ImportFileTask
state *ImportDataState

pendingBatches []*Batch
lastBatchNumber int64
lastOffset int64
fileFullySplit bool
completed bool

dataFile datafile.DataFile
header string
numLinesTaken int64
lineFromPreviousBatch string
}

func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) {
err := state.PrepareForFileImport(task.FilePath, task.TableNameTup)
if err != nil {
return nil, fmt.Errorf("preparing for file import: %s", err)
}
pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)
if err != nil {
return nil, fmt.Errorf("recovering state for table: %q: %s", task.TableNameTup, err)
}
var completed bool
if len(pendingBatches) == 0 && fileFullySplit {
completed = true
}

return &FileBatchProducer{
task: task,
state: state,
pendingBatches: pendingBatches,
lastBatchNumber: lastBatchNumber,
lastOffset: lastOffset,
fileFullySplit: fileFullySplit,
completed: completed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: completed: len(pendingBatches) == 0 && fileFullySplit

numLinesTaken: lastOffset,
}, nil
}

func (p *FileBatchProducer) Done() bool {
return p.completed
}

func (p *FileBatchProducer) NextBatch() (*Batch, error) {
if p.Done() {
return nil, fmt.Errorf("already completed producing all batches")
}
if len(p.pendingBatches) > 0 {
batch := p.pendingBatches[0]
p.pendingBatches = p.pendingBatches[1:]
// file is fully split and returning the last batch, so mark the producer as completed
if len(p.pendingBatches) == 0 && p.fileFullySplit {
p.completed = true
}
return batch, nil
}

return p.produceNextBatch()
}

func (p *FileBatchProducer) produceNextBatch() (*Batch, error) {
if p.dataFile == nil {
err := p.openDataFile()
if err != nil {
return nil, err
}
}

var readLineErr error
var line string
var currentBytesRead int64
batchNum := p.lastBatchNumber + 1

batchWriter, err := p.newBatchWriter()
if err != nil {
return nil, err
}
if p.lineFromPreviousBatch != "" {
err = batchWriter.WriteRecord(p.lineFromPreviousBatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for explaining about this lineFromPreviousBatch

if err != nil {
return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err)
}
p.lineFromPreviousBatch = ""
}

for readLineErr == nil {

line, currentBytesRead, readLineErr = p.dataFile.NextLine()
if readLineErr == nil || (readLineErr == io.EOF && line != "") {
// handling possible case: last dataline(i.e. EOF) but no newline char at the end
p.numLinesTaken += 1
}
log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead)
if currentBytesRead > tdb.MaxBatchSizeInBytes() {
//If a row is itself larger than MaxBatchSizeInBytes erroring out
ybSpecificMsg := ""
if tconf.TargetDBType == YUGABYTEDB {
ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)"
}
return nil, fmt.Errorf("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg)
}
if line != "" {
// can't use importBatchArgsProto.Columns as to use case insenstiive column names
columnNames, _ := TableToColumnNames.Get(p.task.TableNameTup)
line, err = valueConverter.ConvertRow(p.task.TableNameTup, columnNames, line)
if err != nil {
return nil, fmt.Errorf("transforming line number=%d for table: %q in file %s: %s", p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err)
}

// Check if adding this record exceeds the max batch size
if batchWriter.NumRecordsWritten == batchSizeInNumRows ||
p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead

// Finalize the current batch without adding the record
batch, err := p.finalizeBatch(batchWriter, false, p.numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead)
if err != nil {
return nil, err
}

//carry forward the bytes to next batch
p.dataFile.ResetBytesRead(currentBytesRead)
p.lineFromPreviousBatch = line

// Start a new batch by calling the initBatchWriter function
// initBatchWriter()
return batch, nil
}

// Write the record to the new or current batch
err = batchWriter.WriteRecord(line)
if err != nil {
return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err)
}
}

// Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0
if readLineErr == io.EOF {
batch, err := p.finalizeBatch(batchWriter, true, p.numLinesTaken, p.dataFile.GetBytesRead())
if err != nil {
return nil, err
}

p.completed = true
p.dataFile.ResetBytesRead(0)
return batch, nil
} else if readLineErr != nil {
return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr)
}
}
// ideally should not reach here
return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr)
}

func (p *FileBatchProducer) openDataFile() error {
reader, err := dataStore.Open(p.task.FilePath)
if err != nil {
return fmt.Errorf("preparing reader for split generation on file: %q: %v", p.task.FilePath, err)
}

dataFile, err := datafile.NewDataFile(p.task.FilePath, reader, dataFileDescriptor)

if err != nil {
return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err)
}
p.dataFile = dataFile

log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath)
err = dataFile.SkipLines(p.lastOffset)
if err != nil {
return fmt.Errorf("skipping line for offset=%d: %v", p.lastOffset, err)
}
if dataFileDescriptor.HasHeader {
p.header = dataFile.GetHeader()
}
return nil
}

func (p *FileBatchProducer) newBatchWriter() (*BatchWriter, error) {
batchNum := p.lastBatchNumber + 1
batchWriter := p.state.NewBatchWriter(p.task.FilePath, p.task.TableNameTup, batchNum)
err := batchWriter.Init()
if err != nil {
return nil, fmt.Errorf("initializing batch writer for table: %q: %s", p.task.TableNameTup, err)
}
// Write the header if necessary
if p.header != "" && dataFileDescriptor.FileFormat == datafile.CSV {
err = batchWriter.WriteHeader(p.header)
if err != nil {
utils.ErrExit("writing header for table: %q: %s", p.task.TableNameTup, err)
}
}
return batchWriter, nil
}

func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch bool, offsetEnd int64, bytesInBatch int64) (*Batch, error) {
batchNum := p.lastBatchNumber + 1
batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch)
if err != nil {
utils.ErrExit("finalizing batch %d: %s", batchNum, err)
}
batchWriter = nil
p.lastBatchNumber = batchNum
return batch, nil
}
Loading
Loading