Skip to content

Commit

Permalink
Add lagging agent concept to the volume
Browse files Browse the repository at this point in the history
  • Loading branch information
komarevtsev-d committed Nov 19, 2024
1 parent afefb4a commit 21ff8af
Show file tree
Hide file tree
Showing 16 changed files with 779 additions and 14 deletions.
69 changes: 68 additions & 1 deletion cloud/blockstore/libs/storage/api/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace NCloud::NBlockStore::NStorage::NPartition {
xxx(Drain, __VA_ARGS__) \
// BLOCKSTORE_PARTITION_REQUESTS

// requests forwarded from service to partion
// requests forwarded from service to partition
#define BLOCKSTORE_PARTITION_REQUESTS_FWD_SERVICE(xxx, ...) \
xxx(ReadBlocks, __VA_ARGS__) \
xxx(WriteBlocks, __VA_ARGS__) \
Expand Down Expand Up @@ -82,6 +82,48 @@ struct TEvPartition
{
};

//
// AddLaggingAgent
//

struct TAddLaggingAgentRequest
{
// 0 - for main devices; 1,2 - for mirror replicas
const ui32 ReplicaIndex;
const TString AgentId;
TAddLaggingAgentRequest(
ui32 replicaIndex,
TString agentId)
: ReplicaIndex(replicaIndex)
, AgentId(std::move(agentId))
{}
};

struct TAddLaggingAgentResponse
{
};

//
// RemoveLaggingAgent
//

struct TRemoveLaggingAgentRequest
{
// 0 - for main devices; 1,2 - for mirror replicas
const ui32 ReplicaIndex;
const TString AgentId;
TRemoveLaggingAgentRequest(
ui32 replicaIndex,
TString agentId)
: ReplicaIndex(replicaIndex)
, AgentId(std::move(agentId))
{}
};

struct TRemoveLaggingAgentResponse
{
};

//
// Garbage collector finish report
//
Expand Down Expand Up @@ -115,6 +157,11 @@ struct TEvPartition

EvGarbageCollectorCompleted = EvBegin + 8,

EvAddLaggingAgentRequest = EvBegin + 9,
EvAddLaggingAgentResponse = EvBegin + 10,
EvRemoveLaggingAgentRequest = EvBegin + 11,
EvRemoveLaggingAgentResponse = EvBegin + 12,

EvEnd
};

Expand All @@ -132,6 +179,26 @@ struct TEvPartition
TGarbageCollectorCompleted,
EvGarbageCollectorCompleted
>;

using TEvAddLaggingAgentRequest = TRequestEvent<
TAddLaggingAgentRequest,
EvAddLaggingAgentRequest
>;

using TEvAddLaggingAgentResponse = TResponseEvent<
TAddLaggingAgentResponse,
EvAddLaggingAgentResponse
>;

using TEvRemoveLaggingAgentRequest = TRequestEvent<
TRemoveLaggingAgentRequest,
EvRemoveLaggingAgentRequest
>;

using TEvRemoveLaggingAgentResponse = TResponseEvent<
TRemoveLaggingAgentResponse,
EvRemoveLaggingAgentResponse
>;
};

} // namespace NCloud::NBlockStore::NStorage::NPartition
32 changes: 32 additions & 0 deletions cloud/blockstore/libs/storage/api/volume.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,25 @@ struct TEvVolume
{}
};

//
// DeviceTimeouted
//

struct TDeviceTimeoutedRequest
{
ui32 DeviceIndex;
TString DeviceUUID;

TDeviceTimeoutedRequest(ui32 deviceIndex, TString deviceUUID)
: DeviceIndex(deviceIndex)
, DeviceUUID(std::move(deviceUUID))
{}
};

struct TDeviceTimeoutedResponse
{
};

//
// Events declaration
//
Expand Down Expand Up @@ -331,6 +350,9 @@ struct TEvVolume
EvGetStorageConfigRequest = EvBegin + 58,
EvGetStorageConfigResponse = EvBegin + 59,

