Skip to content

Commit

Permalink
Fix review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
komarevtsev-d committed Jan 14, 2025
1 parent 2db4468 commit 68ae225
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ struct TTestEnv
THashSet<TString>(), // laggingDeviceIds
TDuration::Zero(), // maxTimedOutDeviceStateDuration
false, // maxTimedOutDeviceStateDurationOverridden
false);
false // useSimpleMigrationBandwidthLimiter
);

auto part = std::make_unique<TNonreplicatedPartitionMigrationActor>(
std::move(config),
Expand Down
9 changes: 6 additions & 3 deletions cloud/blockstore/libs/storage/protos/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ message TDiskConfig
// A log of important events in the life of this disk.
repeated TDiskHistoryItem History = 17;

// A list of devices that are lagging begind on writes.
// A list of devices that are lagging behind on writes.
repeated TLaggingDevice LaggingDevices = 18;
}

Expand Down Expand Up @@ -436,13 +436,16 @@ message TLaggingAgent
// Agent id.
string AgentId = 1;

// Node that agent is running on.
uint32 NodeId = 2;

// Index of the mirror disk replica.
// 0 - main devices
// 1,2 - replica devices
uint32 ReplicaIndex = 2;
uint32 ReplicaIndex = 3;

// A list of devices that belong to the agent.
repeated TLaggingDevice Devices = 3;
repeated TLaggingDevice Devices = 4;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
59 changes: 45 additions & 14 deletions cloud/blockstore/libs/storage/volume/model/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@ const NProto::TDeviceConfig* FindDeviceConfig(
return device.GetDeviceUUID() == deviceUUID;
};

const NProto::TDeviceConfig* timeoutedDeviceConfig = nullptr;
timeoutedDeviceConfig = FindIfPtr(meta.GetDevices(), deviceMatcher);
if (timeoutedDeviceConfig) {
return timeoutedDeviceConfig;
const NProto::TDeviceConfig* deviceConfig = nullptr;
deviceConfig = FindIfPtr(meta.GetDevices(), deviceMatcher);
if (deviceConfig) {
return deviceConfig;
}

for (const auto& replica: meta.GetReplicas()) {
timeoutedDeviceConfig = FindIfPtr(replica.GetDevices(), deviceMatcher);
if (timeoutedDeviceConfig) {
return timeoutedDeviceConfig;
deviceConfig = FindIfPtr(replica.GetDevices(), deviceMatcher);
if (deviceConfig) {
return deviceConfig;
}
}

Expand All @@ -106,8 +106,9 @@ const NProto::TDeviceConfig* FindDeviceConfig(
std::optional<ui32> GetAgentDevicesIndexes(
const NProto::TVolumeMeta& meta,
ui32 agentNodeId,
TVector<NProto::TLaggingDevice>& laggingDevices)
TVector<NProto::TLaggingDevice>* laggingDevices)
{
Y_DEBUG_ABORT_UNLESS(laggingDevices);
std::optional<ui32> replicaIndex;
const RepeatedPtrField<NProto::TDeviceConfig>* replicaDevices = nullptr;
const auto deviceMatcher = [agentNodeId](const NProto::TDeviceConfig& device)
Expand Down Expand Up @@ -159,7 +160,7 @@ std::optional<ui32> GetAgentDevicesIndexes(
NProto::TLaggingDevice laggingDevice;
laggingDevice.SetRowIndex(i);
laggingDevice.SetDeviceUUID(device.GetDeviceUUID());
laggingDevices.push_back(std::move(laggingDevice));
laggingDevices->push_back(std::move(laggingDevice));
}
}

Expand All @@ -182,7 +183,7 @@ std::optional<ui32> GetAgentDevicesIndexes(
}
laggingDevice.SetRowIndex(*sourceDeviceIndex);
laggingDevice.SetDeviceUUID(targetDevice.GetDeviceUUID());
laggingDevices.push_back(std::move(laggingDevice));
laggingDevices->push_back(std::move(laggingDevice));
}
}

