diff --git a/core/task/handler/runner.go b/core/task/handler/runner.go index ac5fe462..0e374670 100644 --- a/core/task/handler/runner.go +++ b/core/task/handler/runner.go @@ -55,9 +55,9 @@ type Runner struct { interfaces.Logger // log handling - scannerStdout *bufio.Reader // reader for process stdout - scannerStderr *bufio.Reader // reader for process stderr - logBatchSize int // number of log lines to batch before sending + readerStdout *bufio.Reader // reader for process stdout + readerStderr *bufio.Reader // reader for process stderr + logBatchSize int // number of log lines to batch before sending // IPC (Inter-Process Communication) stdinPipe io.WriteCloser // pipe for writing to child process @@ -247,8 +247,8 @@ func (r *Runner) configureCmd() (err error) { } // Create buffered readers - r.scannerStdout = bufio.NewReader(r.stdoutPipe) - r.scannerStderr = bufio.NewReader(stderrPipe) + r.readerStdout = bufio.NewReader(r.stdoutPipe) + r.readerStderr = bufio.NewReader(stderrPipe) // Initialize IPC channel r.ipcChan = make(chan entity.IPCMessage) @@ -849,32 +849,32 @@ func (r *Runner) startIPCReader() { // Start stdout reader go func() { defer r.wg.Done() - r.readOutput(r.scannerStdout, true) // true for stdout + r.readOutput(r.readerStdout, true) // true for stdout }() // Start stderr reader go func() { defer r.wg.Done() - r.readOutput(r.scannerStderr, false) // false for stderr + r.readOutput(r.readerStderr, false) // false for stderr }() } func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) { + scanner := bufio.NewScanner(reader) for { select { case <-r.ctx.Done(): + // Context cancelled, stop reading return default: - line, err := reader.ReadString('\n') - if err != nil { - if err != io.EOF { - r.Errorf("error reading from %s: %v", - map[bool]string{true: "stdout", false: "stderr"}[isStdout], - err) - } + // Scan the next line + if !scanner.Scan() { return } + // Get the line + line := scanner.Text() + // Trim the line line = strings.TrimRight(line, "\n\r") @@ -904,8 +904,6 @@ func (r *Runner) readOutput(reader *bufio.Reader, isStdout bool) { // handleIPCInsertDataMessage converts the IPC message payload to JSON and sends it to the master node func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { - r.Debugf("processing IPC data message") - if ipcMsg.Payload == nil { r.Errorf("empty payload in IPC message") return @@ -982,8 +980,6 @@ func (r *Runner) handleIPCInsertDataMessage(ipcMsg entity.IPCMessage) { return } } - - r.Infof("successfully sent %d records to master node", len(records)) } // newTaskRunner creates a new task runner instance with the specified task ID diff --git a/core/task/handler/runner_test.go b/core/task/handler/runner_test.go index a7251c39..c20f7de2 100644 --- a/core/task/handler/runner_test.go +++ b/core/task/handler/runner_test.go @@ -5,15 +5,18 @@ import ( "context" "encoding/json" "fmt" - "github.com/crawlab-team/crawlab/core/entity" - "github.com/crawlab-team/crawlab/core/grpc/client" - "github.com/crawlab-team/crawlab/core/grpc/server" - "github.com/crawlab-team/crawlab/core/utils" "io" + "reflect" "runtime" + "sync" "testing" "time" + "github.com/crawlab-team/crawlab/core/entity" + "github.com/crawlab-team/crawlab/core/grpc/client" + "github.com/crawlab-team/crawlab/core/grpc/server" + "github.com/crawlab-team/crawlab/core/utils" + "github.com/apex/log" "github.com/crawlab-team/crawlab/core/constants" "github.com/crawlab-team/crawlab/core/models/models" @@ -103,12 +106,59 @@ func TestRunner(t *testing.T) { // Create a pipe for testing pr, pw := io.Pipe() - defer pr.Close() - defer pw.Close() + defer func() { + _ = pr.Close() + log.Infof("closed reader pipe") + }() + defer func() { + _ = pw.Close() + log.Infof("closed writer pipe") + }() runner.stdoutPipe = pr - // Start IPC reader - go runner.startIPCReader() + // Initialize context and other required fields + runner.ctx, runner.cancel = context.WithCancel(context.Background()) + runner.wg = sync.WaitGroup{} + runner.done = make(chan struct{}) + runner.ipcChan = make(chan entity.IPCMessage) + + // Create a channel to signal that the reader is ready + readerReady := make(chan struct{}) + + // Start IPC reader with ready signal + go func() { + defer runner.wg.Done() + runner.wg.Add(1) + close(readerReady) // Signal that reader is ready + + // Read directly from the pipe for debugging + scanner := bufio.NewScanner(pr) + for scanner.Scan() { + line := scanner.Text() + log.Infof("Read from pipe: %s", line) + + // Try to parse as IPC message + var ipcMsg entity.IPCMessage + if err := json.Unmarshal([]byte(line), &ipcMsg); err != nil { + log.Errorf("Failed to unmarshal IPC message: %v", err) + continue + } + + if ipcMsg.IPC { + log.Infof("Valid IPC message received: %+v", ipcMsg) + if runner.ipcHandler != nil { + runner.ipcHandler(ipcMsg) + } + } + } + + if err := scanner.Err(); err != nil { + log.Errorf("Scanner error: %v", err) + } + }() + + // Wait for reader to be ready + <-readerReady // Create test message testMsg := entity.IPCMessage{ @@ -117,38 +167,50 @@ func TestRunner(t *testing.T) { IPC: true, } - // Create a channel to signal that the message was handled + // Create channels for synchronization handled := make(chan bool) - runner.SetIPCHandler(func(msg entity.IPCMessage) { - assert.Equal(t, testMsg.Type, msg.Type) - assert.Equal(t, testMsg.Payload, msg.Payload) - handled <- true - }) + messageError := make(chan error, 1) - // Convert message to JSON and write to pipe - go func() { - jsonData, err := json.Marshal(testMsg) - if err != nil { - t.Errorf("failed to marshal test message: %v", err) + // Set up message handler + runner.SetIPCHandler(func(msg entity.IPCMessage) { + log.Infof("Handler received IPC message: %+v", msg) + if msg.Type != testMsg.Type { + messageError <- fmt.Errorf("expected message type %s, got %s", testMsg.Type, msg.Type) return } - - // Write message followed by newline - _, err = fmt.Fprintln(pw, string(jsonData)) - if err != nil { - t.Errorf("failed to write to pipe: %v", err) + if !reflect.DeepEqual(msg.Payload, testMsg.Payload) { + messageError <- fmt.Errorf("expected payload %v, got %v", testMsg.Payload, msg.Payload) return } - }() + handled <- true + }) + + // Convert message to JSON + jsonData, err := json.Marshal(testMsg) + if err != nil { + t.Fatalf("failed to marshal test message: %v", err) + } + + // Write message to pipe + log.Infof("Writing message to pipe: %s", string(jsonData)) + _, err = fmt.Fprintln(pw, string(jsonData)) + if err != nil { + t.Fatalf("failed to write to pipe: %v", err) + } + log.Info("Message written to pipe") + // Wait for message handling with timeout select { case <-handled: - // Message was handled successfully log.Info("IPC message was handled successfully") - case <-time.After(3 * time.Second): + case err := <-messageError: + t.Fatalf("error handling message: %v", err) + case <-time.After(5 * time.Second): t.Fatal("timeout waiting for IPC message to be handled") } + // Clean up + runner.cancel() // Cancel context to stop readers }) t.Run("Cancel", func(t *testing.T) { @@ -189,7 +251,7 @@ func TestRunner(t *testing.T) { // Verify process exists before attempting to cancel if !utils.ProcessIdExists(runner.pid) { - t.Fatalf("Process with PID %d was not started successfully", runner.pid) + require.Fail(t, fmt.Sprintf("Process with PID %d was not started successfully", runner.pid)) } // Test cancel @@ -207,7 +269,7 @@ func TestRunner(t *testing.T) { for { select { case <-ctx.Done(): - t.Fatalf("Process with PID %d was not killed within timeout", runner.pid) + require.Fail(t, fmt.Sprintf("Process with PID %d was not killed within timeout", runner.pid)) case <-ticker.C: exists := utils.ProcessIdExists(runner.pid) if !exists { @@ -301,10 +363,8 @@ func TestRunner(t *testing.T) { // Convert message to JSON and write to pipe go func() { - jsonData, err := json.Marshal(testMsg) - assert.NoError(t, err) - _, err = fmt.Fprintln(pw, string(jsonData)) - assert.NoError(t, err) + jsonData, _ := json.Marshal(testMsg) + _, _ = fmt.Fprintln(pw, string(jsonData)) }() // Wait for processing with timeout