Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lagging agent concept to the volume #2524

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud/blockstore/libs/storage/api/disk_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace NCloud::NBlockStore::NStorage {
xxx(GetCheckpointDataState, __VA_ARGS__) \
xxx(SetCheckpointDataState, __VA_ARGS__) \
xxx(GetAgentNodeId, __VA_ARGS__) \
xxx(AddLaggingDevices, __VA_ARGS__) \
// BLOCKSTORE_DISK_REGISTRY_REQUESTS_PROTO

// requests forwarded from service to disk_registry
Expand Down Expand Up @@ -211,6 +212,9 @@ struct TEvDiskRegistry
EvQueryAgentsInfoRequest = EvBegin + 75,
EvQueryAgentsInfoResponse = EvBegin + 76,

EvAddLaggingDevicesRequest = EvBegin + 77,
EvAddLaggingDevicesResponse = EvBegin + 78,

EvEnd
};

Expand Down
43 changes: 42 additions & 1 deletion cloud/blockstore/libs/storage/api/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
xxx(Drain, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_REQUESTS

// requests forwarded from service to partion
// requests forwarded from service to partition
#define BLOCKSTORE_PARTITION_REQUESTS_FWD_SERVICE(xxx, ...) \
xxx(ReadBlocks, __VA_ARGS__) \
xxx(WriteBlocks, __VA_ARGS__) \
Expand Down Expand Up @@ -94,6 +94,34 @@ struct TEvPartition
{}
};

//
// AddLaggingAgent
//

struct TAddLaggingAgentRequest
{
// 0 - for main devices; 1,2 - for mirror replicas
ui32 ReplicaIndex;
TString AgentId;
TAddLaggingAgentRequest(ui32 replicaIndex, TString agentId)
: ReplicaIndex(replicaIndex)
, AgentId(std::move(agentId))
{}
};

//
// RemoveLaggingAgent
//

struct TRemoveLaggingReplicaRequest
{
// 0 - for main devices; 1,2 - for mirror replicas
const ui32 ReplicaIndex;
explicit TRemoveLaggingReplicaRequest(ui32 replicaIndex)
: ReplicaIndex(replicaIndex)
{}
};

//
// Events declaration
//
Expand All @@ -115,6 +143,9 @@ struct TEvPartition

EvGarbageCollectorCompleted = EvBegin + 8,

EvAddLaggingAgentRequest = EvBegin + 9,
EvRemoveLaggingReplicaRequest = EvBegin + 10,

EvEnd
};

Expand All @@ -132,6 +163,16 @@ struct TEvPartition
TGarbageCollectorCompleted,
EvGarbageCollectorCompleted
>;

using TEvAddLaggingAgentRequest = TRequestEvent<
TAddLaggingAgentRequest,
EvAddLaggingAgentRequest
>;

using TEvRemoveLaggingReplicaRequest = TRequestEvent<
TRemoveLaggingReplicaRequest,
EvRemoveLaggingReplicaRequest
>;
};

} // namespace NCloud::NBlockStore::NStorage::NPartition
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,18 @@ void TDiskRegistryActor::HandleOperationCompleted(
Actors.erase(ev->Sender);
}

void TDiskRegistryActor::HandleAddLaggingDevices(
const TEvDiskRegistry::TEvAddLaggingDevicesRequest::TPtr& ev,
const TActorContext& ctx)
{
Y_UNUSED(ev);
Y_UNUSED(ctx);

BLOCKSTORE_DISK_REGISTRY_COUNTER(AddLaggingDevices);

// TODO(komarevtsev-d): Implement this.
}

////////////////////////////////////////////////////////////////////////////////

STFUNC(TDiskRegistryActor::StateBoot)
Expand Down
20 changes: 19 additions & 1 deletion cloud/blockstore/libs/storage/partition_nonrepl/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class TNonreplicatedPartitionConfig
const NActors::TActorId ParentActorId;
const bool MuteIOErrors;
const THashSet<TString> FreshDeviceIds;
// List of devices that have outdated data. Can only appear on mirror disks.
const THashSet<TString> LaggingDeviceIds;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Добавил в partition config список девайсов которые отстают.
Будет использоваться для двух вещей: mirror partition не читает из таких из-за DevicesReadyForReading() и в nonrepl partition тоже будет предохранитель "на всякий случай" от наших сервисных запросов.

const TDuration MaxTimedOutDeviceStateDuration;
const bool MaxTimedOutDeviceStateDurationOverridden;
const bool UseSimpleMigrationBandwidthLimiter;
Expand All @@ -77,6 +79,7 @@ class TNonreplicatedPartitionConfig
NActors::TActorId parentActorId,
bool muteIOErrors,
THashSet<TString> freshDeviceIds,
THashSet<TString> laggingDeviceIds,
TDuration maxTimedOutDeviceStateDuration,
bool maxTimedOutDeviceStateDurationOverridden,
bool useSimpleMigrationBandwidthLimiter)
Expand All @@ -88,6 +91,7 @@ class TNonreplicatedPartitionConfig
, ParentActorId(std::move(parentActorId))
, MuteIOErrors(muteIOErrors)
, FreshDeviceIds(std::move(freshDeviceIds))
, LaggingDeviceIds(std::move(laggingDeviceIds))
, MaxTimedOutDeviceStateDuration(maxTimedOutDeviceStateDuration)
, MaxTimedOutDeviceStateDurationOverridden(maxTimedOutDeviceStateDurationOverridden)
, UseSimpleMigrationBandwidthLimiter(useSimpleMigrationBandwidthLimiter)
Expand All @@ -111,6 +115,13 @@ class TNonreplicatedPartitionConfig
}
}

