Skip to content

Commit

Permalink
restructure providerconfig
Browse files Browse the repository at this point in the history
  • Loading branch information
daanvinken committed Oct 16, 2024
1 parent 865f5a2 commit 9256325
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 84 deletions.
3 changes: 3 additions & 0 deletions apis/orchestration/v1alpha1/entityworkflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type EntityWorkflowStatus struct {
// ResourceStatus includes common fields for tracking the status of external resources.
xpv1.ResourceStatus `json:",inline"`

// WorkflowID `json:",inline"`
WorkflowID string `json:"workflowID,omitempty"`

// AtProvider contains the provider-specific observed state, such as workflow IDs and status.
AtProvider EntityWorkflowObservation `json:"atProvider,omitempty"`
}
Expand Down
15 changes: 15 additions & 0 deletions apis/orchestration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions apis/v1alpha1/providerconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (

// A ProviderConfigSpec defines the desired state of a ProviderConfig.
type ProviderConfigSpec struct {
// Hostname is the temporal hostname (rpc endpoint).
Hostname string `json:"hostname"`
// Hostname is the temporal namespace used for our workflows.
Namespace string `json:"namespace"`
// Credentials required to authenticate to this provider.
Credentials ProviderCredentials `json:"credentials"`
}
Expand Down
2 changes: 2 additions & 0 deletions examples/provider/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ kind: ProviderConfig
metadata:
name: default
spec:
hostname: xxx
namespace: daanvi
credentials:
source: Secret
secretRef:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ require (
github.com/crossplane/crossplane-tools v0.0.0-20230925130601-628280f8bf79
github.com/daanvinken/tempoplane v0.0.0-20241015192601-7f2b8bb1683a
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/pkg/errors v0.9.1
go.temporal.io/api v1.38.0
go.temporal.io/sdk v1.29.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
k8s.io/apimachinery v0.29.2
Expand Down Expand Up @@ -43,7 +45,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/imdario/mergo v0.3.16 // indirect
Expand All @@ -70,7 +71,6 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.9.0 // indirect
go.temporal.io/api v1.38.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ github.com/crossplane/crossplane-runtime v1.16.0 h1:lz+l0wEB3qowdTmN7t0PZkfuNSvf
github.com/crossplane/crossplane-runtime v1.16.0/go.mod h1:Pz2tdGVMF6KDGzHZOkvKro0nKc8EzK0sb/nSA7pH4Dc=
github.com/crossplane/crossplane-tools v0.0.0-20230925130601-628280f8bf79 h1:HigXs5tEQxWz0fcj8hzbU2UAZgEM7wPe0XRFOsrtF8Y=
github.com/crossplane/crossplane-tools v0.0.0-20230925130601-628280f8bf79/go.mod h1:+e4OaFlOcmr0JvINHl/yvEYBrZawzTgj6pQumOH1SS0=
github.com/daanvinken/tempoplane v0.0.0-20241015122140-7ede1cdd040c h1:LEtqb3QP8HrpcWsb6xNaiwqsojnqOX13TKqlDHx9ans=
github.com/daanvinken/tempoplane v0.0.0-20241015122140-7ede1cdd040c/go.mod h1:ewmbXaPj6s3VvtdGCG/xuL+k0l0edywmodS1wQ6vwOc=
github.com/daanvinken/tempoplane v0.0.0-20241015192601-7f2b8bb1683a h1:LA3MHm88hZAFXN5hbo1+kaUJp6smjalWzonNUaLzOOE=
github.com/daanvinken/tempoplane v0.0.0-20241015192601-7f2b8bb1683a/go.mod h1:ewmbXaPj6s3VvtdGCG/xuL+k0l0edywmodS1wQ6vwOc=
github.com/dave/jennifer v1.4.1 h1:XyqG6cn5RQsTj3qlWQTKlRGAyrTcsk1kUmWdZBzRjDw=
Expand Down
118 changes: 38 additions & 80 deletions internal/controller/entityworkflow/entityworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"context"
"fmt"
"github.com/google/uuid"
"os"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -43,19 +41,12 @@ import (
)

const (
errNotEntityWorkflow = "managed resource is not a EntityWorkflow custom resource"
errTrackPCUsage = "cannot track ProviderConfig usage"
errGetPC = "cannot get ProviderConfig"
errGetCreds = "cannot get credentials"

errNewClient = "cannot create new Service"
)

// A NoOpService does nothing.
type NoOpService struct{}

var (
newNoOpService = func(_ []byte) (interface{}, error) { return &NoOpService{}, nil }
errNotEntityWorkflow = "managed resource is not a EntityWorkflow custom resource"
errTrackPCUsage = "cannot track ProviderConfig usage"
errGetPC = "cannot get ProviderConfig"
errGetCreds = "cannot get credentials"
errTemporalConnection = "cannot connect to temporal"
errNewClient = "cannot create new Service"
)

// Setup adds a controller that reconciles EntityWorkflow managed resources.
Expand All @@ -70,9 +61,9 @@ func Setup(mgr ctrl.Manager, o controller.Options) error {
r := managed.NewReconciler(mgr,
resource.ManagedKind(v1alpha1.EntityWorkflowGroupVersionKind),
managed.WithExternalConnecter(&connector{
kube: mgr.GetClient(),
usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}),
newServiceFn: newNoOpService}),
kube: mgr.GetClient(),
usage: resource.NewProviderConfigUsageTracker(mgr.GetClient(), &apisv1alpha1.ProviderConfigUsage{}),
}),
managed.WithLogger(o.Logger.WithValues("controller", name)),
managed.WithPollInterval(o.PollInterval),
managed.WithRecorder(event.NewAPIRecorder(mgr.GetEventRecorderFor(name))),
Expand All @@ -89,9 +80,9 @@ func Setup(mgr ctrl.Manager, o controller.Options) error {
// A connector is expected to produce an ExternalClient when its Connect method
// is called.
type connector struct {
kube client.Client
usage resource.Tracker
newServiceFn func(creds []byte) (interface{}, error)
kube client.Client
usage resource.Tracker
temporal client.Client
}

// Connect typically produces an ExternalClient by:
Expand All @@ -114,29 +105,33 @@ func (c *connector) Connect(ctx context.Context, mg resource.Managed) (managed.E
return nil, errors.Wrap(err, errGetPC)
}

cd := pc.Spec.Credentials
data, err := resource.CommonCredentialExtractor(ctx, cd.Source, c.kube, cd.CommonCredentialSelectors)
//cd := pc.Spec.Credentials

tc, err := temporalclient.Dial(temporalclient.Options{
HostPort: pc.Spec.Hostname,
Namespace: pc.Spec.Namespace,
})

if err != nil {
return nil, errors.Wrap(err, errGetCreds)
return &external{}, errors.Wrap(err, errTemporalConnection)
}

svc, err := c.newServiceFn(data)
hreq := temporalclient.CheckHealthRequest{}
_, err = tc.CheckHealth(context.Background(), &hreq)
if err != nil {
return nil, errors.Wrap(err, errNewClient)
return &external{}, errors.Wrap(err, errTemporalConnection)
}

return &external{service: svc}, nil
return &external{temporalClient: tc}, nil
}

