Skip to content

Commit

Permalink
Enable arena-based allocations for QueryService output rows (ydb-plat…
Browse files Browse the repository at this point in the history
  • Loading branch information
zinal authored Jan 13, 2025
1 parent c0dd9da commit 1402e4e
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 15 deletions.
10 changes: 5 additions & 5 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,13 +325,13 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
}

void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev, const TActorContext& ctx) {
Ydb::Query::ExecuteQueryResponsePart response;
response.set_status(Ydb::StatusIds::SUCCESS);
response.set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
response.mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());
Ydb::Query::ExecuteQueryResponsePart *response = ev->Get()->Arena->Allocate<Ydb::Query::ExecuteQueryResponsePart>();
response->set_status(Ydb::StatusIds::SUCCESS);
response->set_result_set_index(ev->Get()->Record.GetQueryResultIndex());
response->mutable_result_set()->Swap(ev->Get()->Record.MutableResultSet());

TString out;
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Y_PROTOBUF_SUPPRESS_NODISCARD response->SerializeToString(&out);

FlowControl_.PushResponse(out.size());
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ struct TEvKqpExecuter {
}
};

struct TEvStreamData : public TEventPB<TEvStreamData, NKikimrKqp::TEvExecuterStreamData,
TKqpExecuterEvents::EvStreamData> {};
struct TEvStreamData : public TEventPBWithArena<TEvStreamData, NKikimrKqp::TEvExecuterStreamData, TKqpExecuterEvents::EvStreamData> {
using TBaseEv = TEventPBWithArena<TEvStreamData, NKikimrKqp::TEvExecuterStreamData, TKqpExecuterEvents::EvStreamData>;
using TBaseEv::TEventPBBase;

TEvStreamData() = default;
explicit TEvStreamData(TIntrusivePtr<NActors::TProtoArenaHolder> arena)
: TEventPBBase(std::move(arena))
{}
};

struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck,
TKqpExecuterEvents::EvStreamDataAck>
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
batch.Payload = NYql::MakeChunkedBuffer(std::move(computeData.Payload));

if (!trailingResults) {
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), txResult.MkqlItemType, txResult.ColumnOrder, txResult.ColumnHints);
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
streamEv->Record.SetQueryResultIndex(*txResult.QueryResultIndex + StatementResultIndex);
streamEv->Record.SetChannelId(channel.Id);
streamEv->Record.MutableResultSet()->Swap(&resultSet);

TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
protoBuilder.BuildYdbResultSet(*streamEv->Record.MutableResultSet(), std::move(batches),
txResult.MkqlItemType, txResult.ColumnOrder, txResult.ColumnHints);

LOG_D("Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo()
<< ", nRows: " << streamEv->Record.GetResultSet().rows().size());
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/kqp/runtime/kqp_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ TKqpProtoBuilder::~TKqpProtoBuilder() {
}
}

Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
void TKqpProtoBuilder::BuildYdbResultSet(
Ydb::ResultSet& resultSet,
TVector<NYql::NDq::TDqSerializedBatch>&& data,
NKikimr::NMiniKQL::TType* mkqlSrcRowType,
const TVector<ui32>* columnOrder,
Expand All @@ -55,7 +56,6 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
YQL_ENSURE(mkqlSrcRowType->GetKind() == NKikimr::NMiniKQL::TType::EKind::Struct);
const auto* mkqlSrcRowStructType = static_cast<const TStructType*>(mkqlSrcRowType);

Ydb::ResultSet resultSet;
TColumnOrder order = columnHints ? TColumnOrder(*columnHints) : TColumnOrder{};
for (ui32 idx = 0; idx < mkqlSrcRowStructType->GetMembersCount(); ++idx) {
auto* column = resultSet.add_columns();
Expand Down Expand Up @@ -83,8 +83,6 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
});
}
}

return resultSet;
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TKqpProtoBuilder : private TNonCopyable {

~TKqpProtoBuilder();

Ydb::ResultSet BuildYdbResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data,
void BuildYdbResultSet(Ydb::ResultSet& resultSet, TVector<NYql::NDq::TDqSerializedBatch>&& data,
NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr, const TVector<TString>* columnHints = nullptr);

private:
Expand Down

0 comments on commit 1402e4e

Please sign in to comment.