THashSet<TString> laggingDeviceIds;
for (const auto& device: devices) {
if (LaggingDeviceIds.contains(device.GetDeviceUUID())) {
laggingDeviceIds.insert(device.GetDeviceUUID());
}
}

return std::make_shared<TNonreplicatedPartitionConfig>(
std::move(devices),
IOMode,
Expand All @@ -120,6 +131,7 @@ class TNonreplicatedPartitionConfig
ParentActorId,
MuteIOErrors,
std::move(freshDeviceIds),
std::move(laggingDeviceIds),
MaxTimedOutDeviceStateDuration,
MaxTimedOutDeviceStateDurationOverridden,
UseSimpleMigrationBandwidthLimiter
Expand Down Expand Up @@ -176,6 +188,11 @@ class TNonreplicatedPartitionConfig
return FreshDeviceIds;
}

const auto& GetLaggingDeviceIds() const
{
return LaggingDeviceIds;
}

auto GetMaxTimedOutDeviceStateDuration() const
{
return MaxTimedOutDeviceStateDuration;
Expand Down Expand Up @@ -229,7 +246,8 @@ class TNonreplicatedPartitionConfig
Y_UNUSED(relativeRange);

return !Devices[i].GetDeviceUUID()
|| FreshDeviceIds.contains(Devices[i].GetDeviceUUID());
|| FreshDeviceIds.contains(Devices[i].GetDeviceUUID())
|| LaggingDeviceIds.contains(Devices[i].GetDeviceUUID());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ TNonreplicatedPartitionConfigPtr MakePartitionConfig(
NActors::TActorId(),
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
useSimpleMigrationBandwidthLimiter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_MIRROR3},
VolumeActorId,
false, // muteIOErrors
false, // muteIOErrors
std::move(freshDeviceIds),
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

for (auto& replica: Replicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ struct TEnv
4_KB,
volumeInfo,
NActors::TActorId(),
false, // muteIOErrors
false, // muteIOErrors
FreshDeviceIds,
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_MIRROR3},
VolumeActorId,
false, // muteIOErrors
false, // muteIOErrors
std::move(freshDeviceIds),
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
);

for (auto& replica: replicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_NONREPLICATED},
VolumeActorId,
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionMigrationActor>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_NONREPLICATED},
VolumeActorId,
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionRdmaActor>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ struct TTestEnv
TNonreplicatedPartitionConfig::TVolumeInfo{Now(), params.MediaKind},
VolumeActorId,
params.MuteIOErrors,
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionActor>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,12 @@ struct TTestEnv
// only SSD/HDD distinction matters
NProto::STORAGE_MEDIA_SSD_NONREPLICATED},
VolumeActorId,
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
false, // muteIOErrors
THashSet<TString>(), // freshDeviceIds
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
true // useSimpleMigrationBandwidthLimiter
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clang-format

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Он так и сделал

);

auto part = std::make_unique<TNonreplicatedPartitionActor>(
Expand Down
65 changes: 65 additions & 0 deletions cloud/blockstore/libs/storage/protos/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ message TDiskConfig

// A log of important events in the life of this disk.
repeated TDiskHistoryItem History = 17;

// A list of devices that are lagging behind on writes.
repeated TLaggingDevice LaggingDevices = 18;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -417,6 +420,44 @@ message TAgentStats

////////////////////////////////////////////////////////////////////////////////

message TLaggingDevice
{
// UUID of the lagging device.
string DeviceUUID = 1;

// Index of the lagging device in the replica.
uint32 RowIndex = 2;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

может удобнее тут иметь индекс реплики ?
uint32 ReplicaIndex = 3;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я чет не придумал где оно сильно пригодиться может

}

////////////////////////////////////////////////////////////////////////////////

message TLaggingAgent
{
// Agent id.
string AgentId = 1;

// Node that agent is running on.
uint32 NodeId = 2;

// Index of the mirror disk replica.
// 0 - main devices
// 1,2 - replica devices
uint32 ReplicaIndex = 3;

// A list of devices that belong to the agent.
repeated TLaggingDevice Devices = 4;
}

////////////////////////////////////////////////////////////////////////////////

message TLaggingAgentsInfo
{
// A list of agents that lagging behind on writes.
repeated TLaggingAgent Agents = 1;
}

////////////////////////////////////////////////////////////////////////////////

message TDiskRegistryAgentListRequestParams
{
repeated string AgentIds = 1;
Expand Down Expand Up @@ -635,6 +676,9 @@ message TAllocateDiskResponse

// New devices used instead of recently replaced ones.
repeated string DeviceReplacementUUIDs = 8;

// Devices that had been lagging.
repeated TLaggingDevice RemovedLaggingDevices = 9;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1662,6 +1706,27 @@ message TGetAgentNodeIdResponse
bool Connected = 4;
}

////////////////////////////////////////////////////////////////////////////////
// Report that some of the devices were lagging.

message TAddLaggingDevicesRequest
{
// Optional request headers.
THeaders Headers = 1;

// Disk identifier to perform operations on.
string DiskId = 2;

// Devices that has been lagging.
repeated TLaggingDevice LaggingDevices = 3;
}

message TAddLaggingDevicesResponse
{
// Optional error, set only if error happened.
NCloud.NProto.TError Error = 1;
}

////////////////////////////////////////////////////////////////////////////////
// Get dependent disks

Expand Down
Loading
Loading