Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Disk Manager] retry with new checkpoint id when create snapshot if shadow disk failed during filling #2691

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
62 changes: 62 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/disk_registry_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package nbs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

предлагаю упростить и если уж тащить внутренности dr в дм, то только в тестового клиента (и я бы даже делал это без тестов)


type diskRegistryCheckpointReplica struct {
CheckpointID string `json:"CheckpointId"`
SourceDiskID string `json:"SourceDiskId"`
}

type diskRegistryDisk struct {
DiskID string `json:"DiskId"`
DeviceUUIDs []string `json:"DeviceUUIDs"`
CheckpointReplica diskRegistryCheckpointReplica `json:"CheckpointReplica"`
}

type diskRegistryDevice struct {
DeviceUUID string `json:"DeviceUUID"`
}

type diskRegistryAgent struct {
Devices []diskRegistryDevice `json:"Devices"`
AgentID string `json:"AgentId"`
}

type DiskRegistryBackup struct {
Disks []diskRegistryDisk `json:"Disks"`
Agents []diskRegistryAgent `json:"Agents"`
}

type diskRegistryState struct {
Backup DiskRegistryBackup `json:"Backup"`
}

func (b *DiskRegistryBackup) GetDevicesOfDisk(diskID string) []string {
for _, disk := range b.Disks {
if disk.DiskID == diskID {
return disk.DeviceUUIDs
}
}
return nil
}

func (b *DiskRegistryBackup) GetDevicesOfShadowDisk(
originalDiskID string,
) []string {

for _, disk := range b.Disks {
if disk.CheckpointReplica.SourceDiskID == originalDiskID {
return disk.DeviceUUIDs
}
}
return nil
}

func (b *DiskRegistryBackup) GetAgentIDByDeviceUUId(deviceUUID string) string {
for _, agent := range b.Agents {
for _, device := range agent.Devices {
if device.DeviceUUID == deviceUUID {
return agent.AgentID
}
}
}
return ""
}
11 changes: 11 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,17 @@ type Client interface {

// Used in tests.
List(ctx context.Context) ([]string, error)

// Used in tests.
BackupDiskRegistryState(ctx context.Context) (*DiskRegistryBackup, error)

// Used in tests.
DisableDevices(
ctx context.Context,
agentID string,
deviceUUIDs []string,
message string,
) error
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
19 changes: 19 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/mocks/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,25 @@ func (c *ClientMock) FinishFillDisk(
return args.Error(0)
}

func (c *ClientMock) BackupDiskRegistryState(
ctx context.Context,
) (*nbs.DiskRegistryBackup, error) {

args := c.Called(ctx)
return args.Get(0).(*nbs.DiskRegistryBackup), args.Error(1)
}

func (c *ClientMock) DisableDevices(
ctx context.Context,
agentID string,
deviceUUIDs []string,
message string,
) error {

args := c.Called(ctx, agentID, deviceUUIDs, message)
return args.Error(0)
}

////////////////////////////////////////////////////////////////////////////////

func NewClientMock() *ClientMock {
Expand Down
51 changes: 51 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/testing_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nbs
import (
"bytes"
"context"
"encoding/json"
"fmt"
"hash/crc32"
"math/rand"
Expand Down Expand Up @@ -513,6 +514,56 @@ func (c *client) Write(
return nil
}

func (c *client) BackupDiskRegistryState(
ctx context.Context,
) (*DiskRegistryBackup, error) {

output, err := c.nbs.ExecuteAction(ctx, "backupdiskregistrystate", []byte("{}"))
if err != nil {
return nil, wrapError(err)
}

var state diskRegistryState
err = json.Unmarshal(output, &state)
if err != nil {
return nil, err
}

return &state.Backup, nil
}

func (c *client) DisableDevices(
ctx context.Context,
agentID string,
deviceUUIDs []string,
message string,
) error {

if len(deviceUUIDs) == 0 {
return fmt.Errorf("list of devices to disable should contain at least one device")
}

deviceUUIDsField, err := json.Marshal(deviceUUIDs)
if err != nil {
return nil
}

input := fmt.Sprintf(
"{\"DisableAgent\":{\"AgentId\":\"%v\",\"DeviceUUIDs\":%v},\"Message\":\"%v\"}",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Хотя ручка и называется "DisableAgent", она не будет ломать весь агент, если ей передать непустой спасок девайсов. Она сломает только девайсы из этого списка.

Сломает -- значит, девайсы начнут отдавать ошибку в ответ на все запросы чтения и записи.

agentID,
string(deviceUUIDsField),
message,
)

_, err = c.nbs.ExecuteAction(
ctx,
"diskregistrychangestate",
[]byte(input),
)

return wrapError(err)
}

////////////////////////////////////////////////////////////////////////////////

type checkpoint struct {
Expand Down
139 changes: 139 additions & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/tests/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,3 +1530,142 @@ func TestReadFromProxyOverlayDiskWithMultipartitionBaseDisk(t *testing.T) {
err = client.ValidateCrc32(ctx, proxyOverlayDiskID, diskContentInfo)
require.NoError(t, err)
}

func TestDiskRegistryState(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 2 * 262144,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED,
})
require.NoError(t, err)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)

deviceUUIDs := backup.GetDevicesOfDisk(diskID)
require.Equal(t, len(deviceUUIDs), 2)

agentID := backup.GetAgentIDByDeviceUUId(deviceUUIDs[0])
require.NotEmpty(t, agentID)
agentID = backup.GetAgentIDByDeviceUUId(deviceUUIDs[1])
require.NotEmpty(t, agentID)

deviceUUIDs = backup.GetDevicesOfDisk("nonExistingDiskID")
require.Nil(t, deviceUUIDs)
agentID = backup.GetAgentIDByDeviceUUId("nonExistingDeviceID")
require.Empty(t, agentID)

err = client.Delete(ctx, diskID)
require.NoError(t, err)
}

func TestDiskRegistryDisableDevices(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 262144,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED,
})
require.NoError(t, err)

