From 99e7632f8d6e928d9d63b4efeb4af3179c9cf445 Mon Sep 17 00:00:00 2001 From: Daniil Komarevtsev Date: Sun, 5 Jan 2025 08:14:07 +0000 Subject: [PATCH] Add lagging agent concept to the volume --- .../libs/storage/api/disk_registry.h | 4 + cloud/blockstore/libs/storage/api/partition.h | 34 +- .../disk_registry/disk_registry_actor.cpp | 12 + .../libs/storage/partition_nonrepl/config.h | 20 +- .../migration_timeout_calculator_ut.cpp | 1 + .../part_mirror_resync_ut.cpp | 9 +- .../part_mirror_state_ut.cpp | 9 +- .../partition_nonrepl/part_mirror_ut.cpp | 9 +- .../part_nonrepl_migration_ut.cpp | 12 +- .../part_nonrepl_rdma_ut.cpp | 11 +- .../partition_nonrepl/part_nonrepl_ut.cpp | 9 +- .../partition_nonrepl/resync_range_ut.cpp | 11 +- .../blockstore/libs/storage/protos/disk.proto | 62 +++ .../libs/storage/protos_ydb/volume.proto | 6 + .../testlib/disk_registry_proxy_mock.h | 98 ++++- .../libs/storage/testlib/test_env_state.h | 4 +- .../volume/actors/shadow_disk_actor.cpp | 3 + .../libs/storage/volume/model/helpers.cpp | 239 ++++++++++ .../libs/storage/volume/model/helpers.h | 36 ++ .../libs/storage/volume/model/ya.make | 1 + .../libs/storage/volume/testlib/test_env.cpp | 104 +++-- .../libs/storage/volume/testlib/test_env.h | 7 +- .../blockstore/libs/storage/volume/ut/ya.make | 1 + .../libs/storage/volume/volume_actor.cpp | 27 +- .../libs/storage/volume/volume_actor.h | 24 + .../volume/volume_actor_allocatedisk.cpp | 15 +- .../volume/volume_actor_lagging_agents.cpp | 416 ++++++++++++++++++ .../volume/volume_actor_reallocatedisk.cpp | 9 + .../storage/volume/volume_actor_startstop.cpp | 2 + .../volume/volume_actor_updateconfig.cpp | 10 +- .../storage/volume/volume_events_private.h | 82 ++++ .../volume/volume_lagging_agent_ut.cpp | 403 +++++++++++++++++ .../libs/storage/volume/volume_state.cpp | 28 ++ .../libs/storage/volume/volume_state.h | 4 + .../libs/storage/volume/volume_tx.h | 49 +++ .../libs/storage/volume/volume_ut_stats.cpp | 2 +- cloud/blockstore/libs/storage/volume/ya.make | 1 + 37 files changed, 1681 insertions(+), 93 deletions(-) create mode 100644 cloud/blockstore/libs/storage/volume/model/helpers.cpp create mode 100644 cloud/blockstore/libs/storage/volume/model/helpers.h create mode 100644 cloud/blockstore/libs/storage/volume/volume_actor_lagging_agents.cpp create mode 100644 cloud/blockstore/libs/storage/volume/volume_lagging_agent_ut.cpp diff --git a/cloud/blockstore/libs/storage/api/disk_registry.h b/cloud/blockstore/libs/storage/api/disk_registry.h index 2ec01040517..7f7513b90d9 100644 --- a/cloud/blockstore/libs/storage/api/disk_registry.h +++ b/cloud/blockstore/libs/storage/api/disk_registry.h @@ -53,6 +53,7 @@ namespace NCloud::NBlockStore::NStorage { xxx(GetCheckpointDataState, __VA_ARGS__) \ xxx(SetCheckpointDataState, __VA_ARGS__) \ xxx(GetAgentNodeId, __VA_ARGS__) \ + xxx(AddLaggingDevices, __VA_ARGS__) \ // BLOCKSTORE_DISK_REGISTRY_REQUESTS_PROTO // requests forwarded from service to disk_registry @@ -211,6 +212,9 @@ struct TEvDiskRegistry EvQueryAgentsInfoRequest = EvBegin + 75, EvQueryAgentsInfoResponse = EvBegin + 76, + EvAddLaggingDevicesRequest = EvBegin + 77, + EvAddLaggingDevicesResponse = EvBegin + 78, + EvEnd }; diff --git a/cloud/blockstore/libs/storage/api/partition.h b/cloud/blockstore/libs/storage/api/partition.h index a53fbae21e3..3082a074eab 100644 --- a/cloud/blockstore/libs/storage/api/partition.h +++ b/cloud/blockstore/libs/storage/api/partition.h @@ -15,7 +15,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition { xxx(Drain, __VA_ARGS__) \ // BLOCKSTORE_PARTITION_REQUESTS -// requests forwarded from service to partion +// requests forwarded from service to partition #define BLOCKSTORE_PARTITION_REQUESTS_FWD_SERVICE(xxx, ...) \ xxx(ReadBlocks, __VA_ARGS__) \ xxx(WriteBlocks, __VA_ARGS__) \ @@ -94,6 +94,25 @@ struct TEvPartition {} }; + // + // AddLaggingAgent + // + + struct TAddLaggingAgentRequest + { + // 0 - for main devices; 1,2 - for mirror replicas + ui32 ReplicaIndex; + TString AgentId; + TAddLaggingAgentRequest(ui32 replicaIndex, TString agentId) + : ReplicaIndex(replicaIndex) + , AgentId(std::move(agentId)) + {} + }; + + struct TAddLaggingAgentResponse + { + }; + // // Events declaration // @@ -115,6 +134,9 @@ struct TEvPartition EvGarbageCollectorCompleted = EvBegin + 8, + EvAddLaggingAgentRequest = EvBegin + 9, + EvAddLaggingAgentResponse = EvBegin + 10, + EvEnd }; @@ -132,6 +154,16 @@ struct TEvPartition TGarbageCollectorCompleted, EvGarbageCollectorCompleted >; + + using TEvAddLaggingAgentRequest = TRequestEvent< + TAddLaggingAgentRequest, + EvAddLaggingAgentRequest + >; + + using TEvAddLaggingAgentResponse = TResponseEvent< + TAddLaggingAgentResponse, + EvAddLaggingAgentResponse + >; }; } // namespace NCloud::NBlockStore::NStorage::NPartition diff --git a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp index 5f251d85cb0..855245960dc 100644 --- a/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp +++ b/cloud/blockstore/libs/storage/disk_registry/disk_registry_actor.cpp @@ -588,6 +588,18 @@ void TDiskRegistryActor::HandleOperationCompleted( Actors.erase(ev->Sender); } +void TDiskRegistryActor::HandleAddLaggingDevices( + const TEvDiskRegistry::TEvAddLaggingDevicesRequest::TPtr& ev, + const TActorContext& ctx) +{ + Y_UNUSED(ev); + Y_UNUSED(ctx); + + BLOCKSTORE_DISK_REGISTRY_COUNTER(AddLaggingDevices); + + // TODO(komarevtsev-d): Implement this. +} + //////////////////////////////////////////////////////////////////////////////// STFUNC(TDiskRegistryActor::StateBoot) diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/config.h b/cloud/blockstore/libs/storage/partition_nonrepl/config.h index 0aee46709a8..815ba48d4e8 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/config.h +++ b/cloud/blockstore/libs/storage/partition_nonrepl/config.h @@ -61,6 +61,8 @@ class TNonreplicatedPartitionConfig const NActors::TActorId ParentActorId; const bool MuteIOErrors; const THashSet FreshDeviceIds; + // List of devices that have outdated data. Can only appear on mirror disks. + const THashSet LaggingDeviceIds; const TDuration MaxTimedOutDeviceStateDuration; const bool MaxTimedOutDeviceStateDurationOverridden; const bool UseSimpleMigrationBandwidthLimiter; @@ -77,6 +79,7 @@ class TNonreplicatedPartitionConfig NActors::TActorId parentActorId, bool muteIOErrors, THashSet freshDeviceIds, + THashSet laggingDeviceIds, TDuration maxTimedOutDeviceStateDuration, bool maxTimedOutDeviceStateDurationOverridden, bool useSimpleMigrationBandwidthLimiter) @@ -88,6 +91,7 @@ class TNonreplicatedPartitionConfig , ParentActorId(std::move(parentActorId)) , MuteIOErrors(muteIOErrors) , FreshDeviceIds(std::move(freshDeviceIds)) + , LaggingDeviceIds(std::move(laggingDeviceIds)) , MaxTimedOutDeviceStateDuration(maxTimedOutDeviceStateDuration) , MaxTimedOutDeviceStateDurationOverridden(maxTimedOutDeviceStateDurationOverridden) , UseSimpleMigrationBandwidthLimiter(useSimpleMigrationBandwidthLimiter) @@ -111,6 +115,13 @@ class TNonreplicatedPartitionConfig } } + THashSet laggingDeviceIds; + for (const auto& device: devices) { + if (LaggingDeviceIds.contains(device.GetDeviceUUID())) { + laggingDeviceIds.insert(device.GetDeviceUUID()); + } + } + return std::make_shared( std::move(devices), IOMode, @@ -120,6 +131,7 @@ class TNonreplicatedPartitionConfig ParentActorId, MuteIOErrors, std::move(freshDeviceIds), + std::move(laggingDeviceIds), MaxTimedOutDeviceStateDuration, MaxTimedOutDeviceStateDurationOverridden, UseSimpleMigrationBandwidthLimiter @@ -176,6 +188,11 @@ class TNonreplicatedPartitionConfig return FreshDeviceIds; } + const auto& GetLaggingDeviceIds() const + { + return LaggingDeviceIds; + } + auto GetMaxTimedOutDeviceStateDuration() const { return MaxTimedOutDeviceStateDuration; @@ -229,7 +246,8 @@ class TNonreplicatedPartitionConfig Y_UNUSED(relativeRange); return !Devices[i].GetDeviceUUID() - || FreshDeviceIds.contains(Devices[i].GetDeviceUUID()); + || FreshDeviceIds.contains(Devices[i].GetDeviceUUID()) + || LaggingDeviceIds.contains(Devices[i].GetDeviceUUID()); }); } diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp index 17d0c7b5aa6..f2cf17fbd42 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/migration_timeout_calculator_ut.cpp @@ -181,6 +181,7 @@ TNonreplicatedPartitionConfigPtr MakePartitionConfig( NActors::TActorId(), false, // muteIOErrors THashSet(), // freshDeviceIds + THashSet(), // laggingDeviceIds TDuration::Zero(), // maxTimedOutDeviceStateDuration false, // maxTimedOutDeviceStateDurationOverridden useSimpleMigrationBandwidthLimiter); diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_ut.cpp index 94426befa8b..0b7d717afb3 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_resync_ut.cpp @@ -266,11 +266,12 @@ struct TTestEnv // only SSD/HDD distinction matters NProto::STORAGE_MEDIA_SSD_MIRROR3}, VolumeActorId, - false, // muteIOErrors + false, // muteIOErrors std::move(freshDeviceIds), - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - true // useSimpleMigrationBandwidthLimiter + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + true // useSimpleMigrationBandwidthLimiter ); for (auto& replica: Replicas) { diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state_ut.cpp index 574bd18e914..2f9610b70cb 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_state_ut.cpp @@ -60,11 +60,12 @@ struct TEnv 4_KB, volumeInfo, NActors::TActorId(), - false, // muteIOErrors + false, // muteIOErrors FreshDeviceIds, - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - true // useSimpleMigrationBandwidthLimiter + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + true // useSimpleMigrationBandwidthLimiter ); { diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp index 76d6ce716c0..9608c5d3f82 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_mirror_ut.cpp @@ -200,11 +200,12 @@ struct TTestEnv // only SSD/HDD distinction matters NProto::STORAGE_MEDIA_SSD_MIRROR3}, VolumeActorId, - false, // muteIOErrors + false, // muteIOErrors std::move(freshDeviceIds), - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - true // useSimpleMigrationBandwidthLimiter + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + true // useSimpleMigrationBandwidthLimiter ); for (auto& replica: replicas) { diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp index 6a1d109c8d8..68019d98b31 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_migration_ut.cpp @@ -202,12 +202,12 @@ struct TTestEnv // only SSD/HDD distinction matters NProto::STORAGE_MEDIA_SSD_NONREPLICATED}, VolumeActorId, - false, // muteIOErrors - THashSet(), // freshDeviceIds - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - false - ); + false, // muteIOErrors + THashSet(), // freshDeviceIds + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + false); auto part = std::make_unique( std::move(config), diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp index 5d37661830c..094c04fe721 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_rdma_ut.cpp @@ -129,11 +129,12 @@ struct TTestEnv // only SSD/HDD distinction matters NProto::STORAGE_MEDIA_SSD_NONREPLICATED}, VolumeActorId, - false, // muteIOErrors - THashSet(), // freshDeviceIds - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - false // useSimpleMigrationBandwidthLimiter + false, // muteIOErrors + THashSet(), // freshDeviceIds + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + false // useSimpleMigrationBandwidthLimiter ); auto part = std::make_unique( diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp index c4480cc86fe..d7fbfca7aba 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/part_nonrepl_ut.cpp @@ -131,10 +131,11 @@ struct TTestEnv TNonreplicatedPartitionConfig::TVolumeInfo{Now(), params.MediaKind}, VolumeActorId, params.MuteIOErrors, - THashSet(), // freshDeviceIds - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - false // useSimpleMigrationBandwidthLimiter + THashSet(), // freshDeviceIds + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + false // useSimpleMigrationBandwidthLimiter ); auto part = std::make_unique( diff --git a/cloud/blockstore/libs/storage/partition_nonrepl/resync_range_ut.cpp b/cloud/blockstore/libs/storage/partition_nonrepl/resync_range_ut.cpp index dd7e415b76e..6991e304e0d 100644 --- a/cloud/blockstore/libs/storage/partition_nonrepl/resync_range_ut.cpp +++ b/cloud/blockstore/libs/storage/partition_nonrepl/resync_range_ut.cpp @@ -134,11 +134,12 @@ struct TTestEnv // only SSD/HDD distinction matters NProto::STORAGE_MEDIA_SSD_NONREPLICATED}, VolumeActorId, - false, // muteIOErrors - THashSet(), // freshDeviceIds - TDuration::Zero(), // maxTimedOutDeviceStateDuration - false, // maxTimedOutDeviceStateDurationOverridden - true // useSimpleMigrationBandwidthLimiter + false, // muteIOErrors + THashSet(), // freshDeviceIds + THashSet(), // laggingDeviceIds + TDuration::Zero(), // maxTimedOutDeviceStateDuration + false, // maxTimedOutDeviceStateDurationOverridden + true // useSimpleMigrationBandwidthLimiter ); auto part = std::make_unique( diff --git a/cloud/blockstore/libs/storage/protos/disk.proto b/cloud/blockstore/libs/storage/protos/disk.proto index 8bdc86bfcd8..ed54222fdbf 100644 --- a/cloud/blockstore/libs/storage/protos/disk.proto +++ b/cloud/blockstore/libs/storage/protos/disk.proto @@ -278,6 +278,9 @@ message TDiskConfig // A log of important events in the life of this disk. repeated TDiskHistoryItem History = 17; + + // A list of devices that are lagging begind on writes. + repeated TLaggingDevice LaggingDevices = 18; } //////////////////////////////////////////////////////////////////////////////// @@ -417,6 +420,41 @@ message TAgentStats //////////////////////////////////////////////////////////////////////////////// +message TLaggingDevice +{ + // UUID of the lagging device. + string DeviceUUID = 1; + + // Index of the lagging device in the replica. + uint32 RowIndex = 2; +} + +//////////////////////////////////////////////////////////////////////////////// + +message TLaggingAgent +{ + // Agent id. + string AgentId = 1; + + // Index of the mirror disk replica. + // 0 - main devices + // 1,2 - replica devices + uint32 ReplicaIndex = 2; + + // A list of devices that belong to the agent. + repeated TLaggingDevice Devices = 3; +} + +//////////////////////////////////////////////////////////////////////////////// + +message TLaggingAgentsInfo +{ + // A list of agents that lagging behind on writes. + repeated TLaggingAgent Agents = 1; +} + +//////////////////////////////////////////////////////////////////////////////// + message TDiskRegistryAgentListRequestParams { repeated string AgentIds = 1; @@ -635,6 +673,9 @@ message TAllocateDiskResponse // New devices used instead of recently replaced ones. repeated string DeviceReplacementUUIDs = 8; + + // Devices that had been lagging. + repeated TLaggingDevice RemovedLaggingDevices = 9; } //////////////////////////////////////////////////////////////////////////////// @@ -1662,6 +1703,27 @@ message TGetAgentNodeIdResponse bool Connected = 4; } +//////////////////////////////////////////////////////////////////////////////// +// Report that some of the devices were lagging. + +message TAddLaggingDevicesRequest +{ + // Optional request headers. + THeaders Headers = 1; + + // Disk identifier to perform operations on. + string DiskId = 2; + + // Devices that has been lagging. + repeated TLaggingDevice LaggingDevices = 3; +} + +message TAddLaggingDevicesResponse +{ + // Optional error, set only if error happened. + NCloud.NProto.TError Error = 1; +} + //////////////////////////////////////////////////////////////////////////////// // Get dependent disks diff --git a/cloud/blockstore/libs/storage/protos_ydb/volume.proto b/cloud/blockstore/libs/storage/protos_ydb/volume.proto index 3d784a3006a..90df08528f3 100644 --- a/cloud/blockstore/libs/storage/protos_ydb/volume.proto +++ b/cloud/blockstore/libs/storage/protos_ydb/volume.proto @@ -67,6 +67,12 @@ message TVolumeMeta // We don't allow clients with old sequential number to mount disk for read/write // in order to prevent data corruption during disk filling. uint64 FillSeqNumber = 16; + + // A list of agents that lagging behind on writes. Used only for mirror + // disks. An agent can exit this state if it starts to respond to requests, + // or if the volume has been restarted/reallocated (i.e. the partition has + // restarted), or if the DR has replaced all lagging devices. + TLaggingAgentsInfo LaggingAgentsInfo = 17; } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h b/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h index 2e79fe56936..83b35d3cee9 100644 --- a/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h +++ b/cloud/blockstore/libs/storage/testlib/disk_registry_proxy_mock.h @@ -34,10 +34,12 @@ class TDiskRegistryProxyMock final TDiskRegistryStatePtr State; public: - TDiskRegistryProxyMock(TDiskRegistryStatePtr state) + explicit TDiskRegistryProxyMock(TDiskRegistryStatePtr state) : TActor(&TThis::StateWork) , State(std::move(state)) - {} + { + State->DeviceIsAllocated.resize(State->Devices.size(), false); + } private: STFUNC(StateWork) @@ -136,6 +138,10 @@ class TDiskRegistryProxyMock final TEvDiskRegistry::TEvDeallocateCheckpointRequest, HandleDeallocateCheckpoint); + HFunc( + TEvDiskRegistry::TEvAddLaggingDevicesRequest, + HandleAddLaggingDevices); + HFunc( TEvService::TEvCmsActionRequest, HandleCmsAction); @@ -144,6 +150,7 @@ class TDiskRegistryProxyMock final TEvDiskRegistryProxy::TEvGetDrTabletInfoRequest, HandleGetDrTabletInfo); + IgnoreFunc(NKikimr::TEvLocal::TEvTabletMetrics); default: @@ -184,6 +191,26 @@ class TDiskRegistryProxyMock final } } + const NProto::TDeviceConfig* AllocateNextDevice(i32 prevNodeId) + { + for (int i = 0; i < State->Devices.ysize(); i++) { + if (State->DeviceIsAllocated[i]) { + continue; + } + + if (State->AllocateDiskReplicasOnDifferentNodes && + static_cast(State->Devices[i].GetNodeId()) <= prevNodeId) + { + continue; + } + + State->DeviceIsAllocated[i] = true; + return &State->Devices[i]; + } + + return nullptr; + } + void HandleAllocateDisk( const TEvDiskRegistry::TEvAllocateDiskRequest::TPtr& ev, const NActors::TActorContext& ctx) @@ -217,6 +244,7 @@ class TDiskRegistryProxyMock final disk.PoolName = msg->Record.GetPoolName(); disk.MediaKind = msg->Record.GetStorageMediaKind(); + disk.Replicas.resize(State->ReplicaCount); disk.Migrations.clear(); ui64 bytes = (1 + State->ReplicaCount) * msg->Record.GetBlocksCount() @@ -225,36 +253,39 @@ class TDiskRegistryProxyMock final ui32 i = 0; while (bytes) { ui64 deviceBytes = 0; + i32 prevNodeId = -1; if (i < disk.Devices.size()) { - deviceBytes = Min(bytes, disk.Devices[i].GetBlocksCount() - * disk.Devices[i].GetBlockSize()); + deviceBytes = + Min(bytes, + disk.Devices[i].GetBlocksCount() * + disk.Devices[i].GetBlockSize()); } else { - if (State->NextDeviceIdx >= State->Devices.size()) { + const auto* device = AllocateNextDevice(prevNodeId); + if (!device) { break; } - disk.Devices.push_back( - State->Devices[State->NextDeviceIdx++]); - const auto& device = disk.Devices.back(); - deviceBytes = device.GetBlocksCount() * device.GetBlockSize(); + disk.Devices.push_back(*device); + deviceBytes = device->GetBlocksCount() * device->GetBlockSize(); + prevNodeId = static_cast(device->GetNodeId()); } - disk.Replicas.resize(State->ReplicaCount); - for (auto& replica: disk.Replicas) { if (i < replica.size()) { - deviceBytes += Min(bytes, replica[i].GetBlocksCount() - * replica[i].GetBlockSize()); + deviceBytes += + Min(bytes, + replica[i].GetBlocksCount() * + replica[i].GetBlockSize()); } else { - if (State->NextDeviceIdx >= State->Devices.size()) { + const auto* device = AllocateNextDevice(prevNodeId); + if (!device) { break; } - replica.push_back( - State->Devices[State->NextDeviceIdx++]); - const auto& device = replica.back(); + replica.push_back(*device); deviceBytes += - device.GetBlocksCount() * device.GetBlockSize(); + device->GetBlocksCount() * device->GetBlockSize(); + prevNodeId = static_cast(device->GetNodeId()); } } @@ -281,6 +312,11 @@ class TDiskRegistryProxyMock final *response->Record.AddDeviceReplacementUUIDs() = deviceId; } + for (const auto& laggingDevice: disk.LaggingDevices) { + *response->Record.AddRemovedLaggingDevices() = laggingDevice; + } + disk.LaggingDevices.clear(); + if (bytes) { response->Record.MutableError()->CopyFrom( MakeError(E_BS_OUT_OF_SPACE, "not enough available devices") @@ -964,6 +1000,32 @@ class TDiskRegistryProxyMock final TEvDiskRegistry::TEvDeallocateCheckpointResponse>()); } + void HandleAddLaggingDevices( + const TEvDiskRegistry::TEvAddLaggingDevicesRequest::TPtr& ev, + const NActors::TActorContext& ctx) + { + const auto* msg = ev->Get(); + const auto& diskId = msg->Record.GetDiskId(); + auto* diskState = State->Disks.FindPtr(diskId); + if (!diskState) { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + MakeError(E_NOT_FOUND, "Disk not found"))); + return; + } + + for (const auto& laggingDevice: msg->Record.GetLaggingDevices()) { + diskState->LaggingDevices.push_back(laggingDevice); + } + + NCloud::Reply( + ctx, + *ev, + std::make_unique()); + } + void HandleCmsAction( const TEvService::TEvCmsActionRequest::TPtr& ev, const NActors::TActorContext& ctx) diff --git a/cloud/blockstore/libs/storage/testlib/test_env_state.h b/cloud/blockstore/libs/storage/testlib/test_env_state.h index 52b976859d1..250e453e62a 100644 --- a/cloud/blockstore/libs/storage/testlib/test_env_state.h +++ b/cloud/blockstore/libs/storage/testlib/test_env_state.h @@ -32,6 +32,7 @@ struct TDiskRegistryState: TAtomicRefCount TInstant IOModeTs; TMap Migrations; TVector> Replicas; + TVector LaggingDevices; bool MuteIOErrors = false; TString PoolName; NCloud::NProto::EStorageMediaKind MediaKind = {}; @@ -51,8 +52,8 @@ struct TDiskRegistryState: TAtomicRefCount TSet DisksMarkedForCleanup; TMap PlacementGroups; TVector Devices; + TVector DeviceIsAllocated; TMap MigrationDevices; - ui32 NextDeviceIdx = 0; ui32 CurrentErrorCode = S_OK; EMigrationMode MigrationMode = EMigrationMode::Disabled; ui32 ReplicaCount = 0; @@ -61,6 +62,7 @@ struct TDiskRegistryState: TAtomicRefCount TVector> AgentStates; + bool AllocateDiskReplicasOnDifferentNodes = false; bool WritableState = false; }; diff --git a/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp b/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp index bd5b001963b..308e9404fc3 100644 --- a/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp +++ b/cloud/blockstore/libs/storage/volume/actors/shadow_disk_actor.cpp @@ -619,6 +619,8 @@ bool TShadowDiskActor::OnMessage( TEvService::TEvReadBlocksLocalRequest, HandleReadBlocks); + IgnoreFunc(TEvVolumePrivate::TEvDeviceTimeoutedRequest); + // Write/zero request. case TEvService::TEvWriteBlocksRequest::EventType: { return HandleWriteZeroBlocks( @@ -803,6 +805,7 @@ void TShadowDiskActor::CreateShadowDiskConfig() SelfId(), // need to handle TEvRdmaUnavailable, TEvReacquireDisk true, // muteIOErrors THashSet(), // freshDeviceIds + THashSet(), // laggingDeviceIds TDuration(), // maxTimedOutDeviceStateDuration false, // maxTimedOutDeviceStateDurationOverridden true // useSimpleMigrationBandwidthLimiter diff --git a/cloud/blockstore/libs/storage/volume/model/helpers.cpp b/cloud/blockstore/libs/storage/volume/model/helpers.cpp new file mode 100644 index 00000000000..e457eee6eec --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/model/helpers.cpp @@ -0,0 +1,239 @@ +#include "helpers.h" + +#include +#include + +#include +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage { +namespace { + +using google::protobuf::RepeatedPtrField; + +// Finds index of the replica by device UUID, without searching through +// migrations. +std::pair*> +FindReplicaDevices(const NProto::TVolumeMeta& meta, const TString& deviceUUID) +{ + auto deviceMatcher = [&deviceUUID](const auto& device) + { + return device.GetDeviceUUID() == deviceUUID; + }; + + if (AnyOf(meta.GetDevices(), deviceMatcher)) { + return std::make_pair(0, &meta.GetDevices()); + } + + for (int i = 0; i < meta.GetReplicas().size(); i++) { + const auto& replica = meta.GetReplicas()[i]; + if (AnyOf(replica.GetDevices(), deviceMatcher)) { + return std::make_pair(i + 1, &replica.GetDevices()); + } + } + + return std::make_pair(0, nullptr); +} + +// Finds index of the device by device UUID, without searching through +// migrations. +std::optional FindDeviceRowIndex( + const NProto::TVolumeMeta& meta, + const TString& deviceUUID) +{ + auto deviceMatcher = [&deviceUUID](const auto& device) + { + return device.GetDeviceUUID() == deviceUUID; + }; + + auto it = FindIf(meta.GetDevices(), deviceMatcher); + if (it != meta.GetDevices().end()) { + return std::distance(meta.GetDevices().begin(), it); + } + + for (const auto& replica: meta.GetReplicas()) { + auto it = FindIf(replica.GetDevices(), deviceMatcher); + if (it != replica.GetDevices().end()) { + return std::distance(replica.GetDevices().begin(), it); + } + } + + return std::nullopt; +} + +} // namespace + +bool TLaggingDeviceIndexCmp::operator()( + const NProto::TLaggingDevice& lhs, + const NProto::TLaggingDevice& rhs) const +{ + return lhs.GetRowIndex() < rhs.GetRowIndex(); +} + +const NProto::TDeviceConfig* FindDeviceConfig( + const NProto::TVolumeMeta& meta, + const TString& deviceUUID) +{ + auto deviceMatcher = [&deviceUUID](const auto& device) + { + return device.GetDeviceUUID() == deviceUUID; + }; + + const NProto::TDeviceConfig* timeoutedDeviceConfig = nullptr; + timeoutedDeviceConfig = FindIfPtr(meta.GetDevices(), deviceMatcher); + if (timeoutedDeviceConfig) { + return timeoutedDeviceConfig; + } + + for (const auto& replica: meta.GetReplicas()) { + timeoutedDeviceConfig = FindIfPtr(replica.GetDevices(), deviceMatcher); + if (timeoutedDeviceConfig) { + return timeoutedDeviceConfig; + } + } + + for (const auto& migration: meta.GetMigrations()) { + if (deviceMatcher(migration.GetTargetDevice())) { + return &migration.GetTargetDevice(); + } + } + + return nullptr; +} + +std::optional GetAgentDevicesIndexes( + const NProto::TVolumeMeta& meta, + ui32 agentNodeId, + TVector& laggingDevices) +{ + std::optional replicaIndex; + const RepeatedPtrField* replicaDevices = nullptr; + const auto deviceMatcher = [agentNodeId](const NProto::TDeviceConfig& device) + { + return device.GetNodeId() == agentNodeId; + }; + if (AnyOf( + meta.GetDevices().begin(), + meta.GetDevices().end(), + deviceMatcher)) + { + replicaIndex = 0; + replicaDevices = &meta.GetDevices(); + } + + for (int i = 0; !replicaIndex && i < meta.GetReplicas().size(); i++) { + if (AnyOf( + meta.GetReplicas()[i].GetDevices().begin(), + meta.GetReplicas()[i].GetDevices().end(), + deviceMatcher)) + { + replicaIndex = i + 1; + replicaDevices = &meta.GetReplicas()[i].GetDevices(); + break; + } + } + + if (!replicaIndex) { + for (const auto& migration: meta.GetMigrations()) { + if (deviceMatcher(migration.GetTargetDevice())) { + auto [index, devices] = + FindReplicaDevices(meta, migration.GetSourceDeviceId()); + if (devices) { + replicaIndex = index; + replicaDevices = devices; + } + } + } + } + + // There is no devices from desired agent. + if (!replicaIndex || !replicaDevices) { + return std::nullopt; + } + + for (int i = 0; i < replicaDevices->size(); i++) { + const auto& device = (*replicaDevices)[i]; + if (deviceMatcher(device)) { + NProto::TLaggingDevice laggingDevice; + laggingDevice.SetRowIndex(i); + laggingDevice.SetDeviceUUID(device.GetDeviceUUID()); + laggingDevices.push_back(std::move(laggingDevice)); + } + } + + for (int i = 0; i < meta.GetMigrations().size(); i++) { + const auto& targetDevice = meta.GetMigrations()[i].GetTargetDevice(); + if (deviceMatcher(targetDevice)) { + NProto::TLaggingDevice laggingDevice; + auto sourceDeviceIndex = FindDeviceRowIndex( + meta, + meta.GetMigrations()[i].GetSourceDeviceId()); + if (!sourceDeviceIndex) { + ReportDiskAllocationFailure( + TStringBuilder() + << "Migration source device " + << meta.GetMigrations()[i].GetSourceDeviceId() + << " doesn't belong to the disk " + << meta.GetConfig().GetDiskId() + << ". Target device: " << targetDevice.GetDeviceUUID()); + continue; + } + laggingDevice.SetRowIndex(*sourceDeviceIndex); + laggingDevice.SetDeviceUUID(targetDevice.GetDeviceUUID()); + laggingDevices.push_back(std::move(laggingDevice)); + } + } + + return replicaIndex; +} + +TSet ReplicaIndexesWithFreshDevices( + const NProto::TVolumeMeta& meta, + NProto::TLaggingDevice device) +{ + TSet result; + auto it = Find( + meta.GetFreshDeviceIds(), + meta.GetDevices()[device.GetRowIndex()].GetDeviceUUID()); + if (it != meta.GetFreshDeviceIds().end()) { + result.insert(0); + } + + const auto& replicas = meta.GetReplicas(); + for (int i = 0; i < replicas.size(); i++) { + auto it = Find( + meta.GetFreshDeviceIds(), + replicas[i].GetDevices()[device.GetRowIndex()].GetDeviceUUID()); + if (it != meta.GetFreshDeviceIds().end()) { + result.insert(i + 1); + } + } + return result; +} + +void RemoveLaggingDevicesFromMeta( + NProto::TVolumeMeta& meta, + const TVector laggingDeviceIds) +{ + for (auto& agent: *meta.MutableLaggingAgentsInfo()->MutableAgents()) { + EraseIf( + *agent.MutableDevices(), + [&laggingDeviceIds](const NProto::TLaggingDevice& laggingDevice) + { + auto it = + Find(laggingDeviceIds, laggingDevice.GetDeviceUUID()); + return it != laggingDeviceIds.end(); + }); + } + EraseIf( + *meta.MutableLaggingAgentsInfo()->MutableAgents(), + [](const NProto::TLaggingAgent& laggingAgent) + { return laggingAgent.GetDevices().empty(); }); + if (meta.GetLaggingAgentsInfo().GetAgents().empty()) { + meta.MutableLaggingAgentsInfo()->Clear(); + } +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/model/helpers.h b/cloud/blockstore/libs/storage/volume/model/helpers.h new file mode 100644 index 00000000000..6c949f53b71 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/model/helpers.h @@ -0,0 +1,36 @@ +#pragma once + +#include "cloud/blockstore/libs/storage/protos_ydb/volume.pb.h" + +#include + +namespace NCloud::NBlockStore::NStorage { + +struct TLaggingDeviceIndexCmp +{ + bool operator()( + const NProto::TLaggingDevice& lhs, + const NProto::TLaggingDevice& rhs) const; +}; + +[[nodiscard]] const NProto::TDeviceConfig* FindDeviceConfig( + const NProto::TVolumeMeta& meta, const TString& deviceUUID); + +[[nodiscard]] std::optional GetAgentDevicesIndexes( + const NProto::TVolumeMeta& meta, + ui32 agentNodeId, + TVector& laggingDevices); + +[[nodiscard]] TSet ReplicaIndexesWithFreshDevices( + const NProto::TVolumeMeta& meta, + NProto::TLaggingDevice device); + +[[nodiscard]] bool CheckReplicasPlacementAreCorrect( + const NProto::TVolumeMeta& meta, + ui32 agentNodeId); + +void RemoveLaggingDevicesFromMeta( + NProto::TVolumeMeta& meta, + const TVector laggingDeviceIds); + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/model/ya.make b/cloud/blockstore/libs/storage/volume/model/ya.make index 05a558420e2..2931369f866 100644 --- a/cloud/blockstore/libs/storage/volume/model/ya.make +++ b/cloud/blockstore/libs/storage/volume/model/ya.make @@ -8,6 +8,7 @@ SRCS( checkpoint.cpp checkpoint_light.cpp client_state.cpp + helpers.cpp merge.cpp meta.cpp requests_inflight.cpp diff --git a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp index 87f58a1facc..61fe2a2dfda 100644 --- a/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp +++ b/cloud/blockstore/libs/storage/volume/testlib/test_env.cpp @@ -533,6 +533,18 @@ std::unique_ptr TVolumeClient::CreateGetS return request; } +std::unique_ptr +TVolumeClient::CreateDeviceTimeoutedRequest( + ui32 deviceIndex, + TString deviceUUID) +{ + auto request = + std::make_unique( + deviceIndex, + std::move(deviceUUID)); + return request; +} + std::unique_ptr TVolumeClient::CreateUpdateShadowDiskStateRequest( TString checkpointId, @@ -606,9 +618,10 @@ std::unique_ptr PrepareTestActorRuntime( TDiskRegistryStatePtr diskRegistryState, NProto::TFeaturesConfig featuresConfig, NRdma::IClientPtr rdmaClient, - TDiskAgentStatePtr diskAgentState) + TVector diskAgentStates) { - auto runtime = std::make_unique(1); + const ui32 agentCount = Max(diskAgentStates.size(), 1); + auto runtime = std::make_unique(agentCount); runtime->AppendToLogSettings( TBlockStoreComponents::START, @@ -620,6 +633,13 @@ std::unique_ptr PrepareTestActorRuntime( } // runtime->SetLogPriority(NLog::InvalidComponent, NLog::PRI_DEBUG); + runtime->SetRegistrationObserverFunc( + [] (auto& runtime, const auto& parentId, const auto& actorId) + { + Y_UNUSED(parentId); + runtime.EnableScheduleForActor(actorId); + }); + runtime->AddLocalService( MakeHiveProxyServiceId(), TActorSetupCmd(new TFakeHiveProxy(), TMailboxType::Simple, 0)); @@ -647,37 +667,38 @@ std::unique_ptr PrepareTestActorRuntime( } if (diskRegistryState->Devices.empty()) { - google::protobuf::RepeatedPtrField devices; + TVector devices; - *devices.Add() = MakeDevice("uuid0", "dev0", "transport0"); - *devices.Add() = MakeDevice("uuid1", "dev1", "transport1"); - *devices.Add() = MakeDevice("uuid2", "dev2", "transport2"); + devices.push_back(MakeDevice("uuid0", "dev0", "transport0")); + devices.push_back(MakeDevice("uuid1", "dev1", "transport1")); + devices.push_back(MakeDevice("uuid2", "dev2", "transport2")); auto dev0m = MakeDevice("uuid0_migration", "dev0_migration", "transport0_migration"); auto dev2m = MakeDevice("uuid2_migration", "dev2_migration", "transport2_migration"); - *devices.Add() = dev0m; - *devices.Add() = dev2m; - - diskRegistryState->Devices = TVector( - devices.begin(), - devices.end() - ); + devices.push_back(dev0m); + devices.push_back(dev2m); + diskRegistryState->Devices = std::move(devices); diskRegistryState->MigrationDevices["uuid0"] = dev0m; diskRegistryState->MigrationDevices["uuid2"] = dev2m; } for (auto& d: diskRegistryState->Devices) { - d.SetNodeId(runtime->GetNodeId()); + d.SetNodeId(runtime->GetNodeId(d.GetNodeId())); } - for (auto& [id, d]: diskRegistryState->MigrationDevices) { - d.SetNodeId(runtime->GetNodeId()); + for (auto& [_, d]: diskRegistryState->MigrationDevices) { + d.SetNodeId(runtime->GetNodeId(d.GetNodeId())); } + Sort( + diskRegistryState->Devices, + [](const NProto::TDeviceConfig& lhs, const NProto::TDeviceConfig& rhs) + { return lhs.GetNodeId() < rhs.GetNodeId(); }); + runtime->AddLocalService( MakeDiskRegistryProxyServiceId(), TActorSetupCmd( @@ -688,19 +709,43 @@ std::unique_ptr PrepareTestActorRuntime( ); runtime->EnableScheduleForActor(MakeDiskRegistryProxyServiceId()); - runtime->AddLocalService( - MakeDiskAgentServiceId(runtime->GetNodeId()), - TActorSetupCmd( + SetupTabletServices(*runtime); + + for (ui32 i = 0; i < agentCount; i++) { + struct TByNodeId + { + auto operator()(const NProto::TDeviceConfig& device) const + { + return device.GetNodeId(); + } + }; + const ui32 nodeId = runtime->GetNodeId(i); + auto begin = LowerBoundBy( + diskRegistryState->Devices.begin(), + diskRegistryState->Devices.end(), + nodeId, + TByNodeId()); + auto end = UpperBoundBy( + diskRegistryState->Devices.begin(), + diskRegistryState->Devices.end(), + nodeId, + TByNodeId()); + + auto state = diskAgentStates.size() > i ? std::move(diskAgentStates[i]) + : TDiskAgentStatePtr(); + const auto actorId = runtime->Register( new TDiskAgentMock( { - diskRegistryState->Devices.begin(), - diskRegistryState->Devices.end(), + begin, + end, }, - diskAgentState), - TMailboxType::Simple, - 0)); - - SetupTabletServices(*runtime); + std::move(state)), + i); + runtime->RegisterService( + MakeDiskAgentServiceId(nodeId), + actorId, + i); + } auto config = CreateTestStorageConfig( std::move(storageServiceConfig), @@ -729,13 +774,6 @@ std::unique_ptr PrepareTestActorRuntime( TestTabletId, TTabletTypes::BlockStoreVolume); - runtime->SetRegistrationObserverFunc( - [] (auto& runtime, const auto& parentId, const auto& actorId) - { - Y_UNUSED(parentId); - runtime.EnableScheduleForActor(actorId); - }); - CreateTestBootstrapper(*runtime, info.Get(), createFunc); return runtime; diff --git a/cloud/blockstore/libs/storage/volume/testlib/test_env.h b/cloud/blockstore/libs/storage/volume/testlib/test_env.h index 6aaadd6f87e..81bbfe430a0 100644 --- a/cloud/blockstore/libs/storage/volume/testlib/test_env.h +++ b/cloud/blockstore/libs/storage/volume/testlib/test_env.h @@ -452,6 +452,9 @@ class TVolumeClient std::unique_ptr CreateGetStorageConfigRequest(); + std::unique_ptr + CreateDeviceTimeoutedRequest(ui32 deviceIndex, TString deviceUUID); + std::unique_ptr CreateUpdateShadowDiskStateRequest( TString checkpointId, TEvVolumePrivate::TEvUpdateShadowDiskStateRequest::EReason reason, @@ -526,7 +529,7 @@ inline NProto::TDeviceConfig MakeDevice( const TString& transportId) { NProto::TDeviceConfig device; - device.SetAgentId("Mulder"); + device.SetAgentId("agent-1"); device.SetNodeId(0); device.SetBlocksCount(DefaultDeviceBlockCount); device.SetDeviceUUID(uuid); @@ -543,7 +546,7 @@ std::unique_ptr PrepareTestActorRuntime( TDiskRegistryStatePtr diskRegistryState = {}, NProto::TFeaturesConfig featuresConfig = {}, NRdma::IClientPtr rdmaClient = {}, - TDiskAgentStatePtr diskAgentState = {}); + TVector diskAgentStates = {}); struct TTestRuntimeBuilder { diff --git a/cloud/blockstore/libs/storage/volume/ut/ya.make b/cloud/blockstore/libs/storage/volume/ut/ya.make index ca5b81b744c..3ec4d68707d 100644 --- a/cloud/blockstore/libs/storage/volume/ut/ya.make +++ b/cloud/blockstore/libs/storage/volume/ut/ya.make @@ -4,6 +4,7 @@ INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/medium.inc) SRCS( volume_database_ut.cpp + volume_lagging_agent_ut.cpp volume_state_ut.cpp volume_ut.cpp volume_ut_checkpoint.cpp diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.cpp b/cloud/blockstore/libs/storage/volume/volume_actor.cpp index ac4c911e4fd..209cdf232b8 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor.cpp @@ -1,7 +1,6 @@ #include "volume_actor.h" #include "volume_database.h" -#include "volume_tx.h" #include #include @@ -1061,6 +1060,25 @@ STFUNC(TVolumeActor::StateWork) HFunc(TEvVolume::TEvUpdateResyncState, HandleUpdateResyncState); HFunc(TEvVolume::TEvResyncFinished, HandleResyncFinished); + HFunc( + TEvPartition::TEvAddLaggingAgentResponse, + HandleAddLaggingAgentResponse); + HFunc( + TEvDiskRegistry::TEvAddLaggingDevicesResponse, + HandleAddLaggingDevicesResponse); + HFunc( + TEvVolumePrivate::TEvReportLaggingDevicesToDR, + HandleReportLaggingDevicesToDR); + HFunc( + TEvVolumePrivate::TEvDeviceTimeoutedRequest, + HandleDeviceTimeouted); + HFunc( + TEvVolumePrivate::TEvUpdateSmartMigrationState, + HandleUpdateSmartMigrationState); + HFunc( + TEvVolumePrivate::TEvSmartMigrationFinished, + HandleSmartMigrationFinished); + HFunc( TEvPartitionCommonPrivate::TEvLongRunningOperation, HandleLongRunningBlobOperation); @@ -1094,10 +1112,15 @@ STFUNC(TVolumeActor::StateZombie) IgnoreFunc(TEvVolumePrivate::TEvUpdateThrottlerState); IgnoreFunc(TEvVolumePrivate::TEvUpdateReadWriteClientInfo); IgnoreFunc(TEvVolumePrivate::TEvRemoveExpiredVolumeParams); + IgnoreFunc(TEvVolumePrivate::TEvReportLaggingDevicesToDR); + IgnoreFunc(TEvVolumePrivate::TEvDeviceTimeoutedRequest); + IgnoreFunc(TEvVolumePrivate::TEvUpdateSmartMigrationState); + IgnoreFunc(TEvVolumePrivate::TEvSmartMigrationFinished); IgnoreFunc(TEvStatsService::TEvVolumePartCounters); IgnoreFunc(TEvPartition::TEvWaitReadyResponse); + IgnoreFunc(TEvPartition::TEvAddLaggingAgentResponse); IgnoreFunc(TEvents::TEvPoisonPill); IgnoreFunc(TEvents::TEvPoisonTaken); @@ -1112,6 +1135,8 @@ STFUNC(TVolumeActor::StateZombie) IgnoreFunc(TEvDiskRegistryProxy::TEvGetDrTabletInfoResponse); + IgnoreFunc(TEvDiskRegistry::TEvAddLaggingDevicesResponse); + default: if (!RejectRequests(ev)) { HandleUnexpectedEvent(ev, TBlockStoreComponents::VOLUME); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor.h b/cloud/blockstore/libs/storage/volume/volume_actor.h index c44e9873e88..ad20997f7bb 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor.h +++ b/cloud/blockstore/libs/storage/volume/volume_actor.h @@ -236,6 +236,7 @@ class TVolumeActor final TMigrations Migrations; TVector Replicas; TVector FreshDeviceIds; + TVector RemovedLaggingDeviceIds; void Clear() { @@ -474,6 +475,8 @@ class TVolumeActor final void SetupDiskRegistryBasedPartitions(const NActors::TActorContext& ctx); + void ReportLaggingDevicesToDR(const NActors::TActorContext& ctx); + void DumpUsageStats( const NActors::TActorContext& ctx, TVolumeActor::EStatus status); @@ -657,6 +660,10 @@ class TVolumeActor final const TEvVolumePrivate::TEvWriteOrZeroCompleted::TPtr& ev, const NActors::TActorContext& ctx); + void HandleReportLaggingDevicesToDR( + const TEvVolumePrivate::TEvReportLaggingDevicesToDR::TPtr& ev, + const NActors::TActorContext& ctx); + template bool ReplyToOriginalRequest( const NActors::TActorContext& ctx, @@ -748,6 +755,10 @@ class TVolumeActor final const TEvDiskRegistry::TEvAllocateDiskResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleAddLaggingDevicesResponse( + const TEvDiskRegistry::TEvAddLaggingDevicesResponse::TPtr& ev, + const NActors::TActorContext& ctx); + void ScheduleAllocateDiskIfNeeded(const NActors::TActorContext& ctx); NProto::TAllocateDiskRequest MakeAllocateDiskRequest() const; @@ -968,6 +979,19 @@ class TVolumeActor final const TEvService::TEvReadBlocksLocalResponse::TPtr& ev, const NActors::TActorContext& ctx); + void HandleAddLaggingAgentResponse( + const NPartition::TEvPartition::TEvAddLaggingAgentResponse:: + TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleSmartMigrationFinished( + const TEvVolumePrivate::TEvSmartMigrationFinished::TPtr& ev, + const NActors::TActorContext& ctx); + + void HandleUpdateSmartMigrationState( + const TEvVolumePrivate::TEvUpdateSmartMigrationState::TPtr& ev, + const NActors::TActorContext& ctx); + void CreateCheckpointLightRequest( const NActors::TActorContext& ctx, ui64 requestId, diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_allocatedisk.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_allocatedisk.cpp index b94c09add93..c28366519af 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_allocatedisk.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_allocatedisk.cpp @@ -4,7 +4,7 @@ #include #include - +#include #include #include @@ -34,7 +34,6 @@ ui64 GetSize(const TDevices& devs) ui64 GetBlocks(const NKikimrBlockStore::TVolumeConfig& config) { - // XXX Y_ABORT_UNLESS(config.PartitionsSize() == 1); return config.GetPartitions(0).GetBlockCount(); } @@ -157,6 +156,7 @@ NProto::TVolumeMeta CreateNewMeta( newMeta.SetIOMode(args.IOMode); newMeta.SetIOModeTs(args.IOModeTs.MicroSeconds()); newMeta.SetMuteIOErrors(args.MuteIOErrors); + RemoveLaggingDevicesFromMeta(newMeta, args.RemovedLaggingDeviceIds); return newMeta; } @@ -348,12 +348,19 @@ void TVolumeActor::HandleAllocateDiskResponse( auto& migrations = *msg->Record.MutableMigrations(); TVector replicas; TVector freshDeviceIds; + TVector removedLaggingDevices; for (auto& msgReplica: *msg->Record.MutableReplicas()) { replicas.push_back(std::move(*msgReplica.MutableDevices())); } for (auto& freshDeviceId: *msg->Record.MutableDeviceReplacementUUIDs()) { freshDeviceIds.push_back(std::move(freshDeviceId)); } + for (auto& removedLaggingDevice: + *msg->Record.MutableRemovedLaggingDevices()) + { + removedLaggingDevices.push_back( + std::move(*removedLaggingDevice.MutableDeviceUUID())); + } if (!CheckAllocationResult(ctx, devices, replicas)) { return; @@ -364,6 +371,8 @@ void TVolumeActor::HandleAllocateDiskResponse( UnfinishedUpdateVolumeConfig.Migrations = std::move(migrations); UnfinishedUpdateVolumeConfig.Replicas = std::move(replicas); UnfinishedUpdateVolumeConfig.FreshDeviceIds = std::move(freshDeviceIds); + UnfinishedUpdateVolumeConfig.RemovedLaggingDeviceIds = + std::move(removedLaggingDevices); } else { ExecuteTx( ctx, @@ -371,6 +380,7 @@ void TVolumeActor::HandleAllocateDiskResponse( std::move(migrations), std::move(replicas), std::move(freshDeviceIds), + std::move(removedLaggingDevices), msg->Record.GetIOMode(), TInstant::MicroSeconds(msg->Record.GetIOModeTs()), msg->Record.GetMuteIOErrors() @@ -415,6 +425,7 @@ void TVolumeActor::HandleUpdateDevices( std::move(msg->Migrations), std::move(msg->Replicas), std::move(msg->FreshDeviceIds), + std::move(msg->RemovedLaggingDevices), msg->IOMode, msg->IOModeTs, msg->MuteIOErrors); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_lagging_agents.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_lagging_agents.cpp new file mode 100644 index 00000000000..a4c83792ad6 --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/volume_actor_lagging_agents.cpp @@ -0,0 +1,416 @@ +#include "volume_actor.h" + +#include "volume_tx.h" + +#include +#include +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; +using namespace NKikimr; +using namespace NCloud::NBlockStore::NStorage::NPartition; + +LWTRACE_USING(BLOCKSTORE_STORAGE_PROVIDER); + +//////////////////////////////////////////////////////////////////////////////// + +void TVolumeActor::HandleReportLaggingDevicesToDR( + const TEvVolumePrivate::TEvReportLaggingDevicesToDR::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_UNUSED(ev); + ReportLaggingDevicesToDR(ctx); +} + +void TVolumeActor::ReportLaggingDevicesToDR(const NActors::TActorContext& ctx) +{ + if (!State || State->GetMeta().GetLaggingAgentsInfo().GetAgents().empty()) { + return; + } + + auto request = + std::make_unique(); + *request->Record.MutableDiskId() = State->GetDiskId(); + for (const auto& laggingAgent: + State->GetMeta().GetLaggingAgentsInfo().GetAgents()) + { + for (const auto& laggingDevice: laggingAgent.GetDevices()) { + *request->Record.AddLaggingDevices() = laggingDevice; + } + } + NCloud::Send( + ctx, + MakeDiskRegistryProxyServiceId(), + std::move(request), + 0 // cookie + ); +} + +void TVolumeActor::HandleAddLaggingDevicesResponse( + const TEvDiskRegistry::TEvAddLaggingDevicesResponse::TPtr& ev, + const NActors::TActorContext& ctx) +{ + Y_DEBUG_ABORT_UNLESS(State); + if (State->GetMeta().GetLaggingAgentsInfo().GetAgents().empty()) { + return; + } + + const auto* msg = ev->Get(); + if (HasError(msg->GetError())) { + LOG_WARN( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Couldn't add lagging devices to the DR. Error: %s", + TabletID(), + FormatError(msg->GetError()).c_str()); + + ctx.Schedule( + TDuration::Seconds(1), + new TEvVolumePrivate::TEvReportLaggingDevicesToDR()); + return; + } +} + +void TVolumeActor::HandleDeviceTimeouted( + const TEvVolumePrivate::TEvDeviceTimeoutedRequest::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + + LOG_INFO( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Device \"%s\" timeouted. RowIndex: %u", + TabletID(), + msg->DeviceUUID.c_str(), + msg->RowIndex); + + const auto& meta = State->GetMeta(); + if (!IsReliableDiskRegistryMediaKind( + State->GetConfig().GetStorageMediaKind())) + { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + MakeError( + E_NOT_IMPLEMENTED, + "Only DR mirror disks can have lagging devices"))); + return; + } + + if (UpdateVolumeConfigInProgress) { + NCloud::Reply( + ctx, + *ev, + std::make_unique( + MakeError(E_REJECTED, "Volume config update in progress"))); + return; + } + + const NProto::TDeviceConfig* timeoutedDeviceConfig = + FindDeviceConfig(meta, msg->DeviceUUID); + if (!timeoutedDeviceConfig) { + LOG_WARN( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Could not find config with device %s", + TabletID(), + msg->DeviceUUID.c_str()); + + auto response = + std::make_unique( + MakeError( + E_ARGUMENT, + TStringBuilder() << "Could not find config with device " + << msg->DeviceUUID)); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + TVector timeoutedAgentDevices; + const auto replicaIndex = GetAgentDevicesIndexes( + meta, + timeoutedDeviceConfig->GetNodeId(), + timeoutedAgentDevices); + Y_DEBUG_ABORT_UNLESS(!timeoutedAgentDevices.empty()); + Y_DEBUG_ABORT_UNLESS(replicaIndex); + + for (const auto& laggingAgent: meta.GetLaggingAgentsInfo().GetAgents()) { + // Whether the agent is lagging already. + if (laggingAgent.GetAgentId() == timeoutedDeviceConfig->GetAgentId()) { + LOG_WARN( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Agent %s is already lagging", + TabletID(), + laggingAgent.GetAgentId().c_str()); + + STORAGE_CHECK_PRECONDITION( + laggingAgent.GetDevices().size() == + timeoutedAgentDevices.ysize()); + NCloud::Send( + ctx, + State->GetDiskRegistryBasedPartitionActor(), + std::make_unique( + *replicaIndex, + timeoutedDeviceConfig->GetAgentId())); + + auto response = + std::make_unique( + MakeError(S_ALREADY, "Device is already lagging")); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + + STORAGE_CHECK_PRECONDITION(IsSorted( + timeoutedAgentDevices.begin(), + timeoutedAgentDevices.end(), + TLaggingDeviceIndexCmp())); + STORAGE_CHECK_PRECONDITION(IsSorted( + laggingAgent.GetDevices().begin(), + laggingAgent.GetDevices().end(), + TLaggingDeviceIndexCmp())); + + // Intersect row indexes of known lagging devices and a new one. We only + // allow one lagging device per row. + TVector rowIndexesIntersection; + SetIntersection( + timeoutedAgentDevices.begin(), + timeoutedAgentDevices.end(), + laggingAgent.GetDevices().begin(), + laggingAgent.GetDevices().end(), + std::back_inserter(rowIndexesIntersection), + TLaggingDeviceIndexCmp()); + + if (!rowIndexesIntersection.empty()) { + Y_DEBUG_ABORT_UNLESS( + laggingAgent.GetReplicaIndex() != replicaIndex); + + LOG_WARN( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Will not add a lagging agent %s. Agent's " + "devices intersect with already lagging %s", + TabletID(), + timeoutedDeviceConfig->GetAgentId().c_str(), + laggingAgent.GetAgentId().c_str()); + + auto response = + std::make_unique( + MakeError( + E_INVALID_STATE, + TStringBuilder() + << "There are other lagging devices on agent " + << laggingAgent.GetAgentId())); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + } + + // Check for fresh devices in the same row. + for (const auto& laggingDevice: timeoutedAgentDevices) { + TSet replicaIndexes = + ReplicaIndexesWithFreshDevices(meta, laggingDevice); + const bool laggingDeviceIsFresh = + replicaIndexes.contains(*replicaIndex); + if (replicaIndexes.size() - laggingDeviceIsFresh > 0) { + LOG_WARN( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] There are other fresh devices on the same row with " + "device %s", + TabletID(), + laggingDevice.GetDeviceUUID().c_str()); + + auto response = + std::make_unique( + MakeError( + E_INVALID_STATE, + TStringBuilder() << "There are other fresh devices on " + "the same row with device " + << laggingDevice.GetDeviceUUID())); + NCloud::Reply(ctx, *ev, std::move(response)); + return; + } + } + + NProto::TLaggingAgent unavailableAgent; + unavailableAgent.SetAgentId(timeoutedDeviceConfig->GetAgentId()); + unavailableAgent.SetReplicaIndex(*replicaIndex); + unavailableAgent.MutableDevices()->Assign( + timeoutedAgentDevices.begin(), + timeoutedAgentDevices.end()); + ExecuteTx( + ctx, + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), + std::move(unavailableAgent)); +} + +void TVolumeActor::HandleAddLaggingAgentResponse( + const TEvPartition::TEvAddLaggingAgentResponse::TPtr& ev, + const NActors::TActorContext& ctx) +{ + const auto* msg = ev->Get(); + if (HasError(msg->GetError())) { + LOG_ERROR( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] TEvAddLaggingAgentResponse. Error = %s", + TabletID(), + FormatError(msg->GetError()).c_str()); + } +} + +void TVolumeActor::HandleUpdateSmartMigrationState( + const TEvVolumePrivate::TEvUpdateSmartMigrationState::TPtr& ev, + const TActorContext& ctx) +{ + LOG_INFO( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] UpdateSmartMigrationState %s", + TabletID(), + ev->Get()->AgentId.c_str()); + + // TODO(komarevtsev-d): Show the progress on the mon page. +} + +void TVolumeActor::HandleSmartMigrationFinished( + const TEvVolumePrivate::TEvSmartMigrationFinished::TPtr& ev, + const TActorContext& ctx) +{ + const auto* msg = ev->Get(); + LOG_INFO( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] SmartMigrationFinished %s", + TabletID(), + msg->AgentId.c_str()); + + ExecuteTx( + ctx, + CreateRequestInfo(ev->Sender, ev->Cookie, msg->CallContext), + msg->AgentId); +} + +//////////////////////////////////////////////////////////////////////////////// + +bool TVolumeActor::PrepareAddLaggingAgent( + const TActorContext& ctx, + ITransactionBase::TTransactionContext& tx, + TTxVolume::TAddLaggingAgent& args) +{ + Y_UNUSED(ctx); + Y_UNUSED(tx); + Y_UNUSED(args); + + return true; +} + +void TVolumeActor::ExecuteAddLaggingAgent( + const TActorContext& ctx, + ITransactionBase::TTransactionContext& tx, + TTxVolume::TAddLaggingAgent& args) +{ + Y_DEBUG_ABORT_UNLESS(!args.Agent.GetDevices().empty()); + LOG_INFO( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Add lagging agent: %s, replicaIndex: %u, devices: ( %s )", + TabletID(), + args.Agent.GetAgentId().c_str(), + args.Agent.GetReplicaIndex(), + [laggingDevices = args.Agent.GetDevices()]() + { + TStringBuilder ss; + for (const auto& device: laggingDevices) { + ss << "[" << device.GetDeviceUUID() << "; " + << device.GetRowIndex() << "], "; + } + ss.erase(ss.size() - 2); + return ss; + }() + .c_str()); + + TVolumeDatabase db(tx.DB); + State->AddLaggingAgent(args.Agent); + db.WriteMeta(State->GetMeta()); +} + +void TVolumeActor::CompleteAddLaggingAgent( + const TActorContext& ctx, + TTxVolume::TAddLaggingAgent& args) +{ + const auto& partActorId = State->GetDiskRegistryBasedPartitionActor(); + Y_DEBUG_ABORT_UNLESS(partActorId); + NCloud::Send( + ctx, + partActorId, + std::make_unique( + args.Agent.GetReplicaIndex(), + args.Agent.GetAgentId())); + + auto response = + std::make_unique(); + NCloud::Reply(ctx, *args.RequestInfo, std::move(response)); +} + +//////////////////////////////////////////////////////////////////////////////// + +bool TVolumeActor::PrepareRemoveLaggingAgent( + const TActorContext& ctx, + ITransactionBase::TTransactionContext& tx, + TTxVolume::TRemoveLaggingAgent& args) +{ + Y_UNUSED(ctx); + Y_UNUSED(tx); + Y_UNUSED(args); + + return true; +} + +void TVolumeActor::ExecuteRemoveLaggingAgent( + const TActorContext& ctx, + ITransactionBase::TTransactionContext& tx, + TTxVolume::TRemoveLaggingAgent& args) +{ + auto agentIdPred = [&agentId = args.AgentId](const auto& info) + { + return info.GetAgentId() == agentId; + }; + + const auto& laggingAgents = + State->GetMeta().GetLaggingAgentsInfo().GetAgents(); + Y_DEBUG_ABORT_UNLESS(CountIf(laggingAgents, agentIdPred) <= 1); + const auto* info = FindIfPtr(laggingAgents, agentIdPred); + + if (!info) { + LOG_WARN( + ctx, + TBlockStoreComponents::VOLUME, + "[%lu] Could not find an agent %s in lagging agents list.", + TabletID(), + args.AgentId.c_str()); + return; + } + + args.Agent = *info; + State->RemoveLaggingAgent(args.AgentId); + TVolumeDatabase db(tx.DB); + db.WriteMeta(State->GetMeta()); +} + +void TVolumeActor::CompleteRemoveLaggingAgent( + const TActorContext& ctx, + TTxVolume::TRemoveLaggingAgent& args) +{ + Y_UNUSED(ctx); + Y_UNUSED(args); + + // TODO(komarevtsev-d): Send message to DiskRegistryBasedPartitionActor. +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_reallocatedisk.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_reallocatedisk.cpp index 696433bf8d7..41703ae960f 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_reallocatedisk.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_reallocatedisk.cpp @@ -120,11 +120,20 @@ void TReallocateActor::HandleAllocateDiskResponse( freshDeviceIds.push_back(std::move(freshDeviceId)); } + TVector removedLaggingDevices; + for (auto& removedLaggingDevice: + *msg->Record.MutableRemovedLaggingDevices()) + { + removedLaggingDevices.push_back( + std::move(*removedLaggingDevice.MutableDeviceUUID())); + } + auto request = std::make_unique( std::move(*msg->Record.MutableDevices()), std::move(*msg->Record.MutableMigrations()), std::move(replicas), std::move(freshDeviceIds), + std::move(removedLaggingDevices), msg->Record.GetIOMode(), TInstant::MicroSeconds(msg->Record.GetIOModeTs()), msg->Record.GetMuteIOErrors()); diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp index 207fe346279..025cb69950f 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_startstop.cpp @@ -178,6 +178,7 @@ void TVolumeActor::SetupDiskRegistryBasedPartitions(const TActorContext& ctx) SelfId(), State->GetMeta().GetMuteIOErrors(), State->GetFilteredFreshDevices(), + State->GetLaggingDevices(), maxTimedOutDeviceStateDuration, maxTimedOutDeviceStateDurationOverridden, useSimpleMigrationBandwidthLimiter); @@ -255,6 +256,7 @@ void TVolumeActor::SetupDiskRegistryBasedPartitions(const TActorContext& ctx) State->SetDiskRegistryBasedPartitionActor( WrapNonreplActorIfNeeded(ctx, nonreplicatedActorId, nonreplicatedConfig), nonreplicatedConfig); + ReportLaggingDevicesToDR(ctx); } NActors::TActorId TVolumeActor::WrapNonreplActorIfNeeded( diff --git a/cloud/blockstore/libs/storage/volume/volume_actor_updateconfig.cpp b/cloud/blockstore/libs/storage/volume/volume_actor_updateconfig.cpp index f4af107741b..af3b0e8f5cf 100644 --- a/cloud/blockstore/libs/storage/volume/volume_actor_updateconfig.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_actor_updateconfig.cpp @@ -4,7 +4,7 @@ #include #include - +#include #include #include @@ -243,11 +243,19 @@ void TVolumeActor::FinishUpdateVolumeConfig(const TActorContext& ctx) for (auto& freshDeviceId: UnfinishedUpdateVolumeConfig.FreshDeviceIds) { *newMeta.AddFreshDeviceIds() = std::move(freshDeviceId); } + if (State) { + newMeta.MutableLaggingAgentsInfo()->CopyFrom( + State->GetMeta().GetLaggingAgentsInfo()); + RemoveLaggingDevicesFromMeta( + newMeta, + UnfinishedUpdateVolumeConfig.RemovedLaggingDeviceIds); + } UnfinishedUpdateVolumeConfig.Devices = {}; UnfinishedUpdateVolumeConfig.Migrations = {}; UnfinishedUpdateVolumeConfig.Replicas = {}; UnfinishedUpdateVolumeConfig.FreshDeviceIds = {}; + UnfinishedUpdateVolumeConfig.RemovedLaggingDeviceIds = {}; LOG_DEBUG(ctx, TBlockStoreComponents::VOLUME, "[%lu] Updating volume config to version %u", diff --git a/cloud/blockstore/libs/storage/volume/volume_events_private.h b/cloud/blockstore/libs/storage/volume/volume_events_private.h index 923fdb8ba96..3e3892902fe 100644 --- a/cloud/blockstore/libs/storage/volume/volume_events_private.h +++ b/cloud/blockstore/libs/storage/volume/volume_events_private.h @@ -25,6 +25,7 @@ namespace NCloud::NBlockStore::NStorage { xxx(UpdateCheckpointRequest, __VA_ARGS__) \ xxx(UpdateShadowDiskState, __VA_ARGS__) \ xxx(ReadMetaHistory, __VA_ARGS__) \ + xxx(DeviceTimeouted, __VA_ARGS__) \ // BLOCKSTORE_VOLUME_REQUESTS_PRIVATE //////////////////////////////////////////////////////////////////////////////// @@ -132,6 +133,58 @@ struct TEvVolumePrivate TVector MetaHistory; }; + // + // DeviceTimeouted + // + + struct TDeviceTimeoutedRequest + { + const ui32 RowIndex; + const TString DeviceUUID; + + TDeviceTimeoutedRequest(ui32 rowIndex, TString deviceUUID) + : RowIndex(rowIndex) + , DeviceUUID(std::move(deviceUUID)) + {} + }; + + struct TDeviceTimeoutedResponse + { + }; + + // + // UpdateSmartMigrationState + // + + struct TUpdateSmartMigrationState + { + TString AgentId; + ui64 ProcessedBlockCount; + ui64 BlockCountNeedToBeProcessed; + + TUpdateSmartMigrationState( + TString agentId, + ui64 processedBlockCount, + ui64 blockCountNeedToBeProcessed) + : AgentId(std::move(agentId)) + , ProcessedBlockCount(processedBlockCount) + , BlockCountNeedToBeProcessed(blockCountNeedToBeProcessed) + {} + }; + + // + // SmartMigrationFinished + // + + struct TSmartMigrationFinished + { + const TString AgentId; + + explicit TSmartMigrationFinished(TString agentId) + : AgentId(std::move(agentId)) + {} + }; + // // UpdateDevices // @@ -142,6 +195,7 @@ struct TEvVolumePrivate TMigrations Migrations; TVector Replicas; TVector FreshDeviceIds; + TVector RemovedLaggingDevices; NProto::EVolumeIOMode IOMode; TInstant IOModeTs; bool MuteIOErrors; @@ -151,6 +205,7 @@ struct TEvVolumePrivate TMigrations migrations, TVector replicas, TVector freshDeviceIds, + TVector removedLaggingDevices, NProto::EVolumeIOMode ioMode, TInstant ioModeTs, bool muteIOErrors) @@ -158,6 +213,7 @@ struct TEvVolumePrivate , Migrations(std::move(migrations)) , Replicas(std::move(replicas)) , FreshDeviceIds(std::move(freshDeviceIds)) + , RemovedLaggingDevices(std::move(removedLaggingDevices)) , IOMode(ioMode) , IOModeTs(ioModeTs) , MuteIOErrors(muteIOErrors) @@ -243,6 +299,14 @@ struct TEvVolumePrivate { }; + // + // ReportLaggingDevicesToDR + // + + struct TReportLaggingDevicesToDR + { + }; + // // ShadowDiskAcquired // @@ -330,6 +394,9 @@ struct TEvVolumePrivate EvRemoveExpiredVolumeParams, EvShadowDiskAcquired, EvExternalDrainDone, + EvReportLaggingDevicesToDR, + EvUpdateSmartMigrationState, + EvSmartMigrationFinished, EvEnd }; @@ -378,6 +445,21 @@ struct TEvVolumePrivate EvUpdateReadWriteClientInfo >; + using TEvUpdateSmartMigrationState = TRequestEvent< + TUpdateSmartMigrationState, + EvUpdateSmartMigrationState + >; + + using TEvSmartMigrationFinished = TRequestEvent< + TSmartMigrationFinished, + EvSmartMigrationFinished + >; + + using TEvReportLaggingDevicesToDR = TRequestEvent< + TReportLaggingDevicesToDR, + EvReportLaggingDevicesToDR + >; + using TEvRemoveExpiredVolumeParams = TRequestEvent< TRemoveExpiredVolumeParams, EvRemoveExpiredVolumeParams diff --git a/cloud/blockstore/libs/storage/volume/volume_lagging_agent_ut.cpp b/cloud/blockstore/libs/storage/volume/volume_lagging_agent_ut.cpp new file mode 100644 index 00000000000..9860fe8292b --- /dev/null +++ b/cloud/blockstore/libs/storage/volume/volume_lagging_agent_ut.cpp @@ -0,0 +1,403 @@ +#include +#include +#include +#include +#include + +#include + +namespace NCloud::NBlockStore::NStorage { + +using namespace NActors; +using namespace NKikimr; +using namespace NCloud::NBlockStore::NStorage::NPartition; +using namespace NCloud::NStorage; +using namespace NTestVolume; + +namespace { + +TVector MakeDeviceList(ui32 agentCount, ui32 deviceCount) +{ + TVector result; + for (ui32 i = 1; i <= agentCount; i++) { + for (ui32 j = 0; j < deviceCount; j++) { + auto device = MakeDevice( + Sprintf("uuid%u-%u", i, j), + Sprintf("dev%u", j), + Sprintf("transport%u-%u", i, j)); + device.SetNodeId(i - 1); + device.SetAgentId(Sprintf("agent-%u", i)); + result.push_back(std::move(device)); + } + } + return result; +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TLaggingAgentVolumeTest) +{ + Y_UNIT_TEST(ShouldHandleDeviceTimeouted) + { + constexpr ui32 AgentCount = 3; + auto diskRegistryState = MakeIntrusive(); + diskRegistryState->Devices = MakeDeviceList(AgentCount, 3); + diskRegistryState->AllocateDiskReplicasOnDifferentNodes = true; + diskRegistryState->ReplicaCount = 2; + TVector agentStates; + for (ui32 i = 0; i < AgentCount; i++) { + agentStates.push_back(TDiskAgentStatePtr{}); + } + auto runtime = PrepareTestActorRuntime( + {}, + diskRegistryState, + {}, + {}, + std::move(agentStates)); + + // Create mirror-3 volume with a size of 1 device. + TVolumeClient volume(*runtime); + const ui64 blockCount = + DefaultDeviceBlockCount * DefaultDeviceBlockSize / DefaultBlockSize; + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, // version + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3, + blockCount); + + volume.WaitReady(); + + auto stat = volume.StatVolume(); + const auto& devices = stat->Record.GetVolume().GetDevices(); + const auto& replicas = stat->Record.GetVolume().GetReplicas(); + UNIT_ASSERT_VALUES_EQUAL(1, devices.size()); + UNIT_ASSERT_VALUES_EQUAL("uuid1-0", devices[0].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-1", devices[0].GetAgentId()); + + UNIT_ASSERT_VALUES_EQUAL(2, replicas.size()); + UNIT_ASSERT_VALUES_EQUAL(1, replicas[0].DevicesSize()); + UNIT_ASSERT_VALUES_EQUAL( + "uuid2-0", + replicas[0].GetDevices(0).GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL( + "agent-2", + replicas[0].GetDevices(0).GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL(1, replicas[1].DevicesSize()); + UNIT_ASSERT_VALUES_EQUAL( + "uuid3-0", + replicas[1].GetDevices(0).GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL( + "agent-3", + replicas[1].GetDevices(0).GetAgentId()); + + std::optional + addLaggingAgentRequest; + runtime->SetEventFilter( + [&](TTestActorRuntimeBase&, TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvPartition::EvAddLaggingAgentRequest: { + auto* msg = event->Get< + TEvPartition::TEvAddLaggingAgentRequest>(); + UNIT_ASSERT(!addLaggingAgentRequest.has_value()); + addLaggingAgentRequest = *msg; + return true; + } + } + return false; + }); + + // Device in the first replica is timeouted. + volume.DeviceTimeouted(0, "uuid2-0"); + + UNIT_ASSERT(addLaggingAgentRequest.has_value()); + UNIT_ASSERT_VALUES_EQUAL( + replicas[0].GetDevices(0).GetAgentId(), + addLaggingAgentRequest->AgentId); + UNIT_ASSERT_VALUES_EQUAL(1, addLaggingAgentRequest->ReplicaIndex); + + // Can't add more lagging devices in the same row. + volume.SendDeviceTimeoutedRequest(0, "uuid3-0"); + auto response = volume.RecvDeviceTimeoutedResponse(); + UNIT_ASSERT_VALUES_EQUAL( + E_INVALID_STATE, + response->GetError().GetCode()); + + // Agent devices are now up-to-date. + volume.SendToPipe( + std::make_unique( + "agent-2")); + runtime->DispatchEvents({}, TDuration::Seconds(1)); + + // Now the zeroth replica can lag. + addLaggingAgentRequest.reset(); + volume.DeviceTimeouted(0, "uuid1-0"); + UNIT_ASSERT(addLaggingAgentRequest.has_value()); + UNIT_ASSERT_VALUES_EQUAL( + devices[0].GetAgentId(), + addLaggingAgentRequest->AgentId); + UNIT_ASSERT_VALUES_EQUAL(0, addLaggingAgentRequest->ReplicaIndex); + } + + Y_UNIT_TEST(ShouldHandleTabletReboot) + { + constexpr ui32 AgentCount = 6; + constexpr ui32 DevicePerAgentCount = 2; + auto diskRegistryState = MakeIntrusive(); + diskRegistryState->Devices = + MakeDeviceList(AgentCount, DevicePerAgentCount); + diskRegistryState->AllocateDiskReplicasOnDifferentNodes = true; + diskRegistryState->ReplicaCount = 2; + TVector agentStates; + for (ui32 i = 0; i < AgentCount; i++) { + agentStates.push_back(TDiskAgentStatePtr{}); + } + auto runtime = PrepareTestActorRuntime( + {}, + diskRegistryState, + {}, + {}, + std::move(agentStates)); + + TVolumeClient volume(*runtime); + const ui64 blockCount = DefaultDeviceBlockCount * + DefaultDeviceBlockSize / DefaultBlockSize * 3; + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, // version + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3, + blockCount); + volume.WaitReady(); + + auto stat = volume.StatVolume(); + const auto& devices = stat->Record.GetVolume().GetDevices(); + UNIT_ASSERT_VALUES_EQUAL(3, devices.size()); + UNIT_ASSERT_VALUES_EQUAL("uuid1-0", devices[0].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-1", devices[0].GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL("uuid1-1", devices[1].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-1", devices[1].GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL("uuid4-0", devices[2].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-4", devices[2].GetAgentId()); + + const auto& replicas = stat->Record.GetVolume().GetReplicas(); + UNIT_ASSERT_VALUES_EQUAL(2, replicas.size()); + const auto& replica1Devices = replicas[0].GetDevices(); + UNIT_ASSERT_VALUES_EQUAL(3, replica1Devices.size()); + UNIT_ASSERT_VALUES_EQUAL("uuid2-0", replica1Devices[0].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-2", replica1Devices[0].GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL("uuid2-1", replica1Devices[1].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-2", replica1Devices[1].GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL("uuid5-0", replica1Devices[2].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-5", replica1Devices[2].GetAgentId()); + + const auto& replica2Devices = replicas[1].GetDevices(); + UNIT_ASSERT_VALUES_EQUAL(3, replica2Devices.size()); + UNIT_ASSERT_VALUES_EQUAL("uuid3-0", replica2Devices[0].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-3", replica2Devices[0].GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL("uuid3-1", replica2Devices[1].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-3", replica2Devices[1].GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL("uuid6-0", replica2Devices[2].GetDeviceUUID()); + UNIT_ASSERT_VALUES_EQUAL("agent-6", replica2Devices[2].GetAgentId()); + + std::optional + addLaggingAgentRequest; + std::optional + addLaggingDevicesRequest; + runtime->SetEventFilter( + [&](TTestActorRuntimeBase&, TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvPartition::EvAddLaggingAgentRequest: { + auto* msg = event->Get< + TEvPartition::TEvAddLaggingAgentRequest>(); + UNIT_ASSERT(!addLaggingAgentRequest.has_value()); + addLaggingAgentRequest = *msg; + return true; + } + case TEvDiskRegistry::EvAddLaggingDevicesRequest: { + auto* msg = event->Get< + TEvDiskRegistry::TEvAddLaggingDevicesRequest>(); + addLaggingDevicesRequest = msg->Record; + break; + } + } + return false; + }); + + // Device in the zeroth replica is timeouted. + volume.DeviceTimeouted(1, "uuid1-1"); + + UNIT_ASSERT(addLaggingAgentRequest.has_value()); + UNIT_ASSERT_VALUES_EQUAL( + devices[1].GetAgentId(), + addLaggingAgentRequest->AgentId); + UNIT_ASSERT_VALUES_EQUAL(0, addLaggingAgentRequest->ReplicaIndex); + + { + addLaggingAgentRequest.reset(); + // The first agent is already lagging. + volume.SendDeviceTimeoutedRequest(0, "uuid1-0"); + auto response = volume.RecvDeviceTimeoutedResponse(); + UNIT_ASSERT_VALUES_EQUAL(S_ALREADY, response->GetError().GetCode()); + UNIT_ASSERT(addLaggingAgentRequest.has_value()); + UNIT_ASSERT_VALUES_EQUAL( + devices[0].GetAgentId(), + addLaggingAgentRequest->AgentId); + } + + { + // 0 and 1st rows already lagging. Can't add more lagging devices on + // these rows. + volume.SendDeviceTimeoutedRequest(0, "uuid2-1"); + auto response = volume.RecvDeviceTimeoutedResponse(); + UNIT_ASSERT_VALUES_EQUAL( + E_INVALID_STATE, + response->GetError().GetCode()); + } + + // Adding the second row to lagging. + addLaggingAgentRequest.reset(); + volume.DeviceTimeouted(2, "uuid6-0"); + UNIT_ASSERT(addLaggingAgentRequest.has_value()); + UNIT_ASSERT_VALUES_EQUAL( + replica2Devices[2].GetAgentId(), + addLaggingAgentRequest->AgentId); + + // Rebooting the volume tablet should report lagging devices to the DR. + UNIT_ASSERT(!addLaggingDevicesRequest.has_value()); + volume.RebootTablet(); + runtime->DispatchEvents({}, TDuration::Seconds(1)); + UNIT_ASSERT(addLaggingDevicesRequest.has_value()); + + UNIT_ASSERT_VALUES_EQUAL("vol0", addLaggingDevicesRequest->GetDiskId()); + UNIT_ASSERT_VALUES_EQUAL( + 3, + addLaggingDevicesRequest->GetLaggingDevices().size()); + UNIT_ASSERT_VALUES_EQUAL( + "DeviceUUID: \"uuid1-0\"\n", + addLaggingDevicesRequest->GetLaggingDevices(0).DebugString()); + UNIT_ASSERT_VALUES_EQUAL( + "DeviceUUID: \"uuid1-1\"\nRowIndex: 1\n", + addLaggingDevicesRequest->GetLaggingDevices(1).DebugString()); + UNIT_ASSERT_VALUES_EQUAL( + "DeviceUUID: \"uuid6-0\"\nRowIndex: 2\n", + addLaggingDevicesRequest->GetLaggingDevices(2).DebugString()); + + // Disk Registry will remove lagging devices on reallocation. + volume.ReallocateDisk(); + auto metaHistoryResponse = volume.ReadMetaHistory(); + UNIT_ASSERT(!metaHistoryResponse->MetaHistory.empty()); + UNIT_ASSERT_VALUES_EQUAL( + 0, + metaHistoryResponse->MetaHistory.back() + .Meta.GetLaggingAgentsInfo() + .AgentsSize()); + } + + Y_UNIT_TEST(ShouldHandleUpdateVolumeConfig) + { + constexpr ui32 AgentCount = 6; + constexpr ui32 DevicePerAgentCount = 2; + auto diskRegistryState = MakeIntrusive(); + diskRegistryState->Devices = + MakeDeviceList(AgentCount, DevicePerAgentCount); + diskRegistryState->AllocateDiskReplicasOnDifferentNodes = true; + diskRegistryState->ReplicaCount = 2; + TVector agentStates; + for (ui32 i = 0; i < AgentCount; i++) { + agentStates.push_back(TDiskAgentStatePtr{}); + } + auto runtime = PrepareTestActorRuntime( + {}, + diskRegistryState, + {}, + {}, + std::move(agentStates)); + + TVolumeClient volume(*runtime); + const ui64 blockCount = DefaultDeviceBlockCount * + DefaultDeviceBlockSize / DefaultBlockSize * 3; + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 1, // version + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3, + blockCount); + volume.WaitReady(); + + auto stat = volume.StatVolume(); + const auto& devices = stat->Record.GetVolume().GetDevices(); + UNIT_ASSERT_VALUES_EQUAL(3, devices.size()); + + std::optional + addLaggingDevicesRequest; + runtime->SetEventFilter( + [&](TTestActorRuntimeBase&, TAutoPtr& event) + { + switch (event->GetTypeRewrite()) { + case TEvPartition::EvAddLaggingAgentRequest: { + return true; + } + case TEvDiskRegistry::EvAddLaggingDevicesRequest: { + auto* msg = event->Get< + TEvDiskRegistry::TEvAddLaggingDevicesRequest>(); + addLaggingDevicesRequest = msg->Record; + break; + } + } + return false; + }); + + // Device in the zeroth replica is timeouted. + volume.DeviceTimeouted(1, "uuid1-1"); + + UNIT_ASSERT(!addLaggingDevicesRequest.has_value()); + // Update volume config. + volume.UpdateVolumeConfig( + 0, + 0, + 0, + 0, + false, + 2, // version + NCloud::NProto::STORAGE_MEDIA_SSD_MIRROR3, + blockCount); + volume.WaitReady(); + UNIT_ASSERT(addLaggingDevicesRequest.has_value()); + + auto metaHistoryResponse = volume.ReadMetaHistory(); + UNIT_ASSERT(!metaHistoryResponse->MetaHistory.empty()); + + // Make sure that lagging devices are still there. + auto historyItem = metaHistoryResponse->MetaHistory.back(); + UNIT_ASSERT_VALUES_EQUAL( + 1, + historyItem.Meta.GetLaggingAgentsInfo().AgentsSize()); + UNIT_ASSERT_VALUES_EQUAL( + "agent-1", + historyItem.Meta.GetLaggingAgentsInfo() + .GetAgents()[0] + .GetAgentId()); + UNIT_ASSERT_VALUES_EQUAL( + 2, + historyItem.Meta.GetLaggingAgentsInfo() + .GetAgents()[0] + .GetDevices() + .size()); + } +} + +} // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_state.cpp b/cloud/blockstore/libs/storage/volume/volume_state.cpp index d264fdadb61..071789e8761 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_state.cpp @@ -195,6 +195,34 @@ TRuntimeVolumeParams& TVolumeState::GetVolumeParams() return VolumeParams; } +void TVolumeState::AddLaggingAgent(NProto::TLaggingAgent agent) +{ + Meta.MutableLaggingAgentsInfo()->MutableAgents()->Add(std::move(agent)); +} + +void TVolumeState::RemoveLaggingAgent(const TString& agentId) +{ + const auto& laggingAgents = Meta.GetLaggingAgentsInfo().GetAgents(); + auto it = FindIf( + laggingAgents, + [&agentId](const auto& agent) + { return agent.GetAgentId() == agentId; }); + if (it != laggingAgents.end()) { + Meta.MutableLaggingAgentsInfo()->MutableAgents()->erase(it); + } +} + +THashSet TVolumeState::GetLaggingDevices() const +{ + THashSet laggingDevices; + for (const auto& agent: Meta.GetLaggingAgentsInfo().GetAgents()) { + for (const auto& device: agent.GetDevices()) { + laggingDevices.insert(device.GetDeviceUUID()); + } + } + return laggingDevices; +} + void TVolumeState::ResetMeta(NProto::TVolumeMeta meta) { Meta = std::move(meta); diff --git a/cloud/blockstore/libs/storage/volume/volume_state.h b/cloud/blockstore/libs/storage/volume/volume_state.h index 6df2dd1dee9..eca87da37ac 100644 --- a/cloud/blockstore/libs/storage/volume/volume_state.h +++ b/cloud/blockstore/libs/storage/volume/volume_state.h @@ -305,6 +305,10 @@ class TVolumeState Meta.SetFillSeqNumber(fillSeqNumber); } + void AddLaggingAgent(NProto::TLaggingAgent agent); + void RemoveLaggingAgent(const TString& agentId); + THashSet GetLaggingDevices() const; + void SetStartPartitionsNeeded(bool startPartitionsNeeded) { StartPartitionsNeeded = startPartitionsNeeded; diff --git a/cloud/blockstore/libs/storage/volume/volume_tx.h b/cloud/blockstore/libs/storage/volume/volume_tx.h index e878d8ffb62..edbc4a7bd80 100644 --- a/cloud/blockstore/libs/storage/volume/volume_tx.h +++ b/cloud/blockstore/libs/storage/volume/volume_tx.h @@ -41,6 +41,8 @@ namespace NCloud::NBlockStore::NStorage { xxx(DeleteVolumeParams, __VA_ARGS__) \ xxx(ChangeStorageConfig, __VA_ARGS__) \ xxx(ReadMetaHistory, __VA_ARGS__) \ + xxx(AddLaggingAgent, __VA_ARGS__) \ + xxx(RemoveLaggingAgent, __VA_ARGS__) \ // BLOCKSTORE_VOLUME_TRANSACTIONS //////////////////////////////////////////////////////////////////////////////// @@ -148,6 +150,7 @@ struct TTxVolume TMigrations Migrations; TVector Replicas; TVector FreshDeviceIds; + TVector RemovedLaggingDeviceIds; NProto::EVolumeIOMode IOMode; TInstant IOModeTs; bool MuteIOErrors; @@ -159,6 +162,7 @@ struct TTxVolume TMigrations migrations, TVector replicas, TVector freshDeviceIds, + TVector removedLaggingDeviceIds, NProto::EVolumeIOMode ioMode, TInstant ioModeTs, bool muteIOErrors) @@ -168,6 +172,7 @@ struct TTxVolume std::move(migrations), std::move(replicas), std::move(freshDeviceIds), + std::move(removedLaggingDeviceIds), ioMode, ioModeTs, muteIOErrors @@ -180,6 +185,7 @@ struct TTxVolume TMigrations migrations, TVector replicas, TVector freshDeviceIds, + TVector removedLaggingDeviceIds, NProto::EVolumeIOMode ioMode, TInstant ioModeTs, bool muteIOErrors) @@ -188,6 +194,7 @@ struct TTxVolume , Migrations(std::move(migrations)) , Replicas(std::move(replicas)) , FreshDeviceIds(std::move(freshDeviceIds)) + , RemovedLaggingDeviceIds(std::move(removedLaggingDeviceIds)) , IOMode(ioMode) , IOModeTs(ioModeTs) , MuteIOErrors(muteIOErrors) @@ -695,6 +702,48 @@ struct TTxVolume ResultStorageConfig.Clear(); } }; + + // + // AddLaggingAgent + // + + struct TAddLaggingAgent + { + const TRequestInfoPtr RequestInfo; + const NProto::TLaggingAgent Agent; + + TAddLaggingAgent( + TRequestInfoPtr requestInfo, + NProto::TLaggingAgent agent) + : RequestInfo(std::move(requestInfo)) + , Agent(std::move(agent)) + {} + + void Clear() + {} + }; + + // + // RemoveLaggingAgent + // + + struct TRemoveLaggingAgent + { + const TRequestInfoPtr RequestInfo; + const TString AgentId; + + NProto::TLaggingAgent Agent; + + TRemoveLaggingAgent(TRequestInfoPtr requestInfo, TString agentId) + : RequestInfo(std::move(requestInfo)) + , AgentId(std::move(agentId)) + {} + + void Clear() + { + Agent.Clear(); + } + }; }; } // namespace NCloud::NBlockStore::NStorage diff --git a/cloud/blockstore/libs/storage/volume/volume_ut_stats.cpp b/cloud/blockstore/libs/storage/volume/volume_ut_stats.cpp index 03575694686..098ee62f34e 100644 --- a/cloud/blockstore/libs/storage/volume/volume_ut_stats.cpp +++ b/cloud/blockstore/libs/storage/volume/volume_ut_stats.cpp @@ -538,7 +538,7 @@ Y_UNIT_TEST_SUITE(TVolumeStatsTest) }; auto runtime = - PrepareTestActorRuntime(config, {}, {}, {}, diskAgentState); + PrepareTestActorRuntime(config, {}, {}, {}, {diskAgentState}); struct TReadAndWriteByteCount { diff --git a/cloud/blockstore/libs/storage/volume/ya.make b/cloud/blockstore/libs/storage/volume/ya.make index 77f72624e71..a8cb7552176 100644 --- a/cloud/blockstore/libs/storage/volume/ya.make +++ b/cloud/blockstore/libs/storage/volume/ya.make @@ -14,6 +14,7 @@ SRCS( volume_actor_forward.cpp volume_actor_forward_trackused.cpp volume_actor_initschema.cpp + volume_actor_lagging_agents.cpp volume_actor_loadstate.cpp volume_actor_migration.cpp volume_actor_monitoring_checkpoint.cpp