Skip to content

Commit

Permalink
Add lagging agent concept to the volume
Browse files Browse the repository at this point in the history
  • Loading branch information
komarevtsev-d committed Jan 5, 2025
1 parent 2398e8e commit cfdcc31
Show file tree
Hide file tree
Showing 38 changed files with 1,732 additions and 112 deletions.
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/api/disk_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace NCloud::NBlockStore::NStorage {
xxx(GetCheckpointDataState, __VA_ARGS__) \
xxx(SetCheckpointDataState, __VA_ARGS__) \
xxx(GetAgentNodeId, __VA_ARGS__) \
xxx(AddLaggingDevices, __VA_ARGS__) \
// BLOCKSTORE_DISK_REGISTRY_REQUESTS_PROTO

// requests forwarded from service to disk_registry
Expand Down Expand Up @@ -211,6 +212,9 @@ struct TEvDiskRegistry
EvQueryAgentsInfoRequest = EvBegin + 75,
EvQueryAgentsInfoResponse = EvBegin + 76,

EvAddLaggingDevicesRequest = EvBegin + 77,
EvAddLaggingDevicesResponse = EvBegin + 78,

EvEnd
};

Expand Down
43 changes: 42 additions & 1 deletion cloud/blockstore/libs/storage/api/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
xxx(Drain, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_REQUESTS

// requests forwarded from service to partion
// requests forwarded from service to partition
#define BLOCKSTORE_PARTITION_REQUESTS_FWD_SERVICE(xxx, ...) \
xxx(ReadBlocks, __VA_ARGS__) \
xxx(WriteBlocks, __VA_ARGS__) \
Expand Down Expand Up @@ -94,6 +94,34 @@ struct TEvPartition
{}
};

//
// AddLaggingAgent
//

struct TAddLaggingAgentRequest
{
// 0 - for main devices; 1,2 - for mirror replicas
ui32 ReplicaIndex;
TString AgentId;
TAddLaggingAgentRequest(ui32 replicaIndex, TString agentId)
: ReplicaIndex(replicaIndex)
, AgentId(std::move(agentId))
{}
};

//
// RemoveLaggingAgent
//

struct TRemoveLaggingReplicaRequest
{
// 0 - for main devices; 1,2 - for mirror replicas
const ui32 ReplicaIndex;
explicit TRemoveLaggingReplicaRequest(ui32 replicaIndex)
: ReplicaIndex(replicaIndex)
{}
};

//
// Events declaration
//
Expand All @@ -115,6 +143,9 @@ struct TEvPartition

EvGarbageCollectorCompleted = EvBegin + 8,

EvAddLaggingAgentRequest = EvBegin + 9,
EvRemoveLaggingReplicaRequest = EvBegin + 10,

EvEnd
};

Expand All @@ -132,6 +163,16 @@ struct TEvPartition
TGarbageCollectorCompleted,
EvGarbageCollectorCompleted
>;

using TEvAddLaggingAgentRequest = TRequestEvent<
TAddLaggingAgentRequest,
EvAddLaggingAgentRequest
>;

using TEvRemoveLaggingReplicaRequest = TRequestEvent<
TRemoveLaggingReplicaRequest,
EvRemoveLaggingReplicaRequest
>;
};

} // namespace NCloud::NBlockStore::NStorage::NPartition
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,18 @@ void TDiskRegistryActor::HandleOperationCompleted(
Actors.erase(ev->Sender);
}

void TDiskRegistryActor::HandleAddLaggingDevices(
const TEvDiskRegistry::TEvAddLaggingDevicesRequest::TPtr& ev,
const TActorContext& ctx)
{
Y_UNUSED(ev);
Y_UNUSED(ctx);

BLOCKSTORE_DISK_REGISTRY_COUNTER(AddLaggingDevices);

// TODO(komarevtsev-d): Implement this.
}

////////////////////////////////////////////////////////////////////////////////

STFUNC(TDiskRegistryActor::StateBoot)
Expand Down
20 changes: 19 additions & 1 deletion cloud/blockstore/libs/storage/partition_nonrepl/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class TNonreplicatedPartitionConfig
const NActors::TActorId ParentActorId;
const bool MuteIOErrors;
const THashSet<TString> FreshDeviceIds;
// List of devices that have outdated data. Can only appear on mirror disks.
const THashSet<TString> LaggingDeviceIds;
const TDuration MaxTimedOutDeviceStateDuration;
const bool MaxTimedOutDeviceStateDurationOverridden;
const bool UseSimpleMigrationBandwidthLimiter;
Expand All @@ -77,6 +79,7 @@ class TNonreplicatedPartitionConfig
NActors::TActorId parentActorId,
bool muteIOErrors,
THashSet<TString> freshDeviceIds,
THashSet<TString> laggingDeviceIds,
TDuration maxTimedOutDeviceStateDuration,
bool maxTimedOutDeviceStateDurationOverridden,
bool useSimpleMigrationBandwidthLimiter)
Expand All @@ -88,6 +91,7 @@ class TNonreplicatedPartitionConfig
, ParentActorId(std::move(parentActorId))
, MuteIOErrors(muteIOErrors)
, FreshDeviceIds(std::move(freshDeviceIds))
, LaggingDeviceIds(std::move(laggingDeviceIds))
, MaxTimedOutDeviceStateDuration(maxTimedOutDeviceStateDuration)
, MaxTimedOutDeviceStateDurationOverridden(maxTimedOutDeviceStateDurationOverridden)
, UseSimpleMigrationBandwidthLimiter(useSimpleMigrationBandwidthLimiter)
Expand All @@ -111,6 +115,13 @@ class TNonreplicatedPartitionConfig
}
}

