Skip to content

Commit

Permalink
Handle ResourceExhausted from RecordTaskStarted in task poll (#7093)
Browse files Browse the repository at this point in the history
## What changed?
Return ResourceExhausted directly to client if we get it from history on
RecordTaskStarted.

## Why?
If history is overloaded and we retry immediately, we can end up in a
loop where we keep matching tasks and sending them to history,
increasing load on history and also persistence as we cycle tasks to the
end of the queue. Returning the error effectively inserts the client in
this loop, slowing it down.

## How did you test it?
existing tests

## Potential risks

## Is hotfix candidate?
yes
  • Loading branch information
dnr authored Jan 16, 2025
1 parent 613b8b6 commit 4f2b2db
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
6 changes: 3 additions & 3 deletions service/matching/backlog_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ func (c *backlogManagerImpl) completeTask(task *persistencespb.AllocatedTaskInfo
// We handle this by writing the task back to persistence with a higher taskID.
// This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up
// again the underlying reason for failing to start will be resolved.
// Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be
// re-written to persistence frequently.
err = executeWithRetry(context.Background(), func(_ context.Context) error {
_, err := c.taskWriter.appendTask(task.Data)
return err
Expand All @@ -237,7 +235,9 @@ func (c *backlogManagerImpl) completeTask(task *persistencespb.AllocatedTaskInfo
if err != nil {
// OK, we also failed to write to persistence.
// This should only happen in very extreme cases where persistence is completely down.
// We still can't lose the old task, so we just unload the entire task queue
// We still can't lose the old task, so we just unload the entire task queue.
// We haven't advanced the ack level past this task, so when the task queue reloads,
// it will see this task again.
c.logger.Error("Persistent store operation failure",
tag.StoreOperationStopTaskQueue,
tag.Error(err),
Expand Down
10 changes: 10 additions & 0 deletions service/matching/matching_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ pollLoop:
tag.Error(err),
)
task.finish(nil, false)
case *serviceerror.ResourceExhausted:
// If history returns one ResourceExhausted, it's likely to return more if we retry
// immediately. Instead, return the error to the client which will back off.
task.finish(err, false)
return nil, err
default:
task.finish(err, false)
if err.Error() == common.ErrNamespaceHandover.Error() {
Expand Down Expand Up @@ -898,6 +903,11 @@ pollLoop:
tag.Deployment(worker_versioning.DeploymentFromCapabilities(requestClone.WorkerVersionCapabilities)),
)
task.finish(nil, false)
case *serviceerror.ResourceExhausted:
// If history returns one ResourceExhausted, it's likely to return more if we retry
// immediately. Instead, return the error to the client which will back off.
task.finish(err, false)
return nil, err
default:
task.finish(err, false)
if err.Error() == common.ErrNamespaceHandover.Error() {
Expand Down

0 comments on commit 4f2b2db

Please sign in to comment.