Skip to content

Commit

Permalink
Removed potential deadlock in registering a new chain
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jun 8, 2020
1 parent bc31669 commit 367b5c3
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 64 deletions.
3 changes: 3 additions & 0 deletions snow/networking/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids

// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getAncestorsMsg,
validatorID: validatorID,
Expand All @@ -268,6 +269,7 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids

// MultiPut passes a MultiPut message received from the network to the consensus engine.
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: multiPutMsg,
validatorID: validatorID,
Expand All @@ -288,6 +290,7 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {

// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
h.msgs <- message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
Expand Down
154 changes: 90 additions & 64 deletions snow/networking/router/subnet_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,35 +64,40 @@ func (sr *ChainRouter) AddChain(chain *Handler) {
// RemoveChain removes the specified chain so that incoming
// messages can't be routed to it
func (sr *ChainRouter) RemoveChain(chainID ids.ID) {
sr.lock.Lock()
defer sr.lock.Unlock()
sr.lock.RLock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
chain.Shutdown()
close(chain.msgs)
if !exists {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
return
}

ticker := time.NewTicker(sr.closeTimeout)
select {
case _, _ = <-chain.closed:
case <-ticker.C:
chain.Context().Log.Warn("timed out while shutting down")
}
ticker.Stop()
chain.Shutdown()
close(chain.msgs)

delete(sr.chains, chainID.Key())
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
ticker := time.NewTicker(sr.closeTimeout)
select {
case _, _ = <-chain.closed:
case <-ticker.C:
chain.Context().Log.Warn("timed out while shutting down")
}
ticker.Stop()

sr.lock.Lock()
delete(sr.chains, chainID.Key())
sr.lock.Unlock()
}

// GetAcceptedFrontier routes an incoming GetAcceptedFrontier request from the
// validator with ID [validatorID] to the consensus engine working on the
// chain with ID [chainID]
func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.GetAcceptedFrontier(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -104,10 +109,11 @@ func (sr *ChainRouter) GetAcceptedFrontier(validatorID ids.ShortID, chainID ids.
// chain with ID [chainID]
func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.AcceptedFrontier(validatorID, requestID, containerIDs)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -119,10 +125,11 @@ func (sr *ChainRouter) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID,
// working on the chain with ID [chainID]
func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.GetAcceptedFrontierFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -134,9 +141,10 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI
// chain with ID [chainID]
func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
sr.lock.RLock()
defer sr.lock.RUnlock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.GetAccepted(validatorID, requestID, containerIDs)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -148,10 +156,11 @@ func (sr *ChainRouter) GetAccepted(validatorID ids.ShortID, chainID ids.ID, requ
// [chainID]
func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.Accepted(validatorID, requestID, containerIDs)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -163,10 +172,11 @@ func (sr *ChainRouter) Accepted(validatorID ids.ShortID, chainID ids.ID, request
// chain with ID [chainID]
func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.GetAcceptedFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -177,9 +187,10 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
sr.lock.RLock()
defer sr.lock.RUnlock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.Get(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -191,9 +202,10 @@ func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID ui
// The maximum number of ancestors to respond with is define in snow/engine/commong/bootstrapper.go
func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
sr.lock.RLock()
defer sr.lock.RUnlock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.GetAncestors(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -204,12 +216,13 @@ func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, req
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

// This message came in response to a Get message from this node, and when we sent that Get
// message we set a timeout. Since we got a response, cancel the timeout.
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.Put(validatorID, requestID, containerID, container)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -220,12 +233,13 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

// This message came in response to a GetAncestors message from this node, and when we sent that
// message we set a timeout. Since we got a response, cancel the timeout.
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.MultiPut(validatorID, requestID, containers)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -236,10 +250,11 @@ func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, request
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.GetFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -250,10 +265,11 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.GetAncestorsFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -264,9 +280,10 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
sr.lock.RLock()
defer sr.lock.RUnlock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.PushQuery(validatorID, requestID, containerID, container)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -277,9 +294,10 @@ func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, reques
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
sr.lock.RLock()
defer sr.lock.RUnlock()
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if chain, exists := sr.chains[chainID.Key()]; exists {
if exists {
chain.PullQuery(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -290,11 +308,13 @@ func (sr *ChainRouter) PullQuery(validatorID ids.ShortID, chainID ids.ID, reques
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

// Cancel timeout we set when sent the message asking for these Chits
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {

if exists {
chain.Chits(validatorID, requestID, votes)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -305,10 +325,11 @@ func (sr *ChainRouter) Chits(validatorID ids.ShortID, chainID ids.ID, requestID
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain, exists := sr.chains[chainID.Key()]
sr.lock.RUnlock()

if exists {
chain.QueryFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
Expand All @@ -318,14 +339,15 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ
// Shutdown shuts down this router
func (sr *ChainRouter) Shutdown() {
sr.lock.Lock()
for _, chain := range sr.chains {
chain.Shutdown()
close(chain.msgs)
}
prevChains := sr.chains
sr.chains = map[[32]byte]*Handler{}
sr.lock.Unlock()

for _, chain := range prevChains {
chain.Shutdown()
close(chain.msgs)
}

ticker := time.NewTicker(sr.closeTimeout)
timedout := false
for _, chain := range prevChains {
Expand All @@ -344,10 +366,14 @@ func (sr *ChainRouter) Shutdown() {

// Gossip accepted containers
func (sr *ChainRouter) Gossip() {
sr.lock.RLock()
defer sr.lock.RUnlock()

sr.lock.Lock()
chains := []*Handler{}
for _, chain := range sr.chains {
chains = append(chains, chain)
}
sr.lock.Unlock()

for _, chain := range chains {
chain.Gossip()
}
}

0 comments on commit 367b5c3

Please sign in to comment.