Skip to content

Commit

Permalink
remove outdated code in rm service (#7207)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Jul 30, 2024
1 parent 8beeb7f commit 0138b90
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 160 deletions.
148 changes: 27 additions & 121 deletions ydb/core/kqp/rm_service/kqp_rm_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ struct TEvPrivate {

struct TEvSchedulePublishResources : public TEventLocal<TEvSchedulePublishResources, EEv::EvSchedulePublishResources> {
};

struct TEvTakeResourcesSnapshot : public TEventLocal<TEvTakeResourcesSnapshot, EEv::EvTakeResourcesSnapshot> {
std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)> Callback;
};
};

class TKqpResourceManager : public IKqpResourceManager {
Expand All @@ -117,7 +113,6 @@ class TKqpResourceManager : public IKqpResourceManager {
, ExecutionUnitsResource(config.GetComputeActorsCount())
, ExecutionUnitsLimit(config.GetComputeActorsCount())
, ScanQueryMemoryResource(config.GetQueryMemoryLimit())
, PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger())
{
SetConfigValues(config);
}
Expand All @@ -132,10 +127,7 @@ class TKqpResourceManager : public IKqpResourceManager {
config.GetKqpPatternCacheCompiledCapacityBytes(),
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());

if (PublishResourcesByExchanger) {
CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
return;
}
CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
}

