Skip to content

Commit

Permalink
Use promises.New from k6
Browse files Browse the repository at this point in the history
This "automatically" fixes grafana/k6#4017 when this is build with a k6
version after that PR is merged.

Also reduces copy-pasted code.
  • Loading branch information
mstoykov committed Oct 26, 2024
1 parent a296c20 commit cdd0784
Showing 1 changed file with 41 additions and 63 deletions.
104 changes: 41 additions & 63 deletions redis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/redis/go-redis/v9"
"go.k6.io/k6/js/common"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/js/promises"
"go.k6.io/k6/lib"
)

Expand All @@ -28,7 +29,7 @@ type Client struct {
//
// The value for `expiration` is interpreted as seconds.
func (c *Client) Set(key string, value interface{}, expiration int) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -59,7 +60,7 @@ func (c *Client) Set(key string, value interface{}, expiration int) *sobek.Promi
//
// If the key does not exist, the promise is rejected with an error.
func (c *Client) Get(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -83,7 +84,7 @@ func (c *Client) Get(key string) *sobek.Promise {
//
// If the provided value is not a supported type, the promise is rejected with an error.
func (c *Client) GetSet(key string, value interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -110,7 +111,7 @@ func (c *Client) GetSet(key string, value interface{}) *sobek.Promise {

// Del removes the specified keys. A key is ignored if it does not exist
func (c *Client) Del(keys ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -134,7 +135,7 @@ func (c *Client) Del(keys ...string) *sobek.Promise {
//
// If the key does not exist, the promise is rejected with an error.
func (c *Client) GetDel(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -158,7 +159,7 @@ func (c *Client) GetDel(key string) *sobek.Promise {
// Note that if the same existing key is mentioned in the argument
// multiple times, it will be counted multiple times.
func (c *Client) Exists(keys ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -183,7 +184,7 @@ func (c *Client) Exists(keys ...string) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) Incr(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -208,7 +209,7 @@ func (c *Client) Incr(key string) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) IncrBy(key string, increment int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -233,7 +234,7 @@ func (c *Client) IncrBy(key string, increment int64) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) Decr(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -258,7 +259,7 @@ func (c *Client) Decr(key string) *sobek.Promise {
// error is returned if the key contains a value of the wrong type, or
// contains a string that cannot be represented as an integer.
func (c *Client) DecrBy(key string, decrement int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -282,7 +283,7 @@ func (c *Client) DecrBy(key string, decrement int64) *sobek.Promise {
//
// If the database is empty, the promise is rejected with an error.
func (c *Client) RandomKey() *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -304,7 +305,7 @@ func (c *Client) RandomKey() *sobek.Promise {

// Mget returns the values associated with the specified keys.
func (c *Client) Mget(keys ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -329,7 +330,7 @@ func (c *Client) Mget(keys ...string) *sobek.Promise {
// Note that calling Expire with a non-positive timeout will result in
// the key being deleted rather than expired.
func (c *Client) Expire(key string, seconds int) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -353,7 +354,7 @@ func (c *Client) Expire(key string, seconds int) *sobek.Promise {
//
//nolint:revive,stylecheck
func (c *Client) Ttl(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -375,7 +376,7 @@ func (c *Client) Ttl(key string) *sobek.Promise {

// Persist removes the existing timeout on key.
func (c *Client) Persist(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -400,7 +401,7 @@ func (c *Client) Persist(key string) *sobek.Promise {
// performing the push operations. When `key` holds a value that is not
// a list, and error is returned.
func (c *Client) Lpush(key string, values ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -429,7 +430,7 @@ func (c *Client) Lpush(key string, values ...interface{}) *sobek.Promise {
// at `key`. If `key` does not exist, it is created as empty list before
// performing the push operations.
func (c *Client) Rpush(key string, values ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -459,7 +460,7 @@ func (c *Client) Rpush(key string, values ...interface{}) *sobek.Promise {
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lpop(key string) *sobek.Promise {
// TODO: redis supports indicating the amount of values to pop
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -484,7 +485,7 @@ func (c *Client) Lpop(key string) *sobek.Promise {
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Rpop(key string) *sobek.Promise {
// TODO: redis supports indicating the amount of values to pop
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -509,7 +510,7 @@ func (c *Client) Rpop(key string) *sobek.Promise {
// negative numbers, where they indicate offsets starting at the end of
// the list.
func (c *Client) Lrange(key string, start, stop int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -535,7 +536,7 @@ func (c *Client) Lrange(key string, start, stop int64) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lindex(key string, index int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -559,7 +560,7 @@ func (c *Client) Lindex(key string, index int64) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lset(key string, index int64, element string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -586,7 +587,7 @@ func (c *Client) Lset(key string, index int64, element string) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Lrem(key string, count int64, value string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -611,7 +612,7 @@ func (c *Client) Lrem(key string, count int64, value string) *sobek.Promise {
//
// If the list does not exist, this command rejects the promise with an error.
func (c *Client) Llen(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -637,7 +638,7 @@ func (c *Client) Llen(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hset(key string, field string, value interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -667,7 +668,7 @@ func (c *Client) Hset(key string, field string, value interface{}) *sobek.Promis
// holding a hash is created. If `field` already exists, this operation
// has no effect.
func (c *Client) Hsetnx(key, field, value string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -691,7 +692,7 @@ func (c *Client) Hsetnx(key, field, value string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hget(key, field string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -713,7 +714,7 @@ func (c *Client) Hget(key, field string) *sobek.Promise {

// Hdel deletes the specified fields from the hash stored at `key`.
func (c *Client) Hdel(key string, fields ...string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -737,7 +738,7 @@ func (c *Client) Hdel(key string, fields ...string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hgetall(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -761,7 +762,7 @@ func (c *Client) Hgetall(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hkeys(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -785,7 +786,7 @@ func (c *Client) Hkeys(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hvals(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -809,7 +810,7 @@ func (c *Client) Hvals(key string) *sobek.Promise {
//
// If the hash does not exist, this command rejects the promise with an error.
func (c *Client) Hlen(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -834,7 +835,7 @@ func (c *Client) Hlen(key string) *sobek.Promise {
// If `field` does not exist the value is set to 0 before the operation is
// set to 0 before the operation is performed.
func (c *Client) Hincrby(key, field string, increment int64) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -858,7 +859,7 @@ func (c *Client) Hincrby(key, field string, increment int64) *sobek.Promise {
// Specified members that are already a member of this set are ignored.
// If key does not exist, a new set is created before adding the specified members.
func (c *Client) Sadd(key string, members ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand Down Expand Up @@ -887,7 +888,7 @@ func (c *Client) Sadd(key string, members ...interface{}) *sobek.Promise {
// Specified members that are not a member of this set are ignored.
// If key does not exist, it is treated as an empty set and this command returns 0.
func (c *Client) Srem(key string, members ...interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -914,7 +915,7 @@ func (c *Client) Srem(key string, members ...interface{}) *sobek.Promise {

// Sismember returns if member is a member of the set stored at key.
func (c *Client) Sismember(key string, member interface{}) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -941,7 +942,7 @@ func (c *Client) Sismember(key string, member interface{}) *sobek.Promise {

// Smembers returns all members of the set stored at key.
func (c *Client) Smembers(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -965,7 +966,7 @@ func (c *Client) Smembers(key string) *sobek.Promise {
//
// If the set does not exist, the promise is rejected with an error.
func (c *Client) Srandmember(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -989,7 +990,7 @@ func (c *Client) Srandmember(key string) *sobek.Promise {
//
// If the set does not exist, the promise is rejected with an error.
func (c *Client) Spop(key string) *sobek.Promise {
promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -1015,7 +1016,7 @@ func (c *Client) SendCommand(command string, args ...interface{}) *sobek.Promise
doArgs = append(doArgs, command)
doArgs = append(doArgs, args...)

promise, resolve, reject := c.makeHandledPromise()
promise, resolve, reject := promises.New(c.vu)

if err := c.connect(); err != nil {
reject(err)
Expand All @@ -1040,29 +1041,6 @@ func (c *Client) SendCommand(command string, args ...interface{}) *sobek.Promise
return promise
}

// makeHandledPromise will create a promise and return its resolve and reject methods,
// wrapped in such a way that it will block the eventloop from exiting before they are
// called even if the promise isn't resolved by the time the current script ends executing.
func (c *Client) makeHandledPromise() (*sobek.Promise, func(interface{}), func(interface{})) {
runtime := c.vu.Runtime()
callback := c.vu.RegisterCallback()
p, resolve, reject := runtime.NewPromise()

return p, func(i interface{}) {
// more stuff
callback(func() error {
resolve(i)
return nil
})
}, func(i interface{}) {
// more stuff
callback(func() error {
reject(i)
return nil
})
}
}

// connect establishes the client's connection to the target
// redis instance(s).
func (c *Client) connect() error {
Expand Down

0 comments on commit cdd0784

Please sign in to comment.