Skip to content

Commit

Permalink
BUGFIX: reconnect on caddy reload
Browse files Browse the repository at this point in the history
  • Loading branch information
skurfuerst committed Nov 28, 2024
1 parent fa8f602 commit ae63a68
Show file tree
Hide file tree
Showing 11 changed files with 209 additions and 47 deletions.
8 changes: 4 additions & 4 deletions body_jetstream/body_jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestPublishRequestToNatsWithBodyJetstream(t *testing.T) {
}
// Read from X-Large-Body-Id
js, err := nc.JetStream()
integrationtest.FailOnErr("Error getting JetStream Client: %s", err, t)
integrationtest.FailOnErr("Error getting JetStream ClientConn: %s", err, t)
os, err := js.ObjectStore(bucket)
integrationtest.FailOnErr("Error getting ObjectStore "+bucket+": %s", err, t)
resBytes, err := os.GetBytes(id)
Expand Down Expand Up @@ -111,13 +111,13 @@ func TestPublishRequestToNatsWithBodyJetstream(t *testing.T) {
}

// we share the same NATS Server and Caddy Server for all testcases
_, nc := integrationtest.StartTestNats(t)
tn := integrationtest.StartTestNats(t)
caddyTester := integrationtest.NewCaddyTester(t)

for _, testcase := range cases {
t.Run(testcase.description, func(t *testing.T) {

subscription, err := nc.SubscribeSync("greet.>")
subscription, err := tn.ClientConn.SubscribeSync("greet.>")
defer subscription.Unsubscribe()
integrationtest.FailOnErr("error subscribing to greet.>: %w", err, t)

Expand All @@ -137,7 +137,7 @@ func TestPublishRequestToNatsWithBodyJetstream(t *testing.T) {
} else {
t.Logf("Received message: %+v", msg)
}
testcase.assertNatsMessage(msg, nc, t)
testcase.assertNatsMessage(msg, tn.ClientConn, t)
})
}
}
2 changes: 1 addition & 1 deletion dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ set -e
function run-tests() {
go mod tidy
# we cannot run in parallel right now, because the tests all boot up caddy servers and NATS servers on fixed ports.
go test -p 1 -v github.com/sandstorm/caddy-nats-bridge/...
go test -count=1 -p 1 -v github.com/sandstorm/caddy-nats-bridge/...
}


Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ toolchain go1.23.3

