Skip to content

Commit

Permalink
Make DQ and KQP use both WideStream and WideFlow for WideFromBlocks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
igormunkin authored Jan 14, 2025
1 parent 3d0c4ef commit 90166bc
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 56 deletions.
97 changes: 77 additions & 20 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,24 @@ bool IsCompatibleWithBlocks(TPositionHandle pos, const TStructExprType& type, TE
return resolveStatus == IArrowResolver::OK;
}

// XXX: Unfortunately, KQP optimizer pipeline is not quite
// predictable, so there is no guarantees whether the pair
// (FromFlow(ToFlow(...)) are already squashed or not.
// The helper below checks both cases to find WideFromBlocks
// input node:
// * (WideFromBlocks(...))
// * (FromFlow(ToFlow(WideFromBlocks(...))))
// FIXME: To suppress compiler warnings when the constexpr value
// NYql::NBlockStreamIO::WideFromBlocks is false, Y_DECLARE_UNUSED
// quantifier is used.
Y_DECLARE_UNUSED
static inline bool IsNodeWideFromBlocks(const TMaybeNode<TExprBase> body) {
return body.Maybe<TCoWideFromBlocks>() ||
body.Maybe<TCoFromFlow>() &&
body.Cast<TCoFromFlow>().Input().Maybe<TCoToFlow>() &&
body.Cast<TCoFromFlow>().Input().Cast<TCoToFlow>().Input().Maybe<TCoWideFromBlocks>();
}

