From 943bd22b6d946ddc219ffe7bb7014e3ab52dfb82 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Mon, 20 Jan 2025 11:13:32 +0530 Subject: [PATCH] cleanup --- yb-voyager/cmd/importData.go | 136 ------------------ yb-voyager/cmd/importDataFileBatchProducer.go | 11 +- 2 files changed, 3 insertions(+), 144 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 1d44f4d0f..09a48e01d 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -17,7 +17,6 @@ package cmd import ( "fmt" - "io" "os" "os/exec" "path/filepath" @@ -1023,141 +1022,6 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f } submitBatch(batch, updateProgressFn, importBatchArgsProto) } - // log.Infof("Collect all interrupted/remaining splits.") - // pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) - // if err != nil { - // utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) - // } - // for _, batch := range pendingBatches { - // submitBatch(batch, updateProgressFn, importBatchArgsProto) - // } - // if !fileFullySplit { - // splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) - // } -} - -func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple, - lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { - log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset) - batchNum := lastBatchNumber + 1 - numLinesTaken := lastOffset - - reader, err := dataStore.Open(filePath) - if err != nil { - utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err) - } - - dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor) - if err != nil { - utils.ErrExit("open datafile: %q: %v", filePath, err) - } - defer dataFile.Close() - - log.Infof("Skipping %d lines from %q", lastOffset, filePath) - err = dataFile.SkipLines(lastOffset) - if err != nil { - utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err) - } - - var readLineErr error = nil - var line string - var currentBytesRead int64 - var batchWriter *BatchWriter - header := "" - if dataFileDescriptor.HasHeader { - header = dataFile.GetHeader() - } - - // Helper function to initialize a new batchWriter - initBatchWriter := func() { - batchWriter = state.NewBatchWriter(filePath, t, batchNum) - err := batchWriter.Init() - if err != nil { - utils.ErrExit("initializing batch writer for table: %q: %s", t, err) - } - // Write the header if necessary - if header != "" && dataFileDescriptor.FileFormat == datafile.CSV { - err = batchWriter.WriteHeader(header) - if err != nil { - utils.ErrExit("writing header for table: %q: %s", t, err) - } - } - } - - // Function to finalize and submit the current batch - finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) { - batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) - if err != nil { - utils.ErrExit("finalizing batch %d: %s", batchNum, err) - } - batchWriter = nil - submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - if !isLastBatch { - batchNum++ - } - } - - for readLineErr == nil { - - if batchWriter == nil { - initBatchWriter() // Create a new batchWriter - } - - line, currentBytesRead, readLineErr = dataFile.NextLine() - if readLineErr == nil || (readLineErr == io.EOF && line != "") { - // handling possible case: last dataline(i.e. EOF) but no newline char at the end - numLinesTaken += 1 - } - log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead) - if currentBytesRead > tdb.MaxBatchSizeInBytes() { - //If a row is itself larger than MaxBatchSizeInBytes erroring out - ybSpecificMsg := "" - if tconf.TargetDBType == YUGABYTEDB { - ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" - } - utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) - } - if line != "" { - // can't use importBatchArgsProto.Columns as to use case insenstiive column names - columnNames, _ := TableToColumnNames.Get(t) - line, err = valueConverter.ConvertRow(t, columnNames, line) - if err != nil { - utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err) - } - - // Check if adding this record exceeds the max batch size - if batchWriter.NumRecordsWritten == batchSizeInNumRows || - dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead - - // Finalize the current batch without adding the record - finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead) - - //carry forward the bytes to next batch - dataFile.ResetBytesRead(currentBytesRead) - - // Start a new batch by calling the initBatchWriter function - initBatchWriter() - } - - // Write the record to the new or current batch - err = batchWriter.WriteRecord(line) - if err != nil { - utils.ErrExit("Write to batch %d: %s", batchNum, err) - } - } - - // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 - if readLineErr == io.EOF { - finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead()) - dataFile.ResetBytesRead(0) - } else if readLineErr != nil { - utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr) - } - } - - log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t) } func executePostSnapshotImportSqls() error { diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index e9872341a..4bccbd53f 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -72,7 +72,7 @@ func (p *FileBatchProducer) Done() bool { func (p *FileBatchProducer) NextBatch() (*Batch, error) { if p.Done() { - return nil, fmt.Errorf("already done") + return nil, fmt.Errorf("already completed producing all batches") } if len(p.pendingBatches) > 0 { batch := p.pendingBatches[0] @@ -176,7 +176,8 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) } } - return nil, fmt.Errorf("unexpected") + // ideally should not reach here + return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr) } func (p *FileBatchProducer) openDataFile() error { @@ -229,10 +230,4 @@ func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch batchWriter = nil p.lastBatchNumber = batchNum return batch, nil - // submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - // if !isLastBatch { - // batchNum++ - // } }