EvDeviceTimeoutedRequest = EvBegin + 60,
EvDeviceTimeoutedResponse = EvBegin + 61,

EvEnd
};

Expand Down Expand Up @@ -403,6 +425,16 @@ struct TEvVolume
TPreparePartitionMigrationResponse,
EvPreparePartitionMigrationResponse
>;

using TEvDeviceTimeoutedRequest = TRequestEvent<
TDeviceTimeoutedRequest,
EvDeviceTimeoutedRequest
>;

using TEvDeviceTimeoutedResponse = TResponseEvent<
TDeviceTimeoutedResponse,
EvDeviceTimeoutedResponse
>;
};

} // namespace NCloud::NBlockStore::NStorage
60 changes: 60 additions & 0 deletions cloud/blockstore/libs/storage/protos/disk.proto
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,45 @@ message TAgentStats

////////////////////////////////////////////////////////////////////////////////

message TLaggingDevice
{
// UUID of the lagging device.
string DeviceUUID = 1;

// Index of the lagging device in the replica.
uint32 DeviceIndex = 2;
}

////////////////////////////////////////////////////////////////////////////////

message TLaggingAgent
{
// Agent id.
string AgentId = 1;

// Index of the mirror disk replica.
// 0 - main devices
// 1,2 - replica devices
uint32 ReplicaIndex = 2;

// A list of devices that belong to the agent.
repeated TLaggingDevice Devices = 3;
}

////////////////////////////////////////////////////////////////////////////////

message TLaggingAgentsInfo
{
// A list of agents that lagging behind on writes.
repeated TLaggingAgent Agents = 1;

// Indicates that the volume should notify the DR about its fresh devices
// list.
bool NeedToSyncFreshDevicesWithDR = 2;
}

////////////////////////////////////////////////////////////////////////////////

message TDiskRegistryAgentListRequestParams
{
repeated string AgentIds = 1;
Expand Down Expand Up @@ -1613,6 +1652,27 @@ message TGetAgentNodeIdResponse
bool Connected = 4;
}

////////////////////////////////////////////////////////////////////////////////
// Mark lagging devices as fresh.

message TUpdateVolumeFreshDeviceListRequest
{
// Optional request headers.
THeaders Headers = 1;

// Disk identifier to perform operations on.
string DiskId = 2;

// Devices that has become fresh.
repeated string VolumeFreshDeviceUUIDs = 3;
}

message TUpdateVolumeFreshDeviceListResponse
{
// Optional error, set only if error happened.
NCloud.NProto.TError Error = 1;
}

////////////////////////////////////////////////////////////////////////////////
// Get dependent disks

Expand Down
6 changes: 6 additions & 0 deletions cloud/blockstore/libs/storage/protos_ydb/volume.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ message TVolumeMeta
// We don't allow clients with old sequential number to mount disk for read/write
// in order to prevent data corruption during disk filling.
uint64 FillSeqNumber = 16;

// A list of agents that lagging behind on writes. Used only for mirror
// disks. An agent can exit this state if it starts to respond to requests,
// or if the volume has been restarted/reallocated (i.e. the partition has
// restarted), or if the DR has replaced all lagging devices.
TLaggingAgentsInfo LaggingAgentsInfo = 17;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
99 changes: 99 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/helpers.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#include "helpers.h"

#include <cloud/storage/core/libs/common/media.h>

#include <util/generic/algorithm.h>
#include <util/generic/vector.h>

