Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure the backtrace tracing for failed allocation #13376

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
}

TString MemoryConsumptionDetails() const override {
// NOTE: don't forget to disable verbosity in stable branches.
return Tx->ToString(true);
return Tx->ToString();
}

void TerminateHandler(bool success, const NYql::TIssues& issues) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
, AllowOlapDataQuery(tableServiceConfig.GetAllowOlapDataQuery())
, BlockTrackingMode(tableServiceConfig.GetBlockTrackingMode())
, WaitCAStatsTimeout(TDuration::MilliSeconds(tableServiceConfig.GetQueryLimits().GetWaitCAStatsTimeoutMs()))
, VerboseMemoryLimitException(tableServiceConfig.GetResourceManager().GetVerboseMemoryLimitException())
{
if (tableServiceConfig.HasArrayBufferMinFillPercentage()) {
ArrayBufferMinFillPercentage = tableServiceConfig.GetArrayBufferMinFillPercentage();
Expand Down Expand Up @@ -2721,6 +2722,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
.CaFactory_ = Request.CaFactory_,
.BlockTrackingMode = BlockTrackingMode,
.ArrayBufferMinFillPercentage = ArrayBufferMinFillPercentage,
.VerboseMemoryLimitException = VerboseMemoryLimitException,
});

auto err = Planner->PlanExecution();
Expand Down Expand Up @@ -3045,6 +3047,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const TDuration WaitCAStatsTimeout;
TMaybe<ui8> ArrayBufferMinFillPercentage;
const bool VerboseMemoryLimitException;
};

} // namespace
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ TKqpPlanner::TKqpPlanner(TKqpPlanner::TArgs&& args)
, CaFactory_(args.CaFactory_)
, BlockTrackingMode(args.BlockTrackingMode)
, ArrayBufferMinFillPercentage(args.ArrayBufferMinFillPercentage)
, VerboseMemoryLimitException(args.VerboseMemoryLimitException)
{
if (GUCSettings) {
SerializedGUCSettings = GUCSettings->SerializeToString();
Expand Down Expand Up @@ -479,7 +480,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)

TxInfo = MakeIntrusive<NRm::TTxState>(
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
UserRequestContext->PoolId, memoryPoolPercent, Database);
UserRequestContext->PoolId, memoryPoolPercent, Database, VerboseMemoryLimitException);
}

if (ArrayBufferMinFillPercentage) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class TKqpPlanner {
const std::shared_ptr<NKikimr::NKqp::NComputeActor::IKqpNodeComputeActorFactory>& CaFactory_;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const TMaybe<ui8> ArrayBufferMinFillPercentage;
const bool VerboseMemoryLimitException;
};

TKqpPlanner(TKqpPlanner::TArgs&& args);
Expand Down Expand Up @@ -150,6 +151,7 @@ class TKqpPlanner {
TVector<TProgressStat> LastStats;
const NKikimrConfig::TTableServiceConfig::EBlockTrackingMode BlockTrackingMode;
const TMaybe<ui8> ArrayBufferMinFillPercentage;
const bool VerboseMemoryLimitException;

public:
static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
.GUCSettings = nullptr,
.MayRunTasksLocally = false,
.ResourceManager_ = Request.ResourceManager_,
.CaFactory_ = Request.CaFactory_
.CaFactory_ = Request.CaFactory_,
// TODO: BlockTrackingMode is not set!
.VerboseMemoryLimitException = false,
});

LOG_D("Execute scan tx, PendingComputeTasks: " << TasksGraph.GetTasks().size());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
txId, TInstant::Now(), ResourceManager_->GetCounters(),
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent(),
msg.GetDatabase());
msg.GetDatabase(), Config.GetVerboseMemoryLimitException());

