Skip to content

Commit

Permalink
[ADDED] WatchFiltered method on KV (#1739)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Dec 13, 2024
1 parent 25692e5 commit aebea8a
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 22 deletions.
47 changes: 37 additions & 10 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type (
// with the same options as Watch.
WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error)

// WatchFiltered will watch for any updates to keys that match the keys
// argument. It can be configured with the same options as Watch.
WatchFiltered(ctx context.Context, keys []string, opts ...WatchOpt) (KeyWatcher, error)

// Keys will return all keys.
// Deprecated: Use ListKeys instead to avoid memory issues.
Keys(ctx context.Context, opts ...WatchOpt) ([]string, error)
Expand Down Expand Up @@ -1069,11 +1073,11 @@ func (w *watcher) Stop() error {
return w.sub.Unsubscribe()
}

// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) {
if !searchKeyValid(keys) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject")
func (kv *kvs) WatchFiltered(ctx context.Context, keys []string, opts ...WatchOpt) (KeyWatcher, error) {
for _, key := range keys {
if !searchKeyValid(key) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject")
}
}
var o watchOpts
for _, opt := range opts {
Expand All @@ -1085,10 +1089,20 @@ func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWat
}

// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(keys)
keys = b.String()
for i, key := range keys {
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)
keys[i] = b.String()
}

// if no keys are provided, watch all keys
if len(keys) == 0 {
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(AllKeys)
keys = []string{b.String()}
}

// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256)}
Expand Down Expand Up @@ -1161,7 +1175,14 @@ func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWat
// update() callback.
w.mu.Lock()
defer w.mu.Unlock()
sub, err := kv.pushJS.Subscribe(keys, update, subOpts...)
var sub *nats.Subscription
var err error
if len(keys) == 1 {
sub, err = kv.pushJS.Subscribe(keys[0], update, subOpts...)
} else {
subOpts = append(subOpts, nats.ConsumerFilterSubjects(keys...))
sub, err = kv.pushJS.Subscribe("", update, subOpts...)
}
if err != nil {
return nil, err
}
Expand All @@ -1185,6 +1206,12 @@ func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWat
return w, nil
}

// Watch for any updates to keys that match the keys argument which could include wildcards.
// Watch will send a nil entry when it has received all initial values.
func (kv *kvs) Watch(ctx context.Context, keys string, opts ...WatchOpt) (KeyWatcher, error) {
return kv.WatchFiltered(ctx, []string{keys}, opts...)
}

// WatchAll will invoke the callback for all updates.
func (kv *kvs) WatchAll(ctx context.Context, opts ...WatchOpt) (KeyWatcher, error) {
return kv.Watch(ctx, AllKeys, opts...)
Expand Down
74 changes: 73 additions & 1 deletion jetstream/test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,22 @@ func TestKeyValueWatch(t *testing.T) {
}
}
}
expectPurgeF := func(t *testing.T, watcher jetstream.KeyWatcher) func(key string, revision uint64) {
return func(key string, revision uint64) {
t.Helper()
select {
case v := <-watcher.Updates():
if v.Operation() != jetstream.KeyValuePurge {
t.Fatalf("Expected a delete operation but got %+v", v)
}
if v.Revision() != revision {
t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision())
}
case <-time.After(time.Second):
t.Fatalf("Did not receive an update like expected")
}
}
}
expectInitDoneF := func(t *testing.T, watcher jetstream.KeyWatcher) func() {
return func() {
t.Helper()
Expand Down Expand Up @@ -315,13 +331,27 @@ func TestKeyValueWatch(t *testing.T) {

watcher, err = kv.Watch(ctx, "t.*")
expectOk(t, err)
defer watcher.Stop()

expectInitDone = expectInitDoneF(t, watcher)
expectUpdate = expectUpdateF(t, watcher)
expectUpdate("t.name", "ik", 8)
expectUpdate("t.age", "44", 10)
expectInitDone()
watcher.Stop()

// test watcher with multiple filters
watcher, err = kv.WatchFiltered(ctx, []string{"t.name", "name"})
expectOk(t, err)
expectInitDone = expectInitDoneF(t, watcher)
expectUpdate = expectUpdateF(t, watcher)
expectPurge := expectPurgeF(t, watcher)
expectUpdate("name", "ik", 3)
expectUpdate("t.name", "ik", 8)
expectInitDone()
err = kv.Purge(ctx, "name")
expectOk(t, err)
expectPurge("name", 11)
defer watcher.Stop()
})

t.Run("watcher with history included", func(t *testing.T) {
Expand Down Expand Up @@ -542,6 +572,48 @@ func TestKeyValueWatch(t *testing.T) {
_, err = kv.Watch(ctx, "foo", jetstream.IncludeHistory(), jetstream.UpdatesOnly())
expectErr(t, err, jetstream.ErrInvalidOption)
})

t.Run("filtered watch with no filters", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

kv, err := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{Bucket: "WATCH"})
expectOk(t, err)

// this should behave like WatchAll
watcher, err := kv.WatchFiltered(ctx, []string{})
expectOk(t, err)
defer watcher.Stop()

expectInitDone := expectInitDoneF(t, watcher)
expectUpdate := expectUpdateF(t, watcher)
expectDelete := expectDeleteF(t, watcher)
// Make sure we already got an initial value marker.
expectInitDone()

_, err = kv.Create(ctx, "name", []byte("derek"))
expectOk(t, err)
expectUpdate("name", "derek", 1)
_, err = kv.Put(ctx, "name", []byte("rip"))
expectOk(t, err)
expectUpdate("name", "rip", 2)
_, err = kv.Put(ctx, "name", []byte("ik"))
expectOk(t, err)
expectUpdate("name", "ik", 3)
_, err = kv.Put(ctx, "age", []byte("22"))
expectOk(t, err)
expectUpdate("age", "22", 4)
_, err = kv.Put(ctx, "age", []byte("33"))
expectOk(t, err)
expectUpdate("age", "33", 5)
expectOk(t, kv.Delete(ctx, "age"))
expectDelete("age", 6)
})
}