namespace NCloud::NBlockStore::NStorage {

bool TLaggingDeviceIndexCmp::operator()(
const NProto::TLaggingDevice& lhs,
const NProto::TLaggingDevice& rhs) const
{
return lhs.GetDeviceIndex() < rhs.GetDeviceIndex();
}

TVector<TString> ClearLaggingDevices(NProto::TVolumeMeta& meta)
{
TVector<TString> result;
if (!IsReliableDiskRegistryMediaKind(
meta.GetConfig().GetStorageMediaKind()) ||
meta.GetLaggingAgentsInfo().GetAgents().empty())
{
return result;
}

auto getReplicaDevices = [&meta](i32 replicaIndex)
{
if (replicaIndex == 0) {
return meta.GetDevices();
}
Y_DEBUG_ABORT_UNLESS(meta.GetReplicas().size() > replicaIndex - 1);
return meta.GetReplicas()[replicaIndex - 1].GetDevices();
};

for (const auto& agent: meta.GetLaggingAgentsInfo().GetAgents()) {
const auto& devices = getReplicaDevices(agent.GetReplicaIndex());

for (const auto& laggingDevice: agent.GetDevices()) {
const i32 index = laggingDevice.GetDeviceIndex();
Y_DEBUG_ABORT_UNLESS(devices.size() > index);
result.push_back(devices[index].GetDeviceUUID());
}
}

meta.MutableLaggingAgentsInfo()->MutableAgents()->Clear();
meta.MutableLaggingAgentsInfo()->SetNeedToSyncFreshDevicesWithDR(true);
return result;
}

std::optional<ui32> GetAgentDevicesIndexes(
const NProto::TVolumeMeta& meta,
const TString& agentId,
TVector<NProto::TLaggingDevice>& laggingDevices)
{
std::optional<ui32> replicaIndex;
const google::protobuf::RepeatedPtrField<NProto::TDeviceConfig>*
replicaDevices = nullptr;
const auto agentIdPred = [&agentId](const NProto::TDeviceConfig& device)
{
return device.GetAgentId() == agentId;
};
if (AnyOf(meta.GetDevices().begin(), meta.GetDevices().end(), agentIdPred))
{
replicaIndex = 0;
replicaDevices = &meta.GetDevices();
}

for (int i = 0; !replicaIndex && i < meta.GetReplicas().size(); i++) {
if (AnyOf(
meta.GetReplicas()[i].GetDevices().begin(),
meta.GetReplicas()[i].GetDevices().end(),
agentIdPred))
{
replicaIndex = i + 1;
replicaDevices = &meta.GetReplicas()[i].GetDevices();
break;
}
}

// There is no devices from desired agent.
if (!replicaIndex) {
return replicaIndex;
}

for (int i = 0; i < replicaDevices->size(); i++) {
const auto& device = (*replicaDevices)[i];
if (device.GetAgentId() == agentId) {
NProto::TLaggingDevice laggingDevice;
laggingDevice.SetDeviceIndex(i);
laggingDevice.SetDeviceUUID(device.GetDeviceUUID());
laggingDevices.push_back(std::move(laggingDevice));
}
}

return replicaIndex;
}

} // namespace NCloud::NBlockStore::NStorage
24 changes: 24 additions & 0 deletions cloud/blockstore/libs/storage/volume/model/helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "cloud/blockstore/libs/storage/protos_ydb/volume.pb.h"

#include <cloud/blockstore/libs/storage/protos/disk.pb.h>

namespace NCloud::NBlockStore::NStorage {

struct TLaggingDeviceIndexCmp
{
bool operator()(
const NProto::TLaggingDevice& lhs,
const NProto::TLaggingDevice& rhs) const;
};

[[nodiscard]] TVector<TString> ClearLaggingDevices(
NProto::TVolumeMeta& meta);

[[nodiscard]] std::optional<ui32> GetAgentDevicesIndexes(
const NProto::TVolumeMeta& meta,
const TString& agentId,
TVector<NProto::TLaggingDevice>& laggingDevices);

} // namespace NCloud::NBlockStore::NStorage
1 change: 1 addition & 0 deletions cloud/blockstore/libs/storage/volume/model/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ SRCS(
checkpoint.cpp
checkpoint_light.cpp
client_state.cpp
helpers.cpp
merge.cpp
meta.cpp
requests_inflight.cpp
Expand Down
Loading

0 comments on commit 21ff8af

Please sign in to comment.