From ff8ed4c74c65a73a8d2b462f8e5f79698c02dd7e Mon Sep 17 00:00:00 2001 From: Jack Chen Date: Tue, 7 Jan 2025 18:47:32 +0800 Subject: [PATCH 1/6] subscription: add ModifySubscription functionality closes https://github.com/gopcua/opcua/issues/713 Signed-off-by: Jack Chen --- README.md | 2 +- examples/subscribe/subscribe.go | 8 ++++++++ subscription.go | 35 +++++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index c2cbfc4c..8aaefc97 100644 --- a/README.md +++ b/README.md @@ -241,7 +241,7 @@ Here is the current set of supported services. For low-level access use the clie | | SetMonitoringMode | Yes | Yes | | | | SetTriggering | | | | | Subscription Service Set | CreateSubscription | Yes | Yes | | -| | ModifySubscription | | | | +| | ModifySubscription | Yes | | | | | SetPublishingMode | | | | | | Publish | Yes | Yes | | | | Republish | | | | diff --git a/examples/subscribe/subscribe.go b/examples/subscribe/subscribe.go index cff306f8..785b1063 100644 --- a/examples/subscribe/subscribe.go +++ b/examples/subscribe/subscribe.go @@ -97,6 +97,14 @@ func main() { log.Fatal(err) } + // Uncomment the following to try modifying the subscription + //var params opcua.SubscriptionParameters + //params.Interval = time.Millisecond * 2000 + //_, err = sub.ModifySubscription(ctx, params) + //if err != nil { + // log.Fatal(err) + //} + // read from subscription's notification channel until ctx is cancelled for { select { diff --git a/subscription.go b/subscription.go index 80f5a6d1..03db1bb4 100644 --- a/subscription.go +++ b/subscription.go @@ -31,6 +31,7 @@ type Subscription struct { RevisedMaxKeepAliveCount uint32 Notifs chan<- *PublishNotificationData params *SubscriptionParameters + paramsMu sync.Mutex items map[uint32]*monitoredItem itemsMu sync.Mutex lastSeq uint32 @@ -111,6 +112,40 @@ func (s *Subscription) delete(ctx context.Context) error { } } +func (s *Subscription) ModifySubscription(ctx context.Context, params SubscriptionParameters) (*ua.ModifySubscriptionResponse, error) { + stats.Subscription().Add("ModifySubscription", 1) + + params.setDefaults() + req := &ua.ModifySubscriptionRequest{ + SubscriptionID: s.SubscriptionID, + RequestedPublishingInterval: float64(params.Interval.Milliseconds()), + RequestedLifetimeCount: params.LifetimeCount, + RequestedMaxKeepAliveCount: params.MaxKeepAliveCount, + MaxNotificationsPerPublish: params.MaxNotificationsPerPublish, + Priority: params.Priority, + } + + var res *ua.ModifySubscriptionResponse + err := s.c.Send(ctx, req, func(v ua.Response) error { + return safeAssign(v, &res) + }) + + if err != nil { + return nil, err + } + + // update subscription parameters + s.paramsMu.Lock() + s.params = ¶ms + s.paramsMu.Unlock() + // update revised subscription parameters + s.RevisedPublishingInterval = time.Duration(res.RevisedPublishingInterval) * time.Millisecond + s.RevisedLifetimeCount = res.RevisedLifetimeCount + s.RevisedMaxKeepAliveCount = res.RevisedMaxKeepAliveCount + + return res, nil +} + func (s *Subscription) Monitor(ctx context.Context, ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) { stats.Subscription().Add("Monitor", 1) stats.Subscription().Add("MonitoredItems", int64(len(items))) From d567e7f13355e429644548a115863f5a022d4180 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Sat, 11 Jan 2025 19:17:16 +0100 Subject: [PATCH 2/6] client_sub: rename func to use correct lock name --- client_sub.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client_sub.go b/client_sub.go index 88c71f01..91bf01c4 100644 --- a/client_sub.go +++ b/client_sub.go @@ -72,7 +72,7 @@ func (c *Client) Subscribe(ctx context.Context, params *SubscriptionParameters, } c.subs[sub.SubscriptionID] = sub - c.updatePublishTimeout_NeedsSubMuxRLock() + c.updatePublishTimeout_NeedsSubMuxLock() return sub, nil } @@ -230,7 +230,7 @@ func (c *Client) forgetSubscription(ctx context.Context, id uint32) { func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint32) { delete(c.subs, id) - c.updatePublishTimeout_NeedsSubMuxRLock() + c.updatePublishTimeout_NeedsSubMuxLock() stats.Subscription().Add("Count", -1) if len(c.subs) == 0 { @@ -240,7 +240,7 @@ func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint } } -func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() { +func (c *Client) updatePublishTimeout_NeedsSubMuxLock() { maxTimeout := uasc.MaxTimeout for _, s := range c.subs { if d := s.publishTimeout(); d < maxTimeout { From 86e7bdcaf2f3cdcf2a02678033747822fc355ea9 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Sat, 11 Jan 2025 19:15:44 +0100 Subject: [PATCH 3/6] client_sub: remove unused parameter from func --- client_sub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client_sub.go b/client_sub.go index 91bf01c4..8cb63561 100644 --- a/client_sub.go +++ b/client_sub.go @@ -470,7 +470,7 @@ func (c *Client) publish(ctx context.Context) error { } // handle the publish response for a specific subscription - c.handleNotification_NeedsSubMuxLock(ctx, sub, res) + c.handleNotification_NeedsSubMuxLock(sub, res) c.subMux.Unlock() c.notifySubscription(ctx, sub, res.NotificationMessage) @@ -513,7 +513,7 @@ func (c *Client) handleAcks_NeedsSubMuxLock(res []ua.StatusCode) { dlog.Printf("notAcked=%v", notAcked) } -func (c *Client) handleNotification_NeedsSubMuxLock(ctx context.Context, sub *Subscription, res *ua.PublishResponse) { +func (c *Client) handleNotification_NeedsSubMuxLock(sub *Subscription, res *ua.PublishResponse) { dlog := debug.NewPrefixLogger("publish: sub %d: ", res.SubscriptionID) // keep-alive message From a0fb6e855c21e94908b443f09d6645a51fc119ee Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Sat, 11 Jan 2025 19:17:48 +0100 Subject: [PATCH 4/6] subscription: remove unused constant --- subscription.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/subscription.go b/subscription.go index 03db1bb4..e8f981d8 100644 --- a/subscription.go +++ b/subscription.go @@ -22,8 +22,6 @@ const ( DefaultSubscriptionPriority = 0 ) -const terminatedSubscriptionID uint32 = 0xC0CAC01B - type Subscription struct { SubscriptionID uint32 RevisedPublishingInterval time.Duration From 2d9bf00b0f64c4330b2cf831444eb98ec5bdf184 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Sat, 11 Jan 2025 19:20:54 +0100 Subject: [PATCH 5/6] subscription: refactor subscription recreation Decouple client and subscription code during subscription recreation. During this operation both the state of the client and the subscription have to be updated while holding a lock in the client. The previous implementation had both classes call each other. The control flow in the new implementation is only from the client to the subscription. --- client_sub.go | 7 +++++-- subscription.go | 42 ++++++++++++++++++++++-------------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/client_sub.go b/client_sub.go index 8cb63561..024500c0 100644 --- a/client_sub.go +++ b/client_sub.go @@ -89,7 +89,7 @@ func (c *Client) SubscriptionIDs() []uint32 { } // recreateSubscriptions creates new subscriptions -// with the same parameters to replace the previous ones +// with the same parameters to replace the previous one func (c *Client) recreateSubscription(ctx context.Context, id uint32) error { c.subMux.Lock() defer c.subMux.Unlock() @@ -98,7 +98,10 @@ func (c *Client) recreateSubscription(ctx context.Context, id uint32) error { if !ok { return ua.StatusBadSubscriptionIDInvalid } - return sub.recreate_NeedsSubMuxLock(ctx) + + sub.recreate_delete(ctx) + c.forgetSubscription_NeedsSubMuxLock(ctx, id) + return sub.recreate_create(ctx) } // transferSubscriptions ask the server to transfer the given subscriptions diff --git a/subscription.go b/subscription.go index e8f981d8..5b5a3d27 100644 --- a/subscription.go +++ b/subscription.go @@ -391,29 +391,31 @@ func (p *SubscriptionParameters) setDefaults() { } } -// recreate_NeedsSubMuxLock creates a new subscription based on the previous subscription -// parameters and monitored items. -func (s *Subscription) recreate_NeedsSubMuxLock(ctx context.Context) error { - dlog := debug.NewPrefixLogger("sub %d: recreate: ", s.SubscriptionID) - - if s.SubscriptionID == terminatedSubscriptionID { - dlog.Printf("subscription is not in a valid state") - return nil +// recreate_delete is called by the client when it is trying to +// recreate an existing subscription. This function deletes the +// existing subscription from the server. +func (s *Subscription) recreate_delete(ctx context.Context) error { + dlog := debug.NewPrefixLogger("sub %d: recreate_delete: ", s.SubscriptionID) + req := &ua.DeleteSubscriptionsRequest{ + SubscriptionIDs: []uint32{s.SubscriptionID}, } + var res *ua.DeleteSubscriptionsResponse + _ = s.c.Send(ctx, req, func(v ua.Response) error { + return safeAssign(v, &res) + }) + dlog.Print("subscription deleted") + return nil +} + +// recreate_create is called by the client when it is trying to +// recreate an existing subscription. This function creates a +// new subscription with the same parameters as the previous one. +func (s *Subscription) recreate_create(ctx context.Context) error { + dlog := debug.NewPrefixLogger("sub %d: recreate_create: ", s.SubscriptionID) + s.paramsMu.Lock() params := s.params - { - req := &ua.DeleteSubscriptionsRequest{ - SubscriptionIDs: []uint32{s.SubscriptionID}, - } - var res *ua.DeleteSubscriptionsResponse - _ = s.c.Send(ctx, req, func(v ua.Response) error { - return safeAssign(v, &res) - }) - dlog.Print("subscription deleted") - } - s.c.forgetSubscription_NeedsSubMuxLock(ctx, s.SubscriptionID) - dlog.Printf("subscription forgotton") + s.paramsMu.Unlock() req := &ua.CreateSubscriptionRequest{ RequestedPublishingInterval: float64(params.Interval / time.Millisecond), From ac1bb480188e116b0ebe61001dccc5878a486f75 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Sat, 11 Jan 2025 19:30:09 +0100 Subject: [PATCH 6/6] examples/subscription: cleanup example --- examples/subscribe/subscribe.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/subscribe/subscribe.go b/examples/subscribe/subscribe.go index 785b1063..112c8f1d 100644 --- a/examples/subscribe/subscribe.go +++ b/examples/subscribe/subscribe.go @@ -98,12 +98,12 @@ func main() { } // Uncomment the following to try modifying the subscription - //var params opcua.SubscriptionParameters - //params.Interval = time.Millisecond * 2000 - //_, err = sub.ModifySubscription(ctx, params) - //if err != nil { - // log.Fatal(err) - //} + // + // var params opcua.SubscriptionParameters + // params.Interval = time.Millisecond * 2000 + // if _, err := sub.ModifySubscription(ctx, params); err != nil { + // log.Fatal(err) + // } // read from subscription's notification channel until ctx is cancelled for {