Skip to content

Commit

Permalink
subscription: refactor subscription recreation
Browse files Browse the repository at this point in the history
Decouple client and subscription code during subscription recreation.
During this operation both the state of the client and the subscription
have to be updated while holding a lock in the client.

The previous implementation had both classes call each other. The
control flow in the new implementation is only from the client to
the subscription.
  • Loading branch information
magiconair committed Jan 11, 2025
1 parent a0fb6e8 commit 2d9bf00
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 22 deletions.
7 changes: 5 additions & 2 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *Client) SubscriptionIDs() []uint32 {
}

// recreateSubscriptions creates new subscriptions
// with the same parameters to replace the previous ones
// with the same parameters to replace the previous one
func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
c.subMux.Lock()
defer c.subMux.Unlock()
Expand All @@ -98,7 +98,10 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
if !ok {
return ua.StatusBadSubscriptionIDInvalid
}
return sub.recreate_NeedsSubMuxLock(ctx)

sub.recreate_delete(ctx)
c.forgetSubscription_NeedsSubMuxLock(ctx, id)
return sub.recreate_create(ctx)
}

// transferSubscriptions ask the server to transfer the given subscriptions
Expand Down
42 changes: 22 additions & 20 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,29 +391,31 @@ func (p *SubscriptionParameters) setDefaults() {
}
}

// recreate_NeedsSubMuxLock creates a new subscription based on the previous subscription
// parameters and monitored items.
func (s *Subscription) recreate_NeedsSubMuxLock(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate: ", s.SubscriptionID)

if s.SubscriptionID == terminatedSubscriptionID {
dlog.Printf("subscription is not in a valid state")
return nil
// recreate_delete is called by the client when it is trying to
// recreate an existing subscription. This function deletes the
// existing subscription from the server.
func (s *Subscription) recreate_delete(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate_delete: ", s.SubscriptionID)
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
return nil
}

// recreate_create is called by the client when it is trying to
// recreate an existing subscription. This function creates a
// new subscription with the same parameters as the previous one.
func (s *Subscription) recreate_create(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate_create: ", s.SubscriptionID)

s.paramsMu.Lock()
params := s.params
{
req := &ua.DeleteSubscriptionsRequest{
SubscriptionIDs: []uint32{s.SubscriptionID},
}
var res *ua.DeleteSubscriptionsResponse
_ = s.c.Send(ctx, req, func(v ua.Response) error {
return safeAssign(v, &res)
})
dlog.Print("subscription deleted")
}
s.c.forgetSubscription_NeedsSubMuxLock(ctx, s.SubscriptionID)
dlog.Printf("subscription forgotton")
s.paramsMu.Unlock()

req := &ua.CreateSubscriptionRequest{
RequestedPublishingInterval: float64(params.Interval / time.Millisecond),
Expand Down

0 comments on commit 2d9bf00

Please sign in to comment.