Skip to content

Commit

Permalink
reconfigure nbd device on disconnect from the other side #2791
Browse files Browse the repository at this point in the history
  • Loading branch information
tpashkin committed Jan 3, 2025
1 parent 7c94cff commit 78e08eb
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 64 deletions.
258 changes: 196 additions & 62 deletions cloud/blockstore/libs/endpoint_proxy/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <cloud/blockstore/libs/nbd/client.h>
#include <cloud/blockstore/libs/nbd/client_handler.h>
#include <cloud/blockstore/libs/nbd/device.h>
#include <cloud/blockstore/libs/nbd/error_handler.h>
#include <cloud/blockstore/libs/nbd/netlink_device.h>
#include <cloud/blockstore/libs/nbd/server.h>
#include <cloud/blockstore/libs/nbd/server_handler.h>
Expand All @@ -30,6 +31,7 @@

#include <library/cpp/logger/log.h>

#include <contrib/libs/grpc/include/grpcpp/alarm.h>
#include <contrib/libs/grpc/include/grpcpp/impl/codegen/completion_queue.h>
#include <contrib/libs/grpc/include/grpcpp/impl/codegen/status.h>
#include <contrib/libs/grpc/include/grpcpp/security/server_credentials.h>
Expand All @@ -55,6 +57,17 @@ namespace {

////////////////////////////////////////////////////////////////////////////////

constexpr auto NBD_CONNECTION_TIMEOUT = TDuration::Days(1);
constexpr bool NBD_RECONFIGURE_CONNECTED = true;
constexpr bool NBD_DELETE_DEVICE = false;

const auto MIN_RECONNECT_DELAY =
gpr_time_from_millis(100, gpr_clock_type::GPR_TIMESPAN);
const auto MAX_RECONNECT_DELAY =
gpr_time_from_minutes(10, gpr_clock_type::GPR_TIMESPAN);

////////////////////////////////////////////////////////////////////////////////

TString ReadFile(const TString& name)
{
return TFileInput(name).ReadAll();
Expand Down Expand Up @@ -160,6 +173,37 @@ struct TResizeRequestContext: TRequestContextBase
}
};

struct TRestartAlarmContext: TRequestContextBase
{
gpr_timespec Delay;
TString Socket;
grpc::Alarm Alarm;

TRestartAlarmContext(
TString socket,
grpc::ServerCompletionQueue& cq)
: Delay(MIN_RECONNECT_DELAY)
, Socket(std::move(socket))
{
Alarm.Set(&cq, gpr_now(gpr_clock_type::GPR_CLOCK_MONOTONIC), this);
}

TRestartAlarmContext(
TRestartAlarmContext* other,
grpc::ServerCompletionQueue& cq)
: Delay(other->Delay)
, Socket(other->Socket)
{
auto now = gpr_now(gpr_clock_type::GPR_CLOCK_MONOTONIC);
auto deadline = gpr_time_add(now, Delay);
Alarm.Set(&cq, deadline, this);

// double the delay for the next attempt
Delay = gpr_time_add(other->Delay, other->Delay);
Delay = gpr_time_min(Delay, MAX_RECONNECT_DELAY);
}
};

////////////////////////////////////////////////////////////////////////////////

struct TClientStorage: NStorage::NServer::IClientStorage
Expand Down Expand Up @@ -226,6 +270,24 @@ struct TServer: IEndpointProxyServer
};
THashMap<TString, std::shared_ptr<TEndpoint>> Socket2Endpoint;

struct TErrorHandler: NBD::IErrorHandler
{
TString Socket;
grpc::ServerCompletionQueue& CQ;

TErrorHandler(TString socket, grpc::ServerCompletionQueue& cq)
: Socket(std::move(socket))
, CQ(cq)
{}

void ProcessException(std::exception_ptr e) override
{
Y_UNUSED(e);

new TRestartAlarmContext(Socket, CQ);
}
};

NBD::IClientPtr NbdClient;

TServer(
Expand Down Expand Up @@ -460,6 +522,14 @@ struct TServer: IEndpointProxyServer
ProcessRequest(resizeRequestContext);
continue;
}

auto* restartAlarmContext =
dynamic_cast<TRestartAlarmContext*>(requestContext);
if (restartAlarmContext) {
ProcessAlarm(restartAlarmContext);
delete restartAlarmContext;
continue;
}
}

