Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
makalaaneesh committed Jan 20, 2025
1 parent cbc1ede commit 943bd22
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 144 deletions.
136 changes: 0 additions & 136 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -1023,141 +1022,6 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f
}
submitBatch(batch, updateProgressFn, importBatchArgsProto)
}
// log.Infof("Collect all interrupted/remaining splits.")
// pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)
// if err != nil {
// utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err)
// }
// for _, batch := range pendingBatches {
// submitBatch(batch, updateProgressFn, importBatchArgsProto)
// }
// if !fileFullySplit {
// splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto)
// }
}

func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple,
lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) {
log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset)
batchNum := lastBatchNumber + 1
numLinesTaken := lastOffset

reader, err := dataStore.Open(filePath)
if err != nil {
utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err)
}

dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor)
if err != nil {
utils.ErrExit("open datafile: %q: %v", filePath, err)
}
defer dataFile.Close()

log.Infof("Skipping %d lines from %q", lastOffset, filePath)
err = dataFile.SkipLines(lastOffset)
if err != nil {
utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err)
}

var readLineErr error = nil
var line string
var currentBytesRead int64
var batchWriter *BatchWriter
header := ""
if dataFileDescriptor.HasHeader {
header = dataFile.GetHeader()
}

// Helper function to initialize a new batchWriter
initBatchWriter := func() {
batchWriter = state.NewBatchWriter(filePath, t, batchNum)
err := batchWriter.Init()
if err != nil {
utils.ErrExit("initializing batch writer for table: %q: %s", t, err)
}
// Write the header if necessary
if header != "" && dataFileDescriptor.FileFormat == datafile.CSV {
err = batchWriter.WriteHeader(header)
if err != nil {
utils.ErrExit("writing header for table: %q: %s", t, err)
}
}
}

// Function to finalize and submit the current batch
finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) {
batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch)
if err != nil {
utils.ErrExit("finalizing batch %d: %s", batchNum, err)
}
batchWriter = nil
submitBatch(batch, updateProgressFn, importBatchArgsProto)

// Increment batchNum only if this is not the last batch
if !isLastBatch {
batchNum++
}
}

for readLineErr == nil {

if batchWriter == nil {
initBatchWriter() // Create a new batchWriter
}

line, currentBytesRead, readLineErr = dataFile.NextLine()
if readLineErr == nil || (readLineErr == io.EOF && line != "") {
// handling possible case: last dataline(i.e. EOF) but no newline char at the end
numLinesTaken += 1
}
log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead)
if currentBytesRead > tdb.MaxBatchSizeInBytes() {
//If a row is itself larger than MaxBatchSizeInBytes erroring out
ybSpecificMsg := ""
if tconf.TargetDBType == YUGABYTEDB {
ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)"
}
utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg)
}
if line != "" {
// can't use importBatchArgsProto.Columns as to use case insenstiive column names
columnNames, _ := TableToColumnNames.Get(t)
line, err = valueConverter.ConvertRow(t, columnNames, line)
if err != nil {
utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err)
}

// Check if adding this record exceeds the max batch size
if batchWriter.NumRecordsWritten == batchSizeInNumRows ||
dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead

// Finalize the current batch without adding the record
finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead)

//carry forward the bytes to next batch
dataFile.ResetBytesRead(currentBytesRead)

// Start a new batch by calling the initBatchWriter function
initBatchWriter()
}

// Write the record to the new or current batch
err = batchWriter.WriteRecord(line)
if err != nil {
utils.ErrExit("Write to batch %d: %s", batchNum, err)
}
}

// Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0
if readLineErr == io.EOF {
finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead())
dataFile.ResetBytesRead(0)
} else if readLineErr != nil {
utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr)
}
}

log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t)
}

func executePostSnapshotImportSqls() error {
Expand Down
11 changes: 3 additions & 8 deletions yb-voyager/cmd/importDataFileBatchProducer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (p *FileBatchProducer) Done() bool {

func (p *FileBatchProducer) NextBatch() (*Batch, error) {
if p.Done() {
return nil, fmt.Errorf("already done")
return nil, fmt.Errorf("already completed producing all batches")
}
if len(p.pendingBatches) > 0 {
batch := p.pendingBatches[0]
Expand Down Expand Up @@ -176,7 +176,8 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) {
return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr)
}
}
return nil, fmt.Errorf("unexpected")
// ideally should not reach here
return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr)
}

func (p *FileBatchProducer) openDataFile() error {
Expand Down Expand Up @@ -229,10 +230,4 @@ func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch
batchWriter = nil
p.lastBatchNumber = batchNum
return batch, nil
// submitBatch(batch, updateProgressFn, importBatchArgsProto)

// Increment batchNum only if this is not the last batch
// if !isLastBatch {
// batchNum++
// }
}

0 comments on commit 943bd22

Please sign in to comment.