writeBlocks(t, ctx, client, diskID, 0 /* startIndex */, 1 /* blockCount */)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)
deviceUUIDs := backup.GetDevicesOfDisk(diskID)
require.Equal(t, len(deviceUUIDs), 1)
agentID := backup.GetAgentIDByDeviceUUId(deviceUUIDs[0])
require.NotEmpty(t, agentID)

err = client.DisableDevices(ctx, agentID, deviceUUIDs, t.Name())
require.NoError(t, err)

session, err := client.MountRW(
ctx,
diskID,
0, // fillGeneration
0, // fillSeqNumber
nil, // encryption
)
require.NoError(t, err)
require.NotNil(t, session)
defer session.Close(ctx)

data := make([]byte, 4096)
rand.Read(data)

// Device is disabled, all read and write requests should return error.
err = session.Write(ctx, 0, data)
require.Error(t, err)
zero := false
err = session.Read(ctx, 0, 1, "", data, &zero)
require.Error(t, err)

err = client.Delete(ctx, diskID)
require.NoError(t, err)
}

func TestDiskRegistryFindDevicesOfShadowDisk(t *testing.T) {
ctx := newContext()
client := newClient(t, ctx)

diskID := t.Name()

err := client.Create(ctx, nbs.CreateDiskParams{
ID: diskID,
BlocksCount: 262144,
BlockSize: 4096,
Kind: types.DiskKind_DISK_KIND_SSD_NONREPLICATED,
})
require.NoError(t, err)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)
deviceUUIDs := backup.GetDevicesOfShadowDisk(diskID)
// Shadow disk should not exist because checkpoint is not created yet.
require.Nil(t, deviceUUIDs)

checkpointID := "checkpointID"
err = client.CreateCheckpoint(ctx, nbs.CheckpointParams{
DiskID: diskID,
CheckpointID: checkpointID,
CheckpointType: nbs.CheckpointTypeNormal,
})
require.NoError(t, err)

retries := 0
for {
// Waiting for shadow disk to be created.
time.Sleep(time.Second)

backup, err := client.BackupDiskRegistryState(ctx)
require.NoError(t, err)
deviceUUIDs := backup.GetDevicesOfShadowDisk(diskID)

if len(deviceUUIDs) > 0 {
require.Equal(t, len(deviceUUIDs), 1)
break
}

retries++
if retries == 10 {
require.Fail(t, "Shadow disk has not appeared in disk registry state")
}
}

err = client.DeleteCheckpoint(ctx, diskID, checkpointID)
require.NoError(t, err)
err = client.Delete(ctx, diskID)
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/tests/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ GO_TEST_FOR(cloud/disk_manager/internal/pkg/clients/nbs)

SET_APPEND(RECIPE_ARGS --nbs-only)
SET_APPEND(RECIPE_ARGS --multiple-nbs)
SET_APPEND(RECIPE_ARGS --disk-agent-count 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

нигде не нашел использования этого флага в этом ПРе - он уже был раньше?

INCLUDE(${ARCADIA_ROOT}/cloud/disk_manager/test/recipe/recipe.inc)

GO_XTEST_SRCS(
Expand Down
1 change: 1 addition & 0 deletions cloud/disk_manager/internal/pkg/clients/nbs/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ GO_LIBRARY()

SRCS(
client.go
disk_registry_state.go
factory.go
interface.go
metrics.go
Expand Down
Loading
Loading