// TODO: composite copy-paste from https://github.com/ydb-platform/ydb/blob/122f053354c5df4fc559bf06fe0302f92d813032/ydb/library/yql/dq/opt/dq_opt_build.cpp#L388
bool CanPropagateWideBlockThroughChannel(
const TDqOutput& output,
Expand All @@ -293,11 +311,20 @@ bool CanPropagateWideBlockThroughChannel(
return false;
}

// Ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
if (!program.Lambda().Body().Maybe<TCoFromFlow>() ||
!program.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
return false;
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
// Ensure that stage has blocks on top level (i.e. (FromFlow(WideFromBlocks(...)))).
if (!program.Lambda().Body().Maybe<TCoFromFlow>() ||
!program.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
return false;
}
} else {
// Ensure that stage has blocks on top level (i.e. either
// (WideFromBlocks(...)) or (FromFlow(ToFlow(WideFromBlocks(...))))).
// See the rationale for alternatives nearby IsNodeWideFromBlocks.
if (!IsNodeWideFromBlocks(program.Lambda().Body())) {
return false;
}
}

auto typeAnnotation = program.Lambda().Ref().GetTypeAnn();
Expand Down Expand Up @@ -371,31 +398,61 @@ TMaybeNode<TKqpPhysicalTx> PeepholeOptimize(const TKqpPhysicalTx& tx, TExprConte
if (auto connection = stage.Inputs().Item(i).Maybe<TDqConnection>(); connection &&
CanPropagateWideBlockThroughChannel(connection.Cast().Output(), programs, TDqStageSettings::Parse(stage), ctx, typesCtx))
{
TExprNode::TPtr newArgNode = ctx.Builder(oldArg.Pos())
.Callable("FromFlow")
.Callable(0, "WideFromBlocks")
.Callable(0, "ToFlow")
.Add(0, newArg.Ptr())
TExprNode::TPtr newArgNode;
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
newArgNode = ctx.Builder(oldArg.Pos())
.Callable("FromFlow")
.Callable(0, "WideFromBlocks")
.Callable(0, "ToFlow")
.Add(0, newArg.Ptr())
.Seal()
.Seal()
.Seal()
.Seal()
.Build();
.Build();
} else {
newArgNode = ctx.Builder(oldArg.Pos())
.Callable("WideFromBlocks")
.Add(0, newArg.Ptr())
.Seal()
.Build();
}

argsMap.emplace(oldArg.Raw(), newArgNode);

auto stageUid = connection.Cast().Output().Stage().Ref().UniqueId();

// Update input program with: FromFlow(WideFromBlocks($1)) → FromFlow($1)
if (const auto& inputProgram = programs.at(stageUid); inputProgram.Lambda().Body().Maybe<TCoFromFlow>() &&
inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
auto newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda().Body().Cast<TCoFromFlow>().Pos())
.Input(inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Cast<TCoWideFromBlocks>().Input())
.Done();
const auto& inputProgram = programs.at(stageUid);
TMaybeNode<TExprBase> newBody;
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
// Update input program with: (FromFlow(WideFromBlocks($1))) -> (FromFlow($1))
if (inputProgram.Lambda().Body().Maybe<TCoFromFlow>() &&
inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
newBody = Build<TCoFromFlow>(ctx, inputProgram.Lambda().Body().Cast<TCoFromFlow>().Pos())
.Input(inputProgram.Lambda().Body().Cast<TCoFromFlow>().Input().Cast<TCoWideFromBlocks>().Input())
.Done();
}
} else {
// Update input program with one of the following:
// * (WideFromBlocks($1)) -> ($1)
// * (FromFlow(ToFlow(WideFromBlocks($1)))) -> ($1)
const auto& body = inputProgram.Lambda().Body();
if (IsNodeWideFromBlocks(body)) {
if (body.Maybe<TCoWideFromBlocks>()) {
newBody = body.Cast<TCoWideFromBlocks>().Input();
} else {
newBody = body.Cast<TCoFromFlow>().Input()
.Cast<TCoToFlow>().Input()
.Cast<TCoWideFromBlocks>().Input();
}
}
}

if (newBody) {
auto newInputProgram = Build<TKqpProgram>(ctx, inputProgram.Pos())
.Lambda()
.Args(inputProgram.Lambda().Args())
.Body(newBody)
.Body(newBody.Cast())
.Build()
.ArgsType(inputProgram.ArgsType())
.Done();
Expand Down
37 changes: 27 additions & 10 deletions ydb/core/kqp/opt/peephole/kqp_opt_peephole_wide_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,33 @@ TExprBase KqpBuildWideReadTable(const TExprBase& node, TExprContext& ctx, TTypeA
auto read = maybeRead.Cast();

if (typesCtx.IsBlockEngineEnabled()) {
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
.Input<TKqpBlockReadOlapTableRanges>()
.Table(read.Table())
.Ranges(read.Ranges())
.Columns(read.Columns())
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.Process(read.Process())
.Build()
.Done();
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
wideRead = Build<TCoWideFromBlocks>(ctx, node.Pos())
.Input<TKqpBlockReadOlapTableRanges>()
.Table(read.Table())
.Ranges(read.Ranges())
.Columns(read.Columns())
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.Process(read.Process())
.Build()
.Done();
} else {
wideRead = Build<TCoToFlow>(ctx, node.Pos())
.Input<TCoWideFromBlocks>()
.Input<TCoFromFlow>()
.Input<TKqpBlockReadOlapTableRanges>()
.Table(read.Table())
.Ranges(read.Ranges())
.Columns(read.Columns())
.Settings(read.Settings())
.ExplainPrompt(read.ExplainPrompt())
.Process(read.Process())
.Build()
.Build()
.Build()
.Done();
}
} else {
wideRead = Build<TKqpWideReadOlapTableRanges>(ctx, node.Pos())
.Table(read.Table())
Expand Down
18 changes: 15 additions & 3 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2536,16 +2536,28 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

switch (blockChannelsMode) {
case NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_SCALAR:
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("return (FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("return (FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
} else {
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("return (FromFlow (NarrowMap (ToFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
}
break;
case NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_AUTO:
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
} else {
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(WideFromBlocks"), plan.QueryStats->Getquery_ast());
}
UNIT_ASSERT_C(!plan.QueryStats->Getquery_ast().Contains("WideToBlocks"), plan.QueryStats->Getquery_ast());
UNIT_ASSERT_EQUAL_C(plan.QueryStats->Getquery_ast().find("WideFromBlocks"), plan.QueryStats->Getquery_ast().rfind("WideFromBlocks"), plan.QueryStats->Getquery_ast());
break;
case NKikimrConfig::TTableServiceConfig_EBlockChannelsMode_BLOCK_CHANNELS_FORCE:
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (WideSortBlocks"), plan.QueryStats->Getquery_ast());
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (NarrowMap (WideFromBlocks"), plan.QueryStats->Getquery_ast());
} else {
UNIT_ASSERT_C(plan.QueryStats->Getquery_ast().Contains("(FromFlow (NarrowMap (ToFlow (WideFromBlocks"), plan.QueryStats->Getquery_ast());
}
break;
}
}
Expand Down
37 changes: 26 additions & 11 deletions ydb/library/yql/dq/opt/dq_opt_build.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,10 +768,16 @@ bool CanRebuildForWideBlockChannelOutput(bool forceBlocks, const TDqPhyStage& st

if (!forceBlocks) {
// ensure that stage has blocks on top level (i.e. FromFlow(WideFromBlocks(...)))
if (!stage.Program().Body().Maybe<TCoFromFlow>() ||
!stage.Program().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
return false;
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
if (!stage.Program().Body().Maybe<TCoFromFlow>() ||
!stage.Program().Body().Cast<TCoFromFlow>().Input().Maybe<TCoWideFromBlocks>())
{
return false;
}
} else {
if (!stage.Program().Body().Maybe<TCoWideFromBlocks>()) {
return false;
}
}
}

Expand Down Expand Up @@ -826,15 +832,24 @@ TDqPhyStage RebuildStageInputsAsWideBlock(bool forceBlocks, const TDqPhyStage& s
if (maybeConn && IsSupportedForWideBlocks(maybeConn.Cast()) && CanRebuildForWideBlockChannelOutput(forceBlocks, maybeConn.Cast().Output(), ctx, typesCtx)) {
++blockInputs;
// input will actually be wide block stream - convert it to wide stream first
TExprNode::TPtr newArgNode = ctx.Builder(arg.Pos())
.Callable("FromFlow")
.Callable(0, "WideFromBlocks")
.Callable(0, "ToFlow")
.Add(0, newArg.Ptr())
TExprNode::TPtr newArgNode;
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
newArgNode = ctx.Builder(arg.Pos())
.Callable("FromFlow")
.Callable(0, "WideFromBlocks")
.Callable(0, "ToFlow")
.Add(0, newArg.Ptr())
.Seal()
.Seal()
.Seal()
.Seal()
.Build();
.Build();
} else {
newArgNode = ctx.Builder(arg.Pos())
.Callable("WideFromBlocks")
.Add(0, newArg.Ptr())
.Seal()
.Build();
}
argsMap.emplace(arg.Raw(), newArgNode);

const TDqConnection& conn = maybeConn.Cast();
Expand Down
22 changes: 17 additions & 5 deletions ydb/library/yql/dq/opt/dq_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2946,11 +2946,23 @@ NNodes::TExprBase DqBuildStageWithSourceWrap(NNodes::TExprBase node, TExprContex
.Build();

if (supportsBlocks) {
wideWrap = ctx.Builder(node.Pos())
.Callable("WideFromBlocks")
.Add(0, wideWrap)
.Seal()
.Build();
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
wideWrap = ctx.Builder(node.Pos())
.Callable("WideFromBlocks")
.Add(0, wideWrap)
.Seal()
.Build();
} else {
wideWrap = ctx.Builder(node.Pos())
.Callable("ToFlow")
.Callable(0, "WideFromBlocks")
.Callable(0, "FromFlow")
.Add(0, wideWrap)
.Seal()
.Seal()
.Seal()
.Build();
}
}

auto narrow = ctx.Builder(node.Pos())
Expand Down
27 changes: 20 additions & 7 deletions ydb/library/yql/providers/dq/opt/dqs_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,29 @@ namespace NYql::NDqs {
}

YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration";
return Build<TCoWideFromBlocks>(ctx, node->Pos())
.Input(
Build<TCoToFlow>(ctx, node->Pos())
if constexpr (!NYql::NBlockStreamIO::WideFromBlocks) {
return Build<TCoWideFromBlocks>(ctx, node->Pos())
.Input(Build<TCoToFlow>(ctx, node->Pos())
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
.Input(readWideWrap.Input())
.Flags(readWideWrap.Flags())
.Token(readWideWrap.Token())
.Done().Ptr())
.Input(readWideWrap.Input())
.Flags(readWideWrap.Flags())
.Token(readWideWrap.Token())
.Done().Ptr())
.Done())
.Done().Ptr();
}

YQL_ENSURE(NYql::NBlockStreamIO::WideFromBlocks);

return Build<TCoToFlow>(ctx, node->Pos())
.Input(Build<TCoWideFromBlocks>(ctx, node->Pos())
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
.Input(readWideWrap.Input())
.Flags(readWideWrap.Flags())
.Token(readWideWrap.Token())
.Done().Ptr())
.Done())
.Done().Ptr();
}, ctx, optSettings);
});
}
Expand Down

0 comments on commit 90166bc

Please sign in to comment.