Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Creating iterators for sync subscriptions #1728

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion enc.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
cbValue.Call(oV)
}

return c.Conn.subscribe(subject, queue, natsCB, nil, false, nil)
return c.Conn.subscribe(subject, queue, natsCB, nil, nil, false, nil)
}

// FlushTimeout allows a Flush operation to have an associated timeout.
Expand Down
4 changes: 2 additions & 2 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -1839,7 +1839,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
sub, err := nc.subscribe(deliver, queue, cb, ch, nil, isSync, jsi)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1910,7 +1910,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
jsi.hbi = info.Config.Heartbeat

// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, nil, isSync, jsi)
if err != nil {
return nil, err
}
Expand Down
185 changes: 159 additions & 26 deletions nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ var (
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrPermissionViolation = errors.New("nats: permissions violation")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
Expand Down Expand Up @@ -510,6 +511,11 @@ type Options struct {

// SkipHostLookup skips the DNS lookup for the server hostname.
SkipHostLookup bool

// PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation
// from SubscribeSync if the server returns a permissions error for a subscription.
// Defaults to false.
PermissionErrOnSubscribe bool
}

const (
Expand Down Expand Up @@ -618,17 +624,19 @@ type Subscription struct {
// For holding information about a JetStream consumer.
jsi *jsSub

delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
errCh chan (error)
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
permissionsErr error

// Type of Subscription
typ SubscriptionType
Expand Down Expand Up @@ -1401,6 +1409,13 @@ func SkipHostLookup() Option {
}
}

func PermissionErrOnSubscribe(enabled bool) Option {
return func(o *Options) error {
o.PermissionErrOnSubscribe = enabled
return nil
}
}

// TLSHandshakeFirst is an Option to perform the TLS handshake first, that is
// before receiving the INFO protocol. This requires the server to also be
// configured with such option, otherwise the connection will fail.
Expand Down Expand Up @@ -3435,6 +3450,9 @@ slowConsumer:
}
}

var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)

// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
// This will trigger the async error callback if set.
Expand All @@ -3444,6 +3462,27 @@ slowConsumer:
func (nc *Conn) processTransientError(err error) {
nc.mu.Lock()
nc.err = err
if errors.Is(err, ErrPermissionViolation) {
matches := permissionsRe.FindStringSubmatch(err.Error())
if len(matches) >= 2 {
queueMatches := permissionsQueueRe.FindStringSubmatch(err.Error())
var q string
if len(queueMatches) >= 2 {
q = queueMatches[1]
}
subject := matches[1]
for _, sub := range nc.subs {
if sub.Subject == subject && sub.Queue == q && sub.permissionsErr == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sub.Queue would be "" if it's non-queue group subscription, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, so if there is no match for the regex, q will also be "" and it will fulfill this condition.

sub.mu.Lock()
if sub.errCh != nil {
sub.errCh <- err
}
sub.permissionsErr = err
sub.mu.Unlock()
}
}
}
}
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
Expand Down Expand Up @@ -3685,7 +3724,7 @@ func (nc *Conn) processErr(ie string) {
} else if e == MAX_CONNECTIONS_ERR {
close = nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processTransientError(fmt.Errorf("nats: %s", ne))
nc.processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, ne))
} else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) {
nc.processTransientError(ErrMaxSubscriptionsExceeded)
} else if authErr := checkAuthError(e); authErr != nil {
Expand Down Expand Up @@ -4042,7 +4081,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create the response subscription we will use for all new style responses.
// This will be on an _INBOX with an additional terminal token. The subscription
// will be on a wildcard.
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil)
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, nil, false, nil)
if err != nil {
nc.mu.Unlock()
return nil, token, err
Expand Down Expand Up @@ -4140,7 +4179,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)

s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4246,14 +4285,14 @@ func (nc *Conn) respToken(respInbox string) string {
// since it can't match more than one token.
// Messages will be delivered to the associated MsgHandler.
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil)
return nc.subscribe(subj, _EMPTY_, cb, nil, nil, false, nil)
}

// ChanSubscribe will express interest in the given subject and place
// all messages received on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil)
return nc.subscribe(subj, _EMPTY_, nil, ch, nil, false, nil)
}

// ChanQueueSubscribe will express interest in the given subject.
Expand All @@ -4263,7 +4302,7 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than QueueSubscribeSyncWithChan.
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, group, nil, ch, false, nil)
return nc.subscribe(subj, group, nil, ch, nil, false, nil)
}

