Skip to content

Commit

Permalink
[CSI] remove deprecated stage/publish functions
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmyagkov committed Jan 14, 2025
1 parent c7fd32f commit 47b8069
Showing 1 changed file with 4 additions and 145 deletions.
149 changes: 4 additions & 145 deletions cloud/blockstore/tools/csi_driver/internal/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,81 +598,14 @@ func (s *nodeService) nodeStageDiskAsVhostSocket(
return s.createDummyImgFile(endpointDir)
}

func (s *nodeService) nodePublishDiskAsFilesystemDeprecated(
ctx context.Context,
req *csi.NodePublishVolumeRequest) error {

diskId := req.VolumeId
resp, err := s.startNbsEndpointForNBD(ctx, s.getPodId(req), diskId, req.VolumeContext)
if err != nil {
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

if resp.NbdDeviceFile == "" {
return fmt.Errorf("NbdDeviceFile shouldn't be empty")
}

logVolume(req.VolumeId, "endpoint started with device: %q", resp.NbdDeviceFile)

mnt := req.VolumeCapability.GetMount()

fsType := req.VolumeContext["fsType"]
if mnt != nil && mnt.FsType != "" {
fsType = mnt.FsType
}
if fsType == "" {
fsType = "ext4"
}

err = s.makeFilesystemIfNeeded(diskId, resp.NbdDeviceFile, fsType)
if err != nil {
return err
}

mounted, _ := s.mounter.IsMountPoint(req.TargetPath)
if !mounted {
targetPerm := os.FileMode(0775)
if err := os.MkdirAll(req.TargetPath, targetPerm); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}

if err := os.Chmod(req.TargetPath, targetPerm); err != nil {
return fmt.Errorf("failed to chmod target path: %w", err)
}
}

mountOptions := []string{}
if mnt != nil {
for _, flag := range mnt.MountFlags {
mountOptions = append(mountOptions, flag)
}
}
if req.Readonly {
mountOptions = append(mountOptions, "ro")
}

err = s.mountIfNeeded(
diskId,
resp.NbdDeviceFile,
req.TargetPath,
fsType,
mountOptions)
if err != nil {
return err
}

return nil
}

func (s *nodeService) nodePublishDiskAsFilesystem(
ctx context.Context,
req *csi.NodePublishVolumeRequest) error {

// Fallback to previous implementation for already mounted volumes
// Must be removed after migration of all endpoints to the new format
mounted, _ := s.mounter.IsMountPoint(req.StagingTargetPath)
if !mounted {
return s.nodePublishDiskAsFilesystemDeprecated(ctx, req)
return s.statusErrorf(codes.FailedPrecondition,
"Staging target path is not mounted: %w", req.VolumeId)
}

mounted, _ = s.mounter.IsMountPoint(req.TargetPath)
Expand Down Expand Up @@ -713,19 +646,6 @@ func (s *nodeService) nodePublishDiskAsFilesystem(
return nil
}

func (s *nodeService) IsMountConflictError(err error) bool {
if err != nil {
var clientErr *nbsclient.ClientError
if errors.As(err, &clientErr) {
if clientErr.Code == nbsclient.E_MOUNT_CONFLICT {
return true
}
}
}

return false
}

func (s *nodeService) IsGrpcTimeoutError(err error) bool {
if err != nil {
var clientErr *nbsclient.ClientError
Expand All @@ -739,47 +659,13 @@ func (s *nodeService) IsGrpcTimeoutError(err error) bool {
return false
}

func (s *nodeService) hasLocalEndpoint(
ctx context.Context,
diskId string) (bool, error) {

listEndpointsResp, err := s.nbsClient.ListEndpoints(
ctx, &nbsapi.TListEndpointsRequest{},
)
if err != nil {
log.Printf("List endpoints failed %v", err)
return false, err
}

if len(listEndpointsResp.Endpoints) == 0 {
return false, nil
}

for _, endpoint := range listEndpointsResp.Endpoints {
if endpoint.DiskId == diskId {
return true, nil
}
}

return false, nil
}

func (s *nodeService) nodeStageDiskAsFilesystem(
ctx context.Context,
req *csi.NodeStageVolumeRequest) error {

diskId := req.VolumeId
resp, err := s.startNbsEndpointForNBD(ctx, "", diskId, req.VolumeContext)
if err != nil {
if s.IsMountConflictError(err) {
localEndpoint, err := s.hasLocalEndpoint(ctx, diskId)
if err != nil {
return err
}
if localEndpoint {
return nil
}
}
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

Expand Down Expand Up @@ -846,15 +732,6 @@ func (s *nodeService) nodeStageDiskAsBlockDevice(
diskId := req.VolumeId
resp, err := s.startNbsEndpointForNBD(ctx, "", diskId, req.VolumeContext)
if err != nil {
if s.IsMountConflictError(err) {
localEndpoint, err := s.hasLocalEndpoint(ctx, diskId)
if err != nil {
return err
}
if localEndpoint {
return nil
}
}
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

Expand All @@ -868,24 +745,6 @@ func (s *nodeService) nodeStageDiskAsBlockDevice(
return s.mountBlockDevice(diskId, resp.NbdDeviceFile, devicePath, false)
}

func (s *nodeService) nodePublishDiskAsBlockDeviceDeprecated(
ctx context.Context,
req *csi.NodePublishVolumeRequest) error {

diskId := req.VolumeId
resp, err := s.startNbsEndpointForNBD(ctx, s.getPodId(req), diskId, req.VolumeContext)
if err != nil {
return fmt.Errorf("failed to start NBS endpoint: %w", err)
}

if resp.NbdDeviceFile == "" {
return fmt.Errorf("NbdDeviceFile shouldn't be empty")
}

logVolume(req.VolumeId, "endpoint started with device: %q", resp.NbdDeviceFile)
return s.mountBlockDevice(req.VolumeId, resp.NbdDeviceFile, req.TargetPath, req.Readonly)
}

func (s *nodeService) nodePublishDiskAsBlockDevice(
ctx context.Context,
req *csi.NodePublishVolumeRequest) error {
Expand All @@ -894,8 +753,8 @@ func (s *nodeService) nodePublishDiskAsBlockDevice(
devicePath := filepath.Join(req.StagingTargetPath, diskId)
mounted, _ := s.mounter.IsMountPoint(devicePath)
if !mounted {
// Fallback to previous implementation for already staged volumes
return s.nodePublishDiskAsBlockDeviceDeprecated(ctx, req)
return s.statusErrorf(codes.FailedPrecondition,
"Staging target path is not mounted: %w", req.VolumeId)
}

return s.mountBlockDevice(diskId, devicePath, req.TargetPath, req.Readonly)
Expand Down

0 comments on commit 47b8069

Please sign in to comment.