Skip to content

Commit

Permalink
[FIXED] Invalid fetch sequence in ordered consumer Fetch and Next aft…
Browse files Browse the repository at this point in the history
…er timeout (#1705)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Dec 13, 2024
1 parent 425f31c commit 25692e5
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 6 deletions.
8 changes: 6 additions & 2 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,9 @@ func (c *orderedConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, erro
c.currentConsumer.Unlock()
return nil, ErrOrderedConsumerConcurrentRequests
}
c.cursor.streamSeq = c.runningFetch.sseq
if c.runningFetch.sseq != 0 {
c.cursor.streamSeq = c.runningFetch.sseq
}
}
c.currentConsumer.Unlock()
c.consumerType = consumerTypeFetch
Expand Down Expand Up @@ -438,7 +440,9 @@ func (c *orderedConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBat
if !c.runningFetch.done {
return nil, ErrOrderedConsumerConcurrentRequests
}
c.cursor.streamSeq = c.runningFetch.sseq
if c.runningFetch.sseq != 0 {
c.cursor.streamSeq = c.runningFetch.sseq
}
}
c.consumerType = consumerTypeFetch
sub := orderedSubscription{
Expand Down
70 changes: 66 additions & 4 deletions jetstream/test/ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,21 +1571,19 @@ func TestOrderedConsumerNext(t *testing.T) {
}

publishTestMsgs(t, js)
msg, err := c.Next()
_, err = c.Next()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
msg.Ack()

name := c.CachedInfo().Name
if err := s.DeleteConsumer(ctx, name); err != nil {
t.Fatal(err)
}
msg, err = c.Next()
_, err = c.Next()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
msg.Ack()
})

t.Run("consumer used as consume", func(t *testing.T) {
Expand Down Expand Up @@ -1623,6 +1621,70 @@ func TestOrderedConsumerNext(t *testing.T) {
t.Fatalf("Expected error: %s; got: %s", jetstream.ErrOrderConsumerUsedAsConsume, err)
}
})

t.Run("preserve sequence after fetch error", func(t *testing.T) {
srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
c, err := s.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if _, err := js.Publish(ctx, "FOO.A", []byte("msg")); err != nil {
t.Fatalf("Unexpected error during publish: %s", err)
}
msg, err := c.Next()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
meta, err := msg.Metadata()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if meta.Sequence.Stream != 1 {
t.Fatalf("Expected sequence: %d; got: %d", 1, meta.Sequence.Stream)
}

// get next message, it should time out (no more messages on stream)
_, err = c.Next(jetstream.FetchMaxWait(100 * time.Millisecond))
if !errors.Is(err, nats.ErrTimeout) {
t.Fatalf("Expected error: %s; got: %s", nats.ErrTimeout, err)
}

if _, err := js.Publish(ctx, "FOO.A", []byte("msg")); err != nil {
t.Fatalf("Unexpected error during publish: %s", err)
}

// get next message, it should have stream sequence 2
msg, err = c.Next()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
meta, err = msg.Metadata()
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if meta.Sequence.Stream != 2 {
t.Fatalf("Expected sequence: %d; got: %d", 2, meta.Sequence.Stream)
}
})
}

func TestOrderedConsumerFetchNoWait(t *testing.T) {
Expand Down

0 comments on commit 25692e5

Please sign in to comment.