Skip to content

Commit

Permalink
feat: add delete cloud backup
Browse files Browse the repository at this point in the history
fix: create cloud backup ready status update

Signed-off-by: Shivanjan Chakravorty <[email protected]>
  • Loading branch information
Glitchfix committed Feb 28, 2024
1 parent 8712fa8 commit 3ada73b
Show file tree
Hide file tree
Showing 3 changed files with 294 additions and 62 deletions.
5 changes: 5 additions & 0 deletions api/server/sdk/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"google.golang.org/grpc/status"
)

// IsErrorUnavailable returns if the given error is due to unavailable
func IsErrorUnavailable(err error) bool {
return FromError(err).Code() == codes.Unavailable
}

// IsErrorNotFound returns if the given error is due to not found
func IsErrorNotFound(err error) bool {
return FromError(err).Code() == codes.NotFound
Expand Down
122 changes: 103 additions & 19 deletions csi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/libopenstorage/openstorage/api"
"github.com/libopenstorage/openstorage/api/server/sdk"
"github.com/libopenstorage/openstorage/pkg/grpcutil"
"github.com/libopenstorage/openstorage/pkg/units"
"github.com/libopenstorage/openstorage/pkg/util"
Expand Down Expand Up @@ -1016,6 +1017,7 @@ func (s *OsdCsiServer) createLocalSnapshot(
},
}, nil
}

func (s *OsdCsiServer) getCloudBackupClient(ctx context.Context) (api.OpenStorageCloudBackupClient, error) {
// Get grpc connection
conn, err := s.getRemoteConn(ctx)
Expand All @@ -1036,18 +1038,45 @@ func (s *OsdCsiServer) createCloudBackup(
return nil, err
}

// In the incoming request the snapshot is denoted by `snapshot-<UID of the volumesnapshot>`
csiSnapshotID := req.GetName()

// Get any labels passed in by the CO
_, locator, _, err := s.specHandler.SpecFromOpts(req.GetParameters())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "Unable to get parameters: %v", err)
}

credentialID := locator.VolumeLabels[osdSnapshotCredentialIDKey]
backupID := req.GetName()

// Check if the snapshot with this name already exists
backupStatus, err := cloudBackupClient.Status(ctx, &api.SdkCloudBackupStatusRequest{
TaskId: csiSnapshotID,
})
if err == nil {

// Verify the parent is the same
if req.GetSourceVolumeId() != backupStatus.Statuses[csiSnapshotID].GetSrcVolumeId() {
return nil, status.Error(codes.AlreadyExists, "Requested snapshot already exists for another source volume id")
}

isBackupReady := backupStatus.Statuses[csiSnapshotID].Status == api.SdkCloudBackupStatusType_SdkCloudBackupStatusTypeDone

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SnapshotId: csiSnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: backupStatus.Statuses[csiSnapshotID].StartTime,
ReadyToUse: isBackupReady,
SizeBytes: int64(backupStatus.Statuses[csiSnapshotID].BytesDone),
},
}, nil
}

// Create snapshot
_, err = cloudBackupClient.Create(ctx, &api.SdkCloudBackupCreateRequest{
VolumeId: req.GetSourceVolumeId(),
TaskId: backupID,
TaskId: csiSnapshotID,
CredentialId: credentialID,
Labels: locator.GetVolumeLabels(),
})
Expand All @@ -1057,31 +1086,24 @@ func (s *OsdCsiServer) createCloudBackup(
}

var isBackupReady bool
var backupStatus *api.SdkCloudBackupStatusResponse

