diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index c793ba7f7ee5..7f1070d21b2f 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -96,7 +96,8 @@ struct TEvPrivate { ui64 SendStatisticPeriodSec = 2; ui64 MaxBatchSizeBytes = 10000000; -ui64 MaxHandledEvents = 1000; +ui64 MaxHandledEventsCount = 1000; +ui64 MaxHandledEventsSize = 1000000; TVector GetVector(const google::protobuf::RepeatedPtrField& value) { return {value.begin(), value.end()}; @@ -157,7 +158,8 @@ class TTopicSession : public TActorBootstrapped { void operator()(NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent&) { } TTopicSession& Self; - const TString& LogPrefix; + const TString& LogPrefix; + ui64& dataReceivedEventSize; }; struct TParserSchema { @@ -531,7 +533,9 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) { } void TTopicSession::HandleNewEvents() { - for (ui64 i = 0; i < MaxHandledEvents; ++i) { + ui64 handledEventsSize = 0; + + for (ui64 i = 0; i < MaxHandledEventsCount; ++i) { if (!ReadSession) { return; } @@ -543,7 +547,11 @@ void TTopicSession::HandleNewEvents() { if (!event) { break; } - std::visit(TTopicEventProcessor{*this, LogPrefix}, *event); + + std::visit(TTopicEventProcessor{*this, LogPrefix, handledEventsSize}, *event); + if (handledEventsSize >= MaxHandledEventsSize) { + break; + } } } @@ -568,6 +576,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE Self.ClientsStats.Add(dataSize, event.GetMessages().size()); Self.Metrics.SessionDataRate->Add(dataSize); Self.Metrics.AllSessionsDataRate->Add(dataSize); + dataReceivedEventSize += dataSize; Self.SendToParsing(event.GetMessages()); }