diff --git a/go.mod b/go.mod index 98a88db3..c43f7126 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/Azure/go-amqp v0.17.5 - github.com/go-stomp/stomp/v3 v3.0.0 + github.com/go-stomp/stomp/v3 v3.0.5 github.com/joshdk/go-junit v0.0.0-20210226021600-6145f504ca0d github.com/sirupsen/logrus v1.8.1 github.com/spf13/cobra v1.3.0 diff --git a/go.sum b/go.sum index 4fc08add..92e45878 100644 --- a/go.sum +++ b/go.sum @@ -189,6 +189,8 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stomp/stomp/v3 v3.0.0 h1:SnKnOoBkx/2MLxBeANKlyr+O8+Zx10H0ONub5ax6v/Y= github.com/go-stomp/stomp/v3 v3.0.0/go.mod h1:jTrybHBK20jPdM9iyh65m6GusX6aMf7atfEFZ1nIcgc= +github.com/go-stomp/stomp/v3 v3.0.5 h1:yOORvXLqSu0qF4loJjfWrcVE1o0+9cFudclcP0an36Y= +github.com/go-stomp/stomp/v3 v3.0.5/go.mod h1:ztzZej6T2W4Y6FlD+Tb5n7HQP3/O5UNQiuC169pIp10= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= diff --git a/vendor/github.com/go-stomp/stomp/v3/.travis.yml b/vendor/github.com/go-stomp/stomp/v3/.travis.yml deleted file mode 100644 index aa244e44..00000000 --- a/vendor/github.com/go-stomp/stomp/v3/.travis.yml +++ /dev/null @@ -1,5 +0,0 @@ -language: go - -go: - - 1.4 - diff --git a/vendor/github.com/go-stomp/stomp/v3/README.md b/vendor/github.com/go-stomp/stomp/v3/README.md index f937e5ba..ed606e8d 100644 --- a/vendor/github.com/go-stomp/stomp/v3/README.md +++ b/vendor/github.com/go-stomp/stomp/v3/README.md @@ -2,7 +2,7 @@ Go language implementation of a STOMP client library. -[![Build Status](https://travis-ci.org/go-stomp/stomp.svg?branch=master)](https://travis-ci.org/go-stomp/stomp) +![Build Status](https://github.com/go-stomp/stomp/actions/workflows/test.yml/badge.svg?branch=master) [![Go Reference](https://pkg.go.dev/badge/github.com/go-stomp/stomp/v3.svg)](https://pkg.go.dev/github.com/go-stomp/stomp/v3) Features: diff --git a/vendor/github.com/go-stomp/stomp/v3/conn.go b/vendor/github.com/go-stomp/stomp/v3/conn.go index 075ffd92..4f0516ac 100644 --- a/vendor/github.com/go-stomp/stomp/v3/conn.go +++ b/vendor/github.com/go-stomp/stomp/v3/conn.go @@ -3,7 +3,6 @@ package stomp import ( "errors" "io" - "log" "net" "strconv" "sync" @@ -22,6 +21,9 @@ const DefaultMsgSendTimeout = 10 * time.Second // Default receipt timeout in Conn.Send function const DefaultRcvReceiptTimeout = 30 * time.Second +// Reply-To header used for temporary queues/RPC with rabbit. +const ReplyToHeader = "reply-to" + // A Conn is a connection to a STOMP server. Create a Conn using either // the Dial or Connect function. type Conn struct { @@ -39,6 +41,7 @@ type Conn struct { closed bool closeMutex *sync.Mutex options *connOptions + log Logger } type writeRequest struct { @@ -64,7 +67,7 @@ func Dial(network, addr string, opts ...func(*Conn) error) (*Conn, error) { // Add option to set host and make it the first option in list, // so that if host has been explicitly specified it will override. - opts = append([](func(*Conn) error){ConnOpt.Host(host)}, opts...) + opts = append([]func(*Conn) error{ConnOpt.Host(host)}, opts...) return Connect(c, opts...) } @@ -87,6 +90,8 @@ func Connect(conn io.ReadWriteCloser, opts ...func(*Conn) error) (*Conn, error) return nil, err } + c.log = options.Logger + if options.ReadBufferSize > 0 { reader = frame.NewReaderSize(conn, options.ReadBufferSize) } @@ -311,7 +316,7 @@ func processLoop(c *Conn, writer *frame.Writer) { } case frame.ERROR: - log.Println("received ERROR; Closing underlying connection") + c.log.Error("received ERROR; Closing underlying connection") for _, ch := range channels { ch <- f close(ch) @@ -329,7 +334,7 @@ func processLoop(c *Conn, writer *frame.Writer) { if ch, ok := channels[id]; ok { ch <- f } else { - log.Println("ignored MESSAGE for subscription", id) + c.log.Infof("ignored MESSAGE for subscription: %s", id) } } } @@ -352,23 +357,38 @@ func processLoop(c *Conn, writer *frame.Writer) { } } + // default is to always send a frame. + var sendFrame = true + switch req.Frame.Command { case frame.SUBSCRIBE: id, _ := req.Frame.Header.Contains(frame.Id) channels[id] = req.C + + // if using a temp queue, map that destination as a known channel + // however, don't send the frame, it's most likely an invalid destination + // on the broker. + if replyTo, ok := req.Frame.Header.Contains(ReplyToHeader); ok { + channels[replyTo] = req.C + sendFrame = false + } + case frame.UNSUBSCRIBE: id, _ := req.Frame.Header.Contains(frame.Id) // is this trying to be too clever -- add a receipt // header so that when the server responds with a // RECEIPT frame, the corresponding channel will be closed req.Frame.Header.Set(frame.Receipt, id) + } - // frame to send - err := writer.Write(req.Frame) - if err != nil { - sendError(channels, err) - return + // frame to send, if enabled + if sendFrame { + err := writer.Write(req.Frame) + if err != nil { + sendError(channels, err) + return + } } } } @@ -731,7 +751,9 @@ func (c *Conn) createAckNackFrame(msg *Message, ack bool) (*frame.Frame, error) return nil, missingHeader(frame.MessageId) } case V12: + // message frame contains ack header if ack, ok := msg.Header.Contains(frame.Ack); ok { + // ack frame should reference it as id f.Header.Add(frame.Id, ack) } else { return nil, missingHeader(frame.Ack) diff --git a/vendor/github.com/go-stomp/stomp/v3/conn_options.go b/vendor/github.com/go-stomp/stomp/v3/conn_options.go index 8a648643..f0bf4d4d 100644 --- a/vendor/github.com/go-stomp/stomp/v3/conn_options.go +++ b/vendor/github.com/go-stomp/stomp/v3/conn_options.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-stomp/stomp/v3/frame" + "github.com/go-stomp/stomp/v3/internal/log" ) // ConnOptions is an opaque structure used to collection options @@ -25,6 +26,7 @@ type connOptions struct { ReadChannelCapacity, WriteChannelCapacity int ReadBufferSize, WriteBufferSize int ResponseHeadersCallback func(*frame.Header) + Logger Logger } func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error) { @@ -36,6 +38,7 @@ func newConnOptions(conn *Conn, opts []func(*Conn) error) (*connOptions, error) HeartBeatError: DefaultHeartBeatError, MsgSendTimeout: DefaultMsgSendTimeout, RcvReceiptTimeout: DefaultRcvReceiptTimeout, + Logger: log.StdLogger{}, } // This is a slight of hand, attach the options to the Conn long @@ -178,6 +181,9 @@ var ConnOpt struct { // ResponseHeaders lets you provide a callback function to get the headers from the CONNECT response ResponseHeaders func(func(*frame.Header)) func(*Conn) error + + // Logger lets you provide a callback function that sets the logger used by a connection + Logger func(logger Logger) func(*Conn) error } func init() { @@ -294,4 +300,14 @@ func init() { return nil } } + + ConnOpt.Logger = func(log Logger) func(*Conn) error { + return func(c *Conn) error { + if log != nil { + c.options.Logger = log + } + + return nil + } + } } diff --git a/vendor/github.com/go-stomp/stomp/v3/frame/encode.go b/vendor/github.com/go-stomp/stomp/v3/frame/encode.go index 50d796c6..ecd187c0 100644 --- a/vendor/github.com/go-stomp/stomp/v3/frame/encode.go +++ b/vendor/github.com/go-stomp/stomp/v3/frame/encode.go @@ -1,8 +1,8 @@ package frame import ( - "bytes" "strings" + "unsafe" ) var ( @@ -20,17 +20,15 @@ var ( ) ) -// Encodes a header value using STOMP value encoding -func encodeValue(s string) []byte { - var buf bytes.Buffer - buf.Grow(len(s)) - replacerForEncodeValue.WriteString(&buf, s) - return buf.Bytes() +// Reduce one allocation on copying bytes to string +func bytesToString(b []byte) string { + /* #nosec G103 */ + return *(*string)(unsafe.Pointer(&b)) } // Unencodes a header value using STOMP value encoding // TODO: return error if invalid sequences found (eg "\t") func unencodeValue(b []byte) (string, error) { - s := replacerForUnencodeValue.Replace(string(b)) + s := replacerForUnencodeValue.Replace(bytesToString(b)) return s, nil } diff --git a/vendor/github.com/go-stomp/stomp/v3/frame/writer.go b/vendor/github.com/go-stomp/stomp/v3/frame/writer.go index ad76694e..7c6e83e0 100644 --- a/vendor/github.com/go-stomp/stomp/v3/frame/writer.go +++ b/vendor/github.com/go-stomp/stomp/v3/frame/writer.go @@ -53,7 +53,7 @@ func (w *Writer) Write(f *Frame) error { for i := 0; i < f.Header.Len(); i++ { key, value := f.Header.GetAt(i) //println(" ", key, ":", value) - _, err = w.writer.Write(encodeValue(key)) + _, err = replacerForEncodeValue.WriteString(w.writer, key) if err != nil { return err } @@ -61,7 +61,7 @@ func (w *Writer) Write(f *Frame) error { if err != nil { return err } - _, err = w.writer.Write(encodeValue(value)) + _, err = replacerForEncodeValue.WriteString(w.writer, value) if err != nil { return err } diff --git a/vendor/github.com/go-stomp/stomp/v3/internal/log/stdlogger.go b/vendor/github.com/go-stomp/stomp/v3/internal/log/stdlogger.go new file mode 100644 index 00000000..784c6f0b --- /dev/null +++ b/vendor/github.com/go-stomp/stomp/v3/internal/log/stdlogger.go @@ -0,0 +1,51 @@ +package log + +import ( + "fmt" + stdlog "log" +) + +var ( + debugPrefix = "DEBUG: " + infoPrefix = "INFO: " + warnPrefix = "WARN: " + errorPrefix = "ERROR: " +) + +func logf(prefix string, format string, value ...interface{}) { + _ = stdlog.Output(3, fmt.Sprintf(prefix+format+"\n", value...)) +} + +type StdLogger struct{} + +func (s StdLogger) Debugf(format string, value ...interface{}) { + logf(debugPrefix, format, value...) +} + +func (s StdLogger) Debug(message string) { + logf(debugPrefix, "%s", message) +} + +func (s StdLogger) Infof(format string, value ...interface{}) { + logf(infoPrefix, format, value...) +} + +func (s StdLogger) Info(message string) { + logf(infoPrefix, "%s", message) +} + +func (s StdLogger) Warningf(format string, value ...interface{}) { + logf(warnPrefix, format, value...) +} + +func (s StdLogger) Warning(message string) { + logf(warnPrefix, "%s", message) +} + +func (s StdLogger) Errorf(format string, value ...interface{}) { + logf(errorPrefix, format, value...) +} + +func (s StdLogger) Error(message string) { + logf(errorPrefix, "%s", message) +} diff --git a/vendor/github.com/go-stomp/stomp/v3/logger.go b/vendor/github.com/go-stomp/stomp/v3/logger.go new file mode 100644 index 00000000..4651d461 --- /dev/null +++ b/vendor/github.com/go-stomp/stomp/v3/logger.go @@ -0,0 +1,13 @@ +package stomp + +type Logger interface { + Debugf(format string, value ...interface{}) + Infof(format string, value ...interface{}) + Warningf(format string, value ...interface{}) + Errorf(format string, value ...interface{}) + + Debug(message string) + Info(message string) + Warning(message string) + Error(message string) +} diff --git a/vendor/github.com/go-stomp/stomp/v3/subscription.go b/vendor/github.com/go-stomp/stomp/v3/subscription.go index 97d88623..6aeaaf7e 100644 --- a/vendor/github.com/go-stomp/stomp/v3/subscription.go +++ b/vendor/github.com/go-stomp/stomp/v3/subscription.go @@ -2,7 +2,6 @@ package stomp import ( "fmt" - "log" "sync" "sync/atomic" @@ -155,7 +154,7 @@ func (s *Subscription) readLoop(ch chan *frame.Frame) { s.id, s.destination, message) - log.Println(text) + s.conn.log.Info(text) contentType := f.Header.Get(frame.ContentType) msg := &Message{ Err: &Error{ @@ -178,7 +177,7 @@ func (s *Subscription) readLoop(ch chan *frame.Frame) { } return } else { - log.Printf("Subscription %s: %s: unsupported frame type: %+v\n", s.id, s.destination, f) + s.conn.log.Infof("Subscription %s: %s: unsupported frame type: %+v", s.id, s.destination, f) } } } diff --git a/vendor/modules.txt b/vendor/modules.txt index a10f72a4..3d50cc60 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -69,10 +69,11 @@ github.com/go-openapi/jsonreference # github.com/go-openapi/swag v0.19.15 ## explicit; go 1.11 github.com/go-openapi/swag -# github.com/go-stomp/stomp/v3 v3.0.0 +# github.com/go-stomp/stomp/v3 v3.0.5 ## explicit; go 1.15 github.com/go-stomp/stomp/v3 github.com/go-stomp/stomp/v3/frame +github.com/go-stomp/stomp/v3/internal/log # github.com/gogo/protobuf v1.3.2 ## explicit; go 1.15 github.com/gogo/protobuf/proto