// An ExternalClient observes, then either creates, updates, or deletes an
// external resource to ensure it reflects the managed resource's desired state.
type external struct {
// A 'client' used to connect to the external resource API. In practice this
// would be something like an AWS SDK client.
service interface{}
temporalClient temporalclient.Client
}

func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput) entityworkflow.EntityInput {
func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput, workflowID string) entityworkflow.EntityInput {
return entityworkflow.EntityInput{
EntityID: input.EntityID,
Kind: input.Kind,
Expand All @@ -146,7 +141,7 @@ func ConvertToEntityWorkflowInput(input v1alpha1.EntityInput) entityworkflow.Ent
DC: input.DC,
Env: input.Env,
Timestamp: input.Timestamp,
CorrelationID: input.CorrelationID,
CorrelationID: workflowID,
}
}

Expand All @@ -157,23 +152,14 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
}

// Extract WorkflowID from CorrelationID in EntityInput
workflowID := cr.Spec.ForProvider.EntityInput.CorrelationID

// Create a Temporal client to query workflow status
tc, err := temporalclient.Dial(temporalclient.Options{
HostPort: os.Getenv("TEMPORAL_ADDRESS"),
Namespace: os.Getenv("TEMPORAL_NS"),
})
defer tc.Close()
if err != nil {
return managed.ExternalObservation{}, errors.Wrap(err, "failed to create Temporal client for observation")
}
workflowID := cr.Status.WorkflowID

// Describe the workflow execution using the WorkflowID (CorrelationID)
// TODO probably wanna use runID as identifier on k8s
resp, err := tc.DescribeWorkflowExecution(ctx, workflowID, "")
resp, err := c.temporalClient.DescribeWorkflowExecution(ctx, workflowID, "")

if err != nil {
//TODO improve error check
fmt.Println("Resource doesn't exist, returning")
if string(err.Error()) == "operation GetCurrentExecution encountered not found" {
return managed.ExternalObservation{ResourceExists: false}, nil
}
Expand All @@ -190,6 +176,7 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
ResourceUpToDate: true,
ConnectionDetails: managed.ConnectionDetails{
"WorkflowID": []byte(workflowID),
"runID": []byte(resp.WorkflowExecutionInfo.FirstRunId),
"Status": []byte("completed"),
},
}, nil
Expand All @@ -206,13 +193,15 @@ func (c *external) Observe(ctx context.Context, mg resource.Managed) (managed.Ex
ResourceUpToDate: false,
ConnectionDetails: managed.ConnectionDetails{
"WorkflowID": []byte(workflowID),
"runID": []byte(resp.WorkflowExecutionInfo.Execution.RunId), //TODO correct RunID?
"Status": []byte(workflowStatus.String()),
},
}, nil
}
}

