Skip to content

Commit

Permalink
functional tests testing queries + nexus tasks with versioning-3 (#7015)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
- various functional tests (sticky, no-sticky) testing Query + Nexus
tasks routing with versioning-3
- refactored `taskPoller` to now consider query and nexus tasks 

## Why?
<!-- Tell your future self why have you made these changes -->
- more testing

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
- no feature related line changes so N/A

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
- none: shall reduce risks if not for anything

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No
  • Loading branch information
Shivs11 authored Jan 8, 2025
1 parent b3eefb5 commit 7d95a34
Show file tree
Hide file tree
Showing 3 changed files with 550 additions and 4 deletions.
217 changes: 216 additions & 1 deletion common/testing/taskpoller/taskpoller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ import (
"testing"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
nexuspb "go.temporal.io/api/nexus/v1"

enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/temporal"
"go.temporal.io/server/common"
"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/payloads"
"go.temporal.io/server/common/rpc"
"go.temporal.io/server/common/testing/testvars"
)
Expand All @@ -55,6 +60,10 @@ type (
*TaskPoller
pollActivityTaskRequest *workflowservice.PollActivityTaskQueueRequest
}
nexusTaskPoller struct {
*TaskPoller
pollNexusTaskRequest *workflowservice.PollNexusTaskQueueRequest
}
options struct {
tv *testvars.TestVars
timeout time.Duration
Expand Down Expand Up @@ -104,6 +113,12 @@ func (p *TaskPoller) PollWorkflowTask(
return &workflowTaskPoller{TaskPoller: p, pollWorkflowTaskRequest: req}
}

func (p *TaskPoller) PollNexusTask(
req *workflowservice.PollNexusTaskQueueRequest,
) *nexusTaskPoller {
return &nexusTaskPoller{TaskPoller: p, pollNexusTaskRequest: req}
}

// PollAndHandleWorkflowTask issues a PollWorkflowTaskQueueRequest to obtain a new workflow task,
// invokes the handler with the task, and completes/fails the task accordingly.
// Any unspecified but required request and response fields are automatically generated using `tv`.
Expand Down Expand Up @@ -135,6 +150,120 @@ func (p *workflowTaskPoller) HandleTask(
return p.pollAndHandleTask(ctx, options, handler)
}

func (p *nexusTaskPoller) pollTask(
ctx context.Context,
opts *options,
) (*workflowservice.PollNexusTaskQueueResponse, error) {
p.t.Helper()

req := common.CloneProto(p.pollNexusTaskRequest)
if req.Namespace == "" {
req.Namespace = p.namespace
}
if req.TaskQueue == nil {
req.TaskQueue = opts.tv.TaskQueue()
}
if req.Identity == "" {
req.Identity = opts.tv.WorkerIdentity()
}
resp, err := p.client.PollNexusTaskQueue(ctx, req)
if err != nil {
return nil, err
}
if resp == nil || resp.TaskToken == nil {
return nil, NoWorkflowTaskAvailable
}

return resp, err
}

func (p *nexusTaskPoller) pollAndHandleTask(
ctx context.Context,
opts *options,
handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error),
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
task, err := p.pollTask(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to poll nexus task: %w", err)
}
return p.handleTask(ctx, opts, task, handler)
}
func (p *nexusTaskPoller) handleTask(
ctx context.Context,
opts *options,
task *workflowservice.PollNexusTaskQueueResponse,
handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error),
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
reply, err := handler(task)
if err != nil {
return nil, p.respondNexusTaskFailed(ctx, opts, task.TaskToken)
}

resp, err := p.respondNexusTaskCompleted(ctx, opts, task, reply)
if err != nil {
return nil, err
}

return resp, nil
}

func (p *nexusTaskPoller) respondNexusTaskCompleted(
ctx context.Context,
opts *options,
task *workflowservice.PollNexusTaskQueueResponse,
reply *workflowservice.RespondNexusTaskCompletedRequest,
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
if reply == nil {
return nil, errors.New("missing RespondWorkflowTaskCompletedRequest return")
}
if reply.Namespace == "" {
reply.Namespace = p.namespace
}
if len(reply.TaskToken) == 0 {
reply.TaskToken = task.TaskToken
}
if reply.Identity == "" {
reply.Identity = opts.tv.WorkerIdentity()
}
reply.Response = &nexuspb.Response{}

return p.client.RespondNexusTaskCompleted(ctx, reply)
}

func (p *nexusTaskPoller) respondNexusTaskFailed(
ctx context.Context,
opts *options,
taskToken []byte,
) error {
p.t.Helper()
_, err := p.client.RespondNexusTaskFailed(
ctx,
&workflowservice.RespondNexusTaskFailedRequest{
Namespace: p.namespace,
TaskToken: taskToken,
Identity: opts.tv.WorkerIdentity(),
Error: &nexuspb.HandlerError{
ErrorType: string(nexus.HandlerErrorTypeInternal),
},
})
return err
}

