diff --git a/aat/auth_test.go b/aat/auth_test.go index 38944c79..bc666b9e 100644 --- a/aat/auth_test.go +++ b/aat/auth_test.go @@ -45,7 +45,7 @@ func TestJoinRealmWithCRAuth(t *testing.T) { } realmDetails := cli.RealmDetails() - if wamp.OptionString(realmDetails, "authrole") != "user" { + if s, _ := wamp.AsString(realmDetails["authrole"]); s != "user" { t.Fatal("missing or incorrect authrole") } @@ -80,7 +80,7 @@ func TestJoinRealmWithCRCookieAuth(t *testing.T) { details := cli.RealmDetails() // Client should not have be authenticated by cookie first time. - if wamp.OptionFlag(details, "authbycookie") { + if ok, _ := wamp.AsBool(details["authbycookie"]); ok { t.Fatal("authbycookie set incorrectly to true") } @@ -93,11 +93,11 @@ func TestJoinRealmWithCRCookieAuth(t *testing.T) { // If websocket, then should be authenticated by cookie this time. if cfg.WsCfg.Jar != nil { - if !wamp.OptionFlag(details, "authbycookie") { + if ok, _ := wamp.AsBool(details["authbycookie"]); !ok { t.Fatal("should have been authenticated by cookie") } } else { - if wamp.OptionFlag(details, "authbycookie") { + if ok, _ := wamp.AsBool(details["authbycookie"]); ok { t.Fatal("authbycookie set incorrectly to true") } } @@ -159,7 +159,7 @@ func TestAuthz(t *testing.T) { t.Fatal("Call error:", err) } dict, _ := wamp.AsDict(result.Arguments[0]) - if wamp.OptionString(dict, "foobar") != "" { + if _, ok := dict["foobar"]; ok { t.Fatal("Should not have special info in session") } @@ -183,7 +183,7 @@ func TestAuthz(t *testing.T) { t.Fatal("Call error:", err) } dict, _ = wamp.AsDict(result.Arguments[0]) - if wamp.OptionString(dict, "foobar") != "baz" { + if s, _ := wamp.AsString(dict["foobar"]); s != "baz" { t.Fatal("Missing special info in session") } diff --git a/aat/pubsub_test.go b/aat/pubsub_test.go index d0bbe801..e141e865 100644 --- a/aat/pubsub_test.go +++ b/aat/pubsub_test.go @@ -100,7 +100,7 @@ func TestPubSubWildcard(t *testing.T) { errChan <- errors.New("event missing or bad args") return } - origTopic := wamp.OptionURI(details, "topic") + origTopic, _ := wamp.AsURI(details["topic"]) if origTopic != testTopic { errChan <- errors.New("wrong original topic") return diff --git a/aat/registrationmeta_test.go b/aat/registrationmeta_test.go index e038839a..8b57bf63 100644 --- a/aat/registrationmeta_test.go +++ b/aat/registrationmeta_test.go @@ -48,14 +48,13 @@ func TestMetaEventRegOnCreateRegOnRegister(t *testing.T) { errChanC <- errors.New("argument 0 (session) was not wamp.ID") return } - onCreateID = wamp.OptionID(dict, "id") - if wamp.OptionURI(dict, "uri") != wamp.URI("some.proc") { + onCreateID, _ = wamp.AsID(dict["id"]) + if u, _ := wamp.AsURI(dict["uri"]); u != wamp.URI("some.proc") { errChanC <- fmt.Errorf( - "on_create had wrong procedure, got '%v' want 'some.proc'", - wamp.OptionURI(dict, "uri")) + "on_create had wrong procedure, got '%v' want 'some.proc'", u) return } - if wamp.OptionString(dict, "created") == "" { + if s, _ := wamp.AsString(dict["created"]); s == "" { errChanC <- errors.New("on_create missing created time") return } diff --git a/aat/rpc_test.go b/aat/rpc_test.go index ad2c0a23..0a81429f 100644 --- a/aat/rpc_test.go +++ b/aat/rpc_test.go @@ -2,6 +2,7 @@ package aat import ( "context" + "sync" "testing" "time" @@ -42,34 +43,95 @@ func TestRPCRegisterAndCall(t *testing.T) { t.Fatal("Failed to connect client:", err) } - // Test calling the procedure. - callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - ctx := context.Background() - result, err := caller.Call(ctx, procName, nil, callArgs, nil, "") + // Connect second caller session. + caller2, err := connectClient() if err != nil { - t.Fatal("Failed to call procedure:", err) + t.Fatal("Failed to connect client:", err) } - sum, ok := wamp.AsInt64(result.Arguments[0]) - if !ok { - t.Fatal("Could not convert result to int64") + + // Connect third caller session. + caller3, err := connectClient() + if err != nil { + t.Fatal("Failed to connect client:", err) } - if sum != 55 { - t.Fatal("Wrong result:", sum) + + // Test calling the procedure. + callArgs := wamp.List{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + var result1, result2, result3 *wamp.Result + var err1, err2, err3 error + var ready, allDone sync.WaitGroup + release := make(chan struct{}) + ready.Add(3) + allDone.Add(3) + go func() { + defer allDone.Done() + ready.Done() + <-release + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + result1, err1 = caller.Call(ctx, procName, nil, callArgs, nil, "") + }() + go func() { + defer allDone.Done() + // Call it with caller2. + ready.Done() + <-release + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + result2, err2 = caller2.Call(ctx, procName, nil, callArgs, nil, "") + }() + go func() { + // Call it with caller3. + defer allDone.Done() + ready.Done() + <-release + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + result3, err3 = caller3.Call(ctx, procName, nil, callArgs, nil, "") + }() + + ready.Wait() + close(release) + allDone.Wait() + + errs := []error{err1, err2, err3} + results := []*wamp.Result{result1, result2, result3} + for i := 0; i < 3; i++ { + if errs[i] != nil { + t.Error("Caller", i, "failed to call procedure:", errs[i]) + } else { + sum, ok := wamp.AsInt64(results[i].Arguments[0]) + if !ok { + t.Error("Could not convert result", i, "to int64") + } else if sum != 55 { + t.Errorf("Wrong result %d: %d", i, sum) + } + } } // Test unregister. if err = callee.Unregister(procName); err != nil { - t.Fatal("Failed to unregister procedure:", err) + t.Error("Failed to unregister procedure:", err) } err = caller.Close() if err != nil { - t.Fatal("Failed to disconnect client:", err) + t.Error("Failed to disconnect client:", err) + } + + err = caller2.Close() + if err != nil { + t.Error("Failed to disconnect client:", err) + } + + err = caller3.Close() + if err != nil { + t.Error("Failed to disconnect client:", err) } err = callee.Close() if err != nil { - t.Fatal("Failed to disconnect client:", err) + t.Error("Failed to disconnect client:", err) } } diff --git a/aat/sessionmeta_test.go b/aat/sessionmeta_test.go index 736c5472..4397e8a4 100644 --- a/aat/sessionmeta_test.go +++ b/aat/sessionmeta_test.go @@ -49,7 +49,7 @@ func TestMetaEventOnJoin(t *testing.T) { errChan <- errors.New("argument was not wamp.Dict") return } - onJoinID = wamp.OptionID(details, "session") + onJoinID, _ = wamp.AsID(details["session"]) errChan <- nil } @@ -436,7 +436,7 @@ func TestMetaProcSessionGet(t *testing.T) { if !ok { t.Fatal("Could not convert result to wamp.Dict") } - resultID := wamp.OptionID(dict, "session") + resultID, _ := wamp.AsID(dict["session"]) if resultID != sess.ID() { t.Fatal("Wrong session ID in result") } diff --git a/aat/subscriptionmeta_test.go b/aat/subscriptionmeta_test.go index f62ab7c3..a373a42b 100644 --- a/aat/subscriptionmeta_test.go +++ b/aat/subscriptionmeta_test.go @@ -46,14 +46,13 @@ func TestMetaEventOnCreateOnSubscribe(t *testing.T) { errChanC <- errors.New("argument 0 (session) was not wamp.ID") return } - onCreateID = wamp.OptionID(dict, "id") - if wamp.OptionURI(dict, "uri") != wamp.URI("some.topic") { + onCreateID, _ = wamp.AsID(dict["id"]) + if u, _ := wamp.AsURI(dict["uri"]); u != wamp.URI("some.topic") { errChanC <- fmt.Errorf( - "on_create had wrong topic, got '%v' want 'some.topic'", - wamp.OptionURI(dict, "uri")) + "on_create had wrong topic, got '%v' want 'some.topic'", u) return } - if wamp.OptionString(dict, "created") == "" { + if s, _ := wamp.AsString(dict["created"]); s == "" { errChanC <- errors.New("on_create missing created time") return } diff --git a/client/client.go b/client/client.go index 148fb927..5b5fc4bf 100644 --- a/client/client.go +++ b/client/client.go @@ -143,7 +143,6 @@ type Client struct { progGate map[context.Context]wamp.ID actionChan chan func() - idGen *wamp.SyncIDGen stopping chan struct{} activeInvHandlers sync.WaitGroup @@ -191,7 +190,6 @@ func NewClient(p wamp.Peer, cfg Config) (*Client, error) { progGate: map[context.Context]wamp.ID{}, actionChan: make(chan func()), - idGen: new(wamp.SyncIDGen), stopping: make(chan struct{}), done: make(chan struct{}), @@ -252,7 +250,7 @@ func (c *Client) Subscribe(topic string, fn EventHandler, options wamp.Dict) err if options == nil { options = wamp.Dict{} } - id := c.idGen.Next() + id := wamp.GlobalID() c.expectReply(id) c.sess.Send(&wamp.Subscribe{ Request: id, @@ -322,7 +320,7 @@ func (c *Client) Unsubscribe(topic string) error { return err } - id := c.idGen.Next() + id := wamp.GlobalID() c.expectReply(id) c.sess.Send(&wamp.Unsubscribe{ Request: id, @@ -382,7 +380,7 @@ func (c *Client) Publish(topic string, options wamp.Dict, args wamp.List, kwargs // Check if the client is asking for a PUBLISHED response. pubAck, _ := options[wamp.OptAcknowledge].(bool) - id := c.idGen.Next() + id := wamp.GlobalID() if pubAck { c.expectReply(id) } @@ -443,7 +441,7 @@ type InvocationHandler func(context.Context, wamp.List, wamp.Dict, wamp.Dict) (r // // NOTE: Use consts defined in wamp/options.go instead of raw strings. func (c *Client) Register(procedure string, fn InvocationHandler, options wamp.Dict) error { - id := c.idGen.Next() + id := wamp.GlobalID() c.expectReply(id) c.sess.Send(&wamp.Register{ Request: id, @@ -517,7 +515,7 @@ func (c *Client) Unregister(procedure string) error { return err } - id := c.idGen.Next() + id := wamp.GlobalID() c.expectReply(id) c.sess.Send(&wamp.Unregister{ Request: id, @@ -657,7 +655,7 @@ func (c *Client) CallProgress(ctx context.Context, procedure string, options wam }() } - id := c.idGen.Next() + id := wamp.GlobalID() c.expectReply(id) c.sess.Send(&wamp.Call{ Request: id, @@ -848,7 +846,7 @@ func handleCRAuth(peer wamp.Peer, challenge *wamp.Challenge, authHandlers map[st // If router sent back ABORT in response to client's authentication attempt // return error. if abort, ok := msg.(*wamp.Abort); ok { - authErr := wamp.OptionString(abort.Details, wamp.OptError) + authErr, _ := wamp.AsString(abort.Details[wamp.OptError]) if authErr == "" { authErr = "authentication failed" } @@ -1002,10 +1000,11 @@ CollectResults: } // If this is a progressive result. if progChan != nil { - result, ok := msg.(*wamp.Result) - if ok && wamp.OptionFlag(result.Details, wamp.OptProgress) { - progChan <- result - goto CollectResults + if result, ok := msg.(*wamp.Result); ok { + if ok, _ = wamp.AsBool(result.Details[wamp.OptProgress]); ok { + progChan <- result + goto CollectResults + } } } c.actionChan <- func() { @@ -1126,7 +1125,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) { // Create a kill switch so that invocation can be canceled. var cancel context.CancelFunc var ctx context.Context - timeout := wamp.OptionInt64(msg.Details, wamp.OptTimeout) + timeout, _ := wamp.AsInt64(msg.Details[wamp.OptTimeout]) if timeout > 0 { // The caller specified a timeout, in milliseconds. ctx, cancel = context.WithTimeout(context.Background(), @@ -1139,7 +1138,7 @@ func (c *Client) runHandleInvocation(msg *wamp.Invocation) { // If caller is accepting progressive results, create map entry to // allow progress to be sent. - if wamp.OptionFlag(msg.Details, wamp.OptReceiveProgress) { + if ok, _ = wamp.AsBool(msg.Details[wamp.OptReceiveProgress]); ok { c.progGate[ctx] = msg.Request } @@ -1230,7 +1229,8 @@ func (c *Client) runHandleInterrupt(msg *wamp.Interrupt) { // If the interrupt mode is "killnowait", then the router is not // waiting for a response, so do not send one. This is indicated by // deleting the cancel for the invocation early. - if wamp.OptionString(msg.Options, wamp.OptMode) == wamp.CancelModeKillNoWait { + mode, _ := wamp.AsString(msg.Options[wamp.OptMode]) + if mode == wamp.CancelModeKillNoWait { delete(c.invHandlerKill, msg.Request) } cancel() diff --git a/client/client_test.go b/client/client_test.go index 2f997f08..a3f94e71 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -202,7 +202,7 @@ func TestSubscribe(t *testing.T) { errChan <- errors.New("event missing or bad args") return } - origTopic := wamp.OptionURI(details, "topic") + origTopic, _ := wamp.AsURI(details["topic"]) if origTopic != wamp.URI(testTopic) { errChan <- errors.New("wrong original topic") return diff --git a/router/auth/anonymous_test.go b/router/auth/anonymous_test.go index fd38da51..7bcd27f4 100644 --- a/router/auth/anonymous_test.go +++ b/router/auth/anonymous_test.go @@ -24,10 +24,10 @@ func TestAnonAuth(t *testing.T) { t.Fatal("expected WELCOME message, got: ", welcome.MessageType()) } - if wamp.OptionString(welcome.Details, "authmethod") != "anonymous" { + if s, _ := wamp.AsString(welcome.Details["authmethod"]); s != "anonymous" { t.Fatal("invalid authmethod in welcome details") } - if wamp.OptionString(welcome.Details, "authrole") != "anonymous" { + if s, _ := wamp.AsString(welcome.Details["authrole"]); s != "anonymous" { t.Fatal("incorrect authrole in welcome details") } } diff --git a/router/auth/crauth.go b/router/auth/crauth.go index c60e6483..b8f21334 100644 --- a/router/auth/crauth.go +++ b/router/auth/crauth.go @@ -29,7 +29,7 @@ func NewCRAuthenticator(keyStore KeyStore, timeout time.Duration) *CRAuthenticat func (cr *CRAuthenticator) AuthMethod() string { return "wampcra" } func (cr *CRAuthenticator) Authenticate(sid wamp.ID, details wamp.Dict, client wamp.Peer) (*wamp.Welcome, error) { - authid := wamp.OptionString(details, "authid") + authid, _ := wamp.AsString(details["authid"]) if authid == "" { return nil, errors.New("missing authid") } diff --git a/router/auth/crauth_test.go b/router/auth/crauth_test.go index 1f367c62..803210b0 100644 --- a/router/auth/crauth_test.go +++ b/router/auth/crauth_test.go @@ -153,13 +153,13 @@ func TestTicketAuth(t *testing.T) { if welcome.MessageType() != wamp.WELCOME { t.Fatal("expected WELCOME message, got: ", welcome.MessageType()) } - if wamp.OptionString(welcome.Details, "authmethod") != "ticket" { + if s, _ := wamp.AsString(welcome.Details["authmethod"]); s != "ticket" { t.Fatal("invalid authmethod in welcome details") } - if wamp.OptionString(welcome.Details, "authrole") != "user" { + if s, _ := wamp.AsString(welcome.Details["authrole"]); s != "user" { t.Fatal("incorrect authrole in welcome details") } - if wamp.OptionFlag(welcome.Details, "authbycookie") { + if ok, _ := wamp.AsBool(welcome.Details["authbycookie"]); ok { t.Fatal("authbycookie set incorrectly to true") } tks.ticket = "bad" @@ -182,7 +182,7 @@ func TestTicketAuth(t *testing.T) { t.Fatal("challenge failed: ", err.Error()) } // Client should be authenticated by cookie. - if !wamp.OptionFlag(welcome.Details, "authbycookie") { + if ok, _ := wamp.AsBool(welcome.Details["authbycookie"]); !ok { t.Fatal("authbycookie set incorrectly to false") } } @@ -226,10 +226,10 @@ func TestCRAuth(t *testing.T) { if welcome.MessageType() != wamp.WELCOME { t.Fatal("expected WELCOME message, got: ", welcome.MessageType()) } - if wamp.OptionString(welcome.Details, "authmethod") != "wampcra" { + if s, _ := wamp.AsString(welcome.Details["authmethod"]); s != "wampcra" { t.Fatal("invalid authmethod in welcome details") } - if wamp.OptionString(welcome.Details, "authrole") != "user" { + if s, _ := wamp.AsString(welcome.Details["authrole"]); s != "user" { t.Fatal("incorrect authrole in welcome details") } diff --git a/router/auth/ticket.go b/router/auth/ticket.go index 1ae870a2..97827c56 100644 --- a/router/auth/ticket.go +++ b/router/auth/ticket.go @@ -35,7 +35,7 @@ func (t *TicketAuthenticator) Authenticate(sid wamp.ID, details wamp.Dict, clien // The HELLO.Details.authid|string is the authentication ID (e.g. username) // the client wishes to authenticate as. For Ticket-based authentication, // this MUST be provided. - authID := wamp.OptionString(details, "authid") + authID, _ := wamp.AsString(details["authid"]) if authID == "" { return nil, errors.New("missing authid") } diff --git a/router/broker.go b/router/broker.go index 73728e3f..38754de6 100644 --- a/router/broker.go +++ b/router/broker.go @@ -138,7 +138,7 @@ func (b *Broker) Publish(pub *wamp.Session, msg *wamp.Publish) { // do so when the Broker configuration (for the publication topic) is // set up to do so. TODO: Currently no broker config for this. var disclose bool - if wamp.OptionFlag(msg.Options, wamp.OptDiscloseMe) { + if opt, _ := wamp.AsBool(msg.Options[wamp.OptDiscloseMe]); opt { // Broker MAY deny a publisher's request to disclose its identity. if !b.allowDisclose { b.trySend(pub, &wamp.Error{ @@ -183,7 +183,7 @@ func (b *Broker) Subscribe(sub *wamp.Session, msg *wamp.Subscribe) { // loose), and all URI components must be non-empty for normal // subscriptions, may be empty for wildcard subscriptions and must be // non-empty for all but the last component for prefix subscriptions. - match := wamp.OptionString(msg.Options, wamp.OptMatch) + match, _ := wamp.AsString(msg.Options[wamp.OptMatch]) if !msg.Topic.ValidURI(b.strictURI, match) { errMsg := fmt.Sprintf( "subscribe for invalid topic URI %v (URI strict checking %v)", diff --git a/router/dealer.go b/router/dealer.go index 77236f6a..716cc163 100644 --- a/router/dealer.go +++ b/router/dealer.go @@ -169,7 +169,7 @@ func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { // Validate procedure URI. For REGISTER, must be valid URI (either strict // or loose), and all URI components must be non-empty other than for // wildcard or prefix matched procedures. - match := wamp.OptionString(msg.Options, wamp.OptMatch) + match, _ := wamp.AsString(msg.Options[wamp.OptMatch]) if !msg.Procedure.ValidURI(d.strictURI, match) { errMsg := fmt.Sprintf( "register for invalid procedure URI %v (URI strict checking %v)", @@ -187,7 +187,7 @@ func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { // Disallow registration of procedures starting with "wamp.", except for // trusted sessions that are built into router. - authrole := wamp.OptionString(callee.Details, "authrole") + authrole, _ := wamp.AsString(callee.Details["authrole"]) if authrole != "" && authrole != "trusted" { if wampURI { errMsg := fmt.Sprintf("register for restricted procedure URI %v", @@ -204,7 +204,7 @@ func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { // If callee requests disclosure of caller identity, but dealer does not // allow, then send error as registration response. - disclose := wamp.OptionFlag(msg.Options, wamp.OptDiscloseCaller) + disclose, _ := wamp.AsBool(msg.Options[wamp.OptDiscloseCaller]) // allow disclose for trusted clients if !d.allowDisclose && disclose && authrole != "trusted" { d.trySend(callee, &wamp.Error{ @@ -216,7 +216,7 @@ func (d *Dealer) Register(callee *wamp.Session, msg *wamp.Register) { return } - invoke := wamp.OptionString(msg.Options, wamp.OptInvoke) + invoke, _ := wamp.AsString(msg.Options[wamp.OptInvoke]) d.actionChan <- func() { d.register(callee, msg, match, invoke, disclose, wampURI) } @@ -570,7 +570,7 @@ func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { // // A timeout allows to automatically cancel a call after a specified time // either at the Callee or at the Dealer. - timeout := wamp.OptionInt64(msg.Options, wamp.OptTimeout) + timeout, _ := wamp.AsInt64(msg.Options[wamp.OptTimeout]) if timeout > 0 { // Check that callee supports call_timeout. if callee.HasFeature(roleCallee, featureCallTimeout) { @@ -592,7 +592,7 @@ func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { // A Caller MAY request the disclosure of its identity (its WAMP // session ID) to endpoints of a routed call. This is indicated by the // "disclose_me" flag in the message options. - if wamp.OptionFlag(msg.Options, wamp.OptDiscloseMe) { + if opt, _ := wamp.AsBool(msg.Options[wamp.OptDiscloseMe]); opt { // Dealer MAY deny a Caller's request to disclose its identity. if !d.allowDisclose { d.trySend(caller, &wamp.Error{ @@ -610,7 +610,7 @@ func (d *Dealer) call(caller *wamp.Session, msg *wamp.Call) { // A Caller indicates its willingness to receive progressive results by // setting CALL.Options.receive_progress|bool := true - if wamp.OptionFlag(msg.Options, wamp.OptReceiveProgress) { + if opt, _ := wamp.AsBool(msg.Options[wamp.OptReceiveProgress]); opt { // If the Callee supports progressive calls, the Dealer will // forward the Caller's willingness to receive progressive // results by setting. @@ -681,7 +681,7 @@ func (d *Dealer) cancel(caller *wamp.Session, msg *wamp.Cancel) { invk.canceled = true // Cancel mode should be one of: "skip", "kill", "killnowait" - mode := wamp.OptionString(msg.Options, wamp.OptMode) + mode, _ := wamp.AsString(msg.Options[wamp.OptMode]) if mode == wamp.CancelModeKillNoWait || mode == wamp.CancelModeKill { // Check that callee supports call canceling to see if it is alright to // send INTERRUPT to callee. @@ -744,7 +744,7 @@ func (d *Dealer) yield(callee *wamp.Session, msg *wamp.Yield) { details := wamp.Dict{} - progress := wamp.OptionFlag(msg.Options, wamp.OptProgress) + progress, _ := wamp.AsBool(msg.Options[wamp.OptProgress]) if !progress { delete(d.invocations, msg.Request) // Delete callID -> invocation. @@ -933,7 +933,7 @@ func (d *Dealer) RegLookup(msg *wamp.Invocation) wamp.Message { var match string if len(msg.Arguments) > 1 { opts := msg.Arguments[1].(wamp.Dict) - match = wamp.OptionString(opts, wamp.OptMatch) + match, _ = wamp.AsString(opts[wamp.OptMatch]) } sync := make(chan wamp.ID) d.actionChan <- func() { @@ -988,9 +988,8 @@ func (d *Dealer) RegMatch(msg *wamp.Invocation) wamp.Message { func (d *Dealer) RegGet(msg *wamp.Invocation) wamp.Message { var dict wamp.Dict if len(msg.Arguments) != 0 { - if i64, ok := wamp.AsInt64(msg.Arguments[0]); ok { + if regID, ok := wamp.AsID(msg.Arguments[0]); ok { sync := make(chan struct{}) - regID := wamp.ID(i64) d.actionChan <- func() { if reg, ok := d.registrations[regID]; ok { dict = wamp.Dict{ @@ -1025,9 +1024,8 @@ func (d *Dealer) RegGet(msg *wamp.Invocation) wamp.Message { func (d *Dealer) RegListCallees(msg *wamp.Invocation) wamp.Message { var calleeIDs []wamp.ID if len(msg.Arguments) != 0 { - if i64, ok := wamp.AsInt64(msg.Arguments[0]); ok { + if regID, ok := wamp.AsID(msg.Arguments[0]); ok { sync := make(chan struct{}) - regID := wamp.ID(i64) d.actionChan <- func() { if reg, ok := d.registrations[regID]; ok { calleeIDs = make([]wamp.ID, len(reg.callees)) @@ -1060,10 +1058,9 @@ func (d *Dealer) RegCountCallees(msg *wamp.Invocation) wamp.Message { var count int var ok bool if len(msg.Arguments) != 0 { - var i64 int64 - if i64, ok = wamp.AsInt64(msg.Arguments[0]); ok { + var regID wamp.ID + if regID, ok = wamp.AsID(msg.Arguments[0]); ok { sync := make(chan int) - regID := wamp.ID(i64) d.actionChan <- func() { if reg, found := d.registrations[regID]; found { sync <- len(reg.callees) diff --git a/router/dealer_test.go b/router/dealer_test.go index 31a5a5cf..8105fa0d 100644 --- a/router/dealer_test.go +++ b/router/dealer_test.go @@ -205,7 +205,7 @@ func TestBasicCall(t *testing.T) { if rslt.Request != 125 { t.Fatal("wrong request ID in RESULT") } - if wamp.OptionFlag(rslt.Details, "progress") { + if ok, _ = wamp.AsBool(rslt.Details["progress"]); ok { t.Fatal("progress flag should not be set for response") } @@ -358,7 +358,7 @@ func TestCancelCallModeKill(t *testing.T) { if len(rslt.Details) == 0 { t.Fatal("expected details in message") } - if wamp.OptionString(rslt.Details, "reason") != "callee canceled" { + if s, _ := wamp.AsString(rslt.Details["reason"]); s != "callee canceled" { t.Fatal("Did not get error message from caller") } } @@ -1095,7 +1095,7 @@ func TestCallerIdentification(t *testing.T) { } // Test that invocation contains caller ID. - if wamp.OptionID(inv.Details, "caller") != callerID { + if id, _ := wamp.AsID(inv.Details["caller"]); id != callerID { fmt.Println("===> details:", inv.Details) t.Fatal("Did not get expected caller ID") } diff --git a/router/publishfilter.go b/router/publishfilter.go index 6d9eca08..50d35260 100644 --- a/router/publishfilter.go +++ b/router/publishfilter.go @@ -95,7 +95,7 @@ func (f *publishFilter) publishAllowed(sub *wamp.Session) bool { // Check blacklists to see if session has a value in any blacklist. for attr, vals := range f.blMap { // Get the session attribute value to compare with blacklist. - sessAttr := wamp.OptionString(sub.Details, attr) + sessAttr, _ := wamp.AsString(sub.Details[attr]) if sessAttr == "" { continue } @@ -124,7 +124,7 @@ func (f *publishFilter) publishAllowed(sub *wamp.Session) bool { // Check whitelists to make sure session has value in each whitelist. for attr, vals := range f.wlMap { // Get the session attribute value to compare with whitelist. - sessAttr := wamp.OptionString(sub.Details, attr) + sessAttr, _ := wamp.AsString(sub.Details[attr]) if sessAttr == "" { // Session does not have whitelisted value, so deny. return false diff --git a/router/realm.go b/router/realm.go index 8e1e741e..07630f83 100644 --- a/router/realm.go +++ b/router/realm.go @@ -272,7 +272,7 @@ func (r *realm) createMetaSession() { } // Run the handler for messages from the meta session. - go r.handleInboundMessages(r.metaSess) + go r.handleInboundMessages(r.metaSess, r.metaStop) if r.debug { r.log.Println("Started meta-session", r.metaSess) } @@ -384,7 +384,7 @@ func (r *realm) handleSession(sess *wamp.Session) error { r.log.Println("Started session", sess) } go func() { - shutdown, err := r.handleInboundMessages(sess) + shutdown, err := r.handleInboundMessages(sess, r.clientStop) if err != nil { abortMsg := wamp.Abort{ Reason: wamp.ErrProtocolViolation, @@ -402,14 +402,10 @@ func (r *realm) handleSession(sess *wamp.Session) error { // handleInboundMessages handles the messages sent from a client session to // the router. -func (r *realm) handleInboundMessages(sess *wamp.Session) (bool, error) { +func (r *realm) handleInboundMessages(sess *wamp.Session, stopChan <-chan struct{}) (bool, error) { if r.debug { defer r.log.Println("Ended session", sess) } - stopChan := r.clientStop - if sess == r.metaSess { - stopChan = r.metaStop - } recvChan := sess.Recv() for { var msg wamp.Message @@ -629,6 +625,8 @@ func (r *realm) getAuthenticator(methods []string) (auth auth.Authenticator, aut } func (r *realm) registerMetaProcedure(procedure wamp.URI, f func(*wamp.Invocation) wamp.Message) { + // Register the meta procedure. The "disclose_caller" option must be + // enabled for the testament API and the meta session API. r.metaPeer.Send(&wamp.Register{ Request: r.metaIDGen.Next(), Options: wamp.Dict{ @@ -708,7 +706,7 @@ func (r *realm) sessionCount(msg *wamp.Invocation) wamp.Message { r.actionChan <- func() { var nclients int for _, sess := range r.clients { - authrole := wamp.OptionString(sess.Details, "authrole") + authrole, _ := wamp.AsString(sess.Details["authrole"]) for j := range filter { if filter[j] == authrole { nclients++ @@ -747,7 +745,7 @@ func (r *realm) sessionList(msg *wamp.Invocation) wamp.Message { r.actionChan <- func() { var ids []wamp.ID for sessID, sess := range r.clients { - authrole := wamp.OptionString(sess.Details, "authrole") + authrole, _ := wamp.AsString(sess.Details["authrole"]) for j := range filter { if filter[j] == authrole { ids = append(ids, sessID) @@ -763,32 +761,23 @@ func (r *realm) sessionList(msg *wamp.Invocation) wamp.Message { } func (r *realm) sessionGet(msg *wamp.Invocation) wamp.Message { - makeErr := func() *wamp.Error { - return &wamp.Error{ - Type: wamp.INVOCATION, - Request: msg.Request, - Details: wamp.Dict{}, - Error: wamp.ErrNoSuchSession, - } - } - if len(msg.Arguments) == 0 { - return makeErr() + return makeError(msg.Request, wamp.ErrNoSuchSession) } - sessID, ok := wamp.AsInt64(msg.Arguments[0]) + sessID, ok := wamp.AsID(msg.Arguments[0]) if !ok { - return makeErr() + return makeError(msg.Request, wamp.ErrNoSuchSession) } retChan := make(chan *wamp.Session) r.actionChan <- func() { - sess, _ := r.clients[wamp.ID(sessID)] + sess, _ := r.clients[sessID] retChan <- sess } sess := <-retChan if sess == nil { - return makeErr() + return makeError(msg.Request, wamp.ErrNoSuchSession) } output := r.cleanSessionDetails(sess.Details) @@ -802,28 +791,20 @@ func (r *realm) sessionGet(msg *wamp.Invocation) wamp.Message { } } -// testamentFlush removes all testaments for the invoking client. -// it optionally takes a keyword argument "scope" set to "detached" or "destroyed" +// testamentFlush removes all testaments for the invoking client. It takes an +// optional keyword argument "scope" that has the value "detached" or +// "destroyed" func (r *realm) testamentFlush(msg *wamp.Invocation) wamp.Message { - makeErr := func(uri wamp.URI) *wamp.Error { - return &wamp.Error{ - Type: wamp.INVOCATION, - Request: msg.Request, - Details: wamp.Dict{}, - Error: uri, - } - } - caller, ok := wamp.AsID(msg.Details["caller"]) if !ok { - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } scope, ok := wamp.AsString(msg.ArgumentsKw["scope"]) if !ok || scope == "" { scope = "destroyed" } if scope != "destroyed" && scope != "detached" { - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } testaments, ok := r.testaments[caller] if ok { @@ -841,32 +822,23 @@ func (r *realm) testamentFlush(msg *wamp.Invocation) wamp.Message { // detached (when session resumption is implemented) or destroyed (when the // transport is lost). func (r *realm) testamentAdd(msg *wamp.Invocation) wamp.Message { - makeErr := func(uri wamp.URI) *wamp.Error { - return &wamp.Error{ - Type: wamp.INVOCATION, - Request: msg.Request, - Details: wamp.Dict{}, - Error: uri, - } - } - caller, ok := wamp.AsID(msg.Details["caller"]) if !ok || len(msg.Arguments) < 3 { - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } topic, ok := wamp.AsURI(msg.Arguments[0]) if !ok { fmt.Printf("invalid topic") - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } args, ok := wamp.AsList(msg.Arguments[1]) if !ok { fmt.Printf("invalid args") - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } kwargs, ok := wamp.AsDict(msg.Arguments[2]) if !ok { - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } options, ok := wamp.AsDict(msg.ArgumentsKw["publish_options"]) if !ok { @@ -877,10 +849,10 @@ func (r *realm) testamentAdd(msg *wamp.Invocation) wamp.Message { scope = "destroyed" } if scope != "destroyed" && scope != "detached" { - return makeErr(wamp.ErrInvalidArgument) + return makeError(msg.Request, wamp.ErrInvalidArgument) } - // a map returns the "zero value" if a key doesn't exist, so there are nils for the arrays - // which are equal to empty arrays + // a map returns the "zero value" if a key doesn't exist, so there are nils + // for the arrays which are equal to empty arrays testaments := r.testaments[caller] t := testament{ args: args, @@ -960,3 +932,13 @@ func (r *realm) cleanSessionDetails(details wamp.Dict) wamp.Dict { return clean } + +// makeError returns a wamp.Error message with the given URI. +func makeError(req wamp.ID, uri wamp.URI) *wamp.Error { + return &wamp.Error{ + Type: wamp.INVOCATION, + Request: req, + Details: wamp.Dict{}, + Error: uri, + } +} diff --git a/router/router_test.go b/router/router_test.go index 8005afbf..6caea4fe 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -173,7 +173,7 @@ func TestProtocolViolation(t *testing.T) { if abort.Reason != wamp.ErrProtocolViolation { t.Fatal("Expected reason to be", wamp.ErrProtocolViolation) } - //errMsg := wamp.OptionString(abort.Details, "error") + //errMsg, _ := wamp.AsString(abort.Details["error"]) //fmt.Println("===> Abort error:", errMsg) } @@ -197,7 +197,7 @@ func TestProtocolViolation(t *testing.T) { if abort.Reason != wamp.ErrProtocolViolation { t.Fatal("Expected reason to be", wamp.ErrProtocolViolation) } - //errMsg := wamp.OptionString(abort.Details, "error") + //errMsg, _ := wamp.AsString(abort.Details["error"]) //fmt.Println("===> Abort error:", errMsg) } } @@ -527,7 +527,7 @@ func TestSessionMetaProcedures(t *testing.T) { if !ok { t.Fatal("expected dict type arg") } - sid := wamp.ID(wamp.OptionInt64(dict, "session")) + sid, _ := wamp.AsID(dict["session"]) if sid != sessID { t.Fatal("wrong session ID") } @@ -753,11 +753,11 @@ func TestRegistrationMetaProcedures(t *testing.T) { if !ok { t.Fatal("expected wamp.Dict") } - regID = wamp.OptionID(dict, "id") + regID, _ = wamp.AsID(dict["id"]) if regID != registrationID { t.Fatal("received wrong registration") } - uri := wamp.OptionURI(dict, "uri") + uri, _ := wamp.AsURI(dict["uri"]) if uri != testProcedure { t.Fatal("registration has wrong uri:", uri) } diff --git a/wamp/convert.go b/wamp/convert.go new file mode 100644 index 00000000..fdcd8cd4 --- /dev/null +++ b/wamp/convert.go @@ -0,0 +1,149 @@ +package wamp + +import "reflect" + +// AsString is an extended type assertion for string. +func AsString(v interface{}) (string, bool) { + switch v := v.(type) { + case string: + return v, true + case []byte: + return string(v), true + case URI: + return string(v), true + } + return "", false +} + +// AsID is an extended type assertion for ID. +func AsID(v interface{}) (ID, bool) { + if i64, ok := AsInt64(v); ok { + return ID(i64), true + } + return ID(0), false +} + +// AsURI is an extended type assertion for URI. +func AsURI(v interface{}) (URI, bool) { + switch v := v.(type) { + case URI: + return v, true + case string: + return URI(v), true + case []byte: + return URI(string(v)), true + } + return URI(""), false +} + +// AsInt64 is an extended type assertion for int64. +func AsInt64(v interface{}) (int64, bool) { + switch v := v.(type) { + case int64: + return v, true + case ID: + return int64(v), true + case uint64: + return int64(v), true + case int: + return int64(v), true + case int32: + return int64(v), true + case uint: + return int64(v), true + case uint32: + return int64(v), true + case float64: + return int64(v), true + case float32: + return int64(v), true + } + return 0, false +} + +// AsFloat64 is an extended type assertion for float64. +func AsFloat64(v interface{}) (float64, bool) { + switch v := v.(type) { + case float64: + return v, true + case float32: + return float64(v), true + case int64: + return float64(v), true + case ID: + return float64(v), true + case uint64: + return float64(v), true + case int: + return float64(v), true + case int32: + return float64(v), true + case uint: + return float64(v), true + case uint32: + return float64(v), true + } + return 0.0, false +} + +// AsBool is an extended type assertion for bool. +func AsBool(v interface{}) (bool, bool) { + b, ok := v.(bool) + return b, ok +} + +// AsDict is an extended type assertion for Dict. +func AsDict(v interface{}) (Dict, bool) { + n := NormalizeDict(v) + return n, n != nil +} + +// AsList is an extended type assertion for List. +func AsList(v interface{}) (List, bool) { + switch v := v.(type) { + case List: + return v, true + case []interface{}: + return List(v), true + } + val := reflect.ValueOf(v) + if val.Kind() != reflect.Slice { + return nil, false + } + list := make(List, val.Len()) + for i := 0; i < val.Len(); i++ { + list[i] = val.Index(i).Interface() + } + return list, true +} + +// OptionString returns named value as string; empty string if missing or not +// string type. +func OptionString(opts Dict, optionName string) string { + opt, _ := AsString(opts[optionName]) + return opt +} + +// OptionURI returns named value as URI; URI("") if missing or not URI type. +func OptionURI(opts Dict, optionName string) URI { + opt, _ := AsURI(opts[optionName]) + return opt +} + +// OptionID returns named value as ID; ID(0) if missing or not ID type. +func OptionID(opts Dict, optionName string) ID { + opt, _ := AsID(opts[optionName]) + return opt +} + +// OptionInt64 returns named value as int64; 0 if missing or not integer type. +func OptionInt64(opts Dict, optionName string) int64 { + opt, _ := AsInt64(opts[optionName]) + return opt +} + +// OptionFlag returns named value as bool; false if missing or not bool type. +func OptionFlag(opts Dict, optionName string) bool { + opt, _ := AsBool(opts[optionName]) + return opt +} diff --git a/wamp/crsign/crsign.go b/wamp/crsign/crsign.go index 67468047..15439dc7 100644 --- a/wamp/crsign/crsign.go +++ b/wamp/crsign/crsign.go @@ -73,12 +73,12 @@ const ( // cli, err = client.ConnectNet(routerAddr, cfg) // func RespondChallenge(pass string, c *wamp.Challenge, h func() hash.Hash) string { - ch := wamp.OptionString(c.Extra, "challenge") + ch, _ := wamp.AsString(c.Extra["challenge"]) // If the client needed to lookup a user's key, this would require decoding // the JSON-encoded challenge string and getting the authid. For this // example assume that client only operates as one user and knows the key // to use. - saltStr := wamp.OptionString(c.Extra, "salt") + saltStr, _ := wamp.AsString(c.Extra["salt"]) // If no salt given, use raw password as key. if saltStr == "" { return SignChallenge(ch, []byte(pass)) @@ -86,8 +86,8 @@ func RespondChallenge(pass string, c *wamp.Challenge, h func() hash.Hash) string // If salting info give, then compute a derived key using PBKDF2. salt := []byte(saltStr) - iters := int(wamp.OptionInt64(c.Extra, "iterations")) - keylen := int(wamp.OptionInt64(c.Extra, "keylen")) + iters, _ := wamp.AsInt64(c.Extra["iterations"]) + keylen, _ := wamp.AsInt64(c.Extra["keylen"]) if iters == 0 { iters = defaultIters @@ -99,7 +99,7 @@ func RespondChallenge(pass string, c *wamp.Challenge, h func() hash.Hash) string h = sha256.New } // Compute derived key. - dk := pbkdf2.Key([]byte(pass), salt, iters, keylen, h) + dk := pbkdf2.Key([]byte(pass), salt, int(iters), int(keylen), h) // Sign challenge using derived key. return SignChallenge(ch, dk) diff --git a/wamp/dict.go b/wamp/dict.go index 9695c522..214a42f1 100644 --- a/wamp/dict.go +++ b/wamp/dict.go @@ -114,159 +114,6 @@ func DictFlag(dict Dict, path []string) (bool, error) { return b, nil } -func AsString(v interface{}) (string, bool) { - switch v := v.(type) { - case string: - return v, true - case []byte: - return string(v), true - case URI: - return string(v), true - } - return "", false -} - -func AsID(v interface{}) (ID, bool) { - if i64, ok := AsInt64(v); ok { - return ID(i64), true - } - return ID(0), false -} - -func AsURI(v interface{}) (URI, bool) { - switch v := v.(type) { - case URI: - return v, true - case string: - return URI(v), true - case []byte: - return URI(string(v)), true - } - return URI(""), false -} - -func AsInt64(v interface{}) (int64, bool) { - switch v := v.(type) { - case int64: - return v, true - case ID: - return int64(v), true - case uint64: - return int64(v), true - case int: - return int64(v), true - case int32: - return int64(v), true - case uint: - return int64(v), true - case uint32: - return int64(v), true - case float64: - return int64(v), true - case float32: - return int64(v), true - } - return 0, false -} - -func AsFloat64(v interface{}) (float64, bool) { - switch v := v.(type) { - case float64: - return v, true - case float32: - return float64(v), true - case int64: - return float64(v), true - case ID: - return float64(v), true - case uint64: - return float64(v), true - case int: - return float64(v), true - case int32: - return float64(v), true - case uint: - return float64(v), true - case uint32: - return float64(v), true - } - return 0.0, false -} -func AsDict(v interface{}) (Dict, bool) { - n := NormalizeDict(v) - return n, n != nil -} - -func AsList(v interface{}) (List, bool) { - switch v := v.(type) { - case List: - return v, true - case []interface{}: - return List(v), true - } - val := reflect.ValueOf(v) - if val.Kind() != reflect.Slice { - return nil, false - } - list := make(List, val.Len()) - for i := 0; i < val.Len(); i++ { - list[i] = val.Index(i).Interface() - } - return list, true -} - -// OptionString returns the string value of the option with the specified name. -// If the option is not present or is not a string type, an empty string is -// returned. -func OptionString(opts Dict, optionName string) string { - var opt string - if _opt, ok := opts[optionName]; ok && _opt != nil { - opt, _ = AsString(_opt) - } - return opt -} - -// OptionURI returns the URI value of the option with the specified name. -// If the option is not present, an empty URI is returned. -func OptionURI(opts Dict, optionName string) URI { - var opt URI - if _opt, ok := opts[optionName]; ok && _opt != nil { - opt, _ = AsURI(_opt) - } - return opt -} - -// OptionID returns the ID value of the option with the specified name. -// If the option is not present, an ID 0 is returned. -func OptionID(opts Dict, optionName string) ID { - var opt ID - if _opt, ok := opts[optionName]; ok && _opt != nil { - opt, _ = AsID(_opt) - } - return opt -} - -// OptionInt64 returns the int64 value of the option with the specified name. -// If the option is not present, a value of 0 is returned. -func OptionInt64(opts Dict, optionName string) int64 { - if opt, ok := opts[optionName]; ok && opt != nil { - if i64, ok := AsInt64(opt); ok { - return i64 - } - } - return 0 -} - -// OptionString returns the boolean value of the option with the specified -// name. If the option is not present, false is returned. -func OptionFlag(opts Dict, optionName string) bool { - var opt bool - if _opt, ok := opts[optionName]; ok && _opt != nil { - opt, _ = _opt.(bool) - } - return opt -} - // SetOption sets a single option name-value pair in message options dict. func SetOption(dict Dict, name string, value interface{}) Dict { if dict == nil { diff --git a/wamp/identifier.go b/wamp/identifier.go index cd01fe3b..36774a3d 100644 --- a/wamp/identifier.go +++ b/wamp/identifier.go @@ -31,8 +31,8 @@ var ( strictURIEmpty = regexp.MustCompile(`^(([0-9a-z_]+\.)|\.)*([0-9a-z_]+)?$`) ) -// ValidURI returs true if the URI complies with formatting rules determined by -// the strict flag and match type. +// ValidURI returns true if the URI complies with formatting rules determined +// by the strict flag and match type. func (u URI) ValidURI(strict bool, match string) bool { if strict { if match == MatchWildcard {