Skip to content

Commit

Permalink
fix: unnecessary replay runs update (#300)
Browse files Browse the repository at this point in the history
* fix: unnecessary replay runs update

* refactor: remove unused code in replay

* fix: unmatch replay status when fetching different run list

* fix: add cancel check before continue replay execution

* refactor: add check for runs length before updating the final runs

* test: fix invalid test in replay worker
  • Loading branch information
arinda-arif committed Nov 15, 2024
1 parent b4c2e42 commit 559d9ab
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 103 deletions.
104 changes: 61 additions & 43 deletions core/scheduler/service/replay_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,27 +190,12 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
return err
}

if err := w.replayRepo.UpdateReplayRuns(ctx, replayID, syncedRunStatus); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay state to failed: %s", replayID, err)
return err
}

runStatusSummary := syncedRunStatus.GetJobRunStatusSummary()
w.logger.Info("[ReplayID: %s] synced %d replay runs with status: %s", replayID, len(syncedRunStatus), runStatusSummary)

// check if replay request is on termination state
if syncedRunStatus.IsAllTerminated() {
return w.finishReplay(ctx, replayWithRun.Replay, syncedRunStatus, runStatusSummary)
return w.finishReplay(ctx, replayWithRun, syncedRunStatus)
}

// pick runs to be triggered
statesForReplay := []scheduler.State{scheduler.StatePending, scheduler.StateMissing}
toBeReplayedRuns := syncedRunStatus.GetSortedRunsByStates(statesForReplay)
if len(toBeReplayedRuns) == 0 {
continue
}

// execute replay run on scheduler
// TODO: this is a temporary solution. replay worker should have been killed when cancelled
canceled, err := w.isReplayCanceled(ctx, replayID)
if err != nil {
return err
Expand All @@ -220,53 +205,86 @@ func (w *ReplayWorker) startExecutionLoop(ctx context.Context, replayID uuid.UUI
return nil
}

var updatedRuns []*scheduler.JobRunStatus
if replayWithRun.Replay.Config().Parallel {
if err := w.replayRunOnScheduler(ctx, jobCron, replayWithRun.Replay, toBeReplayedRuns...); err != nil {
return err
}
updatedRuns = scheduler.JobRunStatusList(toBeReplayedRuns).OverrideWithStatus(scheduler.StateInProgress)
} else { // sequential should work when there's no in_progress state on existing runs
inProgressRuns := syncedRunStatus.GetSortedRunsByStates([]scheduler.State{scheduler.StateInProgress})
if len(inProgressRuns) > 0 {
w.logger.Info("[ReplayID: %s] %d run is in progress, skip sequential iteration", replayID, len(inProgressRuns))
continue
}
if err := w.replayRunOnScheduler(ctx, jobCron, replayWithRun.Replay, toBeReplayedRuns[0]); err != nil {
return err
}
updatedRuns = scheduler.JobRunStatusList(toBeReplayedRuns[:1]).OverrideWithStatus(scheduler.StateInProgress)
updatedRuns, err := w.continueExecution(ctx, syncedRunStatus, replayWithRun, jobCron)
if err != nil {
return err
}

// update runs status
if err := w.replayRepo.UpdateReplayRuns(ctx, replayID, updatedRuns); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay runs: %s", replayID, err)
return err
// store updated runs
runsToStore := updatedRuns.GetOnlyDifferedRuns(replayWithRun.Runs)
if len(runsToStore) > 0 {
if err := w.replayRepo.UpdateReplayRuns(ctx, replayWithRun.Replay.ID(), runsToStore); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay runs: %s", replayWithRun.Replay.ID(), err)
return err
}
}

replayWorkerLoopDuration.WithLabelValues(replayID.String()).Set(time.Since(loopStartTime).Seconds())
}
}

