Skip to content

Commit

Permalink
fix: blocked job refresh issue (#318)
Browse files Browse the repository at this point in the history
* fix: avoid refresh to be blocked due to 1 job resolution issue

* test: fix failing test in deployment service
  • Loading branch information
arinda-arif authored Jan 3, 2025
1 parent 2adb974 commit ecebfeb
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
29 changes: 21 additions & 8 deletions core/scheduler/service/deployment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,14 @@ func (s *JobRunService) UploadJobs(ctx context.Context, tnnt tenant.Tenant, toUp
me := errors.NewMultiError("errorInUploadJobs")

if len(toUpdate) > 0 {
if err = s.resolveAndDeployJobs(ctx, tnnt, toUpdate); err == nil {
deployedJobs, err := s.resolveAndDeployJobs(ctx, tnnt, toUpdate)
if len(deployedJobs) > 0 && err == nil {
s.l.Info("[success] namespace: %s, project: %s, deployed %d jobs", tnnt.NamespaceName().String(),
tnnt.ProjectName().String(), len(toUpdate))
tnnt.ProjectName().String(), len(deployedJobs))
}
if len(deployedJobs) > 0 && err != nil {
s.l.Error("[failure] namespace: %s, project: %s, deployed %d jobs with failure: %s", tnnt.NamespaceName().String(),
tnnt.ProjectName().String(), len(deployedJobs), err.Error())
}
me.Append(err)
}
Expand All @@ -99,16 +104,24 @@ func (s *JobRunService) UploadJobs(ctx context.Context, tnnt tenant.Tenant, toUp
return me.ToErr()
}

func (s *JobRunService) resolveAndDeployJobs(ctx context.Context, tnnt tenant.Tenant, toUpdate []string) error {
func (s *JobRunService) resolveAndDeployJobs(ctx context.Context, tnnt tenant.Tenant, toUpdate []string) ([]*scheduler.JobWithDetails, error) {
me := errors.NewMultiError("errorInResolveAndDeployJobs")
allJobsWithDetails, err := s.jobRepo.GetJobs(ctx, tnnt.ProjectName(), toUpdate)
if err != nil || allJobsWithDetails == nil {
return err
if err != nil {
if allJobsWithDetails == nil {
return nil, err
}
me.Append(err)
}

if err := s.priorityResolver.Resolve(ctx, allJobsWithDetails); err != nil {
if err = s.priorityResolver.Resolve(ctx, allJobsWithDetails); err != nil {
s.l.Error("error priority resolving jobs: %s", err)
return err
me.Append(err)
return nil, me.ToErr()
}

return s.scheduler.DeployJobs(ctx, tnnt, allJobsWithDetails)
if err = s.scheduler.DeployJobs(ctx, tnnt, allJobsWithDetails); err != nil {
me.Append(err)
}
return allJobsWithDetails, me.ToErr()
}
24 changes: 24 additions & 0 deletions core/scheduler/service/deployment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,30 @@ func TestDeploymentService(t *testing.T) {
err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.Nil(t, err)
})
t.Run("should upload requested jobs even though there is invalid upstream in 1 job", func(t *testing.T) {
jobNamesToUpload := []string{"job1", "job3"}
var jobNamesToDelete []string
jobsToUpload := []*scheduler.JobWithDetails{jobsWithDetails[0], jobsWithDetails[2]}

jobRepo := new(JobRepository)
upstreamErr := "unresolved upstream for job1"
jobRepo.On("GetJobs", mock.Anything, proj1Name, jobNamesToUpload).Return(jobsToUpload, errors.New(upstreamErr))
defer jobRepo.AssertExpectations(t)

priorityResolver := new(mockPriorityResolver)
priorityResolver.On("Resolve", mock.Anything, jobsToUpload).Return(nil)
defer priorityResolver.AssertExpectations(t)

mScheduler := new(mockScheduler)
mScheduler.On("DeployJobs", mock.Anything, tnnt1, jobsToUpload).Return(nil)
defer mScheduler.AssertExpectations(t)

runService := service.NewJobRunService(logger, jobRepo, nil, nil, nil,
mScheduler, priorityResolver, nil, nil, nil)

err := runService.UploadJobs(ctx, tnnt1, jobNamesToUpload, jobNamesToDelete)
assert.ErrorContains(t, err, upstreamErr)
})
})
}

Expand Down

0 comments on commit ecebfeb

Please sign in to comment.