Skip to content

Commit

Permalink
feat: reorder the flow of update runs on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Nov 8, 2023
1 parent 8c0d5e8 commit f5d0cf3
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 18 deletions.
25 changes: 8 additions & 17 deletions internal/store/postgres/scheduler/replay_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,11 +290,10 @@ func (r ReplayRepository) UpdateReplayStatus(ctx context.Context, id uuid.UUID,
}

func (r ReplayRepository) UpdateReplay(ctx context.Context, id uuid.UUID, replayStatus scheduler.ReplayState, runs []*scheduler.JobRunStatus, message string) error {
if err := r.updateReplayRequest(ctx, id, replayStatus, message); err != nil {
if err := r.updateReplayRuns(ctx, id, runs); err != nil {
return err
}

return r.updateReplayRuns(ctx, id, runs)
return r.updateReplayRequest(ctx, id, replayStatus, message)
}

func (r ReplayRepository) GetReplayJobConfig(ctx context.Context, jobTenant tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time) (map[string]string, error) {
Expand Down Expand Up @@ -329,21 +328,13 @@ func (r ReplayRepository) updateReplayRequest(ctx context.Context, id uuid.UUID,
}

func (r ReplayRepository) updateReplayRuns(ctx context.Context, id uuid.UUID, runs []*scheduler.JobRunStatus) error {
tx, err := r.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return err
}

deleteRuns := `DELETE FROM replay_run WHERE replay_id = $1`
if _, err := tx.Exec(ctx, deleteRuns, id); err != nil {
tx.Rollback(ctx)
return errors.Wrap(scheduler.EntityJobRun, "unable to delete runs of replay", err)
}
if err := r.insertReplayRuns(ctx, tx, id, runs); err != nil {
tx.Rollback(ctx)
return errors.Wrap(scheduler.EntityJobRun, "unable to insert runs of replay", err)
query := `UPDATE replay_run SET status=$1, updated_at=NOW() WHERE replay_id=$2 AND scheduled_at=$3 AND status<>$1`
for _, run := range runs {
_, err := r.db.Exec(ctx, query, run.State, id, run.ScheduledAt)
if err != nil {
return errors.Wrap(scheduler.EntityJobRun, "unable to update replay runs", err)
}
}
tx.Commit(ctx)
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestPostgresSchedulerRepository(t *testing.T) {
})

t.Run("UpdateReplay", func(t *testing.T) {
t.Run("updates replay request and reinsert the runs", func(t *testing.T) {
t.Run("updates replay request and update the runs", func(t *testing.T) {
db := dbSetup()
replayRepo := postgres.NewReplayRepository(db)

Expand Down

0 comments on commit f5d0cf3

Please sign in to comment.