diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/disk_registry_state.go b/cloud/disk_manager/internal/pkg/clients/nbs/disk_registry_state.go new file mode 100644 index 00000000000..242b7e52ff3 --- /dev/null +++ b/cloud/disk_manager/internal/pkg/clients/nbs/disk_registry_state.go @@ -0,0 +1,62 @@ +package nbs + +type diskRegistryCheckpointReplica struct { + CheckpointID string `json:"CheckpointId"` + SourceDiskID string `json:"SourceDiskId"` +} + +type diskRegistryDisk struct { + DiskID string `json:"DiskId"` + DeviceUUIDs []string `json:"DeviceUUIDs"` + CheckpointReplica diskRegistryCheckpointReplica `json:"CheckpointReplica"` +} + +type diskRegistryDevice struct { + DeviceUUID string `json:"DeviceUUID"` +} + +type diskRegistryAgent struct { + Devices []diskRegistryDevice `json:"Devices"` + AgentID string `json:"AgentId"` +} + +type DiskRegistryBackup struct { + Disks []diskRegistryDisk `json:"Disks"` + Agents []diskRegistryAgent `json:"Agents"` +} + +type diskRegistryState struct { + Backup DiskRegistryBackup `json:"Backup"` +} + +func (b *DiskRegistryBackup) GetDevicesOfDisk(diskID string) []string { + for _, disk := range b.Disks { + if disk.DiskID == diskID { + return disk.DeviceUUIDs + } + } + return nil +} + +func (b *DiskRegistryBackup) GetDevicesOfShadowDisk( + originalDiskID string, +) []string { + + for _, disk := range b.Disks { + if disk.CheckpointReplica.SourceDiskID == originalDiskID { + return disk.DeviceUUIDs + } + } + return nil +} + +func (b *DiskRegistryBackup) GetAgentIDByDeviceUUId(deviceUUID string) string { + for _, agent := range b.Agents { + for _, device := range agent.Devices { + if device.DeviceUUID == deviceUUID { + return agent.AgentID + } + } + } + return "" +} diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/interface.go b/cloud/disk_manager/internal/pkg/clients/nbs/interface.go index d45c0361ee7..1fabf0bd218 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/interface.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/interface.go @@ -364,6 +364,17 @@ type Client interface { // Used in tests. List(ctx context.Context) ([]string, error) + + // Used in tests. + BackupDiskRegistryState(ctx context.Context) (*DiskRegistryBackup, error) + + // Used in tests. + DisableDevices( + ctx context.Context, + agentID string, + deviceUUIDs []string, + message string, + ) error } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go b/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go index 86096a2ce88..cb4e0926348 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go @@ -486,6 +486,25 @@ func (c *ClientMock) FinishFillDisk( return args.Error(0) } +func (c *ClientMock) BackupDiskRegistryState( + ctx context.Context, +) (*nbs.DiskRegistryBackup, error) { + + args := c.Called(ctx) + return args.Get(0).(*nbs.DiskRegistryBackup), args.Error(1) +} + +func (c *ClientMock) DisableDevices( + ctx context.Context, + agentID string, + deviceUUIDs []string, + message string, +) error { + + args := c.Called(ctx, agentID, deviceUUIDs, message) + return args.Error(0) +} + //////////////////////////////////////////////////////////////////////////////// func NewClientMock() *ClientMock { diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go b/cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go index 0854e6ef462..64833cac9b6 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go @@ -3,6 +3,7 @@ package nbs import ( "bytes" "context" + "encoding/json" "fmt" "hash/crc32" "math/rand" @@ -513,6 +514,56 @@ func (c *client) Write( return nil } +func (c *client) BackupDiskRegistryState( + ctx context.Context, +) (*DiskRegistryBackup, error) { + + output, err := c.nbs.ExecuteAction(ctx, "backupdiskregistrystate", []byte("{}")) + if err != nil { + return nil, wrapError(err) + } + + var state diskRegistryState + err = json.Unmarshal(output, &state) + if err != nil { + return nil, err + } + + return &state.Backup, nil +} + +func (c *client) DisableDevices( + ctx context.Context, + agentID string, + deviceUUIDs []string, + message string, +) error { + + if len(deviceUUIDs) == 0 { + return fmt.Errorf("list of devices to disable should contain at least one device") + } + + deviceUUIDsField, err := json.Marshal(deviceUUIDs) + if err != nil { + return nil + } + + input := fmt.Sprintf( + "{\"DisableAgent\":{\"AgentId\":\"%v\",\"DeviceUUIDs\":%v},\"Message\":\"%v\"}", + agentID, + string(deviceUUIDsField), + message, + ) + + _, err = c.nbs.ExecuteAction( + ctx, + "diskregistrychangestate", + []byte(input), + ) + + return wrapError(err) +} + //////////////////////////////////////////////////////////////////////////////// type checkpoint struct { diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go b/cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go index 2d990768693..bdd277b5f82 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go +++ b/cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go @@ -1530,3 +1530,142 @@ func TestReadFromProxyOverlayDiskWithMultipartitionBaseDisk(t *testing.T) { err = client.ValidateCrc32(ctx, proxyOverlayDiskID, diskContentInfo) require.NoError(t, err) } + +func TestDiskRegistryState(t *testing.T) { + ctx := newContext() + client := newClient(t, ctx) + + diskID := t.Name() + + err := client.Create(ctx, nbs.CreateDiskParams{ + ID: diskID, + BlocksCount: 2 * 262144, + BlockSize: 4096, + Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED, + }) + require.NoError(t, err) + + backup, err := client.BackupDiskRegistryState(ctx) + require.NoError(t, err) + + deviceUUIDs := backup.GetDevicesOfDisk(diskID) + require.Equal(t, len(deviceUUIDs), 2) + + agentID := backup.GetAgentIDByDeviceUUId(deviceUUIDs[0]) + require.NotEmpty(t, agentID) + agentID = backup.GetAgentIDByDeviceUUId(deviceUUIDs[1]) + require.NotEmpty(t, agentID) + + deviceUUIDs = backup.GetDevicesOfDisk("nonExistingDiskID") + require.Nil(t, deviceUUIDs) + agentID = backup.GetAgentIDByDeviceUUId("nonExistingDeviceID") + require.Empty(t, agentID) + + err = client.Delete(ctx, diskID) + require.NoError(t, err) +} + +func TestDiskRegistryDisableDevices(t *testing.T) { + ctx := newContext() + client := newClient(t, ctx) + + diskID := t.Name() + + err := client.Create(ctx, nbs.CreateDiskParams{ + ID: diskID, + BlocksCount: 262144, + BlockSize: 4096, + Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED, + }) + require.NoError(t, err) + + writeBlocks(t, ctx, client, diskID, 0 /* startIndex */, 1 /* blockCount */) + + backup, err := client.BackupDiskRegistryState(ctx) + require.NoError(t, err) + deviceUUIDs := backup.GetDevicesOfDisk(diskID) + require.Equal(t, len(deviceUUIDs), 1) + agentID := backup.GetAgentIDByDeviceUUId(deviceUUIDs[0]) + require.NotEmpty(t, agentID) + + err = client.DisableDevices(ctx, agentID, deviceUUIDs, t.Name()) + require.NoError(t, err) + + session, err := client.MountRW( + ctx, + diskID, + 0, // fillGeneration + 0, // fillSeqNumber + nil, // encryption + ) + require.NoError(t, err) + require.NotNil(t, session) + defer session.Close(ctx) + + data := make([]byte, 4096) + rand.Read(data) + + // Device is disabled, all read and write requests should return error. + err = session.Write(ctx, 0, data) + require.Error(t, err) + zero := false + err = session.Read(ctx, 0, 1, "", data, &zero) + require.Error(t, err) + + err = client.Delete(ctx, diskID) + require.NoError(t, err) +} + +func TestDiskRegistryFindDevicesOfShadowDisk(t *testing.T) { + ctx := newContext() + client := newClient(t, ctx) + + diskID := t.Name() + + err := client.Create(ctx, nbs.CreateDiskParams{ + ID: diskID, + BlocksCount: 262144, + BlockSize: 4096, + Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED, + }) + require.NoError(t, err) + + backup, err := client.BackupDiskRegistryState(ctx) + require.NoError(t, err) + deviceUUIDs := backup.GetDevicesOfShadowDisk(diskID) + // Shadow disk should not exist because checkpoint is not created yet. + require.Nil(t, deviceUUIDs) + + checkpointID := "checkpointID" + err = client.CreateCheckpoint(ctx, nbs.CheckpointParams{ + DiskID: diskID, + CheckpointID: checkpointID, + CheckpointType: nbs.CheckpointTypeNormal, + }) + require.NoError(t, err) + + retries := 0 + for { + // Waiting for shadow disk to be created. + time.Sleep(time.Second) + + backup, err := client.BackupDiskRegistryState(ctx) + require.NoError(t, err) + deviceUUIDs := backup.GetDevicesOfShadowDisk(diskID) + + if len(deviceUUIDs) > 0 { + require.Equal(t, len(deviceUUIDs), 1) + break + } + + retries++ + if retries == 10 { + require.Fail(t, "Shadow disk has not appeared in disk registry state") + } + } + + err = client.DeleteCheckpoint(ctx, diskID, checkpointID) + require.NoError(t, err) + err = client.Delete(ctx, diskID) + require.NoError(t, err) +} diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/tests/ya.make b/cloud/disk_manager/internal/pkg/clients/nbs/tests/ya.make index c4a9d828360..3128edf7ec5 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/tests/ya.make +++ b/cloud/disk_manager/internal/pkg/clients/nbs/tests/ya.make @@ -2,6 +2,7 @@ GO_TEST_FOR(cloud/disk_manager/internal/pkg/clients/nbs) SET_APPEND(RECIPE_ARGS --nbs-only) SET_APPEND(RECIPE_ARGS --multiple-nbs) +SET_APPEND(RECIPE_ARGS --disk-agent-count 3) INCLUDE(${ARCADIA_ROOT}/cloud/disk_manager/test/recipe/recipe.inc) GO_XTEST_SRCS( diff --git a/cloud/disk_manager/internal/pkg/clients/nbs/ya.make b/cloud/disk_manager/internal/pkg/clients/nbs/ya.make index 231940b4121..c65c9d32275 100644 --- a/cloud/disk_manager/internal/pkg/clients/nbs/ya.make +++ b/cloud/disk_manager/internal/pkg/clients/nbs/ya.make @@ -2,6 +2,7 @@ GO_LIBRARY() SRCS( client.go + disk_registry_state.go factory.go interface.go metrics.go diff --git a/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go b/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go index b0e913a2dd3..4ca53b21d52 100644 --- a/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go +++ b/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/snapshot_service_test.go @@ -766,3 +766,121 @@ func TestSnapshotServiceDeleteSnapshotWhenCreationIsInFlight(t *testing.T) { testcommon.CheckConsistency(t, ctx) } + +//////////////////////////////////////////////////////////////////////////////// + +func testCreateSnapshotFromDiskWithFailedShadowDisk( + t *testing.T, + diskKind disk_manager.DiskKind, + diskBlockSize uint32, + diskSize uint64, + waitDurationBeforeDisableDevice time.Duration, +) { + + ctx := testcommon.NewContext() + + client, err := testcommon.NewClient(ctx) + require.NoError(t, err) + defer client.Close() + + diskID := t.Name() + + reqCtx := testcommon.GetRequestContext(t, ctx) + operation, err := client.CreateDisk(reqCtx, &disk_manager.CreateDiskRequest{ + Src: &disk_manager.CreateDiskRequest_SrcEmpty{ + SrcEmpty: &empty.Empty{}, + }, + Size: int64(diskSize), + Kind: diskKind, + DiskId: &disk_manager.DiskId{ + ZoneId: "zone-a", + DiskId: diskID, + }, + BlockSize: int64(diskBlockSize), + }) + require.NoError(t, err) + require.NotEmpty(t, operation) + err = internal_client.WaitOperation(ctx, client, operation.Id) + require.NoError(t, err) + + nbsClient := testcommon.NewNbsClient(t, ctx, "zone-a") + _, err = nbsClient.FillDisk(ctx, diskID, diskSize) + require.NoError(t, err) + + snapshotID := t.Name() + + reqCtx = testcommon.GetRequestContext(t, ctx) + operation, err = client.CreateSnapshot(reqCtx, &disk_manager.CreateSnapshotRequest{ + Src: &disk_manager.DiskId{ + ZoneId: "zone-a", + DiskId: diskID, + }, + SnapshotId: snapshotID, + FolderId: "folder", + }) + require.NoError(t, err) + require.NotEmpty(t, operation) + + time.Sleep(waitDurationBeforeDisableDevice) + + diskRegistryBackup, err := nbsClient.BackupDiskRegistryState(ctx) + require.NoError(t, err) + deviceUUIDs := diskRegistryBackup.GetDevicesOfShadowDisk(diskID) + if len(deviceUUIDs) == 0 { + // OK: shadow disk is not created yet or it was already deleted. + return + } + require.Equal(t, 1, len(deviceUUIDs)) + agentID := diskRegistryBackup.GetAgentIDByDeviceUUId(deviceUUIDs[0]) + require.NotEmpty(t, agentID) + // Disable device to enforce checkpoint status ERROR. + err = nbsClient.DisableDevices(ctx, agentID, deviceUUIDs, t.Name()) + require.NoError(t, err) + + response := disk_manager.CreateSnapshotResponse{} + err = internal_client.WaitResponse(ctx, client, operation.Id, &response) + if err != nil { + // OK: dataplane task failed with 'Device disabled' error, but shadow + // disk was filled successfully. + // TODO: improve this test after https://github.com/ydb-platform/nbs/issues/1950#issuecomment-2541530203 + require.Contains(t, err.Error(), "Device disabled") + return + } + require.Equal(t, int64(diskSize), response.Size) + + meta := disk_manager.CreateSnapshotMetadata{} + err = internal_client.GetOperationMetadata(ctx, client, operation.Id, &meta) + if err != nil { + // OK: dataplane task failed with 'Device disabled' error, but shadow + // disk was filled successfully. + // TODO: improve this test after https://github.com/ydb-platform/nbs/issues/1950#issuecomment-2541530203 + require.Contains(t, err.Error(), "Device disabled") + return + } + require.Equal(t, float64(1), meta.Progress) + + testcommon.RequireCheckpointsAreEmpty(t, ctx, diskID) + testcommon.CheckConsistency(t, ctx) +} + +func TestSnapshotServiceCreateSnapshotFromDiskWithFailedShadowDiskShort(t *testing.T) { + testCreateSnapshotFromDiskWithFailedShadowDisk( + t, + disk_manager.DiskKind_DISK_KIND_SSD_NONREPLICATED, + 4096, // diskBlockSize + 262144*4096, // diskSize + // Need to add some variance for better testing. + common.RandomDuration(0*time.Second, 3*time.Second), // waitDurationBeforeDisableDevice + ) +} + +func TestSnapshotServiceCreateSnapshotFromDiskWithFailedShadowDiskLong(t *testing.T) { + testCreateSnapshotFromDiskWithFailedShadowDisk( + t, + disk_manager.DiskKind_DISK_KIND_SSD_NONREPLICATED, + 4096, // diskBlockSize + 262144*4096, // diskSize + // Need to add some variance for better testing. + common.RandomDuration(3*time.Second, 40*time.Second), // waitDurationBeforeDisableDevice + ) +} diff --git a/cloud/disk_manager/internal/pkg/resources/mocks/storage_mock.go b/cloud/disk_manager/internal/pkg/resources/mocks/storage_mock.go index da0b07e1791..6022f9852e6 100644 --- a/cloud/disk_manager/internal/pkg/resources/mocks/storage_mock.go +++ b/cloud/disk_manager/internal/pkg/resources/mocks/storage_mock.go @@ -170,12 +170,20 @@ func (s *StorageMock) CreateSnapshot( func (s *StorageMock) SnapshotCreated( ctx context.Context, snapshotID string, + checkpointID string, createdAt time.Time, snapshotSize uint64, snapshotStorageSize uint64, ) error { - args := s.Called(ctx, snapshotID, createdAt, snapshotSize, snapshotStorageSize) + args := s.Called( + ctx, + snapshotID, + checkpointID, + createdAt, + snapshotSize, + snapshotStorageSize, + ) return args.Error(0) } diff --git a/cloud/disk_manager/internal/pkg/resources/snapshots.go b/cloud/disk_manager/internal/pkg/resources/snapshots.go index 5547342b70e..28c0b0f6ad2 100644 --- a/cloud/disk_manager/internal/pkg/resources/snapshots.go +++ b/cloud/disk_manager/internal/pkg/resources/snapshots.go @@ -445,6 +445,7 @@ func (s *storageYDB) snapshotCreated( ctx context.Context, session *persistence.Session, snapshotID string, + checkpointID string, createdAt time.Time, snapshotSize uint64, snapshotStorageSize uint64, @@ -505,6 +506,7 @@ func (s *storageYDB) snapshotCreated( } state.status = snapshotStatusReady + state.checkpointID = checkpointID state.createdAt = createdAt state.size = snapshotSize state.storageSize = snapshotStorageSize @@ -829,6 +831,7 @@ func (s *storageYDB) CreateSnapshot( func (s *storageYDB) SnapshotCreated( ctx context.Context, snapshotID string, + checkpointID string, createdAt time.Time, snapshotSize uint64, snapshotStorageSize uint64, @@ -841,6 +844,7 @@ func (s *storageYDB) SnapshotCreated( ctx, session, snapshotID, + checkpointID, createdAt, snapshotSize, snapshotStorageSize, diff --git a/cloud/disk_manager/internal/pkg/resources/snapshots_test.go b/cloud/disk_manager/internal/pkg/resources/snapshots_test.go index 25247c7a626..f242cbeb610 100644 --- a/cloud/disk_manager/internal/pkg/resources/snapshots_test.go +++ b/cloud/disk_manager/internal/pkg/resources/snapshots_test.go @@ -68,11 +68,11 @@ func TestSnapshotsCreateSnapshot(t *testing.T) { require.NoError(t, err) require.Equal(t, snapshot.ID, created.ID) - err = storage.SnapshotCreated(ctx, snapshot.ID, time.Now(), 0, 0) + err = storage.SnapshotCreated(ctx, snapshot.ID, "checkpoint", time.Now(), 0, 0) require.NoError(t, err) // Check idempotency. - err = storage.SnapshotCreated(ctx, snapshot.ID, time.Now(), 0, 0) + err = storage.SnapshotCreated(ctx, snapshot.ID, "checkpoint", time.Now(), 0, 0) require.NoError(t, err) // Check idempotency. @@ -124,7 +124,7 @@ func TestSnapshotsDeleteSnapshot(t *testing.T) { require.NoError(t, err) requireSnapshotsAreEqual(t, expected, *actual) - err = storage.SnapshotCreated(ctx, snapshot.ID, time.Now(), 0, 0) + err = storage.SnapshotCreated(ctx, snapshot.ID, "checkpoint", time.Now(), 0, 0) require.Error(t, err) require.True(t, errors.Is(err, errors.NewEmptyNonRetriableError())) @@ -149,7 +149,7 @@ func TestSnapshotsDeleteSnapshot(t *testing.T) { require.Error(t, err) require.True(t, errors.Is(err, errors.NewEmptyNonRetriableError())) - err = storage.SnapshotCreated(ctx, snapshot.ID, time.Now(), 0, 0) + err = storage.SnapshotCreated(ctx, snapshot.ID, "checkpoint", time.Now(), 0, 0) require.Error(t, err) require.True(t, errors.Is(err, errors.NewEmptyNonRetriableError())) } @@ -173,7 +173,7 @@ func TestSnapshotsDeleteNonexistentSnapshot(t *testing.T) { err = storage.SnapshotDeleted(ctx, snapshot.ID, time.Now()) require.NoError(t, err) - err = storage.SnapshotCreated(ctx, snapshot.ID, time.Now(), 0, 0) + err = storage.SnapshotCreated(ctx, snapshot.ID, "checkpoint", time.Now(), 0, 0) require.Error(t, err) require.True(t, errors.Is(err, errors.NewEmptyNonRetriableError())) @@ -350,7 +350,14 @@ func TestSnapshotsGetSnapshot(t *testing.T) { require.NotNil(t, s) requireSnapshotsAreEqual(t, snapshot, *s) - err = storage.SnapshotCreated(ctx, snapshotID, time.Now(), snapshotSize, snapshotStorageSize) + err = storage.SnapshotCreated( + ctx, + snapshotID, + "checkpoint", + time.Now(), + snapshotSize, + snapshotStorageSize, + ) require.NoError(t, err) snapshot.Size = snapshotSize @@ -362,3 +369,89 @@ func TestSnapshotsGetSnapshot(t *testing.T) { require.NotNil(t, s) requireSnapshotsAreEqual(t, snapshot, *s) } + +func TestSnapshotsCreatedWithAnotherCheckpoint(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + db, err := newYDB(ctx) + require.NoError(t, err) + defer db.Close(ctx) + + storage := newStorage(t, ctx, db) + + testSnapshotsCreatedWithAnotherCheckpoint( + t, + ctx, + storage, + "snapshot-A", + "", + "checkpoint-2", + ) + testSnapshotsCreatedWithAnotherCheckpoint( + t, + ctx, + storage, + "snapshot-B", + "checkpoint-1", + "checkpoint-2", + ) +} + +func testSnapshotsCreatedWithAnotherCheckpoint( + t *testing.T, + ctx context.Context, + storage Storage, + snapshotID string, + checkpointID1 string, + checkpointID2 string, +) { + + snapshot := SnapshotMeta{ + ID: snapshotID, + FolderID: "folder", + Disk: &types.Disk{ + ZoneId: "zone", + DiskId: "disk", + }, + CheckpointID: checkpointID1, + CreateRequest: &wrappers.UInt64Value{ + Value: 1, + }, + CreateTaskID: "create", + CreatingAt: time.Now(), + CreatedBy: "user", + } + + created, err := storage.CreateSnapshot(ctx, snapshot) + require.NoError(t, err) + require.Equal(t, snapshot.ID, created.ID) + require.Equal(t, checkpointID1, created.CheckpointID) + + // Check idempotency. + created, err = storage.CreateSnapshot(ctx, snapshot) + require.NoError(t, err) + require.Equal(t, snapshot.ID, created.ID) + require.Equal(t, checkpointID1, created.CheckpointID) + + err = storage.SnapshotCreated(ctx, snapshot.ID, checkpointID2, time.Now(), 0, 0) + require.NoError(t, err) + s, err := storage.GetSnapshotMeta(ctx, snapshotID) + require.NoError(t, err) + require.NotNil(t, s) + require.Equal(t, checkpointID2, s.CheckpointID) + + // Check idempotency. + err = storage.SnapshotCreated(ctx, snapshot.ID, checkpointID2, time.Now(), 0, 0) + require.NoError(t, err) + s, err = storage.GetSnapshotMeta(ctx, snapshotID) + require.NoError(t, err) + require.NotNil(t, s) + require.Equal(t, checkpointID2, s.CheckpointID) + + // Check idempotency. + created, err = storage.CreateSnapshot(ctx, snapshot) + require.NoError(t, err) + require.Equal(t, snapshot.ID, created.ID) + require.Equal(t, checkpointID2, created.CheckpointID) +} diff --git a/cloud/disk_manager/internal/pkg/resources/storage.go b/cloud/disk_manager/internal/pkg/resources/storage.go index 69114cac1ff..94693d0c6a1 100644 --- a/cloud/disk_manager/internal/pkg/resources/storage.go +++ b/cloud/disk_manager/internal/pkg/resources/storage.go @@ -168,6 +168,7 @@ type Storage interface { SnapshotCreated( ctx context.Context, snapshotID string, + checkpointID string, createdAt time.Time, snapshotSize uint64, snapshotStorageSize uint64, diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go index 57ea4321a02..c52604b2760 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go +++ b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go @@ -2,6 +2,7 @@ package snapshots import ( "context" + "fmt" "time" "github.com/golang/protobuf/proto" @@ -48,7 +49,6 @@ func (t *createSnapshotFromDiskTask) run( ctx context.Context, execCtx tasks.ExecutionContext, nbsClient nbs.Client, - checkpointID string, ) error { disk := t.request.SrcDisk @@ -63,7 +63,6 @@ func (t *createSnapshotFromDiskTask) run( ID: t.request.DstSnapshotId, FolderID: t.request.FolderId, Disk: disk, - CheckpointID: checkpointID, CreateRequest: t.request, CreateTaskID: selfTaskID, CreatingAt: time.Now(), @@ -80,20 +79,28 @@ func (t *createSnapshotFromDiskTask) run( return nil } - err = nbsClient.CreateCheckpoint( - ctx, - nbs.CheckpointParams{ - DiskID: disk.DiskId, - CheckpointID: checkpointID, - }, - ) - if err != nil { - return err - } + if t.state.FinalCheckpointID == "" { + err = t.updateCheckpoint(ctx, nbsClient) + if err != nil { + return err + } - err = t.ensureCheckpointReady(ctx, nbsClient, disk.DiskId, checkpointID) - if err != nil { - return err + err = t.ensureCheckpointReady( + ctx, + execCtx, + nbsClient, + disk.DiskId, + t.getCurrentCheckpointID(), + ) + if err != nil { + return err + } + + t.state.FinalCheckpointID = t.getCurrentCheckpointID() + err = execCtx.SaveState(ctx) + if err != nil { + return err + } } taskID, err := t.scheduler.ScheduleZonalTask( @@ -103,7 +110,7 @@ func (t *createSnapshotFromDiskTask) run( disk.ZoneId, &dataplane_protos.CreateSnapshotFromDiskRequest{ SrcDisk: disk, - SrcDiskCheckpointId: checkpointID, + SrcDiskCheckpointId: t.state.FinalCheckpointID, DstSnapshotId: t.request.DstSnapshotId, UseS3: t.request.UseS3, UseProxyOverlayDisk: t.request.UseProxyOverlayDisk, @@ -145,6 +152,7 @@ func (t *createSnapshotFromDiskTask) run( return t.storage.SnapshotCreated( ctx, t.request.DstSnapshotId, + t.state.FinalCheckpointID, time.Now(), uint64(t.state.SnapshotSize), uint64(t.state.SnapshotStorageSize), @@ -157,20 +165,22 @@ func (t *createSnapshotFromDiskTask) Run( ) error { disk := t.request.SrcDisk - // NOTE: we use snapshot id as checkpoint id. - checkpointID := t.request.DstSnapshotId nbsClient, err := t.nbsFactory.GetClient(ctx, disk.ZoneId) if err != nil { return err } - err = t.run(ctx, execCtx, nbsClient, checkpointID) + err = t.run(ctx, execCtx, nbsClient) if err != nil { return err } - err = nbsClient.DeleteCheckpointData(ctx, disk.DiskId, checkpointID) + err = nbsClient.DeleteCheckpointData( + ctx, + t.request.GetSrcDisk().DiskId, + t.state.FinalCheckpointID, + ) if err != nil { return err } @@ -190,11 +200,16 @@ func (t *createSnapshotFromDiskTask) Cancel( return err } - // NOTE: we use snapshot id as checkpoint id. - checkpointID := t.request.DstSnapshotId + err = nbsClient.DeleteCheckpoint( + ctx, + t.request.GetSrcDisk().DiskId, + t.getCurrentCheckpointID(), + ) + if err != nil { + return err + } - // NBS-1873: should always delete checkpoint. - err = nbsClient.DeleteCheckpoint(ctx, disk.DiskId, checkpointID) + err = t.deletePreviousCheckpoint(ctx, nbsClient) if err != nil { return err } @@ -278,6 +293,7 @@ func (t *createSnapshotFromDiskTask) GetResponse() proto.Message { func (t *createSnapshotFromDiskTask) ensureCheckpointReady( ctx context.Context, + execCtx tasks.ExecutionContext, nbsClient nbs.Client, diskID string, checkpointID string, @@ -290,7 +306,8 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady( logging.Info( ctx, - "Current CheckpointStatus: %v", + "Current CheckpointStatus for checkpoint %v: %v", + checkpointID, status, ) @@ -299,7 +316,11 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady( return errors.NewInterruptExecutionError() case nbs.CheckpointStatusError: - _ = nbsClient.DeleteCheckpoint(ctx, diskID, checkpointID) + t.state.FailedCheckpointsCount++ + err = execCtx.SaveState(ctx) + if err != nil { + return err + } return errors.NewRetriableErrorf("Filling the NRD disk replica ended with an error.") case nbs.CheckpointStatusReady: @@ -308,3 +329,55 @@ func (t *createSnapshotFromDiskTask) ensureCheckpointReady( return nil } + +//////////////////////////////////////////////////////////////////////////////// + +func (t *createSnapshotFromDiskTask) updateCheckpoint( + ctx context.Context, + nbsClient nbs.Client, +) error { + + err := t.deletePreviousCheckpoint(ctx, nbsClient) + if err != nil { + return err + } + + return nbsClient.CreateCheckpoint( + ctx, + nbs.CheckpointParams{ + DiskID: t.request.GetSrcDisk().DiskId, + CheckpointID: t.getCurrentCheckpointID(), + }, + ) +} + +func (t *createSnapshotFromDiskTask) deletePreviousCheckpoint( + ctx context.Context, + nbsClient nbs.Client, +) error { + if t.state.FailedCheckpointsCount == 0 { + // No previous checkpoint, nothing to do. + return nil + } + + checkpointID := t.makeCheckpointID( + int(t.state.FailedCheckpointsCount) - 1, + ) + + return nbsClient.DeleteCheckpoint( + ctx, + t.request.GetSrcDisk().DiskId, + checkpointID, + ) +} + +func (t *createSnapshotFromDiskTask) getCurrentCheckpointID() string { + return t.makeCheckpointID(int(t.state.FailedCheckpointsCount)) +} + +func (t *createSnapshotFromDiskTask) makeCheckpointID(index int) string { + if index == 0 { + return t.request.DstSnapshotId + } + return fmt.Sprintf("%v_%v", t.request.DstSnapshotId, index) +} diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go new file mode 100644 index 00000000000..090dcfcec1c --- /dev/null +++ b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task_test.go @@ -0,0 +1,199 @@ +package snapshots + +import ( + "context" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/clients/nbs" + nbs_mocks "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/clients/nbs/mocks" + dataplane_protos "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/protos" + performance_config "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/performance/config" + "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/resources" + resources_storage_mocks "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/resources/mocks" + "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/services/snapshots/protos" + "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/types" + "github.com/ydb-platform/nbs/cloud/tasks/errors" + "github.com/ydb-platform/nbs/cloud/tasks/logging" + tasks_mocks "github.com/ydb-platform/nbs/cloud/tasks/mocks" +) + +//////////////////////////////////////////////////////////////////////////////// + +func TestCreateSnapshotFromDiskUpdatesCheckpointsCorrectly(t *testing.T) { + zoneID := "zone-a" + diskID := t.Name() + + ctx := logging.SetLogger( + context.Background(), + logging.NewStderrLogger(logging.DebugLevel), + ) + scheduler := tasks_mocks.NewSchedulerMock() + storage := resources_storage_mocks.NewStorageMock() + nbsFactory := nbs_mocks.NewFactoryMock() + nbsClient := nbs_mocks.NewClientMock() + execCtx := tasks_mocks.NewExecutionContextMock() + + request := &protos.CreateSnapshotFromDiskRequest{ + SrcDisk: &types.Disk{ + ZoneId: zoneID, + DiskId: diskID, + }, + DstSnapshotId: "snapshotID", + FolderId: "folder", + UseS3: true, + UseProxyOverlayDisk: true, + } + + task := &createSnapshotFromDiskTask{ + performanceConfig: &performance_config.PerformanceConfig{}, + scheduler: scheduler, + storage: storage, + nbsFactory: nbsFactory, + request: request, + state: &protos.CreateSnapshotFromDiskTaskState{}, + } + + require.Equal(t, int32(0), task.state.FailedCheckpointsCount) + require.Empty(t, task.state.FinalCheckpointID) + + // Create first checkpoint and get checkpoint status 'Error'. + execCtx.On("GetTaskID").Return("create_snapshot_from_disk_task") + execCtx.On("SaveState", ctx).Return(nil) + nbsFactory.On("GetClient", ctx, zoneID).Return(nbsClient, nil) + nbsClient.On("Describe", ctx, diskID).Return(nbs.DiskParams{}, nil) + storage.On("CreateSnapshot", ctx, mock.Anything).Return( + resources.SnapshotMeta{Ready: false}, nil, + ) + + nbsClient.On("CreateCheckpoint", ctx, nbs.CheckpointParams{ + DiskID: request.SrcDisk.DiskId, + CheckpointID: "snapshotID", + }).Return(nil) + nbsClient.On( + "GetCheckpointStatus", + ctx, + diskID, + "snapshotID", // checkpointID + ).Return(nbs.CheckpointStatusError, nil) + + err := task.Run(ctx, execCtx) + require.Error(t, err) + require.True(t, errors.CanRetry(err)) + require.Equal(t, int32(1), task.state.FailedCheckpointsCount) + require.Empty(t, task.state.FinalCheckpointID) + mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx) + + // Create second checkpoint and get checkpoint status 'Ready'. + // Schedule dataplane task and start waiting for it. + nbsClient.On("CreateCheckpoint", ctx, nbs.CheckpointParams{ + DiskID: request.SrcDisk.DiskId, + CheckpointID: "snapshotID", + }).Unset() + nbsClient.On( + "DeleteCheckpoint", ctx, diskID, "snapshotID", + ).Return(nil).Once() // Should not update checkpoints after checkpoint got ready. + nbsClient.On("CreateCheckpoint", ctx, nbs.CheckpointParams{ + DiskID: request.SrcDisk.DiskId, + CheckpointID: "snapshotID_1", + }).Return(nil).Once() + nbsClient.On( + "GetCheckpointStatus", + ctx, + diskID, + "snapshotID_1", // checkpointID + ).Return(nbs.CheckpointStatusReady, nil).Once() + + scheduler.On( + "ScheduleZonalTask", + mock.Anything, // ctx + "dataplane.CreateSnapshotFromDisk", + mock.Anything, // description + zoneID, + &dataplane_protos.CreateSnapshotFromDiskRequest{ + SrcDisk: request.SrcDisk, + SrcDiskCheckpointId: "snapshotID_1", + DstSnapshotId: request.DstSnapshotId, + UseS3: request.UseS3, + UseProxyOverlayDisk: request.UseProxyOverlayDisk, + }, + ).Return("dataplane_task_id", nil) + scheduler.On("WaitTask", ctx, execCtx, "dataplane_task_id").Return( + mock.Anything, errors.NewInterruptExecutionError(), + ) + + err = task.Run(ctx, execCtx) + require.Error(t, err) + require.True(t, errors.Is(err, errors.NewInterruptExecutionError())) + require.Equal(t, int32(1), task.state.FailedCheckpointsCount) + require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1") + mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx) + + // Finish waiting for the dataplane task, finish creating snapshot. + scheduler.On( + "WaitTask", ctx, execCtx, "dataplane_task_id", + ).Unset().On( + "WaitTask", ctx, execCtx, "dataplane_task_id", + ).Return( + &dataplane_protos.CreateSnapshotFromDiskResponse{}, nil, + ) + execCtx.On("SetEstimate", mock.Anything) + storage.On( + "SnapshotCreated", + ctx, + request.DstSnapshotId, + "snapshotID_1", // checkpointID + mock.Anything, // createdAt + mock.Anything, // snapshotSize + mock.Anything, // snapshotStorageSize + ).Return(nil) + nbsClient.On( + "DeleteCheckpointData", ctx, diskID, "snapshotID_1", + ).Return(nil) + + err = task.Run(ctx, execCtx) + require.NoError(t, err) + require.Equal(t, int32(1), task.state.FailedCheckpointsCount) + require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1") + mock.AssertExpectationsForObjects(t, scheduler, storage, nbsFactory, nbsClient, execCtx) + + // Cancel the task. + // Use fresh nbs client in order not to interfere with previous calls. + nbsClientForCancel := nbs_mocks.NewClientMock() + nbsFactory.On( + "GetClient", ctx, zoneID, + ).Unset().On( + "GetClient", ctx, zoneID, + ).Return(nbsClientForCancel, nil) + + nbsClientForCancel.On( + "DeleteCheckpoint", ctx, diskID, "snapshotID", + ).Return(nil) + nbsClientForCancel.On( + "DeleteCheckpoint", ctx, diskID, "snapshotID_1", + ).Return(nil) + + var m *resources.SnapshotMeta + storage.On( + "DeleteSnapshot", + ctx, + request.DstSnapshotId, + "create_snapshot_from_disk_task", + mock.Anything, //deletingAt + ).Return(m, nil) + + err = task.Cancel(ctx, execCtx) + require.NoError(t, err) + require.Equal(t, int32(1), task.state.FailedCheckpointsCount) + require.Equal(t, task.state.FinalCheckpointID, "snapshotID_1") + mock.AssertExpectationsForObjects( + t, + scheduler, + storage, + nbsFactory, + nbsClient, + nbsClientForCancel, + execCtx, + ) +} diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto b/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto index f1f27c8b0f0..bc88d8a0bc2 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto +++ b/cloud/disk_manager/internal/pkg/services/snapshots/protos/create_snapshot_from_disk_task.proto @@ -24,4 +24,8 @@ message CreateSnapshotFromDiskTaskState { int64 SnapshotSize = 4; int64 SnapshotStorageSize = 5; string DataplaneTaskID = 6; + + // Needed for shadow disk based checkpoints. + int32 FailedCheckpointsCount = 7; + string FinalCheckpointID = 8; } diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/tests/ya.make b/cloud/disk_manager/internal/pkg/services/snapshots/tests/ya.make new file mode 100644 index 00000000000..25fe5adb29f --- /dev/null +++ b/cloud/disk_manager/internal/pkg/services/snapshots/tests/ya.make @@ -0,0 +1,3 @@ +GO_TEST_FOR(cloud/disk_manager/internal/pkg/services/snapshots) + +END() diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/ya.make b/cloud/disk_manager/internal/pkg/services/snapshots/ya.make index 231e344ac99..5e8338b154e 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/ya.make +++ b/cloud/disk_manager/internal/pkg/services/snapshots/ya.make @@ -9,6 +9,10 @@ SRCS( service.go ) +GO_TEST_SRCS( + create_snapshot_from_disk_task_test.go +) + END() RECURSE( @@ -18,4 +22,5 @@ RECURSE( RECURSE_FOR_TESTS( mocks + tests ) diff --git a/cloud/disk_manager/test/recipe/nbs_launcher.py b/cloud/disk_manager/test/recipe/nbs_launcher.py index 204210b0ead..a097723e7b8 100644 --- a/cloud/disk_manager/test/recipe/nbs_launcher.py +++ b/cloud/disk_manager/test/recipe/nbs_launcher.py @@ -75,6 +75,7 @@ def __init__( storage_config_patch.DisableLocalService = False storage_config_patch.InactiveClientsTimeout = 60000 # 1 min storage_config_patch.AgentRequestTimeout = 5000 # 5 sec + storage_config_patch.UseShadowDisksForNonreplDiskCheckpoints = True if destruction_allowed_only_for_disks_with_id_prefixes: storage_config_patch.DestructionAllowedOnlyForDisksWithIdPrefixes.extend(destruction_allowed_only_for_disks_with_id_prefixes)