// SubscribeSync will express interest on the given subject. Messages will
Expand All @@ -4273,15 +4312,19 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
return nil, ErrInvalidConnection
}
mch := make(chan *Msg, nc.Opts.SubChanLen)
return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
var errCh chan error
if nc.Opts.PermissionErrOnSubscribe {
errCh = make(chan error, 100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is the proper size for this channel.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I set it to some value without thinking much about this and kind of forgot to analyze it properly. Since we really only push permission errors on this channel and we do it once per subscription, a buffered channel with size 1 should suffice (so that it's not blocking).

}
return nc.subscribe(subj, _EMPTY_, nil, mch, errCh, true, nil)
}

// QueueSubscribe creates an asynchronous queue subscriber on the given subject.
// All subscribers with the same queue name will form the queue group and
// only one member of the group will be selected to receive any given
// message asynchronously.
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, queue, cb, nil, false, nil)
return nc.subscribe(subj, queue, cb, nil, nil, false, nil)
}

// QueueSubscribeSync creates a synchronous queue subscriber on the given
Expand All @@ -4290,7 +4333,11 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription
// given message synchronously using Subscription.NextMsg().
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
mch := make(chan *Msg, nc.Opts.SubChanLen)
return nc.subscribe(subj, queue, nil, mch, true, nil)
var errCh chan error
if nc.Opts.PermissionErrOnSubscribe {
errCh = make(chan error, 100)
}
return nc.subscribe(subj, queue, nil, mch, errCh, true, nil)
}

// QueueSubscribeSyncWithChan will express interest in the given subject.
Expand All @@ -4300,7 +4347,7 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than ChanQueueSubscribe.
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, queue, nil, ch, false, nil)
return nc.subscribe(subj, queue, nil, ch, nil, false, nil)
}

// badSubject will do quick test on whether a subject is acceptable.
Expand All @@ -4324,16 +4371,16 @@ func badQueue(qname string) bool {
}

// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
nc.mu.Lock()
defer nc.mu.Unlock()
return nc.subscribeLocked(subj, queue, cb, ch, isSync, js)
return nc.subscribeLocked(subj, queue, cb, ch, errCh, isSync, js)
}

func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
Expand Down Expand Up @@ -4384,6 +4431,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
} else { // Sync Subscription
sub.typ = SyncSubscription
sub.mch = ch
sub.errCh = errCh
}

nc.subsMu.Lock()
Expand Down Expand Up @@ -4828,16 +4876,92 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)

if s.errCh != nil {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case err := <-s.errCh:
return nil, err
case <-t.C:
return nil, ErrTimeout
}
} else {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
return nil, ErrTimeout
}
}

return msg, nil
}

// nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not
// time out. It is only used internally for non-timeout subscription iterator.
func (s *Subscription) nextMsgNoTimeout() (*Msg, error) {
if s == nil {
return nil, ErrBadSubscription
}

s.mu.Lock()
err := s.validateNextMsgState(false)
if err != nil {
s.mu.Unlock()
return nil, err
}

// snapshot
mch := s.mch
s.mu.Unlock()

var ok bool
var msg *Msg

// If something is available right away, let's optimize that case.
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
return msg, nil
}
default:
}

if s.errCh != nil {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case err := <-s.errCh:
return nil, err
}
} else {
msg, ok = <-mch
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
return nil, ErrTimeout
}

return msg, nil
Expand All @@ -4860,6 +4984,12 @@ func (s *Subscription) validateNextMsgState(pullSubInternal bool) error {
if s.mcb != nil {
return ErrSyncSubRequired
}
// if this subscription previously had a permissions error
// and no reconnect has been attempted, return the permissions error
// since the subscription does not exist on the server
if s.conn.Opts.PermissionErrOnSubscribe && s.permissionsErr != nil {
Jarema marked this conversation as resolved.
Show resolved Hide resolved
return s.permissionsErr
}
if s.sc {
s.changeSubStatus(SubscriptionActive)
s.sc = false
Expand Down Expand Up @@ -5235,6 +5365,9 @@ func (nc *Conn) resendSubscriptions() {
for _, s := range subs {
adjustedMax := uint64(0)
s.mu.Lock()
// when resending subscriptions, the permissions error should be cleared
// since the user may have fixed the permissions issue
s.permissionsErr = nil
if s.max > 0 {
if s.delivered < s.max {
adjustedMax = s.max - s.delivered
Expand Down
Loading