STORAGE_INFO("Exiting loop");
Expand All @@ -472,7 +542,7 @@ struct TServer: IEndpointProxyServer
template <typename TRequest>
void RequestReceived(const TRequest& request)
{
STORAGE_INFO(request.ShortDebugString().Quote() << "- Received");
STORAGE_INFO(request.ShortDebugString().Quote() << " - Received");
}

template <typename TResponse>
Expand Down Expand Up @@ -546,12 +616,13 @@ struct TServer: IEndpointProxyServer
TEndpoint& ep,
NProto::TStartProxyEndpointResponse& response)
{
const auto tag = TStringBuilder()
<< request.ShortDebugString().Quote() << " - ";

if (!ValidateRequest(request, response)) {
return;
}

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Validated request");
STORAGE_INFO(tag << "Validated request");

TNetworkAddress connectAddress(
MakeUnixSocketAddress(request.GetUnixSocketPath()));
Expand All @@ -560,8 +631,7 @@ struct TServer: IEndpointProxyServer
NBD::CreateClientHandler(Logging),
CreateBlockStoreStub());
ep.Client->Start();
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD client endpoint");
STORAGE_INFO(tag << "Started NBD client endpoint");

auto retryPolicy = CreateRetryPolicy(ClientConfig);
ep.RequestStats = CreateProxyRequestStats();
Expand All @@ -575,8 +645,7 @@ struct TServer: IEndpointProxyServer
Scheduler,
ep.RequestStats,
volumeStats);
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started DurableClient");
STORAGE_INFO(tag << "Started DurableClient");

// these options can actually be obtained from ClientHandler after the
// first request is processed (they will be available after connection
Expand All @@ -588,74 +657,26 @@ struct TServer: IEndpointProxyServer
ep.NbdOptions.BlockSize = request.GetBlockSize();
ep.NbdOptions.BlocksCount = request.GetBlocksCount();

auto handlerFactory = CreateServerHandlerFactory(
CreateDefaultDeviceHandlerFactory(),
Logging,
CreateProxyStorage(
ep.Client,
ep.RequestStats,
ep.NbdOptions.BlockSize),
CreateServerStatsStub(),
ep.NbdOptions);

ep.InternalUnixSocketPath = request.GetUnixSocketPath() + ".p";
ep.ListenAddress = std::make_unique<TNetworkAddress>(
MakeUnixSocketAddress(ep.InternalUnixSocketPath));

// TODO fix StartEndpoint signature - it's actually synchronous
auto startResult = NbdServer->StartEndpoint(
*ep.ListenAddress,
std::move(handlerFactory)).GetValueSync();

if (HasError(startResult)) {
*response.MutableError() = std::move(startResult);
auto status = StartServerEndpoint(ep, tag);
if (HasError(status)) {
*response.MutableError() = std::move(status);
return;
}
response.SetInternalUnixSocketPath(ep.InternalUnixSocketPath);

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD server endpoint");

ep.NbdDevicePath = request.GetNbdDevice();
if (!ep.NbdDevicePath) {
STORAGE_WARN(request.ShortDebugString().Quote()
<< " - NbdDevice missing - no nbd connection with the"
<< " kernel will be established");
return;
}

if (Config.Netlink) {
ep.NbdDevice = NBD::CreateNetlinkDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
Config.NbdRequestTimeout,
TDuration::Days(1), // connection timeout
true); // reconfigure device if exists
} else {
// The only case we want kernel to retry requests is when the socket
// is dead due to nbd server restart. And since we can't configure
// ioctl device to use a new socket, request timeout effectively
// becomes connection timeout
ep.NbdDevice = NBD::CreateDevice(
Logging,
*ep.ListenAddress,
request.GetNbdDevice(),
TDuration::Days(1)); // timeout
}

auto future = ep.NbdDevice->Start();
const auto& status = future.GetValue();
status = StartDevice(ep, tag);
if (HasError(status)) {
STORAGE_ERROR(request.ShortDebugString().Quote()
<< " - Unable to start nbd device: "
STORAGE_ERROR(tag << "Unable to start nbd device: "
<< status.GetMessage());
*response.MutableError() = std::move(status);
return;
}

STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Started NBD device connection");
}