THashSet<TString> laggingDeviceIds;
for (const auto& device: devices) {
if (LaggingDeviceIds.contains(device.GetDeviceUUID())) {
laggingDeviceIds.insert(device.GetDeviceUUID());
}
}

return std::make_shared<TNonreplicatedPartitionConfig>(
std::move(devices),
IOMode,
Expand All @@ -120,6 +131,7 @@ class TNonreplicatedPartitionConfig
ParentActorId,
MuteIOErrors,
std::move(freshDeviceIds),
std::move(laggingDeviceIds),
MaxTimedOutDeviceStateDuration,
MaxTimedOutDeviceStateDurationOverridden,
UseSimpleMigrationBandwidthLimiter
Expand Down Expand Up @@ -176,6 +188,11 @@ class TNonreplicatedPartitionConfig
return FreshDeviceIds;
}

const auto& GetLaggingDeviceIds() const
{
return LaggingDeviceIds;
}

auto GetMaxTimedOutDeviceStateDuration() const
{
return MaxTimedOutDeviceStateDuration;
Expand Down Expand Up @@ -229,7 +246,8 @@ class TNonreplicatedPartitionConfig
Y_UNUSED(relativeRange);

return !Devices[i].GetDeviceUUID()
|| FreshDeviceIds.contains(Devices[i].GetDeviceUUID());
|| FreshDeviceIds.contains(Devices[i].GetDeviceUUID())
|| LaggingDeviceIds.contains(Devices[i].GetDeviceUUID());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ TNonreplicatedPartitionConfigPtr MakePartitionConfig(
NActors::TActorId(),
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
useSimpleMigrationBandwidthLimiter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_MIRROR3},
VolumeActorId,
false, // muteIOErrors
false, // muteIOErrors
std::move(freshDeviceIds),
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

for (auto& replica: Replicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ struct TEnv
4_KB,
volumeInfo,
NActors::TActorId(),
false, // muteIOErrors
false, // muteIOErrors
FreshDeviceIds,
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_MIRROR3},
VolumeActorId,
false, // muteIOErrors
false, // muteIOErrors
std::move(freshDeviceIds),
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

for (auto& replica: replicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_NONREPLICATED},
VolumeActorId,
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false
);
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false);

auto part = std::make_unique<TNonreplicatedPartitionMigrationActor>(
std::move(config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_NONREPLICATED},
VolumeActorId,
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionRdmaActor>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ struct TTestEnv
TNonreplicatedPartitionConfig::TVolumeInfo{Now(), params.MediaKind},
VolumeActorId,
params.MuteIOErrors,
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionActor>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_NONREPLICATED},
VolumeActorId,
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionActor>(
Expand Down
62 changes: 62 additions & 0 deletions cloud/blockstore/libs/storage/protos/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ message TDiskConfig

// A log of important events in the life of this disk.
repeated TDiskHistoryItem History = 17;

// A list of devices that are lagging begind on writes.
repeated TLaggingDevice LaggingDevices = 18;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -417,6 +420,41 @@ message TAgentStats

////////////////////////////////////////////////////////////////////////////////

message TLaggingDevice
{
// UUID of the lagging device.
string DeviceUUID = 1;

// Index of the lagging device in the replica.
uint32 RowIndex = 2;
}

////////////////////////////////////////////////////////////////////////////////

message TLaggingAgent
{
// Agent id.
string AgentId = 1;

// Index of the mirror disk replica.
// 0 - main devices
// 1,2 - replica devices
uint32 ReplicaIndex = 2;

// A list of devices that belong to the agent.
repeated TLaggingDevice Devices = 3;
}

////////////////////////////////////////////////////////////////////////////////

message TLaggingAgentsInfo
{
// A list of agents that lagging behind on writes.
repeated TLaggingAgent Agents = 1;
}

////////////////////////////////////////////////////////////////////////////////

message TDiskRegistryAgentListRequestParams
{
repeated string AgentIds = 1;
Expand Down Expand Up @@ -635,6 +673,9 @@ message TAllocateDiskResponse

// New devices used instead of recently replaced ones.
repeated string DeviceReplacementUUIDs = 8;

// Devices that had been lagging.
repeated TLaggingDevice RemovedLaggingDevices = 9;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1662,6 +1703,27 @@ message TGetAgentNodeIdResponse
bool Connected = 4;
}

////////////////////////////////////////////////////////////////////////////////
// Report that some of the devices were lagging.

message TAddLaggingDevicesRequest
{
// Optional request headers.
THeaders Headers = 1;

// Disk identifier to perform operations on.
string DiskId = 2;

// Devices that has been lagging.
repeated TLaggingDevice LaggingDevices = 3;
}

message TAddLaggingDevicesResponse
{
// Optional error, set only if error happened.
NCloud.NProto.TError Error = 1;
}

////////////////////////////////////////////////////////////////////////////////
// Get dependent disks

Expand Down
Loading

0 comments on commit cfdcc31

Please sign in to comment.