From c661bc4693aa91a9321d691518415e19cb47b452 Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Fri, 13 Dec 2019 16:53:20 +0000 Subject: [PATCH] Authoritative match dispatcher is no longer usable after match stops. --- CHANGELOG.md | 3 ++ server/match_handler.go | 59 ++++++++++++++++++++++---------- server/match_registry.go | 11 +++--- server/runtime.go | 3 +- server/runtime_go.go | 5 +-- server/runtime_go_match_core.go | 38 +++++++++++++++----- server/runtime_lua.go | 6 ++-- server/runtime_lua_match_core.go | 39 ++++++++++++++++----- tests/util.go | 4 +++ 9 files changed, 120 insertions(+), 48 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38fb7596e6..7360d78b15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,9 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Update IAP validation example for Android Publisher v3 API. - Relayed multiplayer matches allow echoing messages back to sender if they're in the filter list. - Upgrade Facebook authentication to use version 5.0 of the Facebook Graph API. +- Upgrade devconsole serialize-javascript (2.1.1) dependency. +- Ensure authoritative match dispatcher is no longer usable after match stops. +- Deferred message broadcasts now process just before match ends if match handler functions return an error. ### Fixed - Correctly read pagination cursor in notification listings. diff --git a/server/match_handler.go b/server/match_handler.go index 670d51ec0d..573c055e66 100644 --- a/server/match_handler.go +++ b/server/match_handler.go @@ -105,7 +105,7 @@ type MatchHandler struct { state interface{} } -func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, params map[string]interface{}) (*MatchHandler, error) { +func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) { presenceList := NewMatchPresenceList() deferredCh := make(chan *DeferredMessage, config.GetMatch().DeferredQueueSize) @@ -155,7 +155,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis joinAttemptCh: make(chan func(mh *MatchHandler), config.GetMatch().JoinAttemptQueueSize), deferredCh: deferredCh, stopCh: make(chan struct{}), - stopped: atomic.NewBool(false), + stopped: stopped, Rate: int64(rateInt), @@ -203,6 +203,10 @@ func (mh *MatchHandler) Close() { if !mh.stopped.CAS(false, true) { return } + + // Ensure any remaining deferred broadcasts are sent. + mh.processDeferred() + mh.core.Cancel() close(mh.stopCh) mh.ticker.Stop() @@ -256,20 +260,11 @@ func loop(mh *MatchHandler) { return } - // Broadcast any deferred messages before checking for nil state, to make sure any final messages are sent. - deferredCount := len(mh.deferredCh) - if deferredCount != 0 { - deferredMessages := make([]*DeferredMessage, deferredCount) - for i := 0; i < deferredCount; i++ { - msg := <-mh.deferredCh - deferredMessages[i] = msg - } - - mh.router.SendDeferred(mh.logger, deferredMessages) - } - // Check if we need to stop the match. - if state == nil { + if state != nil { + // Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle. + mh.processDeferred() + } else { mh.Stop() mh.logger.Info("Match loop returned nil or no state, stopping match") return @@ -288,6 +283,19 @@ func loop(mh *MatchHandler) { mh.tick++ } +func (mh *MatchHandler) processDeferred() { + deferredCount := len(mh.deferredCh) + if deferredCount != 0 { + deferredMessages := make([]*DeferredMessage, deferredCount) + for i := 0; i < deferredCount; i++ { + msg := <-mh.deferredCh + deferredMessages[i] = msg + } + + mh.router.SendDeferred(mh.logger, deferredMessages) + } +} + func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool { if mh.stopped.Load() { return false @@ -315,7 +323,11 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M resultCh <- &MatchJoinResult{Allow: false} return } - if state == nil { + + if state != nil { + // Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle. + mh.processDeferred() + } else { mh.Stop() mh.logger.Info("Match join attempt returned nil or no state, stopping match") resultCh <- &MatchJoinResult{Allow: false} @@ -369,7 +381,10 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence, mark bool) bool { mh.logger.Warn("Stopping match after error from match_join execution", zap.Int64("tick", mh.tick), zap.Error(err)) return } - if state == nil { + if state != nil { + // Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle. + mh.processDeferred() + } else { mh.Stop() mh.logger.Info("Match join returned nil or no state, stopping match") return @@ -404,7 +419,10 @@ func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool { mh.logger.Warn("Stopping match after error from match_leave execution", zap.Int("tick", int(mh.tick)), zap.Error(err)) return } - if state == nil { + if state != nil { + // Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle. + mh.processDeferred() + } else { mh.Stop() mh.logger.Info("Match leave returned nil or no state, stopping match") return @@ -433,7 +451,10 @@ func (mh *MatchHandler) QueueTerminate(graceSeconds int) bool { mh.logger.Warn("Stopping match after error from match_terminate execution", zap.Int("tick", int(mh.tick)), zap.Error(err)) return } - if state == nil { + if state != nil { + // Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle. + mh.processDeferred() + } else { mh.Stop() mh.logger.Info("Match terminate returned nil or no state, stopping match") return diff --git a/server/match_registry.go b/server/match_registry.go index 7ee43446b6..f34ca186b3 100644 --- a/server/match_registry.go +++ b/server/match_registry.go @@ -60,7 +60,7 @@ type MatchRegistry interface { // Create and start a new match, given a Lua module name or registered Go match function. CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) // Register and initialise a match that's ready to run. - NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error) + NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) // Return a match handler by ID, only from the local node. GetMatch(id uuid.UUID) *MatchHandler // Remove a tracked match and ensure all its presences are cleaned up. @@ -136,8 +136,9 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tra func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) { id := uuid.Must(uuid.NewV4()) matchLogger := logger.With(zap.String("mid", id.String())) + stopped := atomic.NewBool(false) - core, err := createFn(ctx, matchLogger, id, r.node, module) + core, err := createFn(ctx, matchLogger, id, r.node, stopped, module) if err != nil { return "", err } @@ -146,7 +147,7 @@ func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger } // Start the match. - mh, err := r.NewMatch(matchLogger, id, core, params) + mh, err := r.NewMatch(matchLogger, id, core, stopped, params) if err != nil { return "", fmt.Errorf("error creating match: %v", err.Error()) } @@ -154,13 +155,13 @@ func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger return mh.IDStr, nil } -func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error) { +func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) { if r.stopped.Load() { // Server is shutting down, reject new matches. return nil, errors.New("shutdown in progress") } - match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, params) + match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, stopped, params) if err != nil { return nil, err } diff --git a/server/runtime.go b/server/runtime.go index 028002a81b..63de9496e0 100644 --- a/server/runtime.go +++ b/server/runtime.go @@ -17,6 +17,7 @@ package server import ( "context" "database/sql" + "go.uber.org/atomic" "os" "path/filepath" "strings" @@ -167,7 +168,7 @@ type ( RuntimeMatchmakerMatchedFunction func(ctx context.Context, entries []*MatchmakerEntry) (string, bool, error) - RuntimeMatchCreateFunction func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) + RuntimeMatchCreateFunction func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) RuntimeMatchDeferMessageFunction func(msg *DeferredMessage) error RuntimeTournamentEndFunction func(ctx context.Context, tournament *api.Tournament, end, reset int64) error diff --git a/server/runtime_go.go b/server/runtime_go.go index 97703dc085..f156649d1d 100644 --- a/server/runtime_go.go +++ b/server/runtime_go.go @@ -19,6 +19,7 @@ import ( "database/sql" "errors" "github.com/golang/protobuf/jsonpb" + "go.uber.org/atomic" "path/filepath" "plugin" "strings" @@ -1820,7 +1821,7 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbM match := make(map[string]func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error), 0) matchLock := &sync.RWMutex{} - matchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) { + matchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) { matchLock.RLock() fn, ok := match[name] matchLock.RUnlock() @@ -1835,7 +1836,7 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbM return nil, err } - return NewRuntimeGoMatchCore(logger, matchRegistry, router, id, node, db, env, nk, match) + return NewRuntimeGoMatchCore(logger, matchRegistry, router, id, node, stopped, db, env, nk, match) } nk.SetMatchCreateFn(matchCreateFn) matchNamesListFn := func() []string { diff --git a/server/runtime_go_match_core.go b/server/runtime_go_match_core.go index 3a7a537b5f..8eae726485 100644 --- a/server/runtime_go_match_core.go +++ b/server/runtime_go_match_core.go @@ -27,6 +27,8 @@ import ( "go.uber.org/zap" ) +var ErrMatchStopped = errors.New("match stopped") + type RuntimeGoMatchCore struct { logger *zap.Logger matchRegistry MatchRegistry @@ -37,11 +39,12 @@ type RuntimeGoMatchCore struct { match runtime.Match - id uuid.UUID - node string - idStr string - stream PresenceStream - label *atomic.String + id uuid.UUID + node string + stopped *atomic.Bool + idStr string + stream PresenceStream + label *atomic.String runtimeLogger runtime.Logger db *sql.DB @@ -51,7 +54,7 @@ type RuntimeGoMatchCore struct { ctxCancelFn context.CancelFunc } -func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, router MessageRouter, id uuid.UUID, node string, db *sql.DB, env map[string]string, nk runtime.NakamaModule, match runtime.Match) (RuntimeMatchCore, error) { +func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, router MessageRouter, id uuid.UUID, node string, stopped *atomic.Bool, db *sql.DB, env map[string]string, nk runtime.NakamaModule, match runtime.Match) (RuntimeMatchCore, error) { ctx, ctxCancelFn := context.WithCancel(context.Background()) ctx = NewRuntimeGoContext(ctx, env, RuntimeExecutionModeMatch, nil, 0, "", "", nil, "", "", "") ctx = context.WithValue(ctx, runtime.RUNTIME_CTX_MATCH_ID, fmt.Sprintf("%v.%v", id.String(), node)) @@ -67,9 +70,10 @@ func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, rout match: match, - id: id, - node: node, - idStr: fmt.Sprintf("%v.%v", id.String(), node), + id: id, + node: node, + stopped: stopped, + idStr: fmt.Sprintf("%v.%v", id.String(), node), stream: PresenceStream{ Mode: StreamModeMatchAuthoritative, Subject: id, @@ -169,6 +173,10 @@ func (r *RuntimeGoMatchCore) Cancel() { } func (r *RuntimeGoMatchCore) BroadcastMessage(opCode int64, data []byte, presences []runtime.Presence, sender runtime.Presence, reliable bool) error { + if r.stopped.Load() { + return ErrMatchStopped + } + presenceIDs, msg, err := r.validateBroadcast(opCode, data, presences, sender, reliable) if err != nil { return err @@ -183,6 +191,10 @@ func (r *RuntimeGoMatchCore) BroadcastMessage(opCode int64, data []byte, presenc } func (r *RuntimeGoMatchCore) BroadcastMessageDeferred(opCode int64, data []byte, presences []runtime.Presence, sender runtime.Presence, reliable bool) error { + if r.stopped.Load() { + return ErrMatchStopped + } + presenceIDs, msg, err := r.validateBroadcast(opCode, data, presences, sender, reliable) if err != nil { return err @@ -298,6 +310,10 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen } func (r *RuntimeGoMatchCore) MatchKick(presences []runtime.Presence) error { + if r.stopped.Load() { + return ErrMatchStopped + } + size := len(presences) if size == 0 { return nil @@ -328,6 +344,10 @@ func (r *RuntimeGoMatchCore) MatchKick(presences []runtime.Presence) error { } func (r *RuntimeGoMatchCore) MatchLabelUpdate(label string) error { + if r.stopped.Load() { + return ErrMatchStopped + } + if err := r.matchRegistry.UpdateMatchLabel(r.id, label); err != nil { return fmt.Errorf("error updating match label: %v", err.Error()) } diff --git a/server/runtime_lua.go b/server/runtime_lua.go index 251239e949..ca78c7ccbd 100644 --- a/server/runtime_lua.go +++ b/server/runtime_lua.go @@ -130,15 +130,15 @@ func NewRuntimeProviderLua(logger, startupLogger *zap.Logger, db *sql.DB, jsonpb var tournamentResetFunction RuntimeTournamentResetFunction var leaderboardResetFunction RuntimeLeaderboardResetFunction - allMatchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) { - core, err := goMatchCreateFn(ctx, logger, id, node, name) + allMatchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) { + core, err := goMatchCreateFn(ctx, logger, id, node, stopped, name) if err != nil { return nil, err } if core != nil { return core, nil } - return NewRuntimeLuaMatchCore(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, stdLibs, once, localCache, goMatchCreateFn, id, node, name) + return NewRuntimeLuaMatchCore(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, stdLibs, once, localCache, goMatchCreateFn, id, node, stopped, name) } runtimeProviderLua := &RuntimeProviderLua{ diff --git a/server/runtime_lua_match_core.go b/server/runtime_lua_match_core.go index 2ba8c04651..b7ec501d4a 100644 --- a/server/runtime_lua_match_core.go +++ b/server/runtime_lua_match_core.go @@ -38,11 +38,12 @@ type RuntimeLuaMatchCore struct { deferMessageFn RuntimeMatchDeferMessageFunction presenceList *MatchPresenceList - id uuid.UUID - node string - idStr string - stream PresenceStream - label *atomic.String + id uuid.UUID + node string + stopped *atomic.Bool + idStr string + stream PresenceStream + label *atomic.String vm *lua.LState initFn lua.LValue @@ -57,7 +58,7 @@ type RuntimeLuaMatchCore struct { ctxCancelFn context.CancelFunc } -func NewRuntimeLuaMatchCore(logger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, rankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, streamManager StreamManager, router MessageRouter, stdLibs map[string]lua.LGFunction, once *sync.Once, localCache *RuntimeLuaLocalCache, goMatchCreateFn RuntimeMatchCreateFunction, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) { +func NewRuntimeLuaMatchCore(logger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, rankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, tracker Tracker, streamManager StreamManager, router MessageRouter, stdLibs map[string]lua.LGFunction, once *sync.Once, localCache *RuntimeLuaLocalCache, goMatchCreateFn RuntimeMatchCreateFunction, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) { // Set up the Lua VM that will handle this match. vm := lua.NewState(lua.Options{ CallStackSize: config.GetRuntime().CallStackSize, @@ -73,15 +74,15 @@ func NewRuntimeLuaMatchCore(logger *zap.Logger, db *sql.DB, jsonpbMarshaler *jso vm.Call(1, 0) } - allMatchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) { - core, err := goMatchCreateFn(ctx, logger, id, node, name) + allMatchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) { + core, err := goMatchCreateFn(ctx, logger, id, node, stopped, name) if err != nil { return nil, err } if core != nil { return core, nil } - return NewRuntimeLuaMatchCore(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, rankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, stdLibs, once, localCache, goMatchCreateFn, id, node, name) + return NewRuntimeLuaMatchCore(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, rankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, stdLibs, once, localCache, goMatchCreateFn, id, node, stopped, name) } nakamaModule := NewRuntimeLuaNakamaModule(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, rankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, once, localCache, allMatchCreateFn, nil, nil) @@ -509,6 +510,11 @@ func (r *RuntimeLuaMatchCore) Cancel() { } func (r *RuntimeLuaMatchCore) broadcastMessage(l *lua.LState) int { + if r.stopped.Load() { + l.RaiseError("match stopped") + return 0 + } + presenceIDs, msg, reliable := r.validateBroadcast(l) if len(presenceIDs) != 0 { r.router.SendToPresenceIDs(r.logger, presenceIDs, msg, reliable) @@ -518,6 +524,11 @@ func (r *RuntimeLuaMatchCore) broadcastMessage(l *lua.LState) int { } func (r *RuntimeLuaMatchCore) broadcastMessageDeferred(l *lua.LState) int { + if r.stopped.Load() { + l.RaiseError("match stopped") + return 0 + } + presenceIDs, msg, reliable := r.validateBroadcast(l) if len(presenceIDs) != 0 { if err := r.deferMessageFn(&DeferredMessage{ @@ -721,6 +732,11 @@ func (r *RuntimeLuaMatchCore) validateBroadcast(l *lua.LState) ([]*PresenceID, * } func (r *RuntimeLuaMatchCore) matchKick(l *lua.LState) int { + if r.stopped.Load() { + l.RaiseError("match stopped") + return 0 + } + input := l.OptTable(1, nil) if input == nil { return 0 @@ -787,6 +803,11 @@ func (r *RuntimeLuaMatchCore) matchKick(l *lua.LState) int { } func (r *RuntimeLuaMatchCore) matchLabelUpdate(l *lua.LState) int { + if r.stopped.Load() { + l.RaiseError("match stopped") + return 0 + } + input := l.OptString(1, "") if err := r.matchRegistry.UpdateMatchLabel(r.id, input); err != nil { diff --git a/tests/util.go b/tests/util.go index 27d9308a80..d87e590626 100644 --- a/tests/util.go +++ b/tests/util.go @@ -50,6 +50,10 @@ var ( } ) +func init() { + _ = server.CheckConfig(logger, config) +} + type DummyMessageRouter struct{} func (d *DummyMessageRouter) SendDeferred(*zap.Logger, []*server.DeferredMessage) {