const TIntrusivePtr<TKqpCounters>& GetCounters() const override {
Expand All @@ -144,14 +136,10 @@ class TKqpResourceManager : public IKqpResourceManager {

void CreateResourceInfoExchanger(
const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) {
PublishResourcesByExchanger = true;
if (!ResourceInfoExchanger) {
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
auto exchanger = CreateKqpResourceInfoExchangerActor(
Counters, ResourceSnapshotState, settings);
ResourceInfoExchanger = ActorSystem->Register(exchanger);
return;
}
ResourceSnapshotState = std::make_shared<TResourceSnapshotState>();
auto exchanger = CreateKqpResourceInfoExchangerActor(
Counters, ResourceSnapshotState, settings);
ResourceInfoExchanger = ActorSystem->Register(exchanger);
}

bool AllocateExecutionUnits(ui32 cnt) {
Expand Down Expand Up @@ -328,39 +316,28 @@ class TKqpResourceManager : public IKqpResourceManager {

TVector<NKikimrKqp::TKqpNodeResources> GetClusterResources() const override {
TVector<NKikimrKqp::TKqpNodeResources> resources;
Y_ABORT_UNLESS(PublishResourcesByExchanger);

if (PublishResourcesByExchanger) {
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
if (infos != nullptr) {
resources = *infos;
}
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
if (infos != nullptr) {
resources = *infos;
}

return resources;
}

void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override {
LOG_AS_D("Schedule Snapshot request");
if (PublishResourcesByExchanger) {
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
TVector<NKikimrKqp::TKqpNodeResources> resources;
if (infos != nullptr) {
resources = *infos;
}
callback(std::move(resources));
return;
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> infos;
with_lock (ResourceSnapshotState->Lock) {
infos = ResourceSnapshotState->Snapshot;
}
TVector<NKikimrKqp::TKqpNodeResources> resources;
if (infos != nullptr) {
resources = *infos;
}
auto ev = MakeHolder<TEvPrivate::TEvTakeResourcesSnapshot>();
ev->Callback = std::move(callback);
TAutoPtr<IEventHandle> handle = new IEventHandle(SelfId, SelfId, ev.Release());
ActorSystem->Send(handle);
callback(std::move(resources));
}

TKqpLocalNodeResources GetLocalResources() const override {
Expand Down Expand Up @@ -470,7 +447,6 @@ class TKqpResourceManager : public IKqpResourceManager {

// state for resource info exchanger
std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
bool PublishResourcesByExchanger;
TActorId ResourceInfoExchanger = TActorId();
};

Expand Down Expand Up @@ -500,7 +476,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
: Config(config)
, ResourceBrokerId(resourceBrokerId ? resourceBrokerId : MakeResourceBrokerID())
, KqpProxySharedResources(std::move(kqpProxySharedResources))
, PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger())
{
ResourceManager = std::make_shared<TKqpResourceManager>(config, counters);
with_lock (ResourceManagers.Lock) {
Expand Down Expand Up @@ -575,7 +550,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
hFunc(TEvInterconnect::TEvNodeInfo, Handle);
hFunc(TEvPrivate::TEvPublishResources, HandleWork);
hFunc(TEvPrivate::TEvSchedulePublishResources, HandleWork);
hFunc(TEvPrivate::TEvTakeResourcesSnapshot, HandleWork);
hFunc(NNodeWhiteboard::TEvWhiteboard::TEvSystemStateResponse, Handle);
hFunc(TEvKqp::TEvKqpProxyPublishRequest, HandleWork);
hFunc(TEvResourceBroker::TEvConfigResponse, HandleWork);
Expand Down Expand Up @@ -611,20 +585,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
PublishResourceUsage("kqp_proxy");
}

void HandleWork(TEvPrivate::TEvTakeResourcesSnapshot::TPtr& ev) {
if (WbState.DomainNotFound) {
LOG_E("Can not take resources snapshot, ssGroupId not set. Tenant: " << WbState.Tenant
<< ", Board: " << WbState.BoardPath);
ev->Get()->Callback({});
return;
}

LOG_D("Create Snapshot actor, board: " << WbState.BoardPath);

Register(
CreateTakeResourcesSnapshotActor(WbState.BoardPath, std::move(ev->Get()->Callback)));
}

void HandleWork(TEvResourceBroker::TEvConfigResponse::TPtr& ev) {
if (!ev->Get()->QueueConfig) {
LOG_E(NLocalDb::KqpResourceManagerQueue << " not configured!");
Expand Down Expand Up @@ -699,23 +659,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
config.GetKqpPatternCacheCompiledCapacityBytes(),
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());

bool enablePublishResourcesByExchanger = config.GetEnablePublishResourcesByExchanger();
if (enablePublishResourcesByExchanger != PublishResourcesByExchanger) {
PublishResourcesByExchanger = enablePublishResourcesByExchanger;
if (enablePublishResourcesByExchanger) {
ResourceManager->CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
PublishResourceUsage("exchanger enabled");
} else {
if (ResourceManager->ResourceInfoExchanger) {
Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison);
ResourceManager->ResourceInfoExchanger = TActorId();
}
ResourceManager->PublishResourcesByExchanger = false;
ResourceManager->ResourceSnapshotState.reset();
PublishResourceUsage("exchanger disabled");
}
}

#define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name());
FORCE_VALUE(ComputeActorsCount)
FORCE_VALUE(ChannelBufferSize)
Expand Down Expand Up @@ -762,14 +705,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
TStringStream str;
str.Reserve(8 * 1024);

auto snapshot = TVector<NKikimrKqp::TKqpNodeResources>();

if (PublishResourcesByExchanger) {
ResourceManager->RequestClusterResourcesInfo(
[&snapshot](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
snapshot = std::move(resources);
});
}
auto snapshot = ResourceManager->GetClusterResources();

HTML(str) {
PRE() {
Expand Down Expand Up @@ -823,9 +759,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
ResourceManager->ResourceInfoExchanger = TActorId();
}
ResourceManager->ResourceSnapshotState.reset();
if (WbState.BoardPublisherActorId) {
Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
}
TActor::PassAway();
}

Expand Down Expand Up @@ -882,39 +815,14 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
pool->SetAvailable(ResourceManager->ScanQueryMemoryResource.Available());
}

if (PublishResourcesByExchanger) {
LOG_I("Send to publish resource usage for "
<< "reason: " << reason
<< ", payload: " << payload.ShortDebugString());
WbState.LastPublishTime = now;
if (ResourceManager->ResourceInfoExchanger) {
Send(ResourceManager->ResourceInfoExchanger,
new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
}
return;
}

if (WbState.BoardPublisherActorId) {
LOG_I("Kill previous board publisher for '" << WbState.BoardPath
<< "' at " << WbState.BoardPublisherActorId << ", reason: " << reason);
Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison);
}

WbState.BoardPublisherActorId = TActorId();

if (WbState.DomainNotFound) {
LOG_E("Can not find default state storage group for database " << WbState.Tenant);
return;
}

auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(),
/* ttlMs */ 0, /* reg */ true);
WbState.BoardPublisherActorId = Register(boardPublisher);

LOG_I("Send to publish resource usage for "
<< "reason: " << reason
<< ", payload: " << payload.ShortDebugString());
WbState.LastPublishTime = now;

LOG_I("Publish resource usage for '" << WbState.BoardPath << "' at " << WbState.BoardPublisherActorId
<< ", reason: " << reason << ", payload: " << payload.ShortDebugString());
if (ResourceManager->ResourceInfoExchanger) {
Send(ResourceManager->ResourceInfoExchanger,
new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload)));
}
}

private:
Expand All @@ -927,7 +835,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
TString Tenant;
TString BoardPath;
bool DomainNotFound = false;
TActorId BoardPublisherActorId;
std::optional<TInstant> LastPublishTime;
};
TWhiteBoardState WbState;
Expand All @@ -940,7 +847,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
std::shared_ptr<TKqpResourceManager> ResourceManager;

std::optional<TInstant> PublishResourcesScheduledAt;
bool PublishResourcesByExchanger;
std::optional<TString> SelfDataCenterId;
};

Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kqp/rm_service/kqp_rm_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,11 +229,6 @@ class IKqpResourceManager : private TNonCopyable {
};


