Skip to content

Commit

Permalink
fix fresh start flow control (#58)
Browse files Browse the repository at this point in the history
Co-authored-by: ashione <[email protected]>
  • Loading branch information
ashione and ashione authored Oct 31, 2022
1 parent c86b94a commit 15522e5
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions streaming/src/channel/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,23 @@ StreamingStatus StreamingQueueProducer::RefreshChannelInfo() {
} else {
queue_info.consumed_bundle_id = consumed_bundle_id;
}
} else {
// While we receive INVALID/-1 consumed bundle id, it's assumed that
// upstream crashed or being refresh starting.
if (queue_info.consumed_bundle_id != static_cast<uint64_t>(-1)) {
// Previous consumed bundle id is valid stands failover happens at this moment.
STREAMING_LOG(INFO) << "Upstream channel id " << channel_info_.channel_id
<< " might fail to fetch data continuous, jump to invliad "
"offset, last consumed bundle id "
<< queue_info.consumed_bundle_id << ", consumed_message_id "
<< queue_info.consumed_message_id;
queue_info.consumed_bundle_id = consumed_bundle_id;
} else {
STREAMING_LOG(DEBUG) << "Refresh starting or co-failure in same time "
<< channel_info_.channel_id;
}
}

return StreamingStatus::OK;
}

Expand Down

0 comments on commit 15522e5

Please sign in to comment.