func TestKeyValueWatchContext(t *testing.T) {
Expand Down
46 changes: 36 additions & 10 deletions kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type KeyValue interface {
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// WatchFiltered will watch for any updates to keys that match the keys
// argument. It can be configured with the same options as Watch.
WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// Deprecated: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
Expand Down Expand Up @@ -964,11 +967,11 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
return kv.Watch(AllKeys, opts...)
}

// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if !searchKeyValid(keys) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject")
func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) {
for _, key := range keys {
if !searchKeyValid(key) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject")
}
}
var o watchOpts
for _, opt := range opts {
Expand All @@ -980,10 +983,20 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}

// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(keys)
keys = b.String()
for i, key := range keys {
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)
keys[i] = b.String()
}

// if no keys are provided, watch all keys
if len(keys) == 0 {
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(AllKeys)
keys = []string{b.String()}
}

// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
Expand Down Expand Up @@ -1056,7 +1069,14 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
// update() callback.
w.mu.Lock()
defer w.mu.Unlock()
sub, err := kv.js.Subscribe(keys, update, subOpts...)
var sub *Subscription
var err error
if len(keys) == 1 {
sub, err = kv.js.Subscribe(keys[0], update, subOpts...)
} else {
subOpts = append(subOpts, ConsumerFilterSubjects(keys...))
sub, err = kv.js.Subscribe("", update, subOpts...)
}
if err != nil {
return nil, err
}
Expand All @@ -1083,6 +1103,12 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
return w, nil
}

// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
return kv.WatchFiltered([]string{keys}, opts...)
}

// Bucket returns the current bucket name (JetStream stream).
func (kv *kvs) Bucket() string {
return kv.name
Expand Down
72 changes: 71 additions & 1 deletion test/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,22 @@ func TestKeyValueWatch(t *testing.T) {
}
}
}
expectPurgeF := func(t *testing.T, watcher nats.KeyWatcher) func(key string, revision uint64) {
return func(key string, revision uint64) {
t.Helper()
select {
case v := <-watcher.Updates():
if v.Operation() != nats.KeyValuePurge {
t.Fatalf("Expected a delete operation but got %+v", v)
}
if v.Revision() != revision {
t.Fatalf("Did not get expected revision: %d vs %d", revision, v.Revision())
}
case <-time.After(time.Second):
t.Fatalf("Did not receive an update like expected")
}
}
}
expectInitDoneF := func(t *testing.T, watcher nats.KeyWatcher) func() {
return func() {
t.Helper()
Expand Down Expand Up @@ -237,13 +253,27 @@ func TestKeyValueWatch(t *testing.T) {

watcher, err = kv.Watch("t.*")
expectOk(t, err)
defer watcher.Stop()

expectInitDone = expectInitDoneF(t, watcher)
expectUpdate = expectUpdateF(t, watcher)
expectUpdate("t.name", "ik", 8)
expectUpdate("t.age", "44", 10)
expectInitDone()
watcher.Stop()

// test watcher with multiple filters
watcher, err = kv.WatchFiltered([]string{"t.name", "name"})
expectOk(t, err)
expectInitDone = expectInitDoneF(t, watcher)
expectUpdate = expectUpdateF(t, watcher)
expectPurge := expectPurgeF(t, watcher)
expectUpdate("name", "ik", 3)
expectUpdate("t.name", "ik", 8)
expectInitDone()
err = kv.Purge("name")
expectOk(t, err)
expectPurge("name", 11)
defer watcher.Stop()
})

t.Run("watcher with history included", func(t *testing.T) {
Expand Down Expand Up @@ -384,6 +414,46 @@ func TestKeyValueWatch(t *testing.T) {
_, err = kv.Watch("foo.")
expectErr(t, err, nats.ErrInvalidKey)
})

t.Run("filtered watch with no filters", func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "WATCH"})
expectOk(t, err)

// this should behave like WatchAll
watcher, err := kv.WatchFiltered([]string{})
expectOk(t, err)
defer watcher.Stop()

expectInitDone := expectInitDoneF(t, watcher)
expectUpdate := expectUpdateF(t, watcher)
expectDelete := expectDeleteF(t, watcher)
// Make sure we already got an initial value marker.
expectInitDone()

_, err = kv.Create("name", []byte("derek"))
expectOk(t, err)
expectUpdate("name", "derek", 1)
_, err = kv.Put("name", []byte("rip"))
expectOk(t, err)
expectUpdate("name", "rip", 2)
_, err = kv.Put("name", []byte("ik"))
expectOk(t, err)
expectUpdate("name", "ik", 3)
_, err = kv.Put("age", []byte("22"))
expectOk(t, err)
expectUpdate("age", "22", 4)
_, err = kv.Put("age", []byte("33"))
expectOk(t, err)
expectUpdate("age", "33", 5)
expectOk(t, kv.Delete("age"))
expectDelete("age", 6)
})
}

func TestKeyValueWatchContext(t *testing.T) {
Expand Down

0 comments on commit aebea8a

Please sign in to comment.