From 4f2b2db8c94eb28fced970ef4386e7dd64a28b07 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Thu, 16 Jan 2025 01:21:41 +0000 Subject: [PATCH] Handle ResourceExhausted from RecordTaskStarted in task poll (#7093) ## What changed? Return ResourceExhausted directly to client if we get it from history on RecordTaskStarted. ## Why? If history is overloaded and we retry immediately, we can end up in a loop where we keep matching tasks and sending them to history, increasing load on history and also persistence as we cycle tasks to the end of the queue. Returning the error effectively inserts the client in this loop, slowing it down. ## How did you test it? existing tests ## Potential risks ## Is hotfix candidate? yes --- service/matching/backlog_manager.go | 6 +++--- service/matching/matching_engine.go | 10 ++++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/service/matching/backlog_manager.go b/service/matching/backlog_manager.go index 2e79662e4d4..ea6477acee0 100644 --- a/service/matching/backlog_manager.go +++ b/service/matching/backlog_manager.go @@ -227,8 +227,6 @@ func (c *backlogManagerImpl) completeTask(task *persistencespb.AllocatedTaskInfo // We handle this by writing the task back to persistence with a higher taskID. // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up // again the underlying reason for failing to start will be resolved. - // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be - // re-written to persistence frequently. err = executeWithRetry(context.Background(), func(_ context.Context) error { _, err := c.taskWriter.appendTask(task.Data) return err @@ -237,7 +235,9 @@ func (c *backlogManagerImpl) completeTask(task *persistencespb.AllocatedTaskInfo if err != nil { // OK, we also failed to write to persistence. // This should only happen in very extreme cases where persistence is completely down. - // We still can't lose the old task, so we just unload the entire task queue + // We still can't lose the old task, so we just unload the entire task queue. + // We haven't advanced the ack level past this task, so when the task queue reloads, + // it will see this task again. c.logger.Error("Persistent store operation failure", tag.StoreOperationStopTaskQueue, tag.Error(err), diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index 605fbc88360..c975703a9be 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -700,6 +700,11 @@ pollLoop: tag.Error(err), ) task.finish(nil, false) + case *serviceerror.ResourceExhausted: + // If history returns one ResourceExhausted, it's likely to return more if we retry + // immediately. Instead, return the error to the client which will back off. + task.finish(err, false) + return nil, err default: task.finish(err, false) if err.Error() == common.ErrNamespaceHandover.Error() { @@ -898,6 +903,11 @@ pollLoop: tag.Deployment(worker_versioning.DeploymentFromCapabilities(requestClone.WorkerVersionCapabilities)), ) task.finish(nil, false) + case *serviceerror.ResourceExhausted: + // If history returns one ResourceExhausted, it's likely to return more if we retry + // immediately. Instead, return the error to the client which will back off. + task.finish(err, false) + return nil, err default: task.finish(err, false) if err.Error() == common.ErrNamespaceHandover.Error() {