Skip to content

Commit

Permalink
[CSI] trigger StopEndpoint if StartEndpoint has failed with GRPC Time…
Browse files Browse the repository at this point in the history
…out error (#2802)
  • Loading branch information
antonmyagkov authored Jan 13, 2025
1 parent 0e6e9e7 commit 48a64e3
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 1 deletion.
45 changes: 44 additions & 1 deletion cloud/blockstore/tools/csi_driver/internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ func (s *nodeService) nodePublishDiskAsVhostSocket(
})

if err != nil {
if s.IsGrpcTimeoutError(err) {
s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
})
}
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

Expand Down Expand Up @@ -582,6 +587,11 @@ func (s *nodeService) nodeStageDiskAsVhostSocket(
})

if err != nil {
if s.IsGrpcTimeoutError(err) {
s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
})
}
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

Expand Down Expand Up @@ -716,6 +726,19 @@ func (s *nodeService) IsMountConflictError(err error) bool {
return false
}

func (s *nodeService) IsGrpcTimeoutError(err error) bool {
if err != nil {
var clientErr *nbsclient.ClientError
if errors.As(err, &clientErr) {
if clientErr.Code == nbsclient.E_GRPC_DEADLINE_EXCEEDED {
return true
}
}
}

return false
}

func (s *nodeService) hasLocalEndpoint(
ctx context.Context,
diskId string) (bool, error) {
Expand Down Expand Up @@ -900,7 +923,7 @@ func (s *nodeService) startNbsEndpointForNBD(
}

hostType := nbsapi.EHostType_HOST_TYPE_DEFAULT
return s.nbsClient.StartEndpoint(ctx, &nbsapi.TStartEndpointRequest{
resp, err := s.nbsClient.StartEndpoint(ctx, &nbsapi.TStartEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
DiskId: diskId,
InstanceId: nbsInstanceId,
Expand All @@ -918,6 +941,14 @@ func (s *nodeService) startNbsEndpointForNBD(
HostType: &hostType,
},
})

if s.IsGrpcTimeoutError(err) {
s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
})
}

return resp, err
}

func (s *nodeService) getNfsClient(fileSystemId string) nfsclient.EndpointClientIface {
Expand Down Expand Up @@ -956,6 +987,12 @@ func (s *nodeService) nodePublishFileStoreAsVhostSocket(
},
})
if err != nil {
if s.IsGrpcTimeoutError(err) {
s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
})
}

return fmt.Errorf("failed to start NFS endpoint: %w", err)
}

Expand Down Expand Up @@ -994,6 +1031,12 @@ func (s *nodeService) nodeStageFileStoreAsVhostSocket(
},
})
if err != nil {
if s.IsGrpcTimeoutError(err) {
s.nbsClient.StopEndpoint(ctx, &nbsapi.TStopEndpointRequest{
UnixSocketPath: filepath.Join(endpointDir, nbsSocketName),
})
}

return fmt.Errorf("failed to start NFS endpoint: %w", err)
}

Expand Down
170 changes: 170 additions & 0 deletions cloud/blockstore/tools/csi_driver/internal/driver/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package driver

import (
"context"
"fmt"
"io/fs"
"os"
"os/exec"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
nbs "github.com/ydb-platform/nbs/cloud/blockstore/public/api/protos"
nbsclient "github.com/ydb-platform/nbs/cloud/blockstore/public/sdk/go/client"
"github.com/ydb-platform/nbs/cloud/blockstore/tools/csi_driver/internal/driver/mocks"
csimounter "github.com/ydb-platform/nbs/cloud/blockstore/tools/csi_driver/internal/mounter"
nfs "github.com/ydb-platform/nbs/cloud/filestore/public/api/protos"
Expand Down Expand Up @@ -937,3 +939,171 @@ func TestPublishDeviceWithReadWriteManyModeIsNotSupportedWithNBS(t *testing.T) {
})
require.Error(t, err)
}

func TestGrpcTimeoutForIKubevirt(t *testing.T) {
tempDir := t.TempDir()

nbsClient := mocks.NewNbsClientMock()
nfsClient := mocks.NewNfsEndpointClientMock()
nfsLocalClient := mocks.NewNfsEndpointClientMock()
mounter := csimounter.NewMock()

ctx := context.Background()
nodeId := "testNodeId"
clientId := "testClientId"
instanceId := "testInstanceId"
actualClientId := "testClientId-" + instanceId
diskId := "test-disk-id-42"
deviceName := diskId
volumeId := diskId + "#" + instanceId
backend := "nbs"

stagingTargetPath := filepath.Join(tempDir, "testStagingTargetPath")
socketsDir := filepath.Join(tempDir, "sockets")
sourcePath := filepath.Join(socketsDir, instanceId, diskId)
targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount")
nbsSocketPath := filepath.Join(sourcePath, "nbs.sock")

nodeService := newNodeService(
nodeId,
clientId,
true,
socketsDir,
targetFsPathPattern,
"",
make(LocalFilestoreOverrideMap),
nbsClient,
nfsClient,
nfsLocalClient,
mounter,
)

accessMode := csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER

volumeCapability := csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{},
},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: accessMode,
},
}

