Skip to content

Commit

Permalink
YQ-3889 Add MaxHandledEventsSize (ydb-platform#11791)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Nov 20, 2024
1 parent 872bb4d commit 753ecb8
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ struct TEvPrivate {

ui64 SendStatisticPeriodSec = 2;
ui64 MaxBatchSizeBytes = 10000000;
ui64 MaxHandledEvents = 1000;
ui64 MaxHandledEventsCount = 1000;
ui64 MaxHandledEventsSize = 1000000;

TVector<TString> GetVector(const google::protobuf::RepeatedPtrField<TString>& value) {
return {value.begin(), value.end()};
Expand Down Expand Up @@ -157,7 +158,8 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
void operator()(NYdb::NTopic::TReadSessionEvent::TPartitionSessionStatusEvent&) { }

TTopicSession& Self;
const TString& LogPrefix;
const TString& LogPrefix;
ui64& dataReceivedEventSize;
};

struct TParserSchema {
Expand Down Expand Up @@ -531,7 +533,9 @@ void TTopicSession::Handle(TEvRowDispatcher::TEvGetNextBatch::TPtr& ev) {
}

void TTopicSession::HandleNewEvents() {
for (ui64 i = 0; i < MaxHandledEvents; ++i) {
ui64 handledEventsSize = 0;

for (ui64 i = 0; i < MaxHandledEventsCount; ++i) {
if (!ReadSession) {
return;
}
Expand All @@ -543,7 +547,11 @@ void TTopicSession::HandleNewEvents() {
if (!event) {
break;
}
std::visit(TTopicEventProcessor{*this, LogPrefix}, *event);

std::visit(TTopicEventProcessor{*this, LogPrefix, handledEventsSize}, *event);
if (handledEventsSize >= MaxHandledEventsSize) {
break;
}
}
}

Expand All @@ -568,6 +576,7 @@ void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionE
Self.ClientsStats.Add(dataSize, event.GetMessages().size());
Self.Metrics.SessionDataRate->Add(dataSize);
Self.Metrics.AllSessionsDataRate->Add(dataSize);
dataReceivedEventSize += dataSize;
Self.SendToParsing(event.GetMessages());
}

Expand Down

0 comments on commit 753ecb8

Please sign in to comment.