From 06e43a3e0a913d3488f233699e51799f48385e1b Mon Sep 17 00:00:00 2001 From: Jacob Weinstock Date: Sun, 2 Jun 2024 21:02:01 -0600 Subject: [PATCH] Update receiver name to match across methods: Signed-off-by: Jacob Weinstock --- internal/server/kubernetes_api.go | 4 +- internal/server/kubernetes_api_workflow.go | 50 +++++++++++----------- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/internal/server/kubernetes_api.go b/internal/server/kubernetes_api.go index c7437ed48..309951fcf 100644 --- a/internal/server/kubernetes_api.go +++ b/internal/server/kubernetes_api.go @@ -126,6 +126,6 @@ func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namesp } // Register registers the service on the gRPC server. -func (s *KubernetesBackedServer) Register(server *grpc.Server) { - proto.RegisterWorkflowServiceServer(server, s) +func (k *KubernetesBackedServer) Register(server *grpc.Server) { + proto.RegisterWorkflowServiceServer(server, k) } diff --git a/internal/server/kubernetes_api_workflow.go b/internal/server/kubernetes_api_workflow.go index d6d0e5cb4..d40d053c0 100644 --- a/internal/server/kubernetes_api_workflow.go +++ b/internal/server/kubernetes_api_workflow.go @@ -34,9 +34,9 @@ func getWorkflowContext(wf v1alpha1.Workflow) *proto.WorkflowContext { } } -func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) { +func (k *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) { stored := &v1alpha1.WorkflowList{} - err := s.ClientFunc().List(ctx, stored, &client.MatchingFields{ + err := k.ClientFunc().List(ctx, stored, &client.MatchingFields{ workflowByNonTerminalState: workerID, }) if err != nil { @@ -52,11 +52,11 @@ func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker return wfs, nil } -func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID, namespace string) (*v1alpha1.Workflow, error) { +func (k *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID, namespace string) (*v1alpha1.Workflow, error) { wflw := &v1alpha1.Workflow{} - err := s.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowID, Namespace: namespace}, wflw) + err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowID, Namespace: namespace}, wflw) if err != nil { - s.logger.Error(err, "get client", "workflow", workflowID) + k.logger.Error(err, "get client", "workflow", workflowID) return nil, err } return wflw, nil @@ -64,31 +64,31 @@ func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflow // The following APIs are used by the worker. -func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error { +func (k *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error { if req.GetWorkerId() == "" { return status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) } - wflows, err := s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) + wflows, err := k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) if err != nil { return err } ctx := context.TODO() id := req.WorkerId - if s.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (s.AutoCapMode == AutoCapModeDiscovery || s.AutoCapMode == AutoCapModeEnrollment) && !s.hardwareObjectExists(ctx, id) { + if k.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (k.AutoCapMode == AutoCapModeDiscovery || k.AutoCapMode == AutoCapModeEnrollment) && !k.hardwareObjectExists(ctx, id) { // In the future, the worker could be signaled to send hardware device information to be used in creation of the Hardware object. // or the proto.WorkflowContextRequest could be extended to include Hardware information. - if err := s.createHardwareObject(ctx, id); err != nil { - s.logger.Error(err, "failed to create hardware object") + if err := k.createHardwareObject(ctx, id); err != nil { + k.logger.Error(err, "failed to create hardware object") return err } - if s.AutoCapMode == AutoCapModeEnrollment { - if err := s.createWorkflowObject(ctx, id); err != nil { - s.logger.Error(err, "failed to create workflow object") + if k.AutoCapMode == AutoCapModeEnrollment { + if err := k.createWorkflowObject(ctx, id); err != nil { + k.logger.Error(err, "failed to create workflow object") return err } - wflows, err = s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) + wflows, err = k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) if err != nil { return err } @@ -103,12 +103,12 @@ func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextR return nil } -func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) { +func (k *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) { wfID := req.GetWorkflowId() if wfID == "" { return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) } - wf, err := s.getWorkflowByName(ctx, wfID, s.namespace) + wf, err := k.getWorkflowByName(ctx, wfID, k.namespace) if err != nil { return nil, err } @@ -116,7 +116,7 @@ func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *pr } // Modifies a workflow for a given workflowContext. -func (s *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error { +func (k *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error { if wf == nil { return errors.New("no workflow provided") } @@ -157,19 +157,19 @@ cont: // Workflow is running, so set the start time to now wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)]) wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt = func() *metav1.Time { - t := metav1.NewTime(s.nowFunc()) + t := metav1.NewTime(k.nowFunc()) return &t }() case proto.State_STATE_FAILED, proto.State_STATE_TIMEOUT: // Handle terminal statuses by updating the workflow state and time wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)]) if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil { - wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) + wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) } case proto.State_STATE_SUCCESS: // Handle a success by marking the task as complete if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil { - wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) + wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) } // Mark success on last action success if wfContext.CurrentActionIndex+1 == wfContext.TotalNumberOfActions { @@ -204,15 +204,15 @@ func getWorkflowContextForRequest(req *proto.WorkflowActionStatus, wf *v1alpha1. return wfContext } -func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) { +func (k *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) { err := validateActionStatusRequest(req) if err != nil { return nil, err } wfID := req.GetWorkflowId() - l := s.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId) + l := k.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId) - wf, err := s.getWorkflowByName(ctx, wfID, s.namespace) + wf, err := k.getWorkflowByName(ctx, wfID, k.namespace) if err != nil { l.Error(err, "get workflow") return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) @@ -225,13 +225,13 @@ func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *pr } wfContext := getWorkflowContextForRequest(req, wf) - err = s.modifyWorkflowState(wf, wfContext) + err = k.modifyWorkflowState(wf, wfContext) if err != nil { l.Error(err, "modify workflow state") return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) } l.Info("updating workflow in Kubernetes") - err = s.ClientFunc().Status().Update(ctx, wf) + err = k.ClientFunc().Status().Update(ctx, wf) if err != nil { l.Error(err, "applying update to workflow") return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)