Skip to content

Commit

Permalink
JetStream leadership change fix (#180)
Browse files Browse the repository at this point in the history
* JS publish reconnect and leader change fix

* Code docs

* Reverted unrelated changes
  • Loading branch information
mtmk authored Nov 6, 2023
1 parent b576026 commit c3b988d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/NATS.Client.JetStream/Internal/NatsJSConsume.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,15 @@ protected override async ValueTask ReceiveInternalAsync(
else if (headers is { Code: 100, Message: NatsHeaders.Messages.IdleHeartbeat })
{
}
else if (headers.Code == 409 && string.Equals(headers.MessageText, "Leadership Change", StringComparison.OrdinalIgnoreCase))
{
_logger.LogDebug(NatsJSLogEvents.LeadershipChange, "Leadership Change");
lock (_pendingGate)
{
_pendingBytes = 0;
_pendingMsgs = 0;
}
}
else if (headers.HasTerminalJSError())
{
_userMsgs.Writer.TryComplete(new NatsJSProtocolException($"JetStream server error: {headers.Code} {headers.MessageText}"));
Expand Down
1 change: 1 addition & 0 deletions src/NATS.Client.JetStream/NatsJSLogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ public static class NatsJSLogEvents
public static readonly EventId NewDeliverySubject = new(2012, nameof(NewDeliverySubject));
public static readonly EventId NewConsumerCreated = new(2013, nameof(NewConsumerCreated));
public static readonly EventId Stopping = new(2014, nameof(Stopping));
public static readonly EventId LeadershipChange = new(2015, nameof(LeadershipChange));
}

0 comments on commit c3b988d

Please sign in to comment.