From 0138b90ad4ffb86f6f1199990bc6fc2d8bd4436e Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Tue, 30 Jul 2024 13:10:18 +0300 Subject: [PATCH] remove outdated code in rm service (#7207) --- ydb/core/kqp/rm_service/kqp_rm_service.cpp | 148 ++++----------------- ydb/core/kqp/rm_service/kqp_rm_service.h | 5 - ydb/core/kqp/rm_service/kqp_rm_ut.cpp | 47 ++----- ydb/core/kqp/rm_service/ya.make | 1 - 4 files changed, 41 insertions(+), 160 deletions(-) diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.cpp b/ydb/core/kqp/rm_service/kqp_rm_service.cpp index f581fb3b0240..baf31d6bf8e7 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_service.cpp @@ -103,10 +103,6 @@ struct TEvPrivate { struct TEvSchedulePublishResources : public TEventLocal { }; - - struct TEvTakeResourcesSnapshot : public TEventLocal { - std::function&&)> Callback; - }; }; class TKqpResourceManager : public IKqpResourceManager { @@ -117,7 +113,6 @@ class TKqpResourceManager : public IKqpResourceManager { , ExecutionUnitsResource(config.GetComputeActorsCount()) , ExecutionUnitsLimit(config.GetComputeActorsCount()) , ScanQueryMemoryResource(config.GetQueryMemoryLimit()) - , PublishResourcesByExchanger(config.GetEnablePublishResourcesByExchanger()) { SetConfigValues(config); } @@ -132,10 +127,7 @@ class TKqpResourceManager : public IKqpResourceManager { config.GetKqpPatternCacheCompiledCapacityBytes(), config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile()); - if (PublishResourcesByExchanger) { - CreateResourceInfoExchanger(config.GetInfoExchangerSettings()); - return; - } + CreateResourceInfoExchanger(config.GetInfoExchangerSettings()); } const TIntrusivePtr& GetCounters() const override { @@ -144,14 +136,10 @@ class TKqpResourceManager : public IKqpResourceManager { void CreateResourceInfoExchanger( const NKikimrConfig::TTableServiceConfig::TResourceManager::TInfoExchangerSettings& settings) { - PublishResourcesByExchanger = true; - if (!ResourceInfoExchanger) { - ResourceSnapshotState = std::make_shared(); - auto exchanger = CreateKqpResourceInfoExchangerActor( - Counters, ResourceSnapshotState, settings); - ResourceInfoExchanger = ActorSystem->Register(exchanger); - return; - } + ResourceSnapshotState = std::make_shared(); + auto exchanger = CreateKqpResourceInfoExchangerActor( + Counters, ResourceSnapshotState, settings); + ResourceInfoExchanger = ActorSystem->Register(exchanger); } bool AllocateExecutionUnits(ui32 cnt) { @@ -328,16 +316,12 @@ class TKqpResourceManager : public IKqpResourceManager { TVector GetClusterResources() const override { TVector resources; - Y_ABORT_UNLESS(PublishResourcesByExchanger); - - if (PublishResourcesByExchanger) { - std::shared_ptr> infos; - with_lock (ResourceSnapshotState->Lock) { - infos = ResourceSnapshotState->Snapshot; - } - if (infos != nullptr) { - resources = *infos; - } + std::shared_ptr> infos; + with_lock (ResourceSnapshotState->Lock) { + infos = ResourceSnapshotState->Snapshot; + } + if (infos != nullptr) { + resources = *infos; } return resources; @@ -345,22 +329,15 @@ class TKqpResourceManager : public IKqpResourceManager { void RequestClusterResourcesInfo(TOnResourcesSnapshotCallback&& callback) override { LOG_AS_D("Schedule Snapshot request"); - if (PublishResourcesByExchanger) { - std::shared_ptr> infos; - with_lock (ResourceSnapshotState->Lock) { - infos = ResourceSnapshotState->Snapshot; - } - TVector resources; - if (infos != nullptr) { - resources = *infos; - } - callback(std::move(resources)); - return; + std::shared_ptr> infos; + with_lock (ResourceSnapshotState->Lock) { + infos = ResourceSnapshotState->Snapshot; + } + TVector resources; + if (infos != nullptr) { + resources = *infos; } - auto ev = MakeHolder(); - ev->Callback = std::move(callback); - TAutoPtr handle = new IEventHandle(SelfId, SelfId, ev.Release()); - ActorSystem->Send(handle); + callback(std::move(resources)); } TKqpLocalNodeResources GetLocalResources() const override { @@ -470,7 +447,6 @@ class TKqpResourceManager : public IKqpResourceManager { // state for resource info exchanger std::shared_ptr ResourceSnapshotState; - bool PublishResourcesByExchanger; TActorId ResourceInfoExchanger = TActorId(); }; @@ -500,7 +476,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped(config, counters); with_lock (ResourceManagers.Lock) { @@ -575,7 +550,6 @@ class TKqpResourceManagerActor : public TActorBootstrappedGet()->Callback({}); - return; - } - - LOG_D("Create Snapshot actor, board: " << WbState.BoardPath); - - Register( - CreateTakeResourcesSnapshotActor(WbState.BoardPath, std::move(ev->Get()->Callback))); - } - void HandleWork(TEvResourceBroker::TEvConfigResponse::TPtr& ev) { if (!ev->Get()->QueueConfig) { LOG_E(NLocalDb::KqpResourceManagerQueue << " not configured!"); @@ -699,23 +659,6 @@ class TKqpResourceManagerActor : public TActorBootstrappedCreateResourceInfoExchanger(config.GetInfoExchangerSettings()); - PublishResourceUsage("exchanger enabled"); - } else { - if (ResourceManager->ResourceInfoExchanger) { - Send(ResourceManager->ResourceInfoExchanger, new TEvents::TEvPoison); - ResourceManager->ResourceInfoExchanger = TActorId(); - } - ResourceManager->PublishResourcesByExchanger = false; - ResourceManager->ResourceSnapshotState.reset(); - PublishResourceUsage("exchanger disabled"); - } - } - #define FORCE_VALUE(name) if (!config.Has ## name ()) config.Set ## name(config.Get ## name()); FORCE_VALUE(ComputeActorsCount) FORCE_VALUE(ChannelBufferSize) @@ -762,14 +705,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped(); - - if (PublishResourcesByExchanger) { - ResourceManager->RequestClusterResourcesInfo( - [&snapshot](TVector&& resources) { - snapshot = std::move(resources); - }); - } + auto snapshot = ResourceManager->GetClusterResources(); HTML(str) { PRE() { @@ -823,9 +759,6 @@ class TKqpResourceManagerActor : public TActorBootstrappedResourceInfoExchanger = TActorId(); } ResourceManager->ResourceSnapshotState.reset(); - if (WbState.BoardPublisherActorId) { - Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison); - } TActor::PassAway(); } @@ -882,39 +815,14 @@ class TKqpResourceManagerActor : public TActorBootstrappedSetAvailable(ResourceManager->ScanQueryMemoryResource.Available()); } - if (PublishResourcesByExchanger) { - LOG_I("Send to publish resource usage for " - << "reason: " << reason - << ", payload: " << payload.ShortDebugString()); - WbState.LastPublishTime = now; - if (ResourceManager->ResourceInfoExchanger) { - Send(ResourceManager->ResourceInfoExchanger, - new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload))); - } - return; - } - - if (WbState.BoardPublisherActorId) { - LOG_I("Kill previous board publisher for '" << WbState.BoardPath - << "' at " << WbState.BoardPublisherActorId << ", reason: " << reason); - Send(WbState.BoardPublisherActorId, new TEvents::TEvPoison); - } - - WbState.BoardPublisherActorId = TActorId(); - - if (WbState.DomainNotFound) { - LOG_E("Can not find default state storage group for database " << WbState.Tenant); - return; - } - - auto boardPublisher = CreateBoardPublishActor(WbState.BoardPath, payload.SerializeAsString(), SelfId(), - /* ttlMs */ 0, /* reg */ true); - WbState.BoardPublisherActorId = Register(boardPublisher); - + LOG_I("Send to publish resource usage for " + << "reason: " << reason + << ", payload: " << payload.ShortDebugString()); WbState.LastPublishTime = now; - - LOG_I("Publish resource usage for '" << WbState.BoardPath << "' at " << WbState.BoardPublisherActorId - << ", reason: " << reason << ", payload: " << payload.ShortDebugString()); + if (ResourceManager->ResourceInfoExchanger) { + Send(ResourceManager->ResourceInfoExchanger, + new TEvKqpResourceInfoExchanger::TEvPublishResource(std::move(payload))); + } } private: @@ -927,7 +835,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped LastPublishTime; }; TWhiteBoardState WbState; @@ -940,7 +847,6 @@ class TKqpResourceManagerActor : public TActorBootstrapped ResourceManager; std::optional PublishResourcesScheduledAt; - bool PublishResourcesByExchanger; std::optional SelfDataCenterId; }; diff --git a/ydb/core/kqp/rm_service/kqp_rm_service.h b/ydb/core/kqp/rm_service/kqp_rm_service.h index 7c7bd8714676..78b519969f02 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_service.h +++ b/ydb/core/kqp/rm_service/kqp_rm_service.h @@ -229,11 +229,6 @@ class IKqpResourceManager : private TNonCopyable { }; -NActors::IActor* CreateTakeResourcesSnapshotActor( - const TString& boardPath, - std::function&&)>&& callback); - - struct TResourceSnapshotState { std::shared_ptr> Snapshot; TMutex Lock; diff --git a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp index 09d7c0536254..9c8dc4d318ef 100644 --- a/ydb/core/kqp/rm_service/kqp_rm_ut.cpp +++ b/ydb/core/kqp/rm_service/kqp_rm_ut.cpp @@ -94,14 +94,12 @@ TResourceBrokerConfig MakeResourceBrokerTestConfig() { return config; } -NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig( - bool EnablePublishResourcesByExchanger = false) { +NKikimrConfig::TTableServiceConfig::TResourceManager MakeKqpResourceManagerConfig() { NKikimrConfig::TTableServiceConfig::TResourceManager config; config.SetComputeActorsCount(100); config.SetPublishStatisticsIntervalSec(0); config.SetQueryMemoryLimit(1000); - config.SetEnablePublishResourcesByExchanger(EnablePublishResourcesByExchanger); auto* infoExchangerRetrySettings = config.MutableInfoExchangerSettings(); auto* exchangerSettings = infoExchangerRetrySettings->MutableExchangerSettings(); @@ -276,12 +274,9 @@ class KqpRm : public TTestBase { UNIT_TEST(NotEnoughMemory); UNIT_TEST(NotEnoughExecutionUnits); UNIT_TEST(ResourceBrokerNotEnoughResources); - UNIT_TEST(SingleSnapshotByStateStorage); UNIT_TEST(SingleSnapshotByExchanger); UNIT_TEST(Reduce); - UNIT_TEST(SnapshotSharingByStateStorage); UNIT_TEST(SnapshotSharingByExchanger); - UNIT_TEST(NodesMembershipByStateStorage); UNIT_TEST(NodesMembershipByExchanger); UNIT_TEST(DisonnectNodes); UNIT_TEST_SUITE_END(); @@ -291,15 +286,12 @@ class KqpRm : public TTestBase { void NotEnoughMemory(); void NotEnoughExecutionUnits(); void ResourceBrokerNotEnoughResources(); - void Snapshot(bool byExchanger); - void SingleSnapshotByStateStorage(); + void Snapshot(); void SingleSnapshotByExchanger(); void Reduce(); - void SnapshotSharing(bool byExchanger); - void SnapshotSharingByStateStorage(); + void SnapshotSharing(); void SnapshotSharingByExchanger(); - void NodesMembership(bool byExchanger); - void NodesMembershipByStateStorage(); + void NodesMembership(); void NodesMembershipByExchanger(); void DisonnectNodes(); @@ -450,8 +442,8 @@ void KqpRm::ResourceBrokerNotEnoughResources() { AssertResourceBrokerSensors(0, 1000, 0, 0, 1); } -void KqpRm::Snapshot(bool byExchanger) { - StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)}); +void KqpRm::Snapshot() { + StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()}); NKikimr::TActorSystemStub stub; auto rm = GetKqpResourceManager(ResourceManagers.front().NodeId()); @@ -490,12 +482,8 @@ void KqpRm::Snapshot(bool byExchanger) { CheckSnapshot(0, {{1000, 100}, {1000, 100}}, rm); } -void KqpRm::SingleSnapshotByStateStorage() { - Snapshot(false); -} - void KqpRm::SingleSnapshotByExchanger() { - Snapshot(true); + Snapshot(); } void KqpRm::Reduce() { @@ -537,8 +525,8 @@ void KqpRm::Reduce() { AssertResourceBrokerSensors(0, 30, 0, 0, 1); } -void KqpRm::SnapshotSharing(bool byExchanger) { - StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)}); +void KqpRm::SnapshotSharing() { + StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()}); NKikimr::TActorSystemStub stub; auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); @@ -607,16 +595,12 @@ void KqpRm::SnapshotSharing(bool byExchanger) { } } -void KqpRm::SnapshotSharingByStateStorage() { - SnapshotSharing(false); -} - void KqpRm::SnapshotSharingByExchanger() { - SnapshotSharing(true); + SnapshotSharing(); } -void KqpRm::NodesMembership(bool byExchanger) { - StartRms({MakeKqpResourceManagerConfig(byExchanger), MakeKqpResourceManagerConfig(byExchanger)}); +void KqpRm::NodesMembership() { + StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()}); NKikimr::TActorSystemStub stub; auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); @@ -641,16 +625,13 @@ void KqpRm::NodesMembership(bool byExchanger) { CheckSnapshot(0, {{1000, 100}}, rm_first); } -void KqpRm::NodesMembershipByStateStorage() { - NodesMembership(false); -} void KqpRm::NodesMembershipByExchanger() { - NodesMembership(true); + NodesMembership(); } void KqpRm::DisonnectNodes() { - StartRms({MakeKqpResourceManagerConfig(true), MakeKqpResourceManagerConfig(true)}); + StartRms({MakeKqpResourceManagerConfig(), MakeKqpResourceManagerConfig()}); NKikimr::TActorSystemStub stub; auto rm_first = GetKqpResourceManager(ResourceManagers[0].NodeId()); diff --git a/ydb/core/kqp/rm_service/ya.make b/ydb/core/kqp/rm_service/ya.make index 2c2133cd98e2..b223d09349bb 100644 --- a/ydb/core/kqp/rm_service/ya.make +++ b/ydb/core/kqp/rm_service/ya.make @@ -1,7 +1,6 @@ LIBRARY() SRCS( - kqp_resource_tracker.cpp kqp_resource_estimation.cpp kqp_resource_info_exchanger.cpp kqp_rm_service.cpp