diff --git a/cloud/blockstore/tests/csi_driver/e2e_tests_part1/test.py b/cloud/blockstore/tests/csi_driver/e2e_tests_part1/test.py index 06011b4e6b..25abfb3895 100644 --- a/cloud/blockstore/tests/csi_driver/e2e_tests_part1/test.py +++ b/cloud/blockstore/tests/csi_driver/e2e_tests_part1/test.py @@ -152,30 +152,8 @@ def test_publish_volume_twice_on_the_same_node(access_type, vm_mode): csi.cleanup_after_test(env, volume_name, access_type, [pod_id1, pod_id2]) -# test can be removed after migration of all endpoints to the new format @pytest.mark.parametrize('access_type', ["mount", "block"]) -def test_restart_kubelet_with_old_format_endpoint(access_type): - env, run = csi.init() - try: - volume_name = "example-disk" - volume_size = 1024 ** 3 - pod_name1 = "example-pod-1" - pod_id1 = "deadbeef1" - env.csi.create_volume(name=volume_name, size=volume_size) - # skip stage to create endpoint with old format - env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) - # run stage/publish again to simulate kubelet restart - env.csi.stage_volume(volume_name, access_type) - env.csi.publish_volume(pod_id1, volume_name, pod_name1, access_type) - except subprocess.CalledProcessError as e: - csi.log_called_process_error(e) - raise - finally: - csi.cleanup_after_test(env, volume_name, access_type, [pod_id1]) - - -@pytest.mark.parametrize('access_type', ["mount", "block"]) -def test_restart_kubelet_with_new_format_endpoint(access_type): +def test_kubelet_restart(access_type): env, run = csi.init() try: volume_name = "example-disk" diff --git a/cloud/blockstore/tools/csi_driver/internal/driver/node.go b/cloud/blockstore/tools/csi_driver/internal/driver/node.go index d6e966e194..3262e4bd10 100644 --- a/cloud/blockstore/tools/csi_driver/internal/driver/node.go +++ b/cloud/blockstore/tools/csi_driver/internal/driver/node.go @@ -598,81 +598,14 @@ func (s *nodeService) nodeStageDiskAsVhostSocket( return s.createDummyImgFile(endpointDir) } -func (s *nodeService) nodePublishDiskAsFilesystemDeprecated( - ctx context.Context, - req *csi.NodePublishVolumeRequest) error { - - diskId := req.VolumeId - resp, err := s.startNbsEndpointForNBD(ctx, s.getPodId(req), diskId, req.VolumeContext) - if err != nil { - return fmt.Errorf("failed to start NBS endpoint: %w", err) - } - - if resp.NbdDeviceFile == "" { - return fmt.Errorf("NbdDeviceFile shouldn't be empty") - } - - logVolume(req.VolumeId, "endpoint started with device: %q", resp.NbdDeviceFile) - - mnt := req.VolumeCapability.GetMount() - - fsType := req.VolumeContext["fsType"] - if mnt != nil && mnt.FsType != "" { - fsType = mnt.FsType - } - if fsType == "" { - fsType = "ext4" - } - - err = s.makeFilesystemIfNeeded(diskId, resp.NbdDeviceFile, fsType) - if err != nil { - return err - } - - mounted, _ := s.mounter.IsMountPoint(req.TargetPath) - if !mounted { - targetPerm := os.FileMode(0775) - if err := os.MkdirAll(req.TargetPath, targetPerm); err != nil { - return fmt.Errorf("failed to create target directory: %w", err) - } - - if err := os.Chmod(req.TargetPath, targetPerm); err != nil { - return fmt.Errorf("failed to chmod target path: %w", err) - } - } - - mountOptions := []string{} - if mnt != nil { - for _, flag := range mnt.MountFlags { - mountOptions = append(mountOptions, flag) - } - } - if req.Readonly { - mountOptions = append(mountOptions, "ro") - } - - err = s.mountIfNeeded( - diskId, - resp.NbdDeviceFile, - req.TargetPath, - fsType, - mountOptions) - if err != nil { - return err - } - - return nil -} - func (s *nodeService) nodePublishDiskAsFilesystem( ctx context.Context, req *csi.NodePublishVolumeRequest) error { - // Fallback to previous implementation for already mounted volumes - // Must be removed after migration of all endpoints to the new format mounted, _ := s.mounter.IsMountPoint(req.StagingTargetPath) if !mounted { - return s.nodePublishDiskAsFilesystemDeprecated(ctx, req) + return s.statusErrorf(codes.FailedPrecondition, + "Staging target path is not mounted: %w", req.VolumeId) } mounted, _ = s.mounter.IsMountPoint(req.TargetPath) @@ -713,19 +646,6 @@ func (s *nodeService) nodePublishDiskAsFilesystem( return nil } -func (s *nodeService) IsMountConflictError(err error) bool { - if err != nil { - var clientErr *nbsclient.ClientError - if errors.As(err, &clientErr) { - if clientErr.Code == nbsclient.E_MOUNT_CONFLICT { - return true - } - } - } - - return false -} - func (s *nodeService) IsGrpcTimeoutError(err error) bool { if err != nil { var clientErr *nbsclient.ClientError @@ -739,31 +659,6 @@ func (s *nodeService) IsGrpcTimeoutError(err error) bool { return false } -func (s *nodeService) hasLocalEndpoint( - ctx context.Context, - diskId string) (bool, error) { - - listEndpointsResp, err := s.nbsClient.ListEndpoints( - ctx, &nbsapi.TListEndpointsRequest{}, - ) - if err != nil { - log.Printf("List endpoints failed %v", err) - return false, err - } - - if len(listEndpointsResp.Endpoints) == 0 { - return false, nil - } - - for _, endpoint := range listEndpointsResp.Endpoints { - if endpoint.DiskId == diskId { - return true, nil - } - } - - return false, nil -} - func (s *nodeService) nodeStageDiskAsFilesystem( ctx context.Context, req *csi.NodeStageVolumeRequest) error { @@ -771,15 +666,6 @@ func (s *nodeService) nodeStageDiskAsFilesystem( diskId := req.VolumeId resp, err := s.startNbsEndpointForNBD(ctx, "", diskId, req.VolumeContext) if err != nil { - if s.IsMountConflictError(err) { - localEndpoint, err := s.hasLocalEndpoint(ctx, diskId) - if err != nil { - return err - } - if localEndpoint { - return nil - } - } return fmt.Errorf("failed to start NBS endpoint: %w", err) } @@ -846,15 +732,6 @@ func (s *nodeService) nodeStageDiskAsBlockDevice( diskId := req.VolumeId resp, err := s.startNbsEndpointForNBD(ctx, "", diskId, req.VolumeContext) if err != nil { - if s.IsMountConflictError(err) { - localEndpoint, err := s.hasLocalEndpoint(ctx, diskId) - if err != nil { - return err - } - if localEndpoint { - return nil - } - } return fmt.Errorf("failed to start NBS endpoint: %w", err) } @@ -868,24 +745,6 @@ func (s *nodeService) nodeStageDiskAsBlockDevice( return s.mountBlockDevice(diskId, resp.NbdDeviceFile, devicePath, false) } -func (s *nodeService) nodePublishDiskAsBlockDeviceDeprecated( - ctx context.Context, - req *csi.NodePublishVolumeRequest) error { - - diskId := req.VolumeId - resp, err := s.startNbsEndpointForNBD(ctx, s.getPodId(req), diskId, req.VolumeContext) - if err != nil { - return fmt.Errorf("failed to start NBS endpoint: %w", err) - } - - if resp.NbdDeviceFile == "" { - return fmt.Errorf("NbdDeviceFile shouldn't be empty") - } - - logVolume(req.VolumeId, "endpoint started with device: %q", resp.NbdDeviceFile) - return s.mountBlockDevice(req.VolumeId, resp.NbdDeviceFile, req.TargetPath, req.Readonly) -} - func (s *nodeService) nodePublishDiskAsBlockDevice( ctx context.Context, req *csi.NodePublishVolumeRequest) error { @@ -894,8 +753,8 @@ func (s *nodeService) nodePublishDiskAsBlockDevice( devicePath := filepath.Join(req.StagingTargetPath, diskId) mounted, _ := s.mounter.IsMountPoint(devicePath) if !mounted { - // Fallback to previous implementation for already staged volumes - return s.nodePublishDiskAsBlockDeviceDeprecated(ctx, req) + return s.statusErrorf(codes.FailedPrecondition, + "Staging target path is not mounted: %w", req.VolumeId) } return s.mountBlockDevice(diskId, devicePath, req.TargetPath, req.Readonly)