From cb9d620f73bc14454e5e24fae2c0303bf46d4e1b Mon Sep 17 00:00:00 2001 From: Muzzammil Shahid Date: Tue, 2 Apr 2024 15:29:23 +0500 Subject: [PATCH 1/3] Add test to close websocket peer --- transport/websocketpeer_test.go | 41 +++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 transport/websocketpeer_test.go diff --git a/transport/websocketpeer_test.go b/transport/websocketpeer_test.go new file mode 100644 index 00000000..af10260b --- /dev/null +++ b/transport/websocketpeer_test.go @@ -0,0 +1,41 @@ +package transport_test + +import ( + "context" + "fmt" + "testing" + + "github.com/gammazero/nexus/v3/router" + "github.com/gammazero/nexus/v3/transport" + "github.com/gammazero/nexus/v3/transport/serialize" + "github.com/gammazero/nexus/v3/wamp" + "github.com/stretchr/testify/require" +) + +func TestCloseWebsocketPeer(t *testing.T) { + routerConfig := &router.Config{ + RealmConfigs: []*router.RealmConfig{ + { + URI: wamp.URI("nexus.test.realm"), + }, + }, + } + r, err := router.NewRouter(routerConfig, nil) + require.NoError(t, err) + defer r.Close() + + const wsAddr = "127.0.0.1:8000" + closer, err := router.NewWebsocketServer(r).ListenAndServe(wsAddr) + require.NoError(t, err) + defer closer.Close() + + client, err := transport.ConnectWebsocketPeer( + context.Background(), fmt.Sprintf("ws://%s/", wsAddr), serialize.JSON, nil, r.Logger(), nil) + require.NoError(t, err) + + // Close the client connection. + client.Close() + + // Try closing the client connection again. It should not cause an error. + client.Close() +} From d126d1ccd61fd9806b9a5f0aa9103c7b6cc364f3 Mon Sep 17 00:00:00 2001 From: Muzzammil Shahid Date: Tue, 2 Apr 2024 16:35:36 +0500 Subject: [PATCH 2/3] Refactor Close method in websocketPeer to handle duplicate close calls --- transport/websocketpeer.go | 6 ++++++ transport/websocketpeer_test.go | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/transport/websocketpeer.go b/transport/websocketpeer.go index 1ae839cf..a8d10099 100644 --- a/transport/websocketpeer.go +++ b/transport/websocketpeer.go @@ -234,6 +234,12 @@ func (w *websocketPeer) IsLocal() bool { return false } // // *** Do not call Send after calling Close. *** func (w *websocketPeer) Close() { + select { + case <-w.closed: + return + default: + } + // Tell sendHandler to exit and discard any queued messages. Do not close // wr channel in case there are incoming messages during close. w.cancelSender() diff --git a/transport/websocketpeer_test.go b/transport/websocketpeer_test.go index af10260b..e978b178 100644 --- a/transport/websocketpeer_test.go +++ b/transport/websocketpeer_test.go @@ -5,11 +5,12 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/require" + "github.com/gammazero/nexus/v3/router" "github.com/gammazero/nexus/v3/transport" "github.com/gammazero/nexus/v3/transport/serialize" "github.com/gammazero/nexus/v3/wamp" - "github.com/stretchr/testify/require" ) func TestCloseWebsocketPeer(t *testing.T) { From 5b734c7653cdcf4c276821638ba4c0e82383f4b3 Mon Sep 17 00:00:00 2001 From: Muzzammil Shahid Date: Wed, 3 Apr 2024 16:04:23 +0500 Subject: [PATCH 3/3] mock websocket connection --- transport/websocketpeer_test.go | 58 +++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/transport/websocketpeer_test.go b/transport/websocketpeer_test.go index e978b178..ba37d524 100644 --- a/transport/websocketpeer_test.go +++ b/transport/websocketpeer_test.go @@ -1,42 +1,50 @@ package transport_test import ( - "context" "fmt" + "log" "testing" + "time" - "github.com/stretchr/testify/require" + "github.com/gorilla/websocket" - "github.com/gammazero/nexus/v3/router" "github.com/gammazero/nexus/v3/transport" "github.com/gammazero/nexus/v3/transport/serialize" - "github.com/gammazero/nexus/v3/wamp" ) +type mockWSConnection struct{} + +func (m mockWSConnection) Close() error { return nil } + +func (m mockWSConnection) WriteControl(messageType int, data []byte, deadline time.Time) error { + return nil +} + +func (m mockWSConnection) WriteMessage(messageType int, data []byte) error { + return nil +} + +func (m mockWSConnection) ReadMessage() (messageType int, p []byte, err error) { + err = fmt.Errorf("implement me") + return +} + +func (m mockWSConnection) SetPongHandler(h func(appData string) error) {} + +func (m mockWSConnection) SetPingHandler(h func(appData string) error) {} + +func (m mockWSConnection) Subprotocol() string { return "" } + +func newMockSession() transport.WebsocketConnection { + return &mockWSConnection{} +} + func TestCloseWebsocketPeer(t *testing.T) { - routerConfig := &router.Config{ - RealmConfigs: []*router.RealmConfig{ - { - URI: wamp.URI("nexus.test.realm"), - }, - }, - } - r, err := router.NewRouter(routerConfig, nil) - require.NoError(t, err) - defer r.Close() - - const wsAddr = "127.0.0.1:8000" - closer, err := router.NewWebsocketServer(r).ListenAndServe(wsAddr) - require.NoError(t, err) - defer closer.Close() - - client, err := transport.ConnectWebsocketPeer( - context.Background(), fmt.Sprintf("ws://%s/", wsAddr), serialize.JSON, nil, r.Logger(), nil) - require.NoError(t, err) + peer := transport.NewWebsocketPeer(newMockSession(), &serialize.JSONSerializer{}, websocket.TextMessage, log.Default(), 0, 0) // Close the client connection. - client.Close() + peer.Close() // Try closing the client connection again. It should not cause an error. - client.Close() + peer.Close() }