diff --git a/streaming/src/channel/channel.cc b/streaming/src/channel/channel.cc index 672a7697..80e1d5b6 100644 --- a/streaming/src/channel/channel.cc +++ b/streaming/src/channel/channel.cc @@ -80,7 +80,23 @@ StreamingStatus StreamingQueueProducer::RefreshChannelInfo() { } else { queue_info.consumed_bundle_id = consumed_bundle_id; } + } else { + // While we receive INVALID/-1 consumed bundle id, it's assumed that + // upstream crashed or being refresh starting. + if (queue_info.consumed_bundle_id != static_cast(-1)) { + // Previous consumed bundle id is valid stands failover happens at this moment. + STREAMING_LOG(INFO) << "Upstream channel id " << channel_info_.channel_id + << " might fail to fetch data continuous, jump to invliad " + "offset, last consumed bundle id " + << queue_info.consumed_bundle_id << ", consumed_message_id " + << queue_info.consumed_message_id; + queue_info.consumed_bundle_id = consumed_bundle_id; + } else { + STREAMING_LOG(DEBUG) << "Refresh starting or co-failure in same time " + << channel_info_.channel_id; + } } + return StreamingStatus::OK; }