diff --git a/cloud/blockstore/libs/endpoint_proxy/server/server.cpp b/cloud/blockstore/libs/endpoint_proxy/server/server.cpp index d3b16ee71aa..e189e57517a 100644 --- a/cloud/blockstore/libs/endpoint_proxy/server/server.cpp +++ b/cloud/blockstore/libs/endpoint_proxy/server/server.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include +#include #include #include #include @@ -55,6 +57,17 @@ namespace { //////////////////////////////////////////////////////////////////////////////// +constexpr auto NBD_CONNECTION_TIMEOUT = TDuration::Days(1); +constexpr bool NBD_RECONFIGURE_CONNECTED = true; +constexpr bool NBD_DELETE_DEVICE = false; + +const auto MIN_RECONNECT_DELAY = + gpr_time_from_millis(100, gpr_clock_type::GPR_TIMESPAN); +const auto MAX_RECONNECT_DELAY = + gpr_time_from_minutes(10, gpr_clock_type::GPR_TIMESPAN); + +//////////////////////////////////////////////////////////////////////////////// + TString ReadFile(const TString& name) { return TFileInput(name).ReadAll(); @@ -160,6 +173,37 @@ struct TResizeRequestContext: TRequestContextBase } }; +struct TRestartAlarmContext: TRequestContextBase +{ + gpr_timespec Delay; + TString Socket; + grpc::Alarm Alarm; + + TRestartAlarmContext( + TString socket, + grpc::ServerCompletionQueue& cq) + : Delay(MIN_RECONNECT_DELAY) + , Socket(std::move(socket)) + { + Alarm.Set(&cq, gpr_now(gpr_clock_type::GPR_CLOCK_MONOTONIC), this); + } + + TRestartAlarmContext( + TRestartAlarmContext* other, + grpc::ServerCompletionQueue& cq) + : Delay(other->Delay) + , Socket(other->Socket) + { + auto now = gpr_now(gpr_clock_type::GPR_CLOCK_MONOTONIC); + auto deadline = gpr_time_add(now, Delay); + Alarm.Set(&cq, deadline, this); + + // double the delay for the next attempt + Delay = gpr_time_add(other->Delay, other->Delay); + Delay = gpr_time_min(Delay, MAX_RECONNECT_DELAY); + } +}; + //////////////////////////////////////////////////////////////////////////////// struct TClientStorage: NStorage::NServer::IClientStorage @@ -226,6 +270,24 @@ struct TServer: IEndpointProxyServer }; THashMap> Socket2Endpoint; + struct TErrorHandler: NBD::IErrorHandler + { + TString Socket; + grpc::ServerCompletionQueue& CQ; + + TErrorHandler(TString socket, grpc::ServerCompletionQueue& cq) + : Socket(std::move(socket)) + , CQ(cq) + {} + + void ProcessException(std::exception_ptr e) override + { + Y_UNUSED(e); + + new TRestartAlarmContext(Socket, CQ); + } + }; + NBD::IClientPtr NbdClient; TServer( @@ -460,6 +522,14 @@ struct TServer: IEndpointProxyServer ProcessRequest(resizeRequestContext); continue; } + + auto* restartAlarmContext = + dynamic_cast(requestContext); + if (restartAlarmContext) { + ProcessAlarm(restartAlarmContext); + delete restartAlarmContext; + continue; + } } STORAGE_INFO("Exiting loop"); @@ -472,7 +542,7 @@ struct TServer: IEndpointProxyServer template void RequestReceived(const TRequest& request) { - STORAGE_INFO(request.ShortDebugString().Quote() << "- Received"); + STORAGE_INFO(request.ShortDebugString().Quote() << " - Received"); } template @@ -546,12 +616,13 @@ struct TServer: IEndpointProxyServer TEndpoint& ep, NProto::TStartProxyEndpointResponse& response) { + const auto tag = TStringBuilder() + << request.ShortDebugString().Quote() << " - "; + if (!ValidateRequest(request, response)) { return; } - - STORAGE_INFO(request.ShortDebugString().Quote() - << " - Validated request"); + STORAGE_INFO(tag << "Validated request"); TNetworkAddress connectAddress( MakeUnixSocketAddress(request.GetUnixSocketPath())); @@ -560,8 +631,7 @@ struct TServer: IEndpointProxyServer NBD::CreateClientHandler(Logging), CreateBlockStoreStub()); ep.Client->Start(); - STORAGE_INFO(request.ShortDebugString().Quote() - << " - Started NBD client endpoint"); + STORAGE_INFO(tag << "Started NBD client endpoint"); auto retryPolicy = CreateRetryPolicy(ClientConfig); ep.RequestStats = CreateProxyRequestStats(); @@ -575,8 +645,7 @@ struct TServer: IEndpointProxyServer Scheduler, ep.RequestStats, volumeStats); - STORAGE_INFO(request.ShortDebugString().Quote() - << " - Started DurableClient"); + STORAGE_INFO(tag << "Started DurableClient"); // these options can actually be obtained from ClientHandler after the // first request is processed (they will be available after connection @@ -588,74 +657,26 @@ struct TServer: IEndpointProxyServer ep.NbdOptions.BlockSize = request.GetBlockSize(); ep.NbdOptions.BlocksCount = request.GetBlocksCount(); - auto handlerFactory = CreateServerHandlerFactory( - CreateDefaultDeviceHandlerFactory(), - Logging, - CreateProxyStorage( - ep.Client, - ep.RequestStats, - ep.NbdOptions.BlockSize), - CreateServerStatsStub(), - ep.NbdOptions); - ep.InternalUnixSocketPath = request.GetUnixSocketPath() + ".p"; ep.ListenAddress = std::make_unique( MakeUnixSocketAddress(ep.InternalUnixSocketPath)); - // TODO fix StartEndpoint signature - it's actually synchronous - auto startResult = NbdServer->StartEndpoint( - *ep.ListenAddress, - std::move(handlerFactory)).GetValueSync(); - - if (HasError(startResult)) { - *response.MutableError() = std::move(startResult); + auto status = StartServerEndpoint(ep, tag); + if (HasError(status)) { + *response.MutableError() = std::move(status); return; } response.SetInternalUnixSocketPath(ep.InternalUnixSocketPath); - STORAGE_INFO(request.ShortDebugString().Quote() - << " - Started NBD server endpoint"); - ep.NbdDevicePath = request.GetNbdDevice(); - if (!ep.NbdDevicePath) { - STORAGE_WARN(request.ShortDebugString().Quote() - << " - NbdDevice missing - no nbd connection with the" - << " kernel will be established"); - return; - } - if (Config.Netlink) { - ep.NbdDevice = NBD::CreateNetlinkDevice( - Logging, - *ep.ListenAddress, - request.GetNbdDevice(), - Config.NbdRequestTimeout, - TDuration::Days(1), // connection timeout - true); // reconfigure device if exists - } else { - // The only case we want kernel to retry requests is when the socket - // is dead due to nbd server restart. And since we can't configure - // ioctl device to use a new socket, request timeout effectively - // becomes connection timeout - ep.NbdDevice = NBD::CreateDevice( - Logging, - *ep.ListenAddress, - request.GetNbdDevice(), - TDuration::Days(1)); // timeout - } - - auto future = ep.NbdDevice->Start(); - const auto& status = future.GetValue(); + status = StartDevice(ep, tag); if (HasError(status)) { - STORAGE_ERROR(request.ShortDebugString().Quote() - << " - Unable to start nbd device: " + STORAGE_ERROR(tag << "Unable to start nbd device: " << status.GetMessage()); *response.MutableError() = std::move(status); return; } - - STORAGE_INFO(request.ShortDebugString().Quote() - << " - Started NBD device connection"); } void ProcessRequest(TStartRequestContext* requestContext) @@ -718,7 +739,7 @@ struct TServer: IEndpointProxyServer if (ep.NbdDevice) { ep.NbdDevice->Stop(true); STORAGE_INFO(request.ShortDebugString().Quote() - << " - Stopped NBD device connection"); + << " - Stopped NBD device"); } if (ep.ListenAddress) { @@ -890,6 +911,119 @@ struct TServer: IEndpointProxyServer ResponseSent(request); } + NProto::TError StartServerEndpoint(TEndpoint& ep, const TString& tag) + { + auto handlerFactory = CreateServerHandlerFactory( + CreateDefaultDeviceHandlerFactory(), + Logging, + CreateProxyStorage( + ep.Client, + ep.RequestStats, + ep.NbdOptions.BlockSize), + CreateServerStatsStub(), + std::make_shared(ep.UnixSocketPath, *CQ), + ep.NbdOptions); + + // TODO fix StartEndpoint signature - it's actually synchronous + auto status = NbdServer->StartEndpoint( + *ep.ListenAddress, + std::move(handlerFactory)).GetValueSync(); + if (HasError(status)) { + STORAGE_INFO(tag << "Unable to start NBD server endpoint: " + << status.GetMessage()); + return status; + } + + STORAGE_INFO(tag << "Started NBD server endpoint"); + return {}; + } + + NProto::TError StartDevice(TEndpoint& ep, const TString& tag) + { + if (!ep.NbdDevicePath) { + STORAGE_WARN(tag << "NbdDevice missing - nbd connection " + << "with the kernel won't be established"); + return {}; + } + + if (Config.Netlink) { + ep.NbdDevice = NBD::CreateNetlinkDevice( + Logging, + *ep.ListenAddress, + ep.NbdDevicePath, + Config.NbdRequestTimeout, + NBD_CONNECTION_TIMEOUT, + NBD_RECONFIGURE_CONNECTED); + } else { + // The only case we want kernel to retry requests is when the socket + // is dead due to nbd server restart. And since we can't configure + // ioctl device to use a new socket, request timeout effectively + // becomes connection timeout + ep.NbdDevice = NBD::CreateDevice( + Logging, + *ep.ListenAddress, + ep.NbdDevicePath, + NBD_CONNECTION_TIMEOUT); + } + + auto status = ep.NbdDevice->Start().GetValueSync(); + if (HasError(status)) { + STORAGE_ERROR(tag << "Unable to start NBD device: " + << status.GetMessage()); + return status; + } + + STORAGE_INFO(tag << "Started NBD device"); + + return {}; + } + + void ProcessAlarm(TRestartAlarmContext* context) + { + const auto tag = TStringBuilder() + << "UnixSocketPath: " << context->Socket.Quote() << " - "; + + if (auto& ep = Socket2Endpoint[context->Socket]; ep) { + STORAGE_INFO(tag << "Restarting proxy endpoint"); + + if (DoProcessAlarm(*ep, tag)) { + STORAGE_ERROR(tag + << "Unable to restart proxy endpoint, retry in " + << gpr_time_to_millis(context->Delay) << "ms"); + new TRestartAlarmContext(context, *CQ); + } + } else { + STORAGE_WARN(tag << "Unable to restart proxy endpoint: " + << "original endpoint stopped"); + } + } + + // returns true in case of an error + bool DoProcessAlarm(TEndpoint& ep, const TString& tag) + { + if (ep.NbdDevice) { + ep.NbdDevice->Stop(NBD_DELETE_DEVICE).GetValueSync(); + STORAGE_INFO(tag << "Stopped NBD device"); + } + + if (ep.ListenAddress) { + NbdServer->StopEndpoint(*ep.ListenAddress).GetValueSync(); + STORAGE_INFO(tag << "Stopped NBD server endpoint"); + } + + auto status = StartServerEndpoint(ep, tag); + if (HasError(status)) { + return true; + } + + status = StartDevice(ep, tag); + if (HasError(status)) { + return true; + } + + return false; + } + void Start() override { PreStart(); diff --git a/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp b/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp index 49c6c736096..8e1be022c7b 100644 --- a/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp +++ b/cloud/blockstore/libs/endpoints_nbd/nbd_server.cpp @@ -53,6 +53,7 @@ class TNbdEndpointListener final Logging, std::move(session), ServerStats, + CreateErrorHandlerStub(), options); auto address = TNetworkAddress( diff --git a/cloud/blockstore/libs/nbd/error_handler.cpp b/cloud/blockstore/libs/nbd/error_handler.cpp new file mode 100644 index 00000000000..c13ded3b6f2 --- /dev/null +++ b/cloud/blockstore/libs/nbd/error_handler.cpp @@ -0,0 +1,27 @@ +#pragma once + +#include "error_handler.h" + +namespace NCloud::NBlockStore::NBD { + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TErrorHandlerStub + : IErrorHandler +{ + void ProcessException(std::exception_ptr) override + {} +}; + +//////////////////////////////////////////////////////////////////////////////// + +IErrorHandlerPtr CreateErrorHandlerStub() +{ + return std::make_shared(); +} + +} // namespace + +} // namespace NCloud::NBlockStore::NBD diff --git a/cloud/blockstore/libs/nbd/error_handler.h b/cloud/blockstore/libs/nbd/error_handler.h new file mode 100644 index 00000000000..b414e4323d7 --- /dev/null +++ b/cloud/blockstore/libs/nbd/error_handler.h @@ -0,0 +1,20 @@ +#pragma once + +#include "public.h" + +namespace NCloud::NBlockStore::NBD { + +//////////////////////////////////////////////////////////////////////////////// + +struct IErrorHandler +{ + virtual ~IErrorHandler() = default; + + virtual void ProcessException(std::exception_ptr) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +IErrorHandlerPtr CreateErrorHandlerStub(); + +} // namespace NCloud::NBlockStore::NBD diff --git a/cloud/blockstore/libs/nbd/netlink_device.cpp b/cloud/blockstore/libs/nbd/netlink_device.cpp index 22bc23407dc..c88a01afe16 100644 --- a/cloud/blockstore/libs/nbd/netlink_device.cpp +++ b/cloud/blockstore/libs/nbd/netlink_device.cpp @@ -108,9 +108,9 @@ TFuture TNetlinkDevice::Start() ConnectSocket(); Connect(); - } catch (const TServiceError& e) { + } catch (const std::exception& e) { StartResult.SetValue(MakeError( - e.GetCode(), + E_FAIL, TStringBuilder() << "unable to configure " << DeviceName << ": " << e.what())); } diff --git a/cloud/blockstore/libs/nbd/public.h b/cloud/blockstore/libs/nbd/public.h index eb07c9bcca2..190a657a569 100644 --- a/cloud/blockstore/libs/nbd/public.h +++ b/cloud/blockstore/libs/nbd/public.h @@ -35,4 +35,7 @@ using IDeviceFactoryPtr = std::shared_ptr; struct ILimiter; using ILimiterPtr = std::shared_ptr; +struct IErrorHandler; +using IErrorHandlerPtr = std::shared_ptr; + } // namespace NCloud::NBlockStore::NBD diff --git a/cloud/blockstore/libs/nbd/server.cpp b/cloud/blockstore/libs/nbd/server.cpp index 394069eeeeb..c83ad1242b9 100644 --- a/cloud/blockstore/libs/nbd/server.cpp +++ b/cloud/blockstore/libs/nbd/server.cpp @@ -181,6 +181,7 @@ class TConnection final if (!IsShuttingDown() && !c->Cancelled()) { STORAGE_INFO("lost connection with client, failed to receive: " << CurrentExceptionMessage()); + Handler->ProcessException(std::current_exception()); } } @@ -212,6 +213,7 @@ class TConnection final } catch (...) { STORAGE_INFO("lost connection with client, failed to send: " << CurrentExceptionMessage()); + Handler->ProcessException(std::current_exception()); } ReleaseRequest(response->RequestBytes); diff --git a/cloud/blockstore/libs/nbd/server_handler.cpp b/cloud/blockstore/libs/nbd/server_handler.cpp index a87095f6360..42802f368a4 100644 --- a/cloud/blockstore/libs/nbd/server_handler.cpp +++ b/cloud/blockstore/libs/nbd/server_handler.cpp @@ -1,5 +1,6 @@ #include "server_handler.h" +#include "error_handler.h" #include "protocol.h" #include "utils.h" @@ -44,6 +45,7 @@ class TServerHandler final const ILoggingServicePtr Logging; const IServerStatsPtr ServerStats; const IDeviceHandlerPtr DeviceHandler; + const IErrorHandlerPtr ErrorHandler; const TStorageOptions Options; TLog Log; @@ -58,10 +60,12 @@ class TServerHandler final ILoggingServicePtr logging, IServerStatsPtr serverStats, IDeviceHandlerPtr deviceHandler, + IErrorHandlerPtr errorHandler, const TStorageOptions& options) : Logging(std::move(logging)) , ServerStats(std::move(serverStats)) , DeviceHandler(std::move(deviceHandler)) + , ErrorHandler(std::move(errorHandler)) , Options(options) { Log = Logging->CreateLog("BLOCKSTORE_NBD"); @@ -107,6 +111,11 @@ class TServerHandler final size_t CollectRequests( const TIncompleteRequestsCollector& collector) override; + void ProcessException(std::exception_ptr e) override + { + ErrorHandler->ProcessException(e); + } + private: bool NegotiateClient(TRequestReader& in, TRequestWriter& out); @@ -900,6 +909,7 @@ class TServerHandlerFactory final const ILoggingServicePtr Logging; const IServerStatsPtr ServerStats; const IDeviceHandlerPtr DeviceHandler; + const IErrorHandlerPtr ErrorHandler; const TStorageOptions Options; public: @@ -907,10 +917,12 @@ class TServerHandlerFactory final ILoggingServicePtr logging, IServerStatsPtr serverStats, IDeviceHandlerPtr deviceHandler, + IErrorHandlerPtr errorHandler, const TStorageOptions& options) : Logging(std::move(logging)) , ServerStats(std::move(serverStats)) , DeviceHandler(std::move(deviceHandler)) + , ErrorHandler(std::move(errorHandler)) , Options(options) {} @@ -920,6 +932,7 @@ class TServerHandlerFactory final Logging, ServerStats, DeviceHandler, + ErrorHandler, Options); } }; @@ -933,6 +946,7 @@ IServerHandlerFactoryPtr CreateServerHandlerFactory( ILoggingServicePtr logging, IStoragePtr storage, IServerStatsPtr serverStats, + IErrorHandlerPtr errorHandler, const TStorageOptions& options) { auto deviceHandler = deviceHandlerFactory->CreateDeviceHandler( @@ -945,6 +959,7 @@ IServerHandlerFactoryPtr CreateServerHandlerFactory( std::move(logging), std::move(serverStats), std::move(deviceHandler), + std::move(errorHandler), options); } diff --git a/cloud/blockstore/libs/nbd/server_handler.h b/cloud/blockstore/libs/nbd/server_handler.h index 6c25378067a..0ab3a219b08 100644 --- a/cloud/blockstore/libs/nbd/server_handler.h +++ b/cloud/blockstore/libs/nbd/server_handler.h @@ -100,6 +100,9 @@ struct IServerHandler IInputStream& in, IOutputStream& out, TCont* cont) = 0; + + virtual void ProcessException( + std::exception_ptr e) = 0; }; //////////////////////////////////////////////////////////////////////////////// @@ -131,6 +134,7 @@ IServerHandlerFactoryPtr CreateServerHandlerFactory( ILoggingServicePtr logging, IStoragePtr storage, IServerStatsPtr serverStats, + IErrorHandlerPtr errorHandler, const TStorageOptions& options); } // namespace NCloud::NBlockStore::NBD diff --git a/cloud/blockstore/libs/nbd/server_handler_ut.cpp b/cloud/blockstore/libs/nbd/server_handler_ut.cpp index a7b7edaca80..c44c0bc26a8 100644 --- a/cloud/blockstore/libs/nbd/server_handler_ut.cpp +++ b/cloud/blockstore/libs/nbd/server_handler_ut.cpp @@ -1,5 +1,6 @@ #include "server_handler.h" +#include "error_handler.h" #include "protocol.h" #include "utils.h" @@ -477,6 +478,7 @@ Y_UNIT_TEST_SUITE(TServerHandlerTest) bootstrap->GetLogging(), bootstrap->GetStorage(), CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto handler = factory->CreateHandler(); @@ -507,6 +509,7 @@ Y_UNIT_TEST_SUITE(TServerHandlerTest) bootstrap->GetLogging(), bootstrap->GetStorage(), CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto handler = factory->CreateHandler(); @@ -543,6 +546,7 @@ Y_UNIT_TEST_SUITE(TServerHandlerTest) bootstrap->GetLogging(), bootstrap->GetStorage(), CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto handler = factory->CreateHandler(); @@ -563,6 +567,7 @@ Y_UNIT_TEST_SUITE(TServerHandlerTest) bootstrap->GetLogging(), bootstrap->GetStorage(), CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto handler = factory->CreateHandler(); @@ -584,6 +589,7 @@ Y_UNIT_TEST_SUITE(TServerHandlerTest) bootstrap->GetLogging(), bootstrap->GetStorage(), CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto handler = factory->CreateHandler(); @@ -660,6 +666,7 @@ Y_UNIT_TEST_SUITE(TServerHandlerTest) bootstrap->GetLogging(), bootstrap->GetStorage(), serverStats, + CreateErrorHandlerStub(), options); auto handler = factory->CreateHandler(); diff --git a/cloud/blockstore/libs/nbd/server_ut.cpp b/cloud/blockstore/libs/nbd/server_ut.cpp index 515d709f996..b53ae0dceec 100644 --- a/cloud/blockstore/libs/nbd/server_ut.cpp +++ b/cloud/blockstore/libs/nbd/server_ut.cpp @@ -274,6 +274,7 @@ std::unique_ptr CreateBootstrap( logging, std::move(storage), CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto client = CreateClient( @@ -1033,6 +1034,7 @@ Y_UNIT_TEST_SUITE(TServerTest) bootstrap->GetLogging(), storage, CreateServerStatsStub(), + CreateErrorHandlerStub(), options); auto future = bootstrap->GetServer()->StartEndpoint(