void ProcessRequest(TStartRequestContext* requestContext)
Expand Down Expand Up @@ -718,7 +739,7 @@ struct TServer: IEndpointProxyServer
if (ep.NbdDevice) {
ep.NbdDevice->Stop(true);
STORAGE_INFO(request.ShortDebugString().Quote()
<< " - Stopped NBD device connection");
<< " - Stopped NBD device");
}

if (ep.ListenAddress) {
Expand Down Expand Up @@ -890,6 +911,119 @@ struct TServer: IEndpointProxyServer
ResponseSent(request);
}

NProto::TError StartServerEndpoint(TEndpoint& ep, const TString& tag)
{
auto handlerFactory = CreateServerHandlerFactory(
CreateDefaultDeviceHandlerFactory(),
Logging,
CreateProxyStorage(
ep.Client,
ep.RequestStats,
ep.NbdOptions.BlockSize),
CreateServerStatsStub(),
std::make_shared<TErrorHandler>(ep.UnixSocketPath, *CQ),
ep.NbdOptions);

// TODO fix StartEndpoint signature - it's actually synchronous
auto status = NbdServer->StartEndpoint(
*ep.ListenAddress,
std::move(handlerFactory)).GetValueSync();
if (HasError(status)) {
STORAGE_INFO(tag << "Unable to start NBD server endpoint: "
<< status.GetMessage());
return status;
}

STORAGE_INFO(tag << "Started NBD server endpoint");
return {};
}

NProto::TError StartDevice(TEndpoint& ep, const TString& tag)
{
if (!ep.NbdDevicePath) {
STORAGE_WARN(tag << "NbdDevice missing - nbd connection "
<< "with the kernel won't be established");
return {};
}

if (Config.Netlink) {
ep.NbdDevice = NBD::CreateNetlinkDevice(
Logging,
*ep.ListenAddress,
ep.NbdDevicePath,
Config.NbdRequestTimeout,
NBD_CONNECTION_TIMEOUT,
NBD_RECONFIGURE_CONNECTED);
} else {
// The only case we want kernel to retry requests is when the socket
// is dead due to nbd server restart. And since we can't configure
// ioctl device to use a new socket, request timeout effectively
// becomes connection timeout
ep.NbdDevice = NBD::CreateDevice(
Logging,
*ep.ListenAddress,
ep.NbdDevicePath,
NBD_CONNECTION_TIMEOUT);
}

auto status = ep.NbdDevice->Start().GetValueSync();
if (HasError(status)) {
STORAGE_ERROR(tag << "Unable to start NBD device: "
<< status.GetMessage());
return status;
}

STORAGE_INFO(tag << "Started NBD device");

return {};
}

void ProcessAlarm(TRestartAlarmContext* context)
{
const auto tag = TStringBuilder()
<< "UnixSocketPath: " << context->Socket.Quote() << " - ";

if (auto& ep = Socket2Endpoint[context->Socket]; ep) {
STORAGE_INFO(tag << "Restarting proxy endpoint");

if (DoProcessAlarm(*ep, tag)) {
STORAGE_ERROR(tag
<< "Unable to restart proxy endpoint, retry in "
<< gpr_time_to_millis(context->Delay) << "ms");
new TRestartAlarmContext(context, *CQ);
}
} else {
STORAGE_WARN(tag << "Unable to restart proxy endpoint: "
<< "original endpoint stopped");
}
}

// returns true in case of an error
bool DoProcessAlarm(TEndpoint& ep, const TString& tag)
{
if (ep.NbdDevice) {
ep.NbdDevice->Stop(NBD_DELETE_DEVICE).GetValueSync();
STORAGE_INFO(tag << "Stopped NBD device");
}

if (ep.ListenAddress) {
NbdServer->StopEndpoint(*ep.ListenAddress).GetValueSync();
STORAGE_INFO(tag << "Stopped NBD server endpoint");
}

auto status = StartServerEndpoint(ep, tag);
if (HasError(status)) {
return true;
}

status = StartDevice(ep, tag);
if (HasError(status)) {
return true;
}

return false;
}

void Start() override
{
PreStart();
Expand Down
1 change: 1 addition & 0 deletions cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class TNbdEndpointListener final
Logging,
std::move(session),
ServerStats,
CreateErrorHandlerStub(),
options);

auto address = TNetworkAddress(
Expand Down
Loading

0 comments on commit 78e08eb

Please sign in to comment.