diff --git a/go_test.mod b/go_test.mod index cb9f2866d..422c3e750 100644 --- a/go_test.mod +++ b/go_test.mod @@ -1,14 +1,14 @@ module github.com/nats-io/nats.go -go 1.21 +go 1.22 -toolchain go1.22.5 +toolchain go1.23.4 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.9 + github.com/klauspost/compress v1.17.11 github.com/nats-io/jwt v1.2.2 - github.com/nats-io/nats-server/v2 v2.10.17 + github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250107191420-519943fc978a github.com/nats-io/nkeys v0.4.9 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.3.0 @@ -17,9 +17,12 @@ require ( ) require ( - github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.7 // indirect + github.com/google/go-cmp v0.5.9 // indirect + github.com/google/go-tpm v0.9.3 // indirect + github.com/minio/highwayhash v1.0.3 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect + go.uber.org/automaxprocs v1.6.0 // indirect golang.org/x/crypto v0.31.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/time v0.5.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/time v0.9.0 // indirect ) diff --git a/go_test.sum b/go_test.sum index d0516d836..c04d93952 100644 --- a/go_test.sum +++ b/go_test.sum @@ -9,18 +9,23 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= -github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= +github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c= -github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.17 h1:PTVObNBD3TZSNUDgzFb1qQsQX4mOgFmOuG9vhT+KBUY= -github.com/nats-io/nats-server/v2 v2.10.17/go.mod h1:5OUyc4zg42s/p2i92zbbqXvUNsbF0ivdTLKshVMn2YQ= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= +github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250107191420-519943fc978a h1:Z8XlUCYnAqGgw4ijut3jL1sN9vZGZVI3qxZ7jlX4j/g= +github.com/nats-io/nats-server/v2 v2.11.0-dev.0.20250107191420-519943fc978a/go.mod h1:skFpICXskKQmrPs+EqjiIdeBTq7FBYWfYAxyCdPdP+4= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= @@ -30,6 +35,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= +go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -37,17 +44,16 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index 3b2530940..a3716da02 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -301,8 +301,10 @@ func TestCreateStream(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if !reflect.DeepEqual(s.CachedInfo().Config.Metadata, test.metadata) { - t.Fatalf("Invalid metadata; want: %v, got: %v", test.metadata, s.CachedInfo().Config.Metadata) + for k, v := range test.metadata { + if s.CachedInfo().Config.Metadata[k] != v { + t.Fatalf("Invalid metadata; want: %v, got: %v", test.metadata, s.CachedInfo().Config.Metadata) + } } }) } @@ -692,8 +694,10 @@ func TestUpdateStream(t *testing.T) { if len(info.Config.Subjects) != 1 || info.Config.Subjects[0] != test.subject { t.Fatalf("Invalid stream subjects after update: %v", info.Config.Subjects) } - if !reflect.DeepEqual(info.Config.Metadata, test.metadata) { - t.Fatalf("Invalid metadata; want: %v, got: %v", test.metadata, info.Config.Metadata) + for k, v := range test.metadata { + if info.Config.Metadata[k] != v { + t.Fatalf("Invalid metadata; want: %v, got: %v", test.metadata, info.Config.Metadata) + } } }) } @@ -1834,15 +1838,14 @@ func TestStreamConfigMatches(t *testing.T) { InactiveThreshold: 10 * time.Second, MaxAckPending: 500, }, - Metadata: map[string]string{ - "foo": "bar", - }, } s, err := js.CreateStream(context.Background(), cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + s.CachedInfo().Config.Metadata = nil if !reflect.DeepEqual(s.CachedInfo().Config, cfg) { t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config) } @@ -1872,6 +1875,8 @@ func TestStreamConfigMatches(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + s.CachedInfo().Config.Metadata = nil if !reflect.DeepEqual(s.CachedInfo().Config, cfgMirror) { t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config) } @@ -1903,6 +1908,8 @@ func TestStreamConfigMatches(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + s.CachedInfo().Config.Metadata = nil if !reflect.DeepEqual(s.CachedInfo().Config, cfgSourcing) { t.Fatalf("StreamConfig doesn't match: %#v", s.CachedInfo().Config) } @@ -1950,15 +1957,14 @@ func TestConsumerConfigMatches(t *testing.T) { Replicas: 1, MemoryStorage: true, FilterSubjects: []string{"foo.1", "foo.2"}, - Metadata: map[string]string{ - "foo": "bar", - }, } c, err := s.CreateConsumer(context.Background(), cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + c.CachedInfo().Config.Metadata = nil if !reflect.DeepEqual(c.CachedInfo().Config, cfg) { t.Fatalf("ConsumerConfig doesn't match") } diff --git a/jetstream/test/kv_test.go b/jetstream/test/kv_test.go index 97194a104..6d60685cd 100644 --- a/jetstream/test/kv_test.go +++ b/jetstream/test/kv_test.go @@ -1680,6 +1680,8 @@ func TestKeyValueCreate(t *testing.T) { if err != nil { t.Fatalf("Error getting stream: %v", err) } + // server will set metadata values, so we need to clear them + stream.CachedInfo().Config.Metadata = nil if !reflect.DeepEqual(stream.CachedInfo().Config, expectedStreamConfig) { t.Fatalf("Expected stream config to be %+v, got %+v", expectedStreamConfig, stream.CachedInfo().Config) } diff --git a/jetstream/test/object_test.go b/jetstream/test/object_test.go index 8f421c51f..e5dd16f0c 100644 --- a/jetstream/test/object_test.go +++ b/jetstream/test/object_test.go @@ -477,8 +477,10 @@ func TestObjectMetadata(t *testing.T) { expectOk(t, err) status, err := obs.Status(ctx) expectOk(t, err) - if !reflect.DeepEqual(status.Metadata(), bucketMetadata) { - t.Fatalf("invalid bucket metadata: %+v", status.Metadata()) + for k, v := range bucketMetadata { + if status.Metadata()[k] != v { + t.Fatalf("invalid bucket metadata: %+v", status.Metadata()) + } } // Simple with no Meta. diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index 97125674f..2044c8e1a 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 The NATS Authors +// Copyright 2022-2024 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -2109,6 +2109,14 @@ func TestOrderedConsumerConfig(t *testing.T) { cfg := c.CachedInfo().Config test.expected.Name = cfg.Name + if test.config.Metadata != nil { + for k, v := range test.config.Metadata { + if cfg.Metadata[k] != v { + t.Fatalf("Expected config %+v, got %+v", test.expected, cfg) + } + } + } + test.expected.Metadata = cfg.Metadata if !reflect.DeepEqual(test.expected, cfg) { t.Fatalf("Expected config %+v, got %+v", test.expected, cfg) } diff --git a/jetstream/test/stream_test.go b/jetstream/test/stream_test.go index b7278b39b..5b05197b5 100644 --- a/jetstream/test/stream_test.go +++ b/jetstream/test/stream_test.go @@ -268,8 +268,10 @@ func TestCreateConsumer(t *testing.T) { if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) { t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) } - if !reflect.DeepEqual(test.consumerConfig.Metadata, ci.CachedInfo().Config.Metadata) { - t.Fatalf("Invalid metadata; want: %v; got: %v", test.consumerConfig.Metadata, ci.CachedInfo().Config.Metadata) + for k, v := range test.consumerConfig.Metadata { + if ci.CachedInfo().Config.Metadata[k] != v { + t.Fatalf("Invalid metadata; want: %v; got: %v", test.consumerConfig.Metadata, ci.CachedInfo().Config.Metadata) + } } }) } diff --git a/js.go b/js.go index 946753887..fe246aaea 100644 --- a/js.go +++ b/js.go @@ -120,6 +120,7 @@ type JetStream interface { // PullSubscribe creates a Subscription that can fetch messages. // See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be // set to an empty string. + // When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods. PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) } diff --git a/test/js_test.go b/test/js_test.go index 91a259b31..1c3346b93 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2289,7 +2289,7 @@ func TestJetStreamManagement(t *testing.T) { if si == nil || si.Config.Name != "foo" { t.Fatalf("StreamInfo is not correct %+v", si) } - if !reflect.DeepEqual(si.Config.Metadata, map[string]string{"foo": "bar", "baz": "quux"}) { + if v1, v2 := si.Config.Metadata["foo"], si.Config.Metadata["baz"]; v1 != "bar" || v2 != "quux" { t.Fatalf("Metadata is not correct %+v", si.Config.Metadata) } if si.Config.Compression != nats.S2Compression { @@ -2406,7 +2406,7 @@ func TestJetStreamManagement(t *testing.T) { if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } - if !reflect.DeepEqual(ci.Config.Metadata, map[string]string{"foo": "bar", "baz": "quux"}) { + if v1, v2 := ci.Config.Metadata["foo"], ci.Config.Metadata["baz"]; v1 != "bar" || v2 != "quux" { t.Fatalf("Metadata is not correct %+v", ci.Config.Metadata) } }) @@ -2955,15 +2955,14 @@ func TestStreamConfigMatches(t *testing.T) { InactiveThreshold: 10 * time.Second, MaxAckPending: 500, }, - Metadata: map[string]string{ - "foo": "bar", - }, } s, err := js.AddStream(&cfg) if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + s.Config.Metadata = nil if !reflect.DeepEqual(s.Config, cfg) { t.Fatalf("StreamConfig doesn't match: %#v", s.Config) } @@ -2993,6 +2992,8 @@ func TestStreamConfigMatches(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + s.Config.Metadata = nil if !reflect.DeepEqual(s.Config, cfgMirror) { t.Fatalf("StreamConfig doesn't match: %#v", s.Config) } @@ -3024,6 +3025,8 @@ func TestStreamConfigMatches(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + // server will set metadata values, so we need to clear them + s.Config.Metadata = nil if !reflect.DeepEqual(s.Config, cfgSourcing) { t.Fatalf("StreamConfig doesn't match: %#v", s.Config) } diff --git a/test/norace_test.go b/test/norace_test.go index cbbfc852f..26955b00a 100644 --- a/test/norace_test.go +++ b/test/norace_test.go @@ -736,7 +736,14 @@ func TestNoRaceJetStreamChanSubscribeStall(t *testing.T) { toSend := 100_000 for i := 0; i < toSend; i++ { // Use plain NATS here for speed. - nc.Publish("STALL", msg) + if _, err := js.PublishAsync("STALL", msg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Timeout waiting for messages") } nc.Flush()