func (p *nexusTaskPoller) HandleTask(
tv *testvars.TestVars,
handler func(task *workflowservice.PollNexusTaskQueueResponse) (*workflowservice.RespondNexusTaskCompletedRequest, error),
opts ...optionFunc,
) (*workflowservice.RespondNexusTaskCompletedResponse, error) {
p.t.Helper()
options := newOptions(tv, opts)
ctx, cancel := newContext(options)
defer cancel()
return p.pollAndHandleTask(ctx, options, handler)
}

// HandleWorkflowTask invokes the provided handler with the provided task, and completes/fails the task accordingly.
// Any unspecified but required request and response fields are automatically generated using `tv`.
// Returning an error from `handler` fails the task.
Expand Down Expand Up @@ -239,7 +368,7 @@ func (p *workflowTaskPoller) pollTask(
}

events = history.Events
if len(events) == 0 {
if len(events) == 0 && req.TaskQueue.GetKind() != enumspb.TASK_QUEUE_KIND_STICKY {
return nil, errors.New("history events are empty")
}

Expand All @@ -262,6 +391,47 @@ func (p *workflowTaskPoller) pollTask(
return resp, err
}

func (p *workflowTaskPoller) HandleLegacyQuery(
tv *testvars.TestVars,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error),
opts ...optionFunc,
) (*workflowservice.RespondQueryTaskCompletedResponse, error) {
p.t.Helper()
options := newOptions(tv, opts)
ctx, cancel := newContext(options)
defer cancel()
return p.pollAndHandleLegacyQuery(ctx, options, handler)
}

func (p *workflowTaskPoller) pollAndHandleLegacyQuery(
ctx context.Context,
opts *options,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error),
) (*workflowservice.RespondQueryTaskCompletedResponse, error) {
p.t.Helper()
task, err := p.pollTask(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to poll workflow task: %w", err)
}
return p.handleQuery(ctx, task, handler)
}

func (p *workflowTaskPoller) handleQuery(
ctx context.Context,
task *workflowservice.PollWorkflowTaskQueueResponse,
handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondQueryTaskCompletedRequest, error),
) (*workflowservice.RespondQueryTaskCompletedResponse, error) {
p.t.Helper()
// if an error is received here it shall be present in RespondQueryTaskCompletedResponse.ErrorMessage
reply, err := handler(task)
resp, err := p.respondQueryTaskCompleted(ctx, task, reply, err)
if err != nil {
return nil, err
}

return resp, nil
}

func (p *workflowTaskPoller) pollAndHandleTask(
ctx context.Context,
opts *options,
Expand Down Expand Up @@ -295,6 +465,51 @@ func (p *workflowTaskPoller) handleTask(
return resp, nil
}

func (p *workflowTaskPoller) respondQueryTaskCompleted(
ctx context.Context,
task *workflowservice.PollWorkflowTaskQueueResponse,
reply *workflowservice.RespondQueryTaskCompletedRequest,
err error,
) (*workflowservice.RespondQueryTaskCompletedResponse, error) {
p.t.Helper()
if task == nil {
return nil, errors.New("missing PollWorkflowTaskQueueResponse")
}
if task.Query == nil {
return nil, errors.New("missing Legacy Query in PollWorkflowTaskQueueResponse")
}
if reply == nil {
return nil, errors.New("missing RespondQueryTaskCompletedRequest")
}

// setting the fields for RespondQueryTaskCompletedResponse
if reply.Namespace == "" {
reply.Namespace = p.namespace
}
if reply.TaskToken == nil {
reply.TaskToken = task.TaskToken
}

if err != nil {
reply.ErrorMessage = err.Error()
reply.Failure = &failurepb.Failure{
Message: err.Error(),
}
reply.CompletedType = enumspb.QUERY_RESULT_TYPE_FAILED
} else {
reply.CompletedType = enumspb.QUERY_RESULT_TYPE_ANSWERED
if reply.QueryResult == nil {
reply.QueryResult = payloads.EncodeString("query-result")
}
}

resp, err := p.client.RespondQueryTaskCompleted(ctx, reply)
if err != nil {
return nil, fmt.Errorf("failed to respond with respondQueryTaskCompleted: %w", err)
}
return resp, nil
}

func (p *workflowTaskPoller) respondTaskCompleted(
ctx context.Context,
opts *options,
Expand Down
1 change: 1 addition & 0 deletions service/history/api/get_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,5 +361,6 @@ func MutableStateToGetResponse(
InheritedBuildId: mutableState.GetInheritedBuildId(),
MostRecentWorkerVersionStamp: mostRecentWorkerVersionStamp,
TransitionHistory: mutableState.GetExecutionInfo().TransitionHistory,
VersioningInfo: mutableState.GetExecutionInfo().VersioningInfo,
}, nil
}
Loading

0 comments on commit 7d95a34

Please sign in to comment.