volumeContext := map[string]string{
backendVolumeContextKey: backend,
instanceIdKey: instanceId,
}

hostType := nbs.EHostType_HOST_TYPE_DEFAULT
grpcError := nbsclient.ClientError{Code: nbsclient.E_GRPC_DEADLINE_EXCEEDED}
startEndpointError := fmt.Errorf("%w", grpcError)
nbsClient.On("StartEndpoint", ctx, &nbs.TStartEndpointRequest{
UnixSocketPath: nbsSocketPath,
DiskId: diskId,
InstanceId: instanceId,
ClientId: actualClientId,
DeviceName: deviceName,
IpcType: nbs.EClientIpcType_IPC_VHOST,
VhostQueuesCount: 8,
VolumeAccessMode: nbs.EVolumeAccessMode_VOLUME_ACCESS_READ_WRITE,
VolumeMountMode: nbs.EVolumeMountMode_VOLUME_MOUNT_LOCAL,
Persistent: true,
NbdDevice: &nbs.TStartEndpointRequest_UseFreeNbdDeviceFile{
false,
},
ClientProfile: &nbs.TClientProfile{
HostType: &hostType,
},
}).Once().Return(&nbs.TStartEndpointResponse{}, startEndpointError)

nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{
UnixSocketPath: nbsSocketPath,
}).Once().Return(&nbs.TStopEndpointResponse{}, nil)

_, err := nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
VolumeId: volumeId,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &volumeCapability,
VolumeContext: volumeContext,
})
require.Error(t, err)
}

func TestGrpcTimeoutForInfrakuber(t *testing.T) {
tempDir := t.TempDir()

nbsClient := mocks.NewNbsClientMock()
mounter := csimounter.NewMock()

ipcType := nbs.EClientIpcType_IPC_NBD
nbdDeviceFile := filepath.Join(tempDir, "dev", "nbd3")
err := os.MkdirAll(nbdDeviceFile, fs.FileMode(0755))
require.NoError(t, err)

ctx := context.Background()
nodeId := "testNodeId"
clientId := "testClientId"
diskId := "test-disk-id-42"
actualClientId := "testClientId-testNodeId"
targetFsPathPattern := filepath.Join(tempDir, "pods/([a-z0-9-]+)/volumes/([a-z0-9-]+)/mount")
stagingTargetPath := "testStagingTargetPath"
socketsDir := filepath.Join(tempDir, "sockets")
socketPath := filepath.Join(socketsDir, diskId, "nbs.sock")

nodeService := newNodeService(
nodeId,
clientId,
false,
socketsDir,
targetFsPathPattern,
"",
make(LocalFilestoreOverrideMap),
nbsClient,
nil,
nil,
mounter,
)

volumeCapability := csi.VolumeCapability{
AccessType: &csi.VolumeCapability_Mount{},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
}

volumeContext := map[string]string{}

hostType := nbs.EHostType_HOST_TYPE_DEFAULT
grpcError := nbsclient.ClientError{Code: nbsclient.E_GRPC_DEADLINE_EXCEEDED}
startEndpointError := fmt.Errorf("%w", grpcError)
nbsClient.On("StartEndpoint", ctx, &nbs.TStartEndpointRequest{
UnixSocketPath: socketPath,
DiskId: diskId,
InstanceId: nodeId,
ClientId: actualClientId,
DeviceName: diskId,
IpcType: ipcType,
VhostQueuesCount: 8,
VolumeAccessMode: nbs.EVolumeAccessMode_VOLUME_ACCESS_READ_WRITE,
VolumeMountMode: nbs.EVolumeMountMode_VOLUME_MOUNT_LOCAL,
Persistent: true,
NbdDevice: &nbs.TStartEndpointRequest_UseFreeNbdDeviceFile{
true,
},
ClientProfile: &nbs.TClientProfile{
HostType: &hostType,
},
}).Return(&nbs.TStartEndpointResponse{}, startEndpointError)

nbsClient.On("StopEndpoint", ctx, &nbs.TStopEndpointRequest{
UnixSocketPath: socketPath,
}).Return(&nbs.TStopEndpointResponse{}, nil)

_, err = nodeService.NodeStageVolume(ctx, &csi.NodeStageVolumeRequest{
VolumeId: diskId,
StagingTargetPath: stagingTargetPath,
VolumeCapability: &volumeCapability,
VolumeContext: volumeContext,
})
require.Error(t, err)
}

0 comments on commit 48a64e3

Please sign in to comment.