Skip to content

Commit

Permalink
Handle system time move backward case in timer task processing (#7030)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
- Handle system time move backward case in timer task processing by
using max(now(), task visibility timestamp)

## Why?
<!-- Tell your future self why have you made these changes -->
- Monotonic time is used in time.Time comparison only when both operands
have monotonic time value. In our case, the timestamp stored/derived
in/from mutable state doesn't have monotonic, thus the comparison logic
will use wall clock time to decide if a timer task should be processed.
If wall clock move backwards (after we verifies that now() > task
visibility timestamp when submitting the task to the task scheduler),
the timer task will be dropped and cause workflow to stuck.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
- Unit tests

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
- In worst case, we will execute a timer task logic earlier than
expected.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
- Could be. But the bug should be very rare as the use millisecond
precision when doing time comparison in timer task processing.
  • Loading branch information
yycptt authored Jan 16, 2025
1 parent 7001356 commit a5b3c01
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 146 deletions.
15 changes: 14 additions & 1 deletion service/history/queues/queue_scheduled.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/timer"
"go.temporal.io/server/common/util"
hshard "go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)
Expand Down Expand Up @@ -302,11 +303,23 @@ func (p *scheduledQueue) lookAheadTask() {

// IsTimeExpired checks if the testing time is equal or before
// the reference time. The precision of the comparison is millisecond.
// This function takes task as input and uses task's fire time (scheduled time)
// as the minimal reference time to handle clock skew issue.
// This check is only meaning for tasks with CategoryTypeScheduled and always
// return false for immediate tasks as they can be executed at any time.
func IsTimeExpired(
task tasks.Task,
referenceTime time.Time,
testingTime time.Time,
) bool {
referenceTime = referenceTime.Truncate(persistence.ScheduledTaskMinPrecision)
if task.GetCategory().Type() == tasks.CategoryTypeImmediate {
return false
}

// NOTE: Persistence layer may lose precision when persisting the task, which essentially moves
// task fire time backward. But we are already performing truncation here, so doesn't need to
// account for that.
referenceTime = util.MaxTime(referenceTime, task.GetKey().FireTime).Truncate(persistence.ScheduledTaskMinPrecision)
testingTime = testingTime.Truncate(persistence.ScheduledTaskMinPrecision)
return !testingTime.After(referenceTime)
}
18 changes: 9 additions & 9 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,8 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
}

timerSequence := t.getTimerSequence(mutableState)
referenceTime := t.shardContext.GetTimeSource().Now()
referenceTime := t.Now()
timerFired := false

Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {
timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
Expand All @@ -183,7 +182,7 @@ Loop:
return serviceerror.NewInternal(errString)
}

if !queues.IsTimeExpired(referenceTime, timerSequenceID.Timestamp) {
if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) {
// Timer sequence IDs are sorted; once we encounter a timer whose
// sequence ID has not expired, all subsequent timers will not have
// expired.
Expand Down Expand Up @@ -231,7 +230,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
}

timerSequence := t.getTimerSequence(mutableState)
referenceTime := t.shardContext.GetTimeSource().Now()
referenceTime := t.Now()
updateMutableState := false
scheduleWorkflowTask := false

Expand All @@ -242,7 +241,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
// created.
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task.GetVisibilityTime(), heartbeatTimeoutVis) {
if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), heartbeatTimeoutVis) {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(
ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return err
Expand All @@ -252,7 +251,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(

Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
if !queues.IsTimeExpired(referenceTime, timerSequenceID.Timestamp) {
if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
Expand Down Expand Up @@ -618,7 +617,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask(
return err
}

if !t.isValidWorkflowRunTimeoutTask(mutableState) {
if !t.isValidWorkflowRunTimeoutTask(mutableState, task) {
return nil
}

Expand All @@ -628,7 +627,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask(
initiator := enumspb.CONTINUE_AS_NEW_INITIATOR_UNSPECIFIED

wfExpTime := mutableState.GetExecutionInfo().WorkflowExecutionExpirationTime
if wfExpTime == nil || wfExpTime.AsTime().IsZero() || wfExpTime.AsTime().After(t.shardContext.GetTimeSource().Now()) {
if wfExpTime == nil || wfExpTime.AsTime().IsZero() || wfExpTime.AsTime().After(t.Now()) {
backoffInterval, retryState = mutableState.GetRetryBackoffDuration(timeoutFailure)
if backoffInterval != backoff.NoBackoff {
// We have a retry policy and we should retry.
Expand Down Expand Up @@ -678,7 +677,7 @@ func (t *timerQueueActiveTaskExecutor) executeWorkflowRunTimeoutTask(
mutableState.GetNamespaceEntry(),
mutableState.GetWorkflowKey().WorkflowID,
newRunID,
t.shardContext.GetTimeSource().Now(),
t.Now(),
mutableState,
)
if err != nil {
Expand Down Expand Up @@ -802,6 +801,7 @@ func (t *timerQueueActiveTaskExecutor) executeStateMachineTimerTask(
ctx,
wfCtx,
ms,
task,
func(node *hsm.Node, task hsm.Task) error {
return t.shardContext.StateMachineRegistry().ExecuteTimerTask(t, node, task)
},
Expand Down
Loading

0 comments on commit a5b3c01

Please sign in to comment.