Skip to content

Commit

Permalink
[FIXED] Publish async not closing done and stall channels after faile…
Browse files Browse the repository at this point in the history
…d retries (#1719)

This fixes an issue in new JetStream API where if maximum number of retries
in `PublishMsgAsync` are reached and the publish failed, done and stall channels
are not closed.
Additionally, this fixes a potential race issue with modifying user msg when publishing.

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Dec 13, 2024
1 parent 240fe36 commit 425f31c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 21 deletions.
64 changes: 43 additions & 21 deletions jetstream/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type (
err error
errCh chan error
doneCh chan *PubAck
reply string
}

jetStreamClient struct {
Expand Down Expand Up @@ -280,17 +281,17 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
}

var id string
var reply string

// register new paf if not retrying
if paf == nil {
var err error
m.Reply, err = js.newAsyncReply()
defer func() { m.Reply = "" }()
reply, err = js.newAsyncReply()
if err != nil {
return nil, fmt.Errorf("nats: error creating async reply handler: %s", err)
}
id = m.Reply[js.replyPrefixLen:]
paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait}
id = reply[js.replyPrefixLen:]
paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait, reply: reply}
numPending, maxPending := js.registerPAF(id, paf)

if maxPending > 0 && numPending > maxPending {
Expand All @@ -303,10 +304,17 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
}
} else {
// when retrying, get the ID from existing reply subject
id = m.Reply[js.replyPrefixLen:]
reply = paf.reply
id = reply[js.replyPrefixLen:]
}

if err := js.conn.PublishMsg(m); err != nil {
pubMsg := &nats.Msg{
Subject: m.Subject,
Reply: reply,
Data: m.Data,
Header: m.Header,
}
if err := js.conn.PublishMsg(pubMsg); err != nil {
js.clearPAF(id)
return nil, err
}
Expand Down Expand Up @@ -370,6 +378,31 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
return
}

closeStc := func() {
// Check on anyone stalled and waiting.
if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.maxpa {
close(js.publisher.stallCh)
js.publisher.stallCh = nil
}
}

closeDchFn := func() func() {
var dch chan struct{}
// Check on anyone one waiting on done status.
if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 {
dch = js.publisher.doneCh
js.publisher.doneCh = nil
}
// Return function to close done channel which
// should be deferred so that error is processed and
// can be checked.
return func() {
if dch != nil {
close(dch)
}
}
}

doErr := func(err error) {
paf.err = err
if paf.errCh != nil {
Expand All @@ -378,7 +411,6 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
cb := js.publisher.asyncPublisherOpts.aecb
js.publisher.Unlock()
if cb != nil {
paf.msg.Reply = ""
cb(js, paf.msg, err)
}
}
Expand All @@ -387,7 +419,6 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
if paf.retries < paf.maxRetries {
paf.retries++
paf.msg.Reply = m.Subject
time.AfterFunc(paf.retryWait, func() {
js.publisher.Lock()
paf := js.getPAF(id)
Expand All @@ -408,25 +439,16 @@ func (js *jetStream) handleAsyncReply(m *nats.Msg) {
return
}
delete(js.publisher.acks, id)
closeStc()
defer closeDchFn()()
doErr(ErrNoStreamResponse)
return
}

// Remove
delete(js.publisher.acks, id)

// Check on anyone stalled and waiting.
if js.publisher.stallCh != nil && len(js.publisher.acks) < js.publisher.asyncPublisherOpts.maxpa {
close(js.publisher.stallCh)
js.publisher.stallCh = nil
}
// Check on anyone waiting on done status.
if js.publisher.doneCh != nil && len(js.publisher.acks) == 0 {
dch := js.publisher.doneCh
js.publisher.doneCh = nil
// Defer here so error is processed and can be checked.
defer close(dch)
}
closeStc()
defer closeDchFn()()

var pa pubAckResponse
if err := json.Unmarshal(m.Data, &pa); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions jetstream/test/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,7 @@ func TestPublishAsyncRetry(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
publishComplete := js.PublishAsyncComplete()
errs := make(chan error, 1)
go func() {
// create stream with delay so that publish will receive no responders
Expand All @@ -1476,6 +1477,12 @@ func TestPublishAsyncRetry(t *testing.T) {
case <-time.After(5 * time.Second):
t.Fatalf("Timeout waiting for ack")
}

select {
case <-publishComplete:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive completion signal")
}
})
}
}
Expand Down

0 comments on commit 425f31c

Please sign in to comment.