Skip to content

Commit

Permalink
Add api at mutable state to track potential reapply events (#7001)
Browse files Browse the repository at this point in the history
## What changed?
Add api at mutable state to track potential reapply events, and reapply
these events when needed.

## Why?
For state-based replication, we need extra fields to track events that
need to reapply.

## How did you test it?
unit test.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->

---------

Co-authored-by: Hai Zhao <[email protected]>
  • Loading branch information
xwduan and hai719 authored Jan 11, 2025
1 parent b158f18 commit a1a3b3a
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 34 deletions.
37 changes: 29 additions & 8 deletions service/history/ndc/transaction_manager_new_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,40 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
currentWorkflow.GetReleaseFn()(nil)
currentWorkflow = nil

targetWorkflowSnapshot, targetWorkflowEventsSeq, err := targetWorkflow.GetMutableState().CloseTransactionAsSnapshot(
ms := targetWorkflow.GetMutableState()

eventReapplyCandidates := ms.GetReapplyCandidateEvents()
targetWorkflowSnapshot, targetWorkflowEventsSeq, err := ms.CloseTransactionAsSnapshot(
targetWorkflowPolicy,
)
if err != nil {
return err
}

if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
if len(targetWorkflowEventsSeq) != 0 {
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
targetWorkflowEventsSeq,
); err != nil {
return err
}
} else if len(eventReapplyCandidates) != 0 {
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: ms.GetExecutionInfo().NamespaceId,
WorkflowID: ms.GetExecutionInfo().WorkflowId,
RunID: ms.GetExecutionState().RunId,
Events: eventReapplyCandidates,
},
}
if err := targetWorkflow.GetContext().ReapplyEvents(
ctx,
r.shardContext,
eventsToApply,
); err != nil {
return err
}
}

// target workflow is in zombie state, no need to update current record.
Expand All @@ -245,7 +266,7 @@ func (r *nDCTransactionMgrForNewWorkflowImpl) createAsZombie(
createMode,
prevRunID,
prevLastWriteVersion,
targetWorkflow.GetMutableState(),
ms,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
)
Expand Down
81 changes: 81 additions & 0 deletions service/history/ndc/transaction_manager_new_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand All @@ -296,6 +297,85 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
s.True(currentReleaseCalled)
}

func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_ReapplyCandidates() {
ctx := context.Background()

namespaceID := namespace.ID("some random namespace ID")
workflowID := "some random workflow ID"
targetRunID := "some random run ID"
currentRunID := "other random runID"

targetReleaseCalled := false
currentReleaseCalled := false

targetWorkflow := NewMockWorkflow(s.controller)
targetContext := workflow.NewMockContext(s.controller)
targetMutableState := workflow.NewMockMutableState(s.controller)
var targetReleaseFn wcache.ReleaseCacheFunc = func(error) { targetReleaseCalled = true }
targetWorkflow.EXPECT().GetContext().Return(targetContext).AnyTimes()
targetWorkflow.EXPECT().GetMutableState().Return(targetMutableState).AnyTimes()
targetWorkflow.EXPECT().GetReleaseFn().Return(targetReleaseFn).AnyTimes()

currentWorkflow := NewMockWorkflow(s.controller)
var currentReleaseFn wcache.ReleaseCacheFunc = func(error) { currentReleaseCalled = true }
currentWorkflow.EXPECT().GetReleaseFn().Return(currentReleaseFn).AnyTimes()

targetWorkflowSnapshot := &persistence.WorkflowSnapshot{
ExecutionInfo: &persistencespb.WorkflowExecutionInfo{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
},
}
targetWorkflowEventsSeq := []*persistence.WorkflowEvents{}

targetMutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{
NamespaceId: namespaceID.String(),
WorkflowId: workflowID,
}).AnyTimes()
targetMutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{
RunId: targetRunID,
}).AnyTimes()
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)

eventReapplyCandidates := []*historypb.HistoryEvent{{
EventId: common.FirstEventID + rand.Int63(),
}}
eventsToApply := []*persistence.WorkflowEvents{
{
NamespaceID: namespaceID.String(),
WorkflowID: workflowID,
RunID: targetRunID,
Events: eventReapplyCandidates,
},
}
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(eventReapplyCandidates)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)

targetWorkflow.EXPECT().HappensAfter(currentWorkflow).Return(false, nil)
targetWorkflow.EXPECT().SuppressBy(currentWorkflow).Return(workflow.TransactionPolicyPassive, nil)

targetContext.EXPECT().CreateWorkflowExecution(
gomock.Any(),
s.mockShard,
persistence.CreateWorkflowModeBypassCurrent,
"",
int64(0),
targetMutableState,
targetWorkflowSnapshot,
targetWorkflowEventsSeq,
).Return(nil)
targetContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, eventsToApply).Return(nil)

err := s.createMgr.dispatchForNewWorkflow(ctx, targetWorkflow)
s.NoError(err)
s.True(targetReleaseCalled)
s.True(currentReleaseCalled)
}

func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZombie_Dedup() {
ctx := context.Background()

Expand Down Expand Up @@ -340,6 +420,7 @@ func (s *transactionMgrForNewWorkflowSuite) TestDispatchForNewWorkflow_CreateAsZ
targetMutableState.EXPECT().CloseTransactionAsSnapshot(workflow.TransactionPolicyPassive).Return(
targetWorkflowSnapshot, targetWorkflowEventsSeq, nil,
)
targetMutableState.EXPECT().GetReapplyCandidateEvents().Return(nil)

s.mockTransactionMgr.EXPECT().GetCurrentWorkflowRunID(ctx, namespaceID, workflowID).Return(currentRunID, nil)
s.mockTransactionMgr.EXPECT().LoadWorkflow(ctx, namespaceID, workflowID, currentRunID).Return(currentWorkflow, nil)
Expand Down
65 changes: 48 additions & 17 deletions service/history/ndc/workflow_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,13 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand All @@ -830,6 +837,7 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
}
prevTxnID = txnID
isNewBranch = false

