From e8352c20fbc594e5013f3931a39701127d6a0f78 Mon Sep 17 00:00:00 2001 From: Jesse Peterson Date: Thu, 13 Jun 2024 15:21:20 -0700 Subject: [PATCH] switch to NanoLIB kv storage (#60) --- engine/storage/diskv/diskv.go | 11 ++-- engine/storage/inmem/inmem.go | 11 ++-- engine/storage/kv/event.go | 7 ++- engine/storage/kv/kv.go | 15 ++--- engine/storage/kv/prim.go | 7 ++- engine/storage/kv/worker_prim.go | 15 ++--- subsystem/cmdplan/storage/diskv/diskv.go | 5 +- subsystem/cmdplan/storage/inmem/inmem.go | 5 +- subsystem/cmdplan/storage/kv/kv.go | 3 +- subsystem/filevault/storage/diskv/diskv.go | 5 +- subsystem/filevault/storage/inmem/inmem.go | 5 +- subsystem/filevault/storage/kv/kv.go | 3 +- utils/kv/kv.go | 56 ----------------- utils/kv/kvdiskv/kvdiskv.go | 37 ------------ utils/kv/kvmap/kvmap.go | 70 ---------------------- 15 files changed, 52 insertions(+), 203 deletions(-) delete mode 100644 utils/kv/kv.go delete mode 100644 utils/kv/kvdiskv/kvdiskv.go delete mode 100644 utils/kv/kvmap/kvmap.go diff --git a/engine/storage/diskv/diskv.go b/engine/storage/diskv/diskv.go index 8043525..3db3626 100644 --- a/engine/storage/diskv/diskv.go +++ b/engine/storage/diskv/diskv.go @@ -5,8 +5,9 @@ import ( "path/filepath" "github.com/micromdm/nanocmd/engine/storage/kv" - "github.com/micromdm/nanocmd/utils/kv/kvdiskv" "github.com/micromdm/nanocmd/utils/uuid" + + "github.com/micromdm/nanolib/storage/kv/kvdiskv" "github.com/peterbourgon/diskv/v3" ) @@ -18,23 +19,23 @@ type Diskv struct { func New(path string) *Diskv { flatTransform := func(s string) []string { return []string{} } return &Diskv{KV: kv.New( - kvdiskv.NewBucket(diskv.New(diskv.Options{ + kvdiskv.New(diskv.New(diskv.Options{ BasePath: filepath.Join(path, "engine", "step"), Transform: flatTransform, CacheSizeMax: 1024 * 1024, })), - kvdiskv.NewBucket(diskv.New(diskv.Options{ + kvdiskv.New(diskv.New(diskv.Options{ BasePath: filepath.Join(path, "engine", "idcmd"), Transform: flatTransform, CacheSizeMax: 1024 * 1024, })), - kvdiskv.NewBucket(diskv.New(diskv.Options{ + kvdiskv.New(diskv.New(diskv.Options{ BasePath: filepath.Join(path, "engine", "eventsubs"), Transform: flatTransform, CacheSizeMax: 1024 * 1024, })), uuid.NewUUID(), - kvdiskv.NewBucket(diskv.New(diskv.Options{ + kvdiskv.New(diskv.New(diskv.Options{ BasePath: filepath.Join(path, "engine", "wfstatus"), Transform: flatTransform, CacheSizeMax: 1024 * 1024, diff --git a/engine/storage/inmem/inmem.go b/engine/storage/inmem/inmem.go index b172088..39d800c 100644 --- a/engine/storage/inmem/inmem.go +++ b/engine/storage/inmem/inmem.go @@ -3,8 +3,9 @@ package inmem import ( "github.com/micromdm/nanocmd/engine/storage/kv" - "github.com/micromdm/nanocmd/utils/kv/kvmap" "github.com/micromdm/nanocmd/utils/uuid" + + "github.com/micromdm/nanolib/storage/kv/kvmap" ) // InMem is an in-memory engine storage backend. @@ -14,10 +15,10 @@ type InMem struct { func New() *InMem { return &InMem{KV: kv.New( - kvmap.NewBucket(), - kvmap.NewBucket(), - kvmap.NewBucket(), + kvmap.New(), + kvmap.New(), + kvmap.New(), uuid.NewUUID(), - kvmap.NewBucket(), + kvmap.New(), )} } diff --git a/engine/storage/kv/event.go b/engine/storage/kv/event.go index b151ad8..bc5fcdf 100644 --- a/engine/storage/kv/event.go +++ b/engine/storage/kv/event.go @@ -8,8 +8,9 @@ import ( "strings" "github.com/micromdm/nanocmd/engine/storage" - "github.com/micromdm/nanocmd/utils/kv" "github.com/micromdm/nanocmd/workflow" + + "github.com/micromdm/nanolib/storage/kv" ) const ( @@ -100,12 +101,12 @@ func (s *KV) RetrieveEventSubscriptions(ctx context.Context, names []string) (ma return ret, nil } -func kvFindEventSubNamesByEvent(ctx context.Context, b kv.TraversingBucket, f workflow.EventFlag) ([]string, error) { +func kvFindEventSubNamesByEvent(ctx context.Context, b kv.KeysPrefixTraversingBucket, f workflow.EventFlag) ([]string, error) { var names []string // this.. is not very efficient. perhaps it would be better to // make a specific bucket/index for this. - for k := range b.Keys(nil) { + for k := range b.Keys(ctx, nil) { if !strings.HasSuffix(k, keySfxEventFlag) { continue } diff --git a/engine/storage/kv/kv.go b/engine/storage/kv/kv.go index 76dc6e8..a29f0b1 100644 --- a/engine/storage/kv/kv.go +++ b/engine/storage/kv/kv.go @@ -10,22 +10,23 @@ import ( "time" "github.com/micromdm/nanocmd/engine/storage" - "github.com/micromdm/nanocmd/utils/kv" "github.com/micromdm/nanocmd/utils/uuid" + + "github.com/micromdm/nanolib/storage/kv" ) // KV is a workflow engine storage backend using a key-value interface. type KV struct { mu sync.RWMutex - stepStore kv.TraversingBucket - idCmdStore kv.TraversingBucket - eventStore kv.TraversingBucket + stepStore kv.KeysPrefixTraversingBucket + idCmdStore kv.KeysPrefixTraversingBucket + eventStore kv.KeysPrefixTraversingBucket ider uuid.IDer - statusStore kv.TraversingBucket + statusStore kv.KeysPrefixTraversingBucket } // New creates a new key-value workflow engine storage backend. -func New(stepStore kv.TraversingBucket, idCmdStore kv.TraversingBucket, eventStore kv.TraversingBucket, ider uuid.IDer, statusStore kv.TraversingBucket) *KV { +func New(stepStore kv.KeysPrefixTraversingBucket, idCmdStore kv.KeysPrefixTraversingBucket, eventStore kv.KeysPrefixTraversingBucket, ider uuid.IDer, statusStore kv.KeysPrefixTraversingBucket) *KV { return &KV{ stepStore: stepStore, idCmdStore: idCmdStore, @@ -324,7 +325,7 @@ func (s *KV) RecordWorkflowStarted(ctx context.Context, ids []string, workflowNa // ClearWorkflowStatus removes all workflow start times for id. func (s *KV) ClearWorkflowStatus(ctx context.Context, id string) error { var toDelete []string - for k := range s.statusStore.Keys(nil) { + for k := range s.statusStore.Keys(ctx, nil) { // very inefficient! this could be a large table if strings.HasPrefix(k, id+".") { toDelete = append(toDelete, k) diff --git a/engine/storage/kv/prim.go b/engine/storage/kv/prim.go index 722cb3e..59c1d08 100644 --- a/engine/storage/kv/prim.go +++ b/engine/storage/kv/prim.go @@ -8,7 +8,8 @@ import ( "time" "github.com/micromdm/nanocmd/engine/storage" - "github.com/micromdm/nanocmd/utils/kv" + + "github.com/micromdm/nanolib/storage/kv" ) const ( @@ -257,13 +258,13 @@ func kvDeleteStepNotUntil(ctx context.Context, b kv.Bucket, stepID string) error } // kvFindWorkflowStepsWithIDs finds specific workflow steps (step IDs) for specific enrollment IDs. -func kvFindWorkflowStepsWithIDs(ctx context.Context, b kv.TraversingBucket, name string, ids []string) ([]string, error) { +func kvFindWorkflowStepsWithIDs(ctx context.Context, b kv.KeysPrefixTraversingBucket, name string, ids []string) ([]string, error) { var stepIDs []string // this.. is not very efficient. perhaps it would be better to // make a specific bucket/index for this. start: - for k := range b.Keys(nil) { + for k := range b.Keys(ctx, nil) { if !strings.HasSuffix(k, keySfxStepMeta) { continue } diff --git a/engine/storage/kv/worker_prim.go b/engine/storage/kv/worker_prim.go index a170eaf..a5fd336 100644 --- a/engine/storage/kv/worker_prim.go +++ b/engine/storage/kv/worker_prim.go @@ -8,17 +8,18 @@ import ( "time" "github.com/micromdm/nanocmd/engine/storage" - "github.com/micromdm/nanocmd/utils/kv" + + "github.com/micromdm/nanolib/storage/kv" ) -func kvFindNotUntilStepsWithIDs(ctx context.Context, b kv.TraversingBucket) ([]string, error) { +func kvFindNotUntilStepsWithIDs(ctx context.Context, b kv.KeysPrefixTraversingBucket) ([]string, error) { var stepIDs []string now := time.Now() // this.. is not very efficient. perhaps it would be better to // make a specific bucket/index for this. - for k := range b.Keys(nil) { + for k := range b.Keys(ctx, nil) { if !strings.HasSuffix(k, keySfxStepNotUntil) { continue } @@ -104,14 +105,14 @@ func kvGetIDCmdRaw(ctx context.Context, b kv.Bucket, id, cmdUUID string) (*stora }, nil } -func kvFindTimedOutStepIDs(ctx context.Context, b kv.TraversingBucket) ([]string, error) { +func kvFindTimedOutStepIDs(ctx context.Context, b kv.KeysPrefixTraversingBucket) ([]string, error) { var stepIDs []string now := time.Now() // this.. is not very efficient. perhaps it would be better to // make a specific bucket/index for this. - for k := range b.Keys(nil) { + for k := range b.Keys(ctx, nil) { if !strings.HasSuffix(k, keySfxStepTimeout) { continue } @@ -167,14 +168,14 @@ func kvGetIDCmdStepResult(ctx context.Context, b kv.Bucket, id, cmdUUID string, return result, err } -func kvFindCommandsToRePush(ctx context.Context, b kv.TraversingBucket, ifBefore time.Time, setTo time.Time) ([]string, error) { +func kvFindCommandsToRePush(ctx context.Context, b kv.KeysPrefixTraversingBucket, ifBefore time.Time, setTo time.Time) ([]string, error) { var ids []string resetLastPushes := make(map[string][]byte) // this.. is not very efficient. perhaps it would be better to // make a specific bucket/index for this. - for k := range b.Keys(nil) { + for k := range b.Keys(ctx, nil) { if !strings.HasSuffix(k, keySfxCmdLastPush) { continue } diff --git a/subsystem/cmdplan/storage/diskv/diskv.go b/subsystem/cmdplan/storage/diskv/diskv.go index aecb372..3e9abcd 100644 --- a/subsystem/cmdplan/storage/diskv/diskv.go +++ b/subsystem/cmdplan/storage/diskv/diskv.go @@ -5,7 +5,8 @@ import ( "path/filepath" "github.com/micromdm/nanocmd/subsystem/cmdplan/storage/kv" - "github.com/micromdm/nanocmd/utils/kv/kvdiskv" + + "github.com/micromdm/nanolib/storage/kv/kvdiskv" "github.com/peterbourgon/diskv/v3" ) @@ -18,7 +19,7 @@ type Diskv struct { func New(path string) *Diskv { flatTransform := func(s string) []string { return []string{} } return &Diskv{ - KV: kv.New(kvdiskv.NewBucket(diskv.New(diskv.Options{ + KV: kv.New(kvdiskv.New(diskv.New(diskv.Options{ BasePath: filepath.Join(path, "cmdplan"), Transform: flatTransform, CacheSizeMax: 1024 * 1024, diff --git a/subsystem/cmdplan/storage/inmem/inmem.go b/subsystem/cmdplan/storage/inmem/inmem.go index 7471e7f..d6f193c 100644 --- a/subsystem/cmdplan/storage/inmem/inmem.go +++ b/subsystem/cmdplan/storage/inmem/inmem.go @@ -3,7 +3,8 @@ package inmem import ( "github.com/micromdm/nanocmd/subsystem/cmdplan/storage/kv" - "github.com/micromdm/nanocmd/utils/kv/kvmap" + + "github.com/micromdm/nanolib/storage/kv/kvmap" ) // InMem is a command plan storage backend backed by an in-memory key-valye store. @@ -12,5 +13,5 @@ type InMem struct { } func New() *InMem { - return &InMem{KV: kv.New(kvmap.NewBucket())} + return &InMem{KV: kv.New(kvmap.New())} } diff --git a/subsystem/cmdplan/storage/kv/kv.go b/subsystem/cmdplan/storage/kv/kv.go index 2652c34..13e570f 100644 --- a/subsystem/cmdplan/storage/kv/kv.go +++ b/subsystem/cmdplan/storage/kv/kv.go @@ -7,7 +7,8 @@ import ( "sync" "github.com/micromdm/nanocmd/subsystem/cmdplan/storage" - "github.com/micromdm/nanocmd/utils/kv" + + "github.com/micromdm/nanolib/storage/kv" ) // KV is a cmdplan storage backend using JSON with key-value storage. diff --git a/subsystem/filevault/storage/diskv/diskv.go b/subsystem/filevault/storage/diskv/diskv.go index 7496791..e76aab6 100644 --- a/subsystem/filevault/storage/diskv/diskv.go +++ b/subsystem/filevault/storage/diskv/diskv.go @@ -7,7 +7,8 @@ import ( "github.com/micromdm/nanocmd/subsystem/filevault/storage" "github.com/micromdm/nanocmd/subsystem/filevault/storage/kv" - "github.com/micromdm/nanocmd/utils/kv/kvdiskv" + + "github.com/micromdm/nanolib/storage/kv/kvdiskv" "github.com/peterbourgon/diskv/v3" ) @@ -20,7 +21,7 @@ func New(path string, p storage.PRKStorage) (*Diskv, error) { flatTransform := func(s string) []string { return []string{} } kvStore, err := kv.New( context.Background(), - kvdiskv.NewBucket(diskv.New(diskv.Options{ + kvdiskv.New(diskv.New(diskv.Options{ BasePath: filepath.Join(path, "fvkey"), Transform: flatTransform, CacheSizeMax: 1024 * 1024, diff --git a/subsystem/filevault/storage/inmem/inmem.go b/subsystem/filevault/storage/inmem/inmem.go index 9c3ab41..6059691 100644 --- a/subsystem/filevault/storage/inmem/inmem.go +++ b/subsystem/filevault/storage/inmem/inmem.go @@ -6,7 +6,8 @@ import ( "github.com/micromdm/nanocmd/subsystem/filevault/storage" "github.com/micromdm/nanocmd/subsystem/filevault/storage/kv" - "github.com/micromdm/nanocmd/utils/kv/kvmap" + + "github.com/micromdm/nanolib/storage/kv/kvmap" ) // InMem implements an in-memory FileVault storage backend. @@ -15,6 +16,6 @@ type InMem struct { } func New(p storage.PRKStorage) (*InMem, error) { - kvStore, err := kv.New(context.Background(), kvmap.NewBucket(), p) + kvStore, err := kv.New(context.Background(), kvmap.New(), p) return &InMem{KV: kvStore}, err } diff --git a/subsystem/filevault/storage/kv/kv.go b/subsystem/filevault/storage/kv/kv.go index 654161e..ef0b4b8 100644 --- a/subsystem/filevault/storage/kv/kv.go +++ b/subsystem/filevault/storage/kv/kv.go @@ -9,7 +9,8 @@ import ( "github.com/micromdm/nanocmd/subsystem/filevault/storage" "github.com/micromdm/nanocmd/utils/cryptoutil" - "github.com/micromdm/nanocmd/utils/kv" + + "github.com/micromdm/nanolib/storage/kv" "github.com/smallstep/pkcs7" ) diff --git a/utils/kv/kv.go b/utils/kv/kv.go deleted file mode 100644 index 41055f3..0000000 --- a/utils/kv/kv.go +++ /dev/null @@ -1,56 +0,0 @@ -// Package kv defines an interface for key-value store. -package kv - -import ( - "context" - "fmt" -) - -// Bucket defines basic CRUD operations for key-value pairs in a single "namespace." -type Bucket interface { - Get(ctx context.Context, k string) (v []byte, err error) - Set(ctx context.Context, k string, v []byte) error - Has(ctx context.Context, k string) (found bool, err error) - Delete(ctx context.Context, k string) error -} - -// TraversingBucket allows us to get a list of the keys in the bucket as well. -type TraversingBucket interface { - Bucket - // Keys returns the unordered keys in the bucket - Keys(cancel <-chan struct{}) <-chan string -} - -// SetMap iterates over m to set the keys in b and returns any error. -func SetMap(ctx context.Context, b Bucket, m map[string][]byte) error { - var err error - for k, v := range m { - if err = b.Set(ctx, k, v); err != nil { - return fmt.Errorf("setting %s: %w", k, err) - } - } - return nil -} - -// SetMap iterates over keys to get the values in b and returns any error. -func GetMap(ctx context.Context, b Bucket, keys []string) (map[string][]byte, error) { - var err error - ret := make(map[string][]byte) - for _, k := range keys { - if ret[k], err = b.Get(ctx, k); err != nil { - return ret, fmt.Errorf("getting %s: %w", k, err) - } - } - return ret, nil -} - -// DeleteSlice deletes s keys from b. -func DeleteSlice(ctx context.Context, b Bucket, s []string) error { - var err error - for _, i := range s { - if err = b.Delete(ctx, i); err != nil { - return fmt.Errorf("deleting %s: %w", i, err) - } - } - return nil -} diff --git a/utils/kv/kvdiskv/kvdiskv.go b/utils/kv/kvdiskv/kvdiskv.go deleted file mode 100644 index 8ae1dbd..0000000 --- a/utils/kv/kvdiskv/kvdiskv.go +++ /dev/null @@ -1,37 +0,0 @@ -// Package kvdiskv wraps diskv to a standard interface for a key-value store. -package kvdiskv - -import ( - "context" - - "github.com/peterbourgon/diskv/v3" -) - -// KVDiskv wraps a diskv object to implement an on-disk key-value store. -type KVDiskv struct { - diskv *diskv.Diskv -} - -func NewBucket(dv *diskv.Diskv) *KVDiskv { - return &KVDiskv{diskv: dv} -} - -func (s *KVDiskv) Get(_ context.Context, k string) ([]byte, error) { - return s.diskv.Read(k) -} - -func (s *KVDiskv) Set(_ context.Context, k string, v []byte) error { - return s.diskv.Write(k, v) -} - -func (s *KVDiskv) Has(_ context.Context, k string) (bool, error) { - return s.diskv.Has(k), nil -} - -func (s *KVDiskv) Delete(_ context.Context, k string) error { - return s.diskv.Erase(k) -} - -func (s *KVDiskv) Keys(cancel <-chan struct{}) <-chan string { - return s.diskv.Keys(cancel) -} diff --git a/utils/kv/kvmap/kvmap.go b/utils/kv/kvmap/kvmap.go deleted file mode 100644 index f33590e..0000000 --- a/utils/kv/kvmap/kvmap.go +++ /dev/null @@ -1,70 +0,0 @@ -// Package kvmap implements an in-memory key-value store backed by a Go map. -package kvmap - -import ( - "context" - "fmt" - "sync" -) - -// KVMap is an in-memory key-value store backed by a Go map. -type KVMap struct { - mu sync.RWMutex - m map[string][]byte -} - -func NewBucket() *KVMap { - return &KVMap{m: make(map[string][]byte)} -} - -func (s *KVMap) Get(_ context.Context, k string) ([]byte, error) { - s.mu.RLock() - defer s.mu.RUnlock() - v, ok := s.m[k] - if !ok { - return nil, fmt.Errorf("key not found: %s", k) - } - return v, nil -} - -func (s *KVMap) Set(_ context.Context, k string, v []byte) error { - s.mu.Lock() - defer s.mu.Unlock() - s.m[k] = v - return nil -} - -func (s *KVMap) Has(_ context.Context, k string) (bool, error) { - s.mu.RLock() - defer s.mu.RUnlock() - _, ok := s.m[k] - return ok, nil -} - -func (s *KVMap) Delete(_ context.Context, k string) error { - s.mu.Lock() - defer s.mu.Unlock() - delete(s.m, k) - return nil -} - -// Keys returns the keys in this bucket. -// Note that this function spawns a go routine that keeps a read lock on -// the internal map. This means that if you attempt to write to the map -// while you're, say, iterating over a keys list you will likely deadlock. -func (s *KVMap) Keys(cancel <-chan struct{}) <-chan string { - r := make(chan string) - go func() { - s.mu.RLock() - defer s.mu.RUnlock() - defer close(r) - for k := range s.m { - select { - case <-cancel: - return - case r <- k: - } - } - }() - return r -}