Expand All @@ -191,12 +192,12 @@ std::optional<ui32> GetAgentDevicesIndexes(

TSet<ui32> ReplicaIndexesWithFreshDevices(
const NProto::TVolumeMeta& meta,
NProto::TLaggingDevice device)
ui32 rowIndex)
{
TSet<ui32> result;
auto it = Find(
meta.GetFreshDeviceIds(),
meta.GetDevices()[device.GetRowIndex()].GetDeviceUUID());
meta.GetDevices()[rowIndex].GetDeviceUUID());
if (it != meta.GetFreshDeviceIds().end()) {
result.insert(0);
}
Expand All @@ -205,7 +206,7 @@ TSet<ui32> ReplicaIndexesWithFreshDevices(
for (int i = 0; i < replicas.size(); i++) {
auto it = Find(
meta.GetFreshDeviceIds(),
replicas[i].GetDevices()[device.GetRowIndex()].GetDeviceUUID());
replicas[i].GetDevices()[rowIndex].GetDeviceUUID());
if (it != meta.GetFreshDeviceIds().end()) {
result.insert(i + 1);
}
Expand All @@ -215,7 +216,7 @@ TSet<ui32> ReplicaIndexesWithFreshDevices(

void RemoveLaggingDevicesFromMeta(
NProto::TVolumeMeta& meta,
const TVector<TString> laggingDeviceIds)
const TVector<TString>& laggingDeviceIds)
{
for (auto& agent: *meta.MutableLaggingAgentsInfo()->MutableAgents()) {
EraseIf(
Expand All @@ -236,4 +237,34 @@ void RemoveLaggingDevicesFromMeta(
}
}

void UpdateLaggingDevicesAfterMetaUpdate(NProto::TVolumeMeta& meta)
{
for (auto& agent: *meta.MutableLaggingAgentsInfo()->MutableAgents()) {
agent.ClearDevices();

TVector<NProto::TLaggingDevice> updatedLaggingDevices;
auto replicaIndex = GetAgentDevicesIndexes(
meta,
agent.GetNodeId(),
&updatedLaggingDevices);
if (!replicaIndex.has_value()) {
continue;
}

Y_DEBUG_ABORT_UNLESS(*replicaIndex == agent.GetReplicaIndex());
Y_DEBUG_ABORT_UNLESS(!updatedLaggingDevices.empty());
for (auto& laggingDevice: updatedLaggingDevices) {
*agent.AddDevices() = std::move(laggingDevice);
}
}

EraseIf(
*meta.MutableLaggingAgentsInfo()->MutableAgents(),
[](const NProto::TLaggingAgent& laggingAgent)
{ return laggingAgent.GetDevices().empty(); });
if (meta.GetLaggingAgentsInfo().GetAgents().empty()) {
meta.MutableLaggingAgentsInfo()->Clear();
}
}

} // namespace NCloud::NBlockStore::NStorage
12 changes: 5 additions & 7 deletions cloud/blockstore/libs/storage/volume/model/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ struct TLaggingDeviceIndexCmp
[[nodiscard]] std::optional<ui32> GetAgentDevicesIndexes(
const NProto::TVolumeMeta& meta,
ui32 agentNodeId,
TVector<NProto::TLaggingDevice>& laggingDevices);
TVector<NProto::TLaggingDevice>* laggingDevices);

[[nodiscard]] TSet<ui32> ReplicaIndexesWithFreshDevices(
const NProto::TVolumeMeta& meta,
NProto::TLaggingDevice device);

[[nodiscard]] bool CheckReplicasPlacementAreCorrect(
const NProto::TVolumeMeta& meta,
ui32 agentNodeId);
ui32 rowIndex);

void RemoveLaggingDevicesFromMeta(
NProto::TVolumeMeta& meta,
const TVector<TString> laggingDeviceIds);
const TVector<TString>& laggingDeviceIds);