localMutableState.GetExecutionInfo().ExecutionStats.HistorySize += int64(len(historyBlob.rawHistory.Data))
}
return nil
Expand All @@ -856,6 +864,9 @@ func (r *WorkflowStateReplicatorImpl) bringLocalEventsUpToSourceCurrentBranch(
if err != nil {
return err
}
for _, event := range events {
localMutableState.AddReapplyCandidateEvent(event)
}
_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: isNewBranch,
Expand Down Expand Up @@ -936,6 +947,24 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
return err
}

ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return err
}

mutableState, err := workflow.NewSanitizedMutableState(
r.shardContext,
r.shardContext.GetEventsCache(),
r.logger,
ns,
sourceMutableState,
common.EmptyEventTaskID, // will be updated below
lastEventItem.GetVersion(),
)
if err != nil {
return err
}

lastFirstTxnID, err := r.backfillHistory(
ctx,
sourceCluster,
Expand All @@ -946,36 +975,23 @@ func (r *WorkflowStateReplicatorImpl) applySnapshotWhenWorkflowNotExist(
// Use the history tree id to be the original run id.
// https://github.com/temporalio/temporal/issues/6501
branchInfo.GetTreeId(),
mutableState,
lastEventItem.GetEventId(),
lastEventItem.GetVersion(),
newHistoryBranchToken,
isStateBased,
)
if err != nil {
return err
}

ns, err := r.namespaceRegistry.GetNamespaceByID(namespaceID)
if err != nil {
return err
}

mutableState, err := workflow.NewSanitizedMutableState(
r.shardContext,
r.shardContext.GetEventsCache(),
r.logger,
ns,
sourceMutableState,
lastFirstTxnID,
lastEventItem.GetVersion(),
)
if err != nil {
return err
}
mutableState.GetExecutionInfo().LastFirstEventTxnId = lastFirstTxnID

err = mutableState.SetCurrentBranchToken(newHistoryBranchToken)
if err != nil {
return err
}

if newRunInfo != nil {
err = r.createNewRunWorkflow(
ctx,
Expand Down Expand Up @@ -1073,9 +1089,11 @@ func (r *WorkflowStateReplicatorImpl) backfillHistory(
workflowID string,
runID string,
originalRunID string,
mutableState *workflow.MutableStateImpl,
lastEventID int64,
lastEventVersion int64,
branchToken []byte,
isStateBased bool,
) (taskID int64, retError error) {

if runID != originalRunID {
Expand Down Expand Up @@ -1153,6 +1171,18 @@ BackfillLoop:
return common.EmptyEventTaskID, err
}

if isStateBased {
// If backfill suceeds but later event reapply fails, during task's next retry,
// we still need to reapply events that have been stored in local DB.
events, err := r.historySerializer.DeserializeEvents(historyBlob.rawHistory)
if err != nil {
return common.EmptyEventTaskID, err
}
for _, event := range events {
mutableState.AddReapplyCandidateEvent(event)
}
}

if historyBlob.nodeID <= lastBatchNodeID {
// The history batch already in DB.
if len(sortedAncestors) > sortedAncestorsIdx {
Expand Down Expand Up @@ -1206,6 +1236,7 @@ BackfillLoop:
if err != nil {
return common.EmptyEventTaskID, err
}

_, err = r.executionMgr.AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shardContext.GetShardID(),
IsNewBranch: prevBranchID != branchID,
Expand Down
22 changes: 22 additions & 0 deletions service/history/ndc/workflow_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
"go.uber.org/mock/gomock"
"google.golang.org/protobuf/proto"
)

type (
Expand Down Expand Up @@ -958,6 +959,19 @@ func (s *workflowReplicatorSuite) Test_ReplicateVersionedTransition_MutationProv
s.IsType(&serviceerrors.SyncState{}, err)
}

type historyEventMatcher struct {
expected *historypb.HistoryEvent
}

func (m *historyEventMatcher) Matches(x interface{}) bool {
evt, ok := x.(*historypb.HistoryEvent)
return ok && proto.Equal(evt, m.expected)
}

func (m *historyEventMatcher) String() string {
return fmt.Sprintf("is equal to %v", m.expected)
}

func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_WithGapAndTailEvents() {
namespaceID := uuid.New()
versionHistories := &historyspb.VersionHistories{
Expand Down Expand Up @@ -1038,6 +1052,14 @@ func (s *workflowReplicatorSuite) Test_bringLocalEventsUpToSourceCurrentBranch_W
RunId: s.runID,
}).AnyTimes()
mockMutableState.EXPECT().SetHistoryBuilder(gomock.Any())

allEvents := append(gapEvents, requestedEvents...)
allEvents = append(allEvents, tailEvents...)
for _, event := range allEvents {
mockMutableState.EXPECT().AddReapplyCandidateEvent(&historyEventMatcher{expected: event}).
Times(1)
}

mockWeCtx := workflow.NewMockContext(s.controller)
sourceClusterName := "test-cluster"
mockShard := shard.NewMockContext(s.controller)
Expand Down
Loading

0 comments on commit a1a3b3a

Please sign in to comment.