From c0010bf127365250b5e53e55a97c11c03356547e Mon Sep 17 00:00:00 2001 From: abakum Date: Mon, 6 Nov 2023 02:11:53 +0300 Subject: [PATCH 1/2] when the connection is broken, join() without Close() freezes for tens of seconds --- forward.go | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/forward.go b/forward.go index d976c37..fff265e 100644 --- a/forward.go +++ b/forward.go @@ -53,18 +53,20 @@ func (fwd *forwarder) Wait() error { // compile-time check that we're implementing the proper interface var _ Forwarder = (*forwarder)(nil) -func join(ctx context.Context, left, right io.ReadWriter) { - g := &sync.WaitGroup{} - g.Add(2) - go func() { - _, _ = io.Copy(left, right) - g.Done() - }() - go func() { - _, _ = io.Copy(right, left) - g.Done() - }() +func join(ctx context.Context, left, right net.Conn) { + g, _ := errgroup.WithContext(ctx) // when ctx is canceled (on WithStopHandler or WithDisconnectHandler ) interrupts both io.Copy + g.Go(func() error { + _, err := io.Copy(left, right) + left.Close() // on left disconnection interrupts io.Copy(right, left) + return err + }) + g.Go(func() error { + _, err := io.Copy(right, left) + right.Close() // on right disconnection interrupts io.Copy(left, right) + return err + }) g.Wait() + } func forwardTunnel(ctx context.Context, tun Tunnel, url *url.URL) Forwarder { @@ -85,20 +87,20 @@ func forwardTunnel(ctx context.Context, tun Tunnel, url *url.URL) Forwarder { if err != nil { return err } + logger.Debug("accept connection from", "address", conn.RemoteAddr()) fwdTasks.Add(1) go func() { ngrokConn := conn.(Conn) - defer ngrokConn.Close() backend, err := openBackend(ctx, logger, tun, ngrokConn, url) if err != nil { + defer ngrokConn.Close() logger.Warn("failed to connect to backend url", "error", err) fwdTasks.Done() return } - defer backend.Close() join(ctx, ngrokConn, backend) fwdTasks.Done() }() From c67a6d263f9cc0be4665d7254dcc260fa40218f7 Mon Sep 17 00:00:00 2001 From: Euan Kemp Date: Tue, 28 May 2024 04:03:32 +0000 Subject: [PATCH 2/2] Propagate half-closes correctly in forward Before, the following would not work as you would expect: ```go // # One terminal // $ ncat --recv-only -l 9090 // ngrok-go code fwd, err := sess.ListenAndForward( ctx, "127.0.0.1:9090", config.TCPEndpoint(), ) // fwd.URL() is 0.tcp.jp.ngrok.io:14517 for this example // another terminal // $ ncat --send-only 0.tcp.jp.ngrok.io 14517 < hello-world.txt ``` What we would expect from the above would be for the send side to send "hello world" and exit, and then the recv side to print "hello world" and also exit. This is what happens if you do `ncat --send-only localhost 9090` instead of copying through the ngrok tcp tunnel. Before this change, when copying through ngrok the recv side would not exit because the 'Close' of the connection did not get propagated through the 'join'. I've also added a unit test showing this. Thank you to @abakum for originally noticing this issue and offering a fix over in #137. In the hopes of landing this more quickly, I've written a new version, derived from the internal ngrok agent's join code, which should thus be easier to review etc. To try and give credit correctly, I've maintained the original commits from #137 as well. --- forward.go | 30 ++++++++++++++++-------------- forward_test.go | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 14 deletions(-) create mode 100644 forward_test.go diff --git a/forward.go b/forward.go index fff265e..200fdcd 100644 --- a/forward.go +++ b/forward.go @@ -53,20 +53,22 @@ func (fwd *forwarder) Wait() error { // compile-time check that we're implementing the proper interface var _ Forwarder = (*forwarder)(nil) -func join(ctx context.Context, left, right net.Conn) { - g, _ := errgroup.WithContext(ctx) // when ctx is canceled (on WithStopHandler or WithDisconnectHandler ) interrupts both io.Copy - g.Go(func() error { - _, err := io.Copy(left, right) - left.Close() // on left disconnection interrupts io.Copy(right, left) - return err - }) - g.Go(func() error { - _, err := io.Copy(right, left) - right.Close() // on right disconnection interrupts io.Copy(left, right) - return err - }) +func join(logger log15.Logger, left, right net.Conn) { + g := &sync.WaitGroup{} + g.Add(2) + go func() { + defer g.Done() + defer left.Close() + n, err := io.Copy(left, right) + logger.Debug("left join finished", "err", err, "bytes", n) + }() + go func() { + defer g.Done() + defer right.Close() + n, err := io.Copy(right, left) + logger.Debug("right join finished", "err", err, "bytes", n) + }() g.Wait() - } func forwardTunnel(ctx context.Context, tun Tunnel, url *url.URL) Forwarder { @@ -101,7 +103,7 @@ func forwardTunnel(ctx context.Context, tun Tunnel, url *url.URL) Forwarder { return } - join(ctx, ngrokConn, backend) + join(logger.New("url", url), ngrokConn, backend) fwdTasks.Done() }() } diff --git a/forward_test.go b/forward_test.go new file mode 100644 index 0000000..b4cf893 --- /dev/null +++ b/forward_test.go @@ -0,0 +1,46 @@ +package ngrok + +import ( + "errors" + "io" + "net" + "testing" + + "github.com/inconshreveable/log15/v3" + "github.com/stretchr/testify/require" +) + +func TestHalfCloseJoin(t *testing.T) { + srv, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + waitSrvConn := make(chan net.Conn) + go func() { + srvConn, err := srv.Accept() + if err != nil { + panic(err) + } + waitSrvConn <- srvConn + }() + + browser, ngrokEndpoint := net.Pipe() + agent, userService := net.Pipe() + + waitJoinDone := make(chan struct{}) + go func() { + defer close(waitJoinDone) + join(log15.New(), ngrokEndpoint, agent) + }() + + _, err = browser.Write([]byte("hello world")) + require.NoError(t, err) + var b [len("hello world")]byte + _, err = userService.Read(b[:]) + require.NoError(t, err) + require.Equal(t, []byte("hello world"), b[:]) + browser.Close() + _, err = userService.Read(b[:]) + require.Truef(t, errors.Is(err, io.EOF), "io.EOF expected, got %v", err) + + <-waitJoinDone +}