void UpdateLaggingDevicesAfterMetaUpdate(NProto::TVolumeMeta& meta);

} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ NProto::TVolumeMeta CreateNewMeta(
newMeta.SetIOModeTs(args.IOModeTs.MicroSeconds());
newMeta.SetMuteIOErrors(args.MuteIOErrors);
RemoveLaggingDevicesFromMeta(newMeta, args.RemovedLaggingDeviceIds);
UpdateLaggingDevicesAfterMetaUpdate(newMeta);

return newMeta;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ void TVolumeActor::HandleDeviceTimeouted(
}

TVector<NProto::TLaggingDevice> timeoutedAgentDevices;
const auto replicaIndex = GetAgentDevicesIndexes(
const auto timeoutedDeviceReplicaIndex = GetAgentDevicesIndexes(
meta,
timeoutedDeviceConfig->GetNodeId(),
timeoutedAgentDevices);
&timeoutedAgentDevices);
Y_DEBUG_ABORT_UNLESS(!timeoutedAgentDevices.empty());
Y_DEBUG_ABORT_UNLESS(replicaIndex);
Y_DEBUG_ABORT_UNLESS(timeoutedDeviceReplicaIndex);

for (const auto& laggingAgent: meta.GetLaggingAgentsInfo().GetAgents()) {
// Whether the agent is lagging already.
Expand All @@ -155,7 +155,7 @@ void TVolumeActor::HandleDeviceTimeouted(
ctx,
State->GetDiskRegistryBasedPartitionActor(),
std::make_unique<TEvPartition::TEvAddLaggingAgentRequest>(
*replicaIndex,
*timeoutedDeviceReplicaIndex,
timeoutedDeviceConfig->GetAgentId()));

auto response =
Expand Down Expand Up @@ -186,9 +186,8 @@ void TVolumeActor::HandleDeviceTimeouted(
TLaggingDeviceIndexCmp());

if (!rowIndexesIntersection.empty()) {
Y_DEBUG_ABORT_UNLESS(
laggingAgent.GetReplicaIndex() != replicaIndex);

// TODO(komarevtsev-d): Allow source and target of the migration to
// lag at the same time.
LOG_WARN(
ctx,
TBlockStoreComponents::VOLUME,
Expand All @@ -213,9 +212,9 @@ void TVolumeActor::HandleDeviceTimeouted(
// Check for fresh devices in the same row.
for (const auto& laggingDevice: timeoutedAgentDevices) {
TSet<ui32> replicaIndexes =
ReplicaIndexesWithFreshDevices(meta, laggingDevice);
ReplicaIndexesWithFreshDevices(meta, laggingDevice.GetRowIndex());
const bool laggingDeviceIsFresh =
replicaIndexes.contains(*replicaIndex);
replicaIndexes.contains(*timeoutedDeviceReplicaIndex);
if (replicaIndexes.size() - laggingDeviceIsFresh > 0) {
LOG_WARN(
ctx,
Expand All @@ -239,7 +238,8 @@ void TVolumeActor::HandleDeviceTimeouted(

NProto::TLaggingAgent unavailableAgent;
unavailableAgent.SetAgentId(timeoutedDeviceConfig->GetAgentId());
unavailableAgent.SetReplicaIndex(*replicaIndex);
unavailableAgent.SetNodeId(timeoutedDeviceConfig->GetNodeId());
unavailableAgent.SetReplicaIndex(*timeoutedDeviceReplicaIndex);
unavailableAgent.MutableDevices()->Assign(
timeoutedAgentDevices.begin(),
timeoutedAgentDevices.end());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ void TVolumeActor::FinishUpdateVolumeConfig(const TActorContext& ctx)
RemoveLaggingDevicesFromMeta(
newMeta,
UnfinishedUpdateVolumeConfig.RemovedLaggingDeviceIds);
UpdateLaggingDevicesAfterMetaUpdate(newMeta);
}

UnfinishedUpdateVolumeConfig.Devices = {};
Expand Down
Loading

0 comments on commit 68ae225

Please sign in to comment.