Skip to content

Commit

Permalink
Log branchToken on potential data loss error (#7027)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
- Log branchToken on potential data loss error. The logged branchToken
is in base64 format and can be decoded with the command below. The
treeID is usually the workflow runID. Also the branchToken is
constructed by persistence layer, so there could be additional info re.
the workflow in the branchToken.
```
./tdbg decode proto --type temporal.server.api.persistence.v1.HistoryBranch --hex-data "$(echo "CiQwMTkzZTY1OC1lZjc0LTc1ZDAtYTU4NS1hZWI0MzZlYThmNDESJDAxOTNlNjU4LWVmNzQtNzYyNi05ZTlmLWI1ODAwMGI2MjNkMQ==" | base64 --decode | hexdump -ve '/1 "%02x"')"
{
 "treeId": "0193e658-ef74-75d0-a585-aeb436ea8f41",
 "branchId": "0193e658-ef74-7626-9e9f-b58000b623d1"
}
```

## Why?
<!-- Tell your future self why have you made these changes -->
- We recently introduced retry for data loss errors in persistence
client, but that also means application logic may not see those errors
and won't emit any logs with workflowInfo. Inside persistence layer,
there's no workflow key info (e.g. workflowID, runID), the best we can
do is log branchToken, which gives us some hint re which workflow is
running into the error.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
- Tested locally.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
  • Loading branch information
yycptt authored Jan 6, 2025
1 parent 8437baa commit 61fd926
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions common/persistence/history_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (

// TrimHistoryBranch will only dump metadata, relatively cheap
trimHistoryBranchPageSize = 1000
dataLossMsg = "Potential data loss"
errNonContiguousEventID = "corrupted history event batch, eventID is not contiguous"
errWrongVersion = "corrupted history event batch, wrong version and IDs"
errEmptyEvents = "corrupted history event batch, empty events"
Expand Down Expand Up @@ -930,36 +931,43 @@ func (m *executionManagerImpl) readHistoryBranch(
historyEvents := make([]*historypb.HistoryEvent, 0, request.PageSize)
historyEventBatches := make([]*historypb.History, 0, request.PageSize)

var firstEvent, lastEvent *historypb.HistoryEvent
var eventCount int

dataLossTags := func(cause string) []tag.Tag {
return []tag.Tag{
tag.Cause(cause),
tag.WorkflowBranchToken(request.BranchToken),
tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.FirstEventVersion(firstEvent.GetVersion()),
tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.LastEventVersion(lastEvent.GetVersion()),
tag.Counter(eventCount),
tag.TokenLastEventID(token.LastEventID),
}
}

for _, batch := range dataBlobs {
events, err := m.serializer.DeserializeEvents(batch)
if err != nil {
return nil, nil, nil, nil, dataSize, err
}
if len(events) == 0 {
m.logger.Error(errEmptyEvents)
m.logger.Error(dataLossMsg, dataLossTags(errEmptyEvents)...)
return nil, nil, nil, nil, dataSize, serviceerror.NewDataLoss(errEmptyEvents)
}

firstEvent := events[0] // first
eventCount := len(events) // length
lastEvent := events[eventCount-1] // last
firstEvent = events[0]
eventCount = len(events)
lastEvent = events[eventCount-1]

if firstEvent.GetVersion() != lastEvent.GetVersion() || firstEvent.GetEventId()+int64(eventCount-1) != lastEvent.GetEventId() {
// in a single batch, version should be the same, and ID should be contiguous
m.logger.Error("Potential data loss",
tag.Cause(errWrongVersion),
tag.FirstEventVersion(firstEvent.GetVersion()), tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.LastEventVersion(lastEvent.GetVersion()), tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.Counter(eventCount))
m.logger.Error(dataLossMsg, dataLossTags(errWrongVersion)...)
return historyEvents, historyEventBatches, transactionIDs, nil, dataSize, serviceerror.NewDataLoss(errWrongVersion)
}
if firstEvent.GetEventId() != token.LastEventID+1 {
m.logger.Error("Potential data loss",
tag.Cause(errNonContiguousEventID),
tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.TokenLastEventID(token.LastEventID),
tag.Counter(eventCount))
m.logger.Error(dataLossMsg, dataLossTags(errNonContiguousEventID)...)
return historyEvents, historyEventBatches, transactionIDs, nil, dataSize, serviceerror.NewDataLoss(errNonContiguousEventID)
}

Expand Down Expand Up @@ -990,34 +998,43 @@ func (m *executionManagerImpl) readHistoryBranchReverse(

historyEvents := make([]*historypb.HistoryEvent, 0, request.PageSize)

var firstEvent, lastEvent *historypb.HistoryEvent
var eventCount int

datalossTags := func(cause string) []tag.Tag {
return []tag.Tag{
tag.Cause(cause),
tag.WorkflowBranchToken(request.BranchToken),
tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.FirstEventVersion(firstEvent.GetVersion()),
tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.LastEventVersion(lastEvent.GetVersion()),
tag.Counter(eventCount),
tag.TokenLastEventID(token.LastEventID),
}
}

for _, batch := range dataBlobs {
events, err := m.serializer.DeserializeEvents(batch)
if err != nil {
return nil, nil, nil, dataSize, err
}
if len(events) == 0 {
m.logger.Error(errEmptyEvents)
m.logger.Error(dataLossMsg, datalossTags(errEmptyEvents)...)
return nil, nil, nil, dataSize, serviceerror.NewDataLoss(errEmptyEvents)
}

firstEvent := events[0] // first
eventCount := len(events) // length
lastEvent := events[eventCount-1] // last
firstEvent = events[0]
eventCount = len(events)
lastEvent = events[eventCount-1]

if firstEvent.GetVersion() != lastEvent.GetVersion() || firstEvent.GetEventId()+int64(eventCount-1) != lastEvent.GetEventId() {
// in a single batch, version should be the same, and ID should be contiguous
m.logger.Error(errWrongVersion,
tag.FirstEventVersion(firstEvent.GetVersion()), tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.LastEventVersion(lastEvent.GetVersion()), tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.Counter(eventCount))
m.logger.Error(dataLossMsg, datalossTags(errWrongVersion)...)
return historyEvents, transactionIDs, nil, dataSize, serviceerror.NewDataLoss(errWrongVersion)
}
if (token.LastEventID != common.EmptyEventID) && (lastEvent.GetEventId() != token.LastEventID-1) {
m.logger.Error(errNonContiguousEventID,
tag.WorkflowFirstEventID(firstEvent.GetEventId()),
tag.WorkflowNextEventID(lastEvent.GetEventId()),
tag.TokenLastEventID(token.LastEventID),
tag.Counter(eventCount))
m.logger.Error(dataLossMsg, datalossTags(errNonContiguousEventID)...)
return historyEvents, transactionIDs, nil, dataSize, serviceerror.NewDataLoss(errNonContiguousEventID)
}

Expand Down

0 comments on commit 61fd926

Please sign in to comment.