func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.ExternalCreation, error) {
fmt.Println("creating...")
cr, ok := mg.(*v1alpha1.EntityWorkflow)
if !ok {
return managed.ExternalCreation{}, errors.New(errNotEntityWorkflow)
Expand All @@ -225,41 +214,22 @@ func (c *external) Create(ctx context.Context, mg resource.Managed) (managed.Ext
taskQueue := "TempoPlane-" + entityInput.Kind

//uniqueID := uuid.New().String()
//return fmt.Sprintf("%s-%s-%s-%s", operationType, entityID, requesterID, uniqueID)
//ID := "TempoPlane" + "-" + uniqueID + "-" + entityInput.RequesterID
ID := "TempoPlane" + "-" + entityInput.RequesterID

// Set CorrelationID to workflow ID
// TODO uniqueness
entityInput.CorrelationID = ID
cr.Spec.ForProvider.EntityInput.CorrelationID = ID

//TODO Now we need to update this
//err := c.kubeClient.Update(ctx, cr)
//if err != nil {
// return managed.ExternalCreation{}, err
//}

// Configure workflow execution options
workflowOptions := temporalclient.StartWorkflowOptions{
TaskQueue: taskQueue,
ID: ID,
}

// Create a Temporal client (assumes you have setup to obtain this client externally, adjust as needed)
tc, err := temporalclient.Dial(temporalclient.Options{
HostPort: os.Getenv("TEMPORAL_ADDRESS"),
Namespace: os.Getenv("TEMPORAL_NS"),
})
defer tc.Close()
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, "failed to create Temporal client")
}

// Execute the CreateWorkflow
workflowExecution, err := tc.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.CreateWorkflow, ConvertToEntityWorkflowInput(entityInput))
workflowExecution, err := c.temporalClient.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.CreateWorkflow, ConvertToEntityWorkflowInput(entityInput, ID))
if err != nil {
return managed.ExternalCreation{}, errors.Wrap(err, "failed to start CreateWorkflow")
}
fmt.Println("Executing workflow")

// Log the WorkflowID and RunID for reference
fmt.Printf("Started CreateWorkflow with WorkflowID: %s and RunID: %s\n", workflowExecution.GetID(), workflowExecution.GetRunID())
Expand Down Expand Up @@ -315,20 +285,8 @@ func (c *external) Delete(ctx context.Context, mg resource.Managed) error {
ID: ID,
}

//TODO DRY this stuff

// Create a Temporal client
tc, err := temporalclient.Dial(temporalclient.Options{
HostPort: os.Getenv("TEMPORAL_ADDRESS"),
Namespace: os.Getenv("TEMPORAL_NS"),
})
defer tc.Close()
if err != nil {
return errors.Wrap(err, "failed to create Temporal client")
}

// Execute the DeleteWorkflow
workflowExecution, err := tc.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.DeleteWorkflow, ConvertToEntityWorkflowInput(entityInput))
workflowExecution, err := c.temporalClient.ExecuteWorkflow(ctx, workflowOptions, entityworkflow.DeleteWorkflow, ConvertToEntityWorkflowInput(entityInput))

Check failure on line 289 in internal/controller/entityworkflow/entityworkflow.go

View workflow job for this annotation

GitHub Actions / unit-tests

not enough arguments in call to ConvertToEntityWorkflowInput

Check failure on line 289 in internal/controller/entityworkflow/entityworkflow.go

View workflow job for this annotation

GitHub Actions / publish-artifacts

not enough arguments in call to ConvertToEntityWorkflowInput

Check failure on line 289 in internal/controller/entityworkflow/entityworkflow.go

View workflow job for this annotation

GitHub Actions / publish-artifacts

not enough arguments in call to ConvertToEntityWorkflowInput
if err != nil {
return errors.Wrap(err, "failed to start DeleteWorkflow")
}
Expand Down
19 changes: 19 additions & 0 deletions internal/utils/temporal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package utils

import (
"github.com/pkg/errors"
"go.temporal.io/sdk/client"
"os"
)

// NewTemporalClient initializes a Temporal client with configuration from environment variables.
func NewTemporalClient() (client.Client, error) {
tc, err := client.Dial(client.Options{
HostPort: os.Getenv("TEMPORAL_ADDRESS"),
Namespace: os.Getenv("TEMPORAL_NS"),
})
if err != nil {
return nil, errors.Wrap(err, "failed to create Temporal client")
}
return tc, nil
}
Loading

0 comments on commit 9256325

Please sign in to comment.