const ui32 tasksCount = msg.GetTasks().size();
for (auto& dqTask: *msg.MutableTasks()) {
Expand Down
70 changes: 43 additions & 27 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,39 +118,48 @@ class TTxState : public TAtomicRefCount<TTxState> {
const TString PoolId;
const double MemoryPoolPercent;
const TString Database;
const bool CollectBacktrace;

private:
std::atomic<ui64> TxScanQueryMemory = 0;
std::atomic<ui64> TxExternalDataQueryMemory = 0;
std::atomic<ui32> TxExecutionUnits = 0;
std::atomic<ui64> TxMaxAllocation = 0;
std::atomic<ui64> TxFailedAllocation = 0;
std::atomic<ui64> TxMaxAllocationSize = 0;

// TODO(ilezhankin): it's better to use std::atomic<std::shared_ptr<>> which is not supported at the moment.
std::atomic<TBackTrace*> TxMaxAllocationBacktrace = nullptr;
std::atomic<TBackTrace*> TxFailedAllocationBacktrace = nullptr;

// NOTE: it's hard to maintain atomic pointer in case of tracking the last failed allocation backtrace,
// because while we try to print one - the new last may emerge and delete previous.
mutable std::mutex BacktraceMutex;
std::atomic<ui64> TxFailedAllocationSize = 0; // protected by BacktraceMutex (only if CollectBacktrace == true)
TBackTrace TxFailedAllocationBacktrace; // protected by BacktraceMutex
std::atomic<bool> HasFailedAllocationBacktrace = false;

public:
explicit TTxState(ui64 txId, TInstant now, TIntrusivePtr<TKqpCounters> counters, const TString& poolId, const double memoryPoolPercent,
const TString& database)
const TString& database, bool collectBacktrace)
: TxId(txId)
, CreatedAt(now)
, Counters(std::move(counters))
, PoolId(poolId)
, MemoryPoolPercent(memoryPoolPercent)
, Database(database)
, CollectBacktrace(collectBacktrace)
{}

~TTxState() {
delete TxMaxAllocationBacktrace.load();
delete TxFailedAllocationBacktrace.load();
}

std::pair<TString, TString> MakePoolId() const {
return std::make_pair(Database, PoolId);
}

TString ToString(bool verbose = false) const {
TString ToString() const {
// use unique_lock to safely unlock mutex in case of exceptions
std::unique_lock backtraceLock(BacktraceMutex, std::defer_lock);

auto res = TStringBuilder() << "TxResourcesInfo { "
<< "TxId: " << TxId
<< ", Database: " << Database;
Expand All @@ -160,19 +169,28 @@ class TTxState : public TAtomicRefCount<TTxState> {
<< ", MemoryPoolPercent: " << Sprintf("%.2f", MemoryPoolPercent > 0 ? MemoryPoolPercent : 100);
}

if (CollectBacktrace) {
backtraceLock.lock();
}

res << ", tx initially granted memory: " << HumanReadableSize(TxExternalDataQueryMemory.load(), SF_BYTES)
<< ", tx total memory allocations: " << HumanReadableSize(TxScanQueryMemory.load(), SF_BYTES)
<< ", tx largest successful memory allocation: " << HumanReadableSize(TxMaxAllocation.load(), SF_BYTES)
<< ", tx last failed memory allocation: " << HumanReadableSize(TxFailedAllocation.load(), SF_BYTES)
<< ", tx largest successful memory allocation: " << HumanReadableSize(TxMaxAllocationSize.load(), SF_BYTES)
<< ", tx last failed memory allocation: " << HumanReadableSize(TxFailedAllocationSize.load(), SF_BYTES)
<< ", tx total execution units: " << TxExecutionUnits.load()
<< ", started at: " << CreatedAt
<< " }" << Endl;

if (verbose && TxMaxAllocationBacktrace.load()) {
res << "TxMaxAllocationBacktrace:" << Endl << TxMaxAllocationBacktrace.load()->PrintToString();
if (CollectBacktrace && HasFailedAllocationBacktrace.load()) {
res << "TxFailedAllocationBacktrace:" << Endl << TxFailedAllocationBacktrace.PrintToString();
}

if (CollectBacktrace) {
backtraceLock.unlock();
}
if (verbose && TxFailedAllocationBacktrace.load()) {
res << "TxFailedAllocationBacktrace:" << Endl << TxFailedAllocationBacktrace.load()->PrintToString();

if (CollectBacktrace && TxMaxAllocationBacktrace.load()) {
res << "TxMaxAllocationBacktrace:" << Endl << TxMaxAllocationBacktrace.load()->PrintToString();
}

return res;
Expand All @@ -183,22 +201,19 @@ class TTxState : public TAtomicRefCount<TTxState> {
}

void AckFailedMemoryAlloc(ui64 memory) {
auto* oldBacktrace = TxFailedAllocationBacktrace.load();
ui64 lastAlloc = TxFailedAllocation.load();
bool exchanged = false;
// use unique_lock to safely unlock mutex in case of exceptions
std::unique_lock backtraceLock(BacktraceMutex, std::defer_lock);

while(!exchanged) {
exchanged = TxFailedAllocation.compare_exchange_weak(lastAlloc, memory);
if (CollectBacktrace) {
backtraceLock.lock();
}

if (exchanged) {
auto* newBacktrace = new TBackTrace();
newBacktrace->Capture();
if (TxFailedAllocationBacktrace.compare_exchange_strong(oldBacktrace, newBacktrace)) {
delete oldBacktrace;
} else {
delete newBacktrace;
}
TxFailedAllocationSize = memory;

if (CollectBacktrace) {
TxFailedAllocationBacktrace.Capture();
HasFailedAllocationBacktrace = true;
backtraceLock.unlock();
}
}

Expand Down Expand Up @@ -239,17 +254,18 @@ class TTxState : public TAtomicRefCount<TTxState> {
}

auto* oldBacktrace = TxMaxAllocationBacktrace.load();
ui64 maxAlloc = TxMaxAllocation.load();
ui64 maxAlloc = TxMaxAllocationSize.load();
bool exchanged = false;

while(maxAlloc < resources.Memory && !exchanged) {
exchanged = TxMaxAllocation.compare_exchange_weak(maxAlloc, resources.Memory);
exchanged = TxMaxAllocationSize.compare_exchange_weak(maxAlloc, resources.Memory);
}

if (exchanged) {
auto* newBacktrace = new TBackTrace();
newBacktrace->Capture();
if (TxMaxAllocationBacktrace.compare_exchange_strong(oldBacktrace, newBacktrace)) {
// XXX(ilezhankin): technically it's possible to have a race with `ToString()`, but it's very unlikely.
delete oldBacktrace;
} else {
delete newBacktrace;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/rm_service/kqp_rm_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class KqpRm : public TTestBase {
}

TIntrusivePtr<NRm::TTxState> MakeTx(ui64 txId, std::shared_ptr<NRm::IKqpResourceManager> rm) {
return MakeIntrusive<NRm::TTxState>(txId, TInstant::Now(), rm->GetCounters(), "", (double)100, "");
return MakeIntrusive<NRm::TTxState>(txId, TInstant::Now(), rm->GetCounters(), "", (double)100, "", false);
}

TIntrusivePtr<NRm::TTaskState> MakeTask(ui64 taskId, TIntrusivePtr<NRm::TTxState> tx) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ message TTableServiceConfig {
optional uint64 MaxNonParallelTopStageExecutionLimit = 26 [default = 1];
optional bool PreferLocalDatacenterExecution = 27 [ default = true ];
optional uint64 MaxNonParallelDataQueryTasksLimit = 28 [default = 1000];

optional bool VerboseMemoryLimitException = 29 [default = false];
}

message TSpillingServiceConfig {
Expand Down
Loading