func (w *ReplayWorker) finishReplay(ctx context.Context, replay *scheduler.Replay, syncedRunStatus scheduler.JobRunStatusList, runStatusSummary string) error {
replayID := replay.ID()
func (w *ReplayWorker) continueExecution(ctx context.Context, runs scheduler.JobRunStatusList,
replayWithRun *scheduler.ReplayWithRun, jobCron *cron.ScheduleSpec,
) (scheduler.JobRunStatusList, error) {
// pick runs to be triggered
statesForReplay := []scheduler.State{scheduler.StatePending, scheduler.StateMissing}
toBeReplayedRuns := runs.GetSortedRunsByStates(statesForReplay)
if len(toBeReplayedRuns) == 0 {
return runs, nil
}

var updatedRuns []*scheduler.JobRunStatus
if replayWithRun.Replay.Config().Parallel {
if err := w.replayRunOnScheduler(ctx, jobCron, replayWithRun.Replay, toBeReplayedRuns...); err != nil {
return nil, err
}
updatedRuns = scheduler.JobRunStatusList(toBeReplayedRuns).OverrideWithStatus(scheduler.StateInProgress)
} else {
// sequential should work when there's no in_progress state on existing runs
inProgressRuns := runs.GetSortedRunsByStates([]scheduler.State{scheduler.StateInProgress})
if len(inProgressRuns) > 0 {
w.logger.Info("[ReplayID: %s] %d run is in progress, skip sequential iteration", replayWithRun.Replay.ID(), len(inProgressRuns))
return runs, nil
}
if err := w.replayRunOnScheduler(ctx, jobCron, replayWithRun.Replay, toBeReplayedRuns[0]); err != nil {
return nil, err
}
updatedRuns = scheduler.JobRunStatusList(toBeReplayedRuns[:1]).OverrideWithStatus(scheduler.StateInProgress)
}
updatedRunsMap := scheduler.JobRunStatusList(updatedRuns).ToRunStatusMap()
return runs.MergeWithUpdatedRuns(updatedRunsMap), nil
}

func (w *ReplayWorker) finishReplay(ctx context.Context, replayWithRun *scheduler.ReplayWithRun, syncedRunStatus scheduler.JobRunStatusList) error {
runsToUpdate := syncedRunStatus.GetOnlyDifferedRuns(replayWithRun.Runs)
if len(runsToUpdate) > 0 {
if err := w.replayRepo.UpdateReplayRuns(ctx, replayWithRun.Replay.ID(), runsToUpdate); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay state to failed: %s", replayWithRun.Replay.ID(), err)
return err
}
}

runStatusSummary := syncedRunStatus.GetJobRunStatusSummary()
w.logger.Info("[ReplayID: %s] synced %d replay runs with status: %s", replayWithRun.Replay.ID(), len(syncedRunStatus), runStatusSummary)

replayID := replayWithRun.Replay.ID()
replayState := scheduler.ReplayStateSuccess
if syncedRunStatus.IsAnyFailure() {
replayState = scheduler.ReplayStateFailed
}

w.sendReplayEvent(ctx, scheduler.ReplayNotificationAttrs{
JobName: replay.JobName().String(),
JobName: replayWithRun.Replay.JobName().String(),
ReplayID: replayID.String(),
Tenant: replay.Tenant(),
JobURN: replay.JobName().GetJobURN(replay.Tenant()),
Tenant: replayWithRun.Replay.Tenant(),
JobURN: replayWithRun.Replay.JobName().GetJobURN(replayWithRun.Replay.Tenant()),
State: replayState,
})

msg := fmt.Sprintf("replay is finished with run status: %s", runStatusSummary)
w.logger.Info("[ReplayID: %s] replay finished with status %s", replayID, replayState)

if err := w.replayRepo.UpdateReplay(ctx, replayID, replayState, syncedRunStatus, msg); err != nil {
if err := w.replayRepo.UpdateReplayStatus(ctx, replayID, replayState, msg); err != nil {
w.logger.Error("[ReplayID: %s] unable to update replay state to failed: %s", replayID, err)
return err
}
Expand Down
Loading

0 comments on commit 559d9ab

Please sign in to comment.