Skip to content

Commit

Permalink
Refactor ListKeysFiltered to return KeyLister and update associat…
Browse files Browse the repository at this point in the history
…ed test (#2)

* refactor(kv): utilize WatchFiltered() for key filtering logic

- Replaced custom filtering logic with the new `WatchFiltered()` method.
- Updated associated tests in `kv_test.go` to validate the new implementation with various filter patterns.

* work on comments
  • Loading branch information
somratdutta authored Dec 18, 2024
1 parent 10a0bf7 commit 4354df8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 92 deletions.
59 changes: 18 additions & 41 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,12 @@ type (
// Deprecated: Use ListKeys instead to avoid memory issues.
Keys(ctx context.Context, opts ...WatchOpt) ([]string, error)

// KeysWithFilters returns a filtered list of keys in the bucket.
// It returns a complete slice of matching keys.
KeysWithFilters(ctx context.Context, kv KeyValue, filters ...string) ([]string, error)

// ListKeys will return KeyLister, allowing to retrieve all keys from
// the key value store in a streaming fashion (on a channel).
ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error)

// ListKeysWithFilters returns a KeyLister for filtered keys in the bucket.
ListKeysWithFilters(ctx context.Context, kv KeyValue, filters ...string) (<-chan string, error)
// ListKeysFiltered ListKeysWithFilters returns a KeyLister for filtered keys in the bucket.
ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error)

// History will return all historical values for the key (up to
// KeyValueMaxHistory).
Expand Down Expand Up @@ -1278,53 +1274,34 @@ func (kv *kvs) ListKeys(ctx context.Context, opts ...WatchOpt) (KeyLister, error
return kl, nil
}

// KeysWithFilters retrieves keys that match the provided filters using WatchFiltered.
func (kv *kvs) KeysWithFilters(ctx context.Context, _ KeyValue, filters ...string) ([]string, error) {
if len(filters) == 0 {
return nil, errors.New("filters cannot be empty")
}

watcher, err := kv.WatchFiltered(ctx, filters, nil)
if err != nil {
return nil, err
}
defer watcher.Stop()

var keys []string
for entry := range watcher.Updates() {
if entry == nil { // Indicates all initial values are received
break
}
keys = append(keys, entry.Key())
}
return keys, nil
}

// ListKeysWithFilters returns a channel of keys matching the provided filters using WatchFiltered.
func (kv *kvs) ListKeysWithFilters(ctx context.Context, _ KeyValue, filters ...string) (<-chan string, error) {
if len(filters) == 0 {
return nil, errors.New("filters cannot be empty")
}

watcher, err := kv.WatchFiltered(ctx, filters, nil)
func (kv *kvs) ListKeysFiltered(ctx context.Context, filters ...string) (KeyLister, error) {
watcher, err := kv.WatchFiltered(ctx, filters)
if err != nil {
return nil, err
}

keysCh := make(chan string)
// Reuse the existing keyLister implementation
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}

go func() {
defer close(kl.keys)
defer watcher.Stop()
defer close(keysCh)

for entry := range watcher.Updates() {
if entry == nil { // Indicates all initial values are received
break
for {
select {
case entry := <-watcher.Updates():
if entry == nil { // Indicates all initial values are received
return
}
kl.keys <- entry.Key()
case <-ctx.Done():
return
}
keysCh <- entry.Key()
}
}()

return keysCh, nil
return kl, nil
}

func (kl *keyLister) Keys() <-chan string {
Expand Down
55 changes: 4 additions & 51 deletions jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,54 +1037,7 @@ func TestKeyValueListKeys(t *testing.T) {
}
}

func TestKeysWithFilters(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// Create Key-Value store.
kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "KVS", History: 2})
expectOk(t, err)

// Helper function to add key-value pairs.
put := func(data map[string]string) {
for key, value := range data {
t.Helper()
_, err := kv.Put(ctx, key, []byte(value))
expectOk(t, err)
}
}

// Add key-value pairs.
put(map[string]string{
"apple": "fruit",
"banana": "fruit",
"carrot": "vegetable",
})

// Test filtering keys that match "apple".
filters := []string{"apple"}
filteredKeys, err := kv.KeysWithFilters(ctx, kv, filters...)
expectOk(t, err)

// Validate expected keys.
expectedKeys := []string{"apple"}
if len(filteredKeys) != len(expectedKeys) {
t.Fatalf("Expected %d filtered key(s), got %d", len(expectedKeys), len(filteredKeys))
}

for _, key := range expectedKeys {
if !contains(filteredKeys, key) {
t.Fatalf("Expected key %s in filtered keys, but not found", key)
}
}
}

func TestListKeysWithFilters(t *testing.T) {
func TestListKeysFiltered(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

Expand Down Expand Up @@ -1115,12 +1068,12 @@ func TestListKeysWithFilters(t *testing.T) {

// Use filters to list keys matching "apple".
filters := []string{"apple"}
keyLister, err := kv.ListKeysWithFilters(ctx, kv, filters...)
keyLister, err := kv.ListKeysFiltered(ctx, filters...)
expectOk(t, err)

// Collect filtered keys.
// Collect filtered keys from KeyLister
var filteredKeys []string
for key := range keyLister {
for key := range keyLister.Keys() {
filteredKeys = append(filteredKeys, key)
}

Expand Down

0 comments on commit 4354df8

Please sign in to comment.