Skip to content

Commit

Permalink
Authoritative match dispatcher is no longer usable after match stops.
Browse files Browse the repository at this point in the history
  • Loading branch information
zyro committed Dec 13, 2019
1 parent 90e0923 commit c661bc4
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 48 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Update IAP validation example for Android Publisher v3 API.
- Relayed multiplayer matches allow echoing messages back to sender if they're in the filter list.
- Upgrade Facebook authentication to use version 5.0 of the Facebook Graph API.
- Upgrade devconsole serialize-javascript (2.1.1) dependency.
- Ensure authoritative match dispatcher is no longer usable after match stops.
- Deferred message broadcasts now process just before match ends if match handler functions return an error.

### Fixed
- Correctly read pagination cursor in notification listings.
Expand Down
59 changes: 40 additions & 19 deletions server/match_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type MatchHandler struct {
state interface{}
}

func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, params map[string]interface{}) (*MatchHandler, error) {
func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) {
presenceList := NewMatchPresenceList()

deferredCh := make(chan *DeferredMessage, config.GetMatch().DeferredQueueSize)
Expand Down Expand Up @@ -155,7 +155,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
joinAttemptCh: make(chan func(mh *MatchHandler), config.GetMatch().JoinAttemptQueueSize),
deferredCh: deferredCh,
stopCh: make(chan struct{}),
stopped: atomic.NewBool(false),
stopped: stopped,

Rate: int64(rateInt),

Expand Down Expand Up @@ -203,6 +203,10 @@ func (mh *MatchHandler) Close() {
if !mh.stopped.CAS(false, true) {
return
}

// Ensure any remaining deferred broadcasts are sent.
mh.processDeferred()

mh.core.Cancel()
close(mh.stopCh)
mh.ticker.Stop()
Expand Down Expand Up @@ -256,20 +260,11 @@ func loop(mh *MatchHandler) {
return
}

// Broadcast any deferred messages before checking for nil state, to make sure any final messages are sent.
deferredCount := len(mh.deferredCh)
if deferredCount != 0 {
deferredMessages := make([]*DeferredMessage, deferredCount)
for i := 0; i < deferredCount; i++ {
msg := <-mh.deferredCh
deferredMessages[i] = msg
}

mh.router.SendDeferred(mh.logger, deferredMessages)
}

// Check if we need to stop the match.
if state == nil {
if state != nil {
// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
mh.processDeferred()
} else {
mh.Stop()
mh.logger.Info("Match loop returned nil or no state, stopping match")
return
Expand All @@ -288,6 +283,19 @@ func loop(mh *MatchHandler) {
mh.tick++
}

func (mh *MatchHandler) processDeferred() {
deferredCount := len(mh.deferredCh)
if deferredCount != 0 {
deferredMessages := make([]*DeferredMessage, deferredCount)
for i := 0; i < deferredCount; i++ {
msg := <-mh.deferredCh
deferredMessages[i] = msg
}

mh.router.SendDeferred(mh.logger, deferredMessages)
}
}

func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *MatchJoinResult, userID, sessionID uuid.UUID, username, node string, metadata map[string]string) bool {
if mh.stopped.Load() {
return false
Expand Down Expand Up @@ -315,7 +323,11 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M
resultCh <- &MatchJoinResult{Allow: false}
return
}
if state == nil {

if state != nil {
// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
mh.processDeferred()
} else {
mh.Stop()
mh.logger.Info("Match join attempt returned nil or no state, stopping match")
resultCh <- &MatchJoinResult{Allow: false}
Expand Down Expand Up @@ -369,7 +381,10 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence, mark bool) bool {
mh.logger.Warn("Stopping match after error from match_join execution", zap.Int64("tick", mh.tick), zap.Error(err))
return
}
if state == nil {
if state != nil {
// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
mh.processDeferred()
} else {
mh.Stop()
mh.logger.Info("Match join returned nil or no state, stopping match")
return
Expand Down Expand Up @@ -404,7 +419,10 @@ func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool {
mh.logger.Warn("Stopping match after error from match_leave execution", zap.Int("tick", int(mh.tick)), zap.Error(err))
return
}
if state == nil {
if state != nil {
// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
mh.processDeferred()
} else {
mh.Stop()
mh.logger.Info("Match leave returned nil or no state, stopping match")
return
Expand Down Expand Up @@ -433,7 +451,10 @@ func (mh *MatchHandler) QueueTerminate(graceSeconds int) bool {
mh.logger.Warn("Stopping match after error from match_terminate execution", zap.Int("tick", int(mh.tick)), zap.Error(err))
return
}
if state == nil {
if state != nil {
// Broadcast any deferred messages. If match will be stopped broadcasting will be handled as part of the match end cycle.
mh.processDeferred()
} else {
mh.Stop()
mh.logger.Info("Match terminate returned nil or no state, stopping match")
return
Expand Down
11 changes: 6 additions & 5 deletions server/match_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type MatchRegistry interface {
// Create and start a new match, given a Lua module name or registered Go match function.
CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error)
// Register and initialise a match that's ready to run.
NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error)
NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error)
// Return a match handler by ID, only from the local node.
GetMatch(id uuid.UUID) *MatchHandler
// Remove a tracked match and ensure all its presences are cleaned up.
Expand Down Expand Up @@ -136,8 +136,9 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tra
func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) {
id := uuid.Must(uuid.NewV4())
matchLogger := logger.With(zap.String("mid", id.String()))
stopped := atomic.NewBool(false)

core, err := createFn(ctx, matchLogger, id, r.node, module)
core, err := createFn(ctx, matchLogger, id, r.node, stopped, module)
if err != nil {
return "", err
}
Expand All @@ -146,21 +147,21 @@ func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger
}

// Start the match.
mh, err := r.NewMatch(matchLogger, id, core, params)
mh, err := r.NewMatch(matchLogger, id, core, stopped, params)
if err != nil {
return "", fmt.Errorf("error creating match: %v", err.Error())
}

return mh.IDStr, nil
}

func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error) {
func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core RuntimeMatchCore, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) {
if r.stopped.Load() {
// Server is shutting down, reject new matches.
return nil, errors.New("shutdown in progress")
}

match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, params)
match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, stopped, params)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion server/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"database/sql"
"go.uber.org/atomic"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -167,7 +168,7 @@ type (

RuntimeMatchmakerMatchedFunction func(ctx context.Context, entries []*MatchmakerEntry) (string, bool, error)

RuntimeMatchCreateFunction func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error)
RuntimeMatchCreateFunction func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error)
RuntimeMatchDeferMessageFunction func(msg *DeferredMessage) error

RuntimeTournamentEndFunction func(ctx context.Context, tournament *api.Tournament, end, reset int64) error
Expand Down
5 changes: 3 additions & 2 deletions server/runtime_go.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"database/sql"
"errors"
"github.com/golang/protobuf/jsonpb"
"go.uber.org/atomic"
"path/filepath"
"plugin"
"strings"
Expand Down Expand Up @@ -1820,7 +1821,7 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbM

match := make(map[string]func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule) (runtime.Match, error), 0)
matchLock := &sync.RWMutex{}
matchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) {
matchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) {
matchLock.RLock()
fn, ok := match[name]
matchLock.RUnlock()
Expand All @@ -1835,7 +1836,7 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, jsonpbM
return nil, err
}

return NewRuntimeGoMatchCore(logger, matchRegistry, router, id, node, db, env, nk, match)
return NewRuntimeGoMatchCore(logger, matchRegistry, router, id, node, stopped, db, env, nk, match)
}
nk.SetMatchCreateFn(matchCreateFn)
matchNamesListFn := func() []string {
Expand Down
38 changes: 29 additions & 9 deletions server/runtime_go_match_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"go.uber.org/zap"
)

var ErrMatchStopped = errors.New("match stopped")

type RuntimeGoMatchCore struct {
logger *zap.Logger
matchRegistry MatchRegistry
Expand All @@ -37,11 +39,12 @@ type RuntimeGoMatchCore struct {

match runtime.Match

id uuid.UUID
node string
idStr string
stream PresenceStream
label *atomic.String
id uuid.UUID
node string
stopped *atomic.Bool
idStr string
stream PresenceStream
label *atomic.String

runtimeLogger runtime.Logger
db *sql.DB
Expand All @@ -51,7 +54,7 @@ type RuntimeGoMatchCore struct {
ctxCancelFn context.CancelFunc
}

func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, router MessageRouter, id uuid.UUID, node string, db *sql.DB, env map[string]string, nk runtime.NakamaModule, match runtime.Match) (RuntimeMatchCore, error) {
func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, router MessageRouter, id uuid.UUID, node string, stopped *atomic.Bool, db *sql.DB, env map[string]string, nk runtime.NakamaModule, match runtime.Match) (RuntimeMatchCore, error) {
ctx, ctxCancelFn := context.WithCancel(context.Background())
ctx = NewRuntimeGoContext(ctx, env, RuntimeExecutionModeMatch, nil, 0, "", "", nil, "", "", "")
ctx = context.WithValue(ctx, runtime.RUNTIME_CTX_MATCH_ID, fmt.Sprintf("%v.%v", id.String(), node))
Expand All @@ -67,9 +70,10 @@ func NewRuntimeGoMatchCore(logger *zap.Logger, matchRegistry MatchRegistry, rout

match: match,

id: id,
node: node,
idStr: fmt.Sprintf("%v.%v", id.String(), node),
id: id,
node: node,
stopped: stopped,
idStr: fmt.Sprintf("%v.%v", id.String(), node),
stream: PresenceStream{
Mode: StreamModeMatchAuthoritative,
Subject: id,
Expand Down Expand Up @@ -169,6 +173,10 @@ func (r *RuntimeGoMatchCore) Cancel() {
}

func (r *RuntimeGoMatchCore) BroadcastMessage(opCode int64, data []byte, presences []runtime.Presence, sender runtime.Presence, reliable bool) error {
if r.stopped.Load() {
return ErrMatchStopped
}

presenceIDs, msg, err := r.validateBroadcast(opCode, data, presences, sender, reliable)
if err != nil {
return err
Expand All @@ -183,6 +191,10 @@ func (r *RuntimeGoMatchCore) BroadcastMessage(opCode int64, data []byte, presenc
}

func (r *RuntimeGoMatchCore) BroadcastMessageDeferred(opCode int64, data []byte, presences []runtime.Presence, sender runtime.Presence, reliable bool) error {
if r.stopped.Load() {
return ErrMatchStopped
}

presenceIDs, msg, err := r.validateBroadcast(opCode, data, presences, sender, reliable)
if err != nil {
return err
Expand Down Expand Up @@ -298,6 +310,10 @@ func (r *RuntimeGoMatchCore) validateBroadcast(opCode int64, data []byte, presen
}

func (r *RuntimeGoMatchCore) MatchKick(presences []runtime.Presence) error {
if r.stopped.Load() {
return ErrMatchStopped
}

size := len(presences)
if size == 0 {
return nil
Expand Down Expand Up @@ -328,6 +344,10 @@ func (r *RuntimeGoMatchCore) MatchKick(presences []runtime.Presence) error {
}

func (r *RuntimeGoMatchCore) MatchLabelUpdate(label string) error {
if r.stopped.Load() {
return ErrMatchStopped
}

if err := r.matchRegistry.UpdateMatchLabel(r.id, label); err != nil {
return fmt.Errorf("error updating match label: %v", err.Error())
}
Expand Down
6 changes: 3 additions & 3 deletions server/runtime_lua.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ func NewRuntimeProviderLua(logger, startupLogger *zap.Logger, db *sql.DB, jsonpb
var tournamentResetFunction RuntimeTournamentResetFunction
var leaderboardResetFunction RuntimeLeaderboardResetFunction

allMatchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, name string) (RuntimeMatchCore, error) {
core, err := goMatchCreateFn(ctx, logger, id, node, name)
allMatchCreateFn := func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) {
core, err := goMatchCreateFn(ctx, logger, id, node, stopped, name)
if err != nil {
return nil, err
}
if core != nil {
return core, nil
}
return NewRuntimeLuaMatchCore(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, stdLibs, once, localCache, goMatchCreateFn, id, node, name)
return NewRuntimeLuaMatchCore(logger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, matchRegistry, tracker, streamManager, router, stdLibs, once, localCache, goMatchCreateFn, id, node, stopped, name)
}

runtimeProviderLua := &RuntimeProviderLua{
Expand Down
Loading

0 comments on commit c661bc4

Please sign in to comment.