NActors::IActor* CreateTakeResourcesSnapshotActor(
const TString& boardPath,
std::function<void(TVector<NKikimrKqp::TKqpNodeResources>&&)>&& callback);


struct TResourceSnapshotState {
std::shared_ptr<TVector<NKikimrKqp::TKqpNodeResources>> Snapshot;
TMutex Lock;
Expand Down
47 changes: 14 additions & 33 deletions ydb/core/kqp/rm_service/kqp_rm_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ TResourceBrokerConfig MakeResourceBrokerTestConfig() {
return config;
}

NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig(
bool EnablePublishResourcesByExchanger = false) {
NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig() {
NKikimrConfig::TTableServiceConfig::TResourceManager config;

config.SetComputeActorsCount(100);
config.SetPublishStatisticsIntervalSec(0);
config.SetQueryMemoryLimit(1000);
config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger);

auto* infoExchangerRetrySettings = config.MutableInfoExchangerSettings();
auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings();
Expand Down Expand Up @@ -276,12 +274,9 @@ class KqpRm : public TTestBase {
UNIT_TEST(NotEnoughMemory);
UNIT_TEST(NotEnoughExecutionUnits);
UNIT_TEST(ResourceBrokerNotEnoughResources);
UNIT_TEST(SingleSnapshotByStateStorage);
UNIT_TEST(SingleSnapshotByExchanger);
UNIT_TEST(Reduce);
UNIT_TEST(SnapshotSharingByStateStorage);
UNIT_TEST(SnapshotSharingByExchanger);
UNIT_TEST(NodesMembershipByStateStorage);
UNIT_TEST(NodesMembershipByExchanger);
UNIT_TEST(DisonnectNodes);
UNIT_TEST_SUITE_END();
Expand All @@ -291,15 +286,12 @@ class KqpRm : public TTestBase {
void NotEnoughMemory();
void NotEnoughExecutionUnits();
void ResourceBrokerNotEnoughResources();
void Snapshot(bool byExchanger);
void SingleSnapshotByStateStorage();
void Snapshot();
void SingleSnapshotByExchanger();
void Reduce();
void SnapshotSharing(bool byExchanger);
void SnapshotSharingByStateStorage();
void SnapshotSharing();
void SnapshotSharingByExchanger();
void NodesMembership(bool byExchanger);
void NodesMembershipByStateStorage();
void NodesMembership();
void NodesMembershipByExchanger();
void DisonnectNodes();

Expand Down Expand Up @@ -450,8 +442,8 @@ void KqpRm::ResourceBrokerNotEnoughResources() {
AssertResourceBrokerSensors(0, 1000, 0, 0, 1);
}

void KqpRm::Snapshot(bool byExchanger) {
StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
void KqpRm::Snapshot() {
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
NKikimr::TActorSystemStub stub;

auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId());
Expand Down Expand Up @@ -490,12 +482,8 @@ void KqpRm::Snapshot(bool byExchanger) {
CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm);
}

void KqpRm::SingleSnapshotByStateStorage() {
Snapshot(false);
}

void KqpRm::SingleSnapshotByExchanger() {
Snapshot(true);
Snapshot();
}

void KqpRm::Reduce() {
Expand Down Expand Up @@ -537,8 +525,8 @@ void KqpRm::Reduce() {
AssertResourceBrokerSensors(0, 30, 0, 0, 1);
}

void KqpRm::SnapshotSharing(bool byExchanger) {
StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
void KqpRm::SnapshotSharing() {
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
NKikimr::TActorSystemStub stub;

auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
Expand Down Expand Up @@ -607,16 +595,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) {
}
}

void KqpRm::SnapshotSharingByStateStorage() {
SnapshotSharing(false);
}

void KqpRm::SnapshotSharingByExchanger() {
SnapshotSharing(true);
SnapshotSharing();
}

void KqpRm::NodesMembership(bool byExchanger) {
StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)});
void KqpRm::NodesMembership() {
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
NKikimr::TActorSystemStub stub;

auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
Expand All @@ -641,16 +625,13 @@ void KqpRm::NodesMembership(bool byExchanger) {
CheckSnapshot(0, {{1000, 100}}, rm_first);
}

void KqpRm::NodesMembershipByStateStorage() {
NodesMembership(false);
}

void KqpRm::NodesMembershipByExchanger() {
NodesMembership(true);
NodesMembership();
}

void KqpRm::DisonnectNodes() {
StartRms({MakeKqpResourceManagerConfig(true), MakeKqpResourceManagerConfig(true)});
StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()});
NKikimr::TActorSystemStub stub;

auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId());
Expand Down
Loading

0 comments on commit 0138b90

Please sign in to comment.