// Check if snapshot has been created but is in error state
backupStatus, errFindFailed := cloudBackupClient.Status(ctx, &api.SdkCloudBackupStatusRequest{
backupStatus, err = cloudBackupClient.Status(ctx, &api.SdkCloudBackupStatusRequest{
VolumeId: req.GetSourceVolumeId(),
TaskId: backupID,
TaskId: csiSnapshotID,
})
if errFindFailed != nil {
return nil, status.Errorf(codes.Aborted, "Failed to create cloud snapshot: %v", err)
if err != nil {
return nil, status.Errorf(codes.Aborted, "Failed to get cloud snapshot status: %v", err)
}
isBackupReady = backupStatus.Statuses[backupID].Status == api.SdkCloudBackupStatusType_SdkCloudBackupStatusTypeDone

snapSize, errSizeFailed := cloudBackupClient.Size(ctx, &api.SdkCloudBackupSizeRequest{
BackupId: backupID,
})
if errSizeFailed != nil {
return nil, status.Errorf(codes.Aborted, "Failed to get cloud snapshot size: %v", err)
}
isBackupReady = backupStatus.Statuses[csiSnapshotID].Status == api.SdkCloudBackupStatusType_SdkCloudBackupStatusTypeDone

return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(snapSize.GetTotalDownloadBytes()),
SnapshotId: backupID,
SnapshotId: csiSnapshotID,
SizeBytes: int64(backupStatus.Statuses[csiSnapshotID].BytesDone),
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: backupStatus.Statuses[backupID].StartTime,
CreationTime: backupStatus.Statuses[csiSnapshotID].StartTime,
ReadyToUse: isBackupReady,
},
}, nil
Expand All @@ -1091,12 +1113,48 @@ func (s *OsdCsiServer) createCloudBackup(
func (s *OsdCsiServer) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {
) (resp *csi.DeleteSnapshotResponse, err error) {
cloudBackupClient, err := s.getCloudBackupClient(ctx)
cloudBackupDriverUnavailable := sdk.IsErrorUnavailable(err)
if err != nil && !cloudBackupDriverUnavailable {
return nil, err
}

if len(req.GetSnapshotId()) == 0 {
csiSnapshotID := req.GetSnapshotId()
if len(csiSnapshotID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot id must be provided")
}

// Check if snapshot has been created but is in error state
backupStatus, err := cloudBackupClient.Status(ctx, &api.SdkCloudBackupStatusRequest{
TaskId: csiSnapshotID,
})
if sdk.IsErrorNotFound(err) || cloudBackupDriverUnavailable {
resp, err = s.deleteLocalSnapshot(ctx, req)
return
}
if err != nil {
return nil, status.Errorf(codes.Aborted, "Failed to get cloud snapshot status: %v", err)
}

clogger.WithContext(ctx).Errorf("DeleteSnapshots backupStatus: %+v, csiSnapshotID: %s", backupStatus, csiSnapshotID)

req.Secrets = map[string]string{
osdSnapshotCredentialIDKey: backupStatus.Statuses[csiSnapshotID].CredentialId,
}

req.SnapshotId = backupStatus.Statuses[csiSnapshotID].BackupId

resp, err = s.deleteCloudBackup(ctx, req)
return

}

func (s *OsdCsiServer) deleteLocalSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {

// Get grpc connection
conn, err := s.getConn()
if err != nil {
Expand Down Expand Up @@ -1125,6 +1183,32 @@ func (s *OsdCsiServer) DeleteSnapshot(
return &csi.DeleteSnapshotResponse{}, nil
}

func (s *OsdCsiServer) deleteCloudBackup(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {
cloudBackupClient, err := s.getCloudBackupClient(ctx)
if err != nil {
return nil, err
}

credentialID := req.GetSecrets()[osdSnapshotCredentialIDKey]

backupID := req.GetSnapshotId()

// Delete snapshot
_, err = cloudBackupClient.Delete(ctx, &api.SdkCloudBackupDeleteRequest{
BackupId: backupID,
CredentialId: credentialID,
})
// NOTE: Currently, the Delete API call has no implementation that returns
// a not found gRPC error with the status code.
if err != nil && !sdk.IsErrorNotFound(err) {
return nil, status.Errorf(codes.Aborted, "failed to delete cloud snapshot: %v", err)
}
return &csi.DeleteSnapshotResponse{}, nil
}

// ListSnapshots lists all snapshots in a cluster.
// This is mainly implemented for Nomad, as Kubernetes will not call
// list snapshots for drivers which have synchronous snapshot creation.
Expand Down
Loading

0 comments on commit 3ada73b

Please sign in to comment.