diff --git a/cloud/blockstore/libs/discovery/test_server.cpp b/cloud/blockstore/libs/discovery/test_server.cpp index 0a636d5ccea..689e561b071 100644 --- a/cloud/blockstore/libs/discovery/test_server.cpp +++ b/cloud/blockstore/libs/discovery/test_server.cpp @@ -17,6 +17,8 @@ #include #include +#include + namespace NCloud::NBlockStore::NDiscovery { namespace { @@ -79,6 +81,7 @@ struct TFakeBlockStoreServer::TImpl ui32 LastByteCount = 0; bool DropPingRequests = false; TVector> DroppedRequests; + TAtomic ShouldStop = 0; TImpl( ui16 port, @@ -102,12 +105,19 @@ struct TFakeBlockStoreServer::TImpl } if (Server) { - Server->Shutdown(); - CQ->Shutdown(); - + // Need to stop the loop explicitly to avoid possible data race + // with grpc server shutdown. + // See https://github.com/ydb-platform/nbs/issues/2809 + AtomicSet(ShouldStop, 1); if (Thread) { Thread->Join(); } + + Server->Shutdown(); + CQ->Shutdown(); + // Need to drain completion queue, otherwise it fails on assert when destroying: + // https://github.com/ydb-platform/nbs/blob/fbf6b9fa568b7b3861fbccffcf3177ee445f498a/contrib/libs/grpc/src/core/lib/surface/completion_queue.cc#L258 + DrainCompletionQueue(); } } @@ -153,7 +163,15 @@ struct TFakeBlockStoreServer::TImpl void* tag; bool ok; - while (CQ->Next(&tag, &ok)) { + while (!AtomicGet(ShouldStop)) { + const auto deadline = + std::chrono::system_clock::now() + + std::chrono::milliseconds(100); + auto status = CQ->AsyncNext(&tag, &ok, deadline); + if (status != grpc::CompletionQueue::GOT_EVENT) { + continue; + } + auto* requestContext = static_cast(tag); if (requestContext->Done || !ok) { @@ -172,6 +190,21 @@ struct TFakeBlockStoreServer::TImpl } } + void DrainCompletionQueue() + { + void* tag; + bool ok; + while (CQ->Next(&tag, &ok)) { + auto* requestContext = static_cast(tag); + + if (requestContext->Done || !ok) { + delete requestContext; + } else { + FinishRequest(requestContext); + } + } + } + void FinishRequest(TRequestContext* requestContext) { requestContext->Done = true;