require (
github.com/caddyserver/caddy/v2 v2.8.4
github.com/nats-io/nats.go v1.33.1
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nuid v1.0.1
go.uber.org/zap v1.27.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a h1:lem6QCvxR0Y28g
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
Expand Down
8 changes: 4 additions & 4 deletions integrationtest/caddy_nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ import (
)

// TestNats tries to experiment with NATS request / reply in a testcase.
func TestNats(t *testing.T) {
_, nc := StartTestNats(t)
func TestNatsExample(t *testing.T) {
tn := StartTestNats(t)

sub, _ := nc.SubscribeSync("greet.*")
nc.Publish("greet.joe", []byte("hello"))
sub, _ := tn.ClientConn.SubscribeSync("greet.*")
tn.ClientConn.Publish("greet.joe", []byte("hello"))
msg, err := sub.NextMsg(10 * time.Millisecond)

if err != nil {
Expand Down
38 changes: 27 additions & 11 deletions integrationtest/natshelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ import (

const TEST_PORT = 8369

func StartTestNats(t *testing.T) (*server.Server, *nats.Conn) {
natsServer := RunServerOnPort(TEST_PORT)
t.Cleanup(func() {
natsServer.Shutdown()
})
type TestNats struct {
Server *server.Server
ClientConn *nats.Conn
}

func (tn *TestNats) RestartServer(t *testing.T) {
t.Logf("Shutting down NATS Server")
tn.Server.Shutdown()
t.Logf("Starting NATS Server")
tn.Server = runServerOnPort(TEST_PORT)
t.Logf("Started NATS Server")
}

func StartTestNats(t *testing.T) TestNats {
natsServer := runServerOnPort(TEST_PORT)

serverUrl := fmt.Sprintf("nats://127.0.0.1:%d", TEST_PORT)
natsClient, err := nats.Connect(serverUrl)
Expand All @@ -25,19 +35,25 @@ func StartTestNats(t *testing.T) (*server.Server, *nats.Conn) {
natsClient.Drain()
})

return natsServer, natsClient
tn := TestNats{
Server: natsServer,
ClientConn: natsClient,
}

t.Cleanup(func() {
tn.Server.Shutdown()
})

return tn
}

func RunServerOnPort(port int) *server.Server {
func runServerOnPort(port int) *server.Server {
opts := natsserver.DefaultTestOptions
opts.Port = port
opts.JetStream = true
opts.Debug = true
opts.Trace = true
opts.NoLog = false
return RunServerWithOptions(&opts)
}

func RunServerWithOptions(opts *server.Options) *server.Server {
return natsserver.RunServer(opts)
return natsserver.RunServer(&opts)
}
160 changes: 150 additions & 10 deletions logoutput/logoutput_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package logoutput_test

import (
"encoding/json"
"github.com/caddyserver/caddy/v2/caddytest"
"github.com/nats-io/nats.go"
"io"
"strings"
)

import (
Expand Down Expand Up @@ -39,17 +42,122 @@ type logMsgExample struct {

func TestLogRequestToNats(t *testing.T) {
type testCase struct {
description string
sendHttpRequestAndAssertResponse func() error
handleNatsMessage func(msg *nats.Msg, nc *nats.Conn) error
CaddyfileSnippet string
description string
sendHttpRequestAndAssertResponse func(t *testing.T, tn *integrationtest.TestNats) error
handleNatsMessage func(msg *nats.Msg, nc *nats.Conn) error
CaddyfileSnippet string
shouldReloadCaddyBeforeExecutingTest bool
}

// Testcases
cases := []testCase{
{
description: "HTTP request logging to NATS",
sendHttpRequestAndAssertResponse: func() error {
sendHttpRequestAndAssertResponse: func(t *testing.T, tn *integrationtest.TestNats) error {
// 1) send initial HTTP request (will be validated on the NATS handler side)
req, err := http.NewRequest("GET", "http://localhost:8889/test/hi", nil)
if err != nil {
return err
}
_, err = http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("HTTP request failed: %w", err)
}

return nil
},
CaddyfileSnippet: `
log {
output nats my.subject
}
route /test/* {
respond 200
}
`,
handleNatsMessage: func(msg *nats.Msg, nc *nats.Conn) error {
// 2) validate incoming NATS request (converted from HTTP)
if msg.Subject != "my.subject" {
t.Fatalf("Subject not correct, expected 'my.subject', actual: %s", msg.Subject)
}
// {"level":"info","ts":1709835557.872107,"logger":"http.log.access.log0","msg":"handled request","request":{"remote_ip":"127.0.0.1","remote_port":"50174","client_ip":"127.0.0.1","proto":"HTTP/1.1","method":"GET","host":"localhost:8889","uri":"/test/hi","headers":{"User-Agent":["Go-http-client/1.1"],"Accept-Encoding":["gzip"]}},"bytes_read":0,"user_id":"","duration":0.000015458,"size":0,"status":200,"resp_headers":{"Server":["Caddy"],"Content-Type":[]}}
var logMsg logMsgExample
err := json.Unmarshal(msg.Data, &logMsg)
if err != nil {
return err
}

if logMsg.Level != "info" {
t.Fatalf("msg.level not correct, actual: %s", logMsg.Level)
}
if logMsg.Msg != "handled request" {
t.Fatalf("msg.msg not correct, actual: %s", logMsg.Msg)
}
if logMsg.Status != 200 {
t.Fatalf("msg.status not correct, actual: %d", logMsg.Status)
}

return nil
},
},

{
description: "HTTP request logging to NATS should also work after reload of caddy",
sendHttpRequestAndAssertResponse: func(t *testing.T, tn *integrationtest.TestNats) error {
forceReloadCaddy(t)

// 1) send initial HTTP request (will be validated on the NATS handler side)
req, err := http.NewRequest("GET", "http://localhost:8889/test/hi", nil)
if err != nil {
return err
}
_, err = http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("HTTP request failed: %w", err)
}

return nil
},
CaddyfileSnippet: `
log {
output nats my.subject
}
route /test/* {
respond 200
}
`,
handleNatsMessage: func(msg *nats.Msg, nc *nats.Conn) error {
// 2) validate incoming NATS request (converted from HTTP)
if msg.Subject != "my.subject" {
t.Fatalf("Subject not correct, expected 'my.subject', actual: %s", msg.Subject)
}
// {"level":"info","ts":1709835557.872107,"logger":"http.log.access.log0","msg":"handled request","request":{"remote_ip":"127.0.0.1","remote_port":"50174","client_ip":"127.0.0.1","proto":"HTTP/1.1","method":"GET","host":"localhost:8889","uri":"/test/hi","headers":{"User-Agent":["Go-http-client/1.1"],"Accept-Encoding":["gzip"]}},"bytes_read":0,"user_id":"","duration":0.000015458,"size":0,"status":200,"resp_headers":{"Server":["Caddy"],"Content-Type":[]}}
var logMsg logMsgExample
err := json.Unmarshal(msg.Data, &logMsg)
if err != nil {
return err
}

if logMsg.Level != "info" {
t.Fatalf("msg.level not correct, actual: %s", logMsg.Level)
}
if logMsg.Msg != "handled request" {
t.Fatalf("msg.msg not correct, actual: %s", logMsg.Msg)
}
if logMsg.Status != 200 {
t.Fatalf("msg.status not correct, actual: %d", logMsg.Status)
}

return nil
},
},

{
description: "!!! UNSTABLE TEST !!! HTTP request logging to NATS should also work after restart of NATS",
sendHttpRequestAndAssertResponse: func(t *testing.T, tn *integrationtest.TestNats) error {
tn.RestartServer(t)
// not nice ;)
time.Sleep(1 * time.Second)

// 1) send initial HTTP request (will be validated on the NATS handler side)
req, err := http.NewRequest("GET", "http://localhost:8889/test/hi", nil)
if err != nil {
Expand Down Expand Up @@ -98,13 +206,13 @@ func TestLogRequestToNats(t *testing.T) {
}

// we share the same NATS Server and Caddy Server for all testcases
_, nc := integrationtest.StartTestNats(t)
testNats := integrationtest.StartTestNats(t)
caddyTester := integrationtest.NewCaddyTester(t)

for _, testcase := range cases {
t.Run(testcase.description, func(t *testing.T) {

subscription, err := nc.SubscribeSync(">")
subscription, err := testNats.ClientConn.SubscribeSync(">")
defer subscription.Unsubscribe()
integrationtest.FailOnErr("error subscribing to >: %w", err, t)

Expand All @@ -117,17 +225,17 @@ func TestLogRequestToNats(t *testing.T) {
// HTTP Request and assertion Goroutine
httpResultChan := make(chan error)
go func() {
httpResultChan <- testcase.sendHttpRequestAndAssertResponse()
httpResultChan <- testcase.sendHttpRequestAndAssertResponse(t, &testNats)
}()

// handle NATS message and generate response.
msg, err := subscription.NextMsg(10 * time.Millisecond)
msg, err := subscription.NextMsg(10000 * time.Millisecond)
if err != nil {
t.Fatalf("message not received: %v", err)
} else {
t.Logf("Received message: %+v", msg)
}
err = testcase.handleNatsMessage(msg, nc)
err = testcase.handleNatsMessage(msg, testNats.ClientConn)
if err != nil {
t.Fatalf("error with NATS message: %s", err)
}
Expand All @@ -140,3 +248,35 @@ func TestLogRequestToNats(t *testing.T) {
})
}
}

func forceReloadCaddy(t *testing.T) {
res, err := http.Get(fmt.Sprintf("http://localhost:%d/config/", caddytest.Default.AdminPort))
if err != nil {
t.Logf("Error reading config: %s", err)
t.FailNow()
return
}
defer res.Body.Close()
body, _ := io.ReadAll(res.Body)

client := &http.Client{
Timeout: caddytest.Default.LoadRequestTimeout,
}
req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%d/load", caddytest.Default.AdminPort), strings.NewReader(string(body)))
if err != nil {
t.Logf("failed to create request: %s", err)
t.FailNow()
return
}

req.Header.Add("Content-Type", "application/json")
req.Header.Add("Cache-Control", "must-revalidate")

res, err = client.Do(req)
if err != nil {
t.Logf("unable to contact caddy server: %s", err)
t.FailNow()
return
}
return
}
18 changes: 12 additions & 6 deletions natsbridge/nats_bridge_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ func (app *NatsBridgeApp) Start() error {
opts = append(opts, opt)
}

// we retry forever
opts = append(opts, nats.MaxReconnects(-1))
opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
app.logger.Info("NATS disconnected")
}))
opts = append(opts, nats.RetryOnFailedConnect(true))
opts = append(opts, nats.ReconnectHandler(func(conn *nats.Conn) {
app.logger.Info("NATS reconnected")
}))

server.Conn, err = nats.Connect(server.NatsUrl, opts...)
if err != nil {
return fmt.Errorf("could not connect to %s : %w", server.NatsUrl, err)
Expand All @@ -118,12 +128,8 @@ func (app *NatsBridgeApp) Start() error {
}

func (app *NatsBridgeApp) Stop() error {
defer func() {
for _, server := range app.Servers {
app.logger.Info("closing NATS connection", zap.String("url", server.Conn.ConnectedUrlRedacted()))
server.Conn.Close()
}
}()
// we do NOT close the connections from NATS server, as otherwise we'd disconnect
// on a reload of the Caddy server.

app.logger.Info("stopping all NATS subscriptions")
for _, server := range app.Servers {
Expand Down
Loading

0 comments on commit ae63a68

Please sign in to comment.