diff --git a/cmd/tink-server/main.go b/cmd/tink-server/main.go index a9cae0bdc..bc134201a 100644 --- a/cmd/tink-server/main.go +++ b/cmd/tink-server/main.go @@ -33,6 +33,9 @@ type Config struct { KubeconfigPath string KubeAPI string KubeNamespace string + + AutoEnrollmentTemplate string + AutoCapMode string } const backendKubernetes = "kubernetes" @@ -48,6 +51,8 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&c.KubeconfigPath, "kubeconfig", "", "The path to the Kubeconfig. Only takes effect if `--backend=kubernetes`") fs.StringVar(&c.KubeAPI, "kubernetes", "", "The Kubernetes API URL, used for in-cluster client construction. Only takes effect if `--backend=kubernetes`") fs.StringVar(&c.KubeNamespace, "kube-namespace", "", "The Kubernetes namespace to target") + fs.StringVar(&c.AutoEnrollmentTemplate, "auto-enrollment-template", "", "The Template to use for auto enrollment Workflows (only used when `--auto-mode=enrollment`). The Template must exist and is a user defined Template, there is no default.") + fs.Var(newAutoCapModeValue(AutoCapMode(string(server.AutoCapModeDisabled)), (*AutoCapMode)(&c.AutoCapMode)), "auto-cap-mode", "The mode to use for automatic capabilities. Must be one of 'discovery', 'enrollment' or 'disabled'. discovery: creates a Hardware object for each unknown worker, enrollment: creates Hardware and Workflow objects for each unknown worker, disabled: auto capabilities are disabled") } func (c *Config) PopulateFromLegacyEnvVar() { @@ -62,7 +67,7 @@ func (c *Config) PopulateFromLegacyEnvVar() { func main() { if err := NewRootCommand().Execute(); err != nil { - fmt.Fprint(os.Stderr, err.Error()) + fmt.Fprint(os.Stderr, err.Error(), "\n") os.Exit(1) } } @@ -88,7 +93,7 @@ func NewRootCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { // I am not sure if it is right for this to be here, // but as last step I want to keep compatibility with - // what we have for a little bit and I thinik that's + // what we have for a little bit and I think that's // the most aggressive way we have to guarantee that // the old way works as before. config.PopulateFromLegacyEnvVar() @@ -108,6 +113,10 @@ func NewRootCommand() *cobra.Command { errCh := make(chan error, 2) var registrar grpcserver.Registrar + if server.AutoCapMode(config.AutoCapMode) == server.AutoCapModeEnrollment && config.AutoEnrollmentTemplate == "" { + return fmt.Errorf("auto-enrollment-template is required when auto-cap-mode is set to enrollment") + } + switch config.Backend { case backendKubernetes: var err error @@ -116,6 +125,8 @@ func NewRootCommand() *cobra.Command { config.KubeconfigPath, config.KubeAPI, config.KubeNamespace, + server.WithAutoCapMode(server.AutoCapMode(config.AutoCapMode)), + server.WithAutoEnrollmentTemplate(config.AutoEnrollmentTemplate), ) if err != nil { return err @@ -206,3 +217,32 @@ func applyViper(v *viper.Viper, cmd *cobra.Command) error { return nil } + +type AutoCapMode server.AutoCapMode + +func (a *AutoCapMode) String() string { + return string(*a) +} + +func (a *AutoCapMode) Set(value string) error { + v := server.AutoCapMode(value) + if v == "" { + v = server.AutoCapModeDisabled + } + switch v { + case server.AutoCapModeDiscovery, server.AutoCapModeEnrollment, server.AutoCapModeDisabled: + *a = AutoCapMode(v) + return nil + } + + return fmt.Errorf("invalid value %q, must be one of %q, %q, or %q", value, server.AutoCapModeDiscovery, server.AutoCapModeEnrollment, server.AutoCapModeDisabled) +} + +func (a *AutoCapMode) Type() string { + return "string" +} + +func newAutoCapModeValue(val AutoCapMode, p *AutoCapMode) *AutoCapMode { + *p = val + return p +} diff --git a/internal/server/auto.go b/internal/server/auto.go new file mode 100644 index 000000000..ed86813ae --- /dev/null +++ b/internal/server/auto.go @@ -0,0 +1,65 @@ +package server + +import ( + "context" + "strings" + + "github.com/tinkerbell/tink/api/v1alpha1" + "github.com/tinkerbell/tink/internal/ptr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +type AutoCapMode string + +var ( + AutoCapModeDiscovery AutoCapMode = "discovery" + AutoCapModeEnrollment AutoCapMode = "enrollment" + AutoCapModeDisabled AutoCapMode = "disabled" +) + +func (k *KubernetesBackedServer) hardwareObjectExists(ctx context.Context, workerID string) bool { + if err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: strings.ReplaceAll(workerID, ":", "."), Namespace: k.namespace}, &v1alpha1.Hardware{}); err != nil { + return false + } + return true +} + +func (k *KubernetesBackedServer) createHardwareObject(ctx context.Context, workerID string) error { + hw := &v1alpha1.Hardware{ + ObjectMeta: metav1.ObjectMeta{ + Name: strings.ReplaceAll(workerID, ":", "."), + Namespace: k.namespace, + }, + Spec: v1alpha1.HardwareSpec{ + Interfaces: []v1alpha1.Interface{ + { + DHCP: &v1alpha1.DHCP{ + MAC: workerID, + }, + Netboot: &v1alpha1.Netboot{ + AllowPXE: ptr.Bool(true), + }, + }, + }, + }, + } + return k.ClientFunc().Create(ctx, hw) +} + +func (k *KubernetesBackedServer) createWorkflowObject(ctx context.Context, workerID string) error { + wf := &v1alpha1.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: strings.ReplaceAll(workerID, ":", "."), + Namespace: k.namespace, + }, + Spec: v1alpha1.WorkflowSpec{ + HardwareRef: strings.ReplaceAll(workerID, ":", "."), + TemplateRef: k.AutoEnrollmentTemplate, + HardwareMap: map[string]string{ + "device_1": workerID, + }, + }, + } + return k.ClientFunc().Create(ctx, wf) +} diff --git a/internal/server/kubernetes_api.go b/internal/server/kubernetes_api.go index b4bcdccef..96ede233c 100644 --- a/internal/server/kubernetes_api.go +++ b/internal/server/kubernetes_api.go @@ -20,12 +20,38 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cluster" ) +// Option for setting optional KubernetesBackedServer fields. +type Option func(*KubernetesBackedServer) + +// KubernetesBackedServer is a server that implements a workflow API. +type KubernetesBackedServer struct { + logger logr.Logger + ClientFunc func() client.Client + namespace string + AutoCapMode AutoCapMode + AutoEnrollmentTemplate string + + nowFunc func() time.Time +} + +func WithAutoCapMode(mode AutoCapMode) Option { + return func(k *KubernetesBackedServer) { + k.AutoCapMode = mode + } +} + +func WithAutoEnrollmentTemplate(name string) Option { + return func(k *KubernetesBackedServer) { + k.AutoEnrollmentTemplate = name + } +} + // +kubebuilder:rbac:groups=tinkerbell.org,resources=hardware;hardware/status,verbs=get;list;watch // +kubebuilder:rbac:groups=tinkerbell.org,resources=templates;templates/status,verbs=get;list;watch // +kubebuilder:rbac:groups=tinkerbell.org,resources=workflows;workflows/status,verbs=get;list;watch;update;patch // NewKubeBackedServer returns a server that implements the Workflow server interface for a given kubeconfig. -func NewKubeBackedServer(logger logr.Logger, kubeconfig, apiserver, namespace string) (*KubernetesBackedServer, error) { +func NewKubeBackedServer(logger logr.Logger, kubeconfig, apiserver, namespace string, opts ...Option) (*KubernetesBackedServer, error) { ccfg := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfig}, &clientcmd.ConfigOverrides{ @@ -43,12 +69,12 @@ func NewKubeBackedServer(logger logr.Logger, kubeconfig, apiserver, namespace st return nil, err } - return NewKubeBackedServerFromREST(logger, cfg, namespace) + return NewKubeBackedServerFromREST(logger, cfg, namespace, opts...) } // NewKubeBackedServerFromREST returns a server that implements the Workflow // server interface with the given Kubernetes rest client and namespace. -func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namespace string) (*KubernetesBackedServer, error) { +func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namespace string, opts ...Option) (*KubernetesBackedServer, error) { clstr, err := cluster.New(config, func(opts *cluster.Options) { opts.Scheme = controller.DefaultScheme() opts.Logger = zapr.NewLogger(zap.NewNop()) @@ -79,22 +105,20 @@ func NewKubeBackedServerFromREST(logger logr.Logger, config *rest.Config, namesp } }() - return &KubernetesBackedServer{ + k := &KubernetesBackedServer{ logger: logger, ClientFunc: clstr.GetClient, nowFunc: time.Now, - }, nil -} - -// KubernetesBackedServer is a server that implements a workflow API. -type KubernetesBackedServer struct { - logger logr.Logger - ClientFunc func() client.Client + namespace: namespace, + } + for _, opt := range opts { + opt(k) + } - nowFunc func() time.Time + return k, nil } // Register registers the service on the gRPC server. -func (s *KubernetesBackedServer) Register(server *grpc.Server) { - proto.RegisterWorkflowServiceServer(server, s) +func (k *KubernetesBackedServer) Register(server *grpc.Server) { + proto.RegisterWorkflowServiceServer(server, k) } diff --git a/internal/server/kubernetes_api_workflow.go b/internal/server/kubernetes_api_workflow.go index 5f7e219bc..dafda1121 100644 --- a/internal/server/kubernetes_api_workflow.go +++ b/internal/server/kubernetes_api_workflow.go @@ -35,9 +35,9 @@ func getWorkflowContext(wf v1alpha1.Workflow) *proto.WorkflowContext { } } -func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) { +func (k *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker(ctx context.Context, workerID string) ([]v1alpha1.Workflow, error) { stored := &v1alpha1.WorkflowList{} - err := s.ClientFunc().List(ctx, stored, &client.MatchingFields{ + err := k.ClientFunc().List(ctx, stored, &client.MatchingFields{ workflowByNonTerminalState: workerID, }) if err != nil { @@ -53,12 +53,12 @@ func (s *KubernetesBackedServer) getCurrentAssignedNonTerminalWorkflowsForWorker return wfs, nil } -func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID string) (*v1alpha1.Workflow, error) { +func (k *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflowID string) (*v1alpha1.Workflow, error) { workflowNamespace, workflowName, _ := strings.Cut(workflowID, "/") wflw := &v1alpha1.Workflow{} - err := s.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowName, Namespace: workflowNamespace}, wflw) + err := k.ClientFunc().Get(ctx, types.NamespacedName{Name: workflowName, Namespace: workflowNamespace}, wflw) if err != nil { - s.logger.Error(err, "get client", "workflow", workflowID) + k.logger.Error(err, "get client", "workflow", workflowID) return nil, err } return wflw, nil @@ -66,14 +66,37 @@ func (s *KubernetesBackedServer) getWorkflowByName(ctx context.Context, workflow // The following APIs are used by the worker. -func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error { +func (k *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextRequest, stream proto.WorkflowService_GetWorkflowContextsServer) error { if req.GetWorkerId() == "" { return status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) } - wflows, err := s.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) + wflows, err := k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) if err != nil { return err } + + ctx := context.TODO() + id := req.WorkerId + if k.AutoCapMode != AutoCapModeDisabled && len(wflows) == 0 && (k.AutoCapMode == AutoCapModeDiscovery || k.AutoCapMode == AutoCapModeEnrollment) && !k.hardwareObjectExists(ctx, id) { + // In the future, the worker could be signaled to send hardware device information to be used in creation of the Hardware object. + // or the proto.WorkflowContextRequest could be extended to include Hardware information. + if err := k.createHardwareObject(ctx, id); err != nil { + k.logger.Error(err, "failed to create hardware object") + return err + } + + if k.AutoCapMode == AutoCapModeEnrollment { + if err := k.createWorkflowObject(ctx, id); err != nil { + k.logger.Error(err, "failed to create workflow object") + return err + } + wflows, err = k.getCurrentAssignedNonTerminalWorkflowsForWorker(stream.Context(), req.WorkerId) + if err != nil { + return err + } + } + } + for _, wf := range wflows { if err := stream.Send(getWorkflowContext(wf)); err != nil { return err @@ -82,12 +105,12 @@ func (s *KubernetesBackedServer) GetWorkflowContexts(req *proto.WorkflowContextR return nil } -func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) { +func (k *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *proto.WorkflowActionsRequest) (*proto.WorkflowActionList, error) { wfID := req.GetWorkflowId() if wfID == "" { return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) } - wf, err := s.getWorkflowByName(ctx, wfID) + wf, err := k.getWorkflowByName(ctx, wfID) if err != nil { return nil, err } @@ -95,7 +118,7 @@ func (s *KubernetesBackedServer) GetWorkflowActions(ctx context.Context, req *pr } // Modifies a workflow for a given workflowContext. -func (s *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error { +func (k *KubernetesBackedServer) modifyWorkflowState(wf *v1alpha1.Workflow, wfContext *proto.WorkflowContext) error { if wf == nil { return errors.New("no workflow provided") } @@ -136,19 +159,19 @@ cont: // Workflow is running, so set the start time to now wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)]) wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt = func() *metav1.Time { - t := metav1.NewTime(s.nowFunc()) + t := metav1.NewTime(k.nowFunc()) return &t }() case proto.State_STATE_FAILED, proto.State_STATE_TIMEOUT: // Handle terminal statuses by updating the workflow state and time wf.Status.State = v1alpha1.WorkflowState(proto.State_name[int32(wfContext.CurrentActionState)]) if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil { - wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) + wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) } case proto.State_STATE_SUCCESS: // Handle a success by marking the task as complete if wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt != nil { - wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(s.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) + wf.Status.Tasks[taskIndex].Actions[actionIndex].Seconds = int64(k.nowFunc().Sub(wf.Status.Tasks[taskIndex].Actions[actionIndex].StartedAt.Time).Seconds()) } // Mark success on last action success if wfContext.CurrentActionIndex+1 == wfContext.TotalNumberOfActions { @@ -183,15 +206,15 @@ func getWorkflowContextForRequest(req *proto.WorkflowActionStatus, wf *v1alpha1. return wfContext } -func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) { +func (k *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *proto.WorkflowActionStatus) (*proto.Empty, error) { err := validateActionStatusRequest(req) if err != nil { return nil, err } wfID := req.GetWorkflowId() - l := s.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId) + l := k.logger.WithValues("actionName", req.GetActionName(), "status", req.GetActionStatus(), "workflowID", req.GetWorkflowId(), "taskName", req.GetTaskName(), "worker", req.WorkerId) - wf, err := s.getWorkflowByName(ctx, wfID) + wf, err := k.getWorkflowByName(ctx, wfID) if err != nil { l.Error(err, "get workflow") return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) @@ -204,13 +227,13 @@ func (s *KubernetesBackedServer) ReportActionStatus(ctx context.Context, req *pr } wfContext := getWorkflowContextForRequest(req, wf) - err = s.modifyWorkflowState(wf, wfContext) + err = k.modifyWorkflowState(wf, wfContext) if err != nil { l.Error(err, "modify workflow state") return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID) } l.Info("updating workflow in Kubernetes") - err = s.ClientFunc().Status().Update(ctx, wf) + err = k.ClientFunc().Status().Update(ctx, wf) if err != nil { l.Error(err, "applying update to workflow") return nil, status.Errorf(codes.InvalidArgument, errInvalidWorkflowID)