Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed May 16, 2024
1 parent b3ce2df commit 0651311
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 52 deletions.
72 changes: 28 additions & 44 deletions relayer/processor/message_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"sync"

chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer/provider"
"go.uber.org/zap"
)
Expand All @@ -19,8 +18,6 @@ type MessageInjector struct {

memo string

dst *pathEndRuntime

cachedUpdateClient provider.RelayerMessage
cachedTrackers []messageToTrack
mu sync.Mutex
Expand All @@ -41,82 +38,69 @@ func NewMessageInjector(
// trackAndSendMessages will increment attempt counters for each message and send each message.
// Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error
// in a previous batch.
func (mb *MessageInjector) trackAndSendMessages(
func (mi *MessageInjector) trackAndSendMessages(
ctx context.Context,
src, dst *pathEndRuntime,
msgUpdateClient provider.RelayerMessage,
trackers []messageToTrack,
needsClientUpdate bool,
) error {
mb.mu.Lock()
defer mb.mu.Unlock()

mb.cachedUpdateClient = msgUpdateClient
mi.mu.Lock()
defer mi.mu.Unlock()

mb.dst = dst
mi.cachedUpdateClient = nil

//broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch
var batch []messageToTrack

for _, t := range trackers {
retries := dst.trackProcessingMessage(t)
if t.assembledMsg() == nil {
dst.trackFinishedProcessingMessage(t)
continue
}

ordered := false
if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() {
ordered = true
}
assembledCount := 0

//if broadcastBatch && (retries == 0 || ordered) {
if retries == 0 || ordered {
for _, t := range trackers {
msg := t.assembledMsg()
if msg != nil {
batch = append(batch, t)
continue
}
//go mb.sendSingleMessage(ctx, src, dst, t)
}

mb.cachedTrackers = batch

assembledCount := 0
for _, m := range trackers {
if m.assembledMsg() != nil {
assembledCount++
}
}

if assembledCount > 0 {
mi.cachedTrackers = batch

if assembledCount > 0 ||
(needsClientUpdate && msgUpdateClient != nil) {
mi.cachedUpdateClient = msgUpdateClient
return nil
}

if needsClientUpdate && msgUpdateClient != nil {
return nil
if len(trackers) > 0 {
return errors.New("all messages failed to assemble")
}

// only msgUpdateClient, don't need to send
return errors.New("all messages failed to assemble")
return nil
}

// InjectMsgs returns relay messages ready to inject into a proposal.
func (mb *MessageInjector) InjectMsgs(ctx context.Context) []provider.RelayerMessage {
mb.mu.Lock()
defer mb.mu.Unlock()
func (mi *MessageInjector) InjectMsgs(ctx context.Context) []provider.RelayerMessage {
mi.mu.Lock()
defer mi.mu.Unlock()

var msgs []provider.RelayerMessage

if mb.cachedUpdateClient != nil {
msgs = append(msgs, mb.cachedUpdateClient)
if mi.cachedUpdateClient != nil {
msgs = append(msgs, mi.cachedUpdateClient)
}

for _, t := range mb.cachedTrackers {
for _, t := range mi.cachedTrackers {
msg := t.assembledMsg()
if msg != nil {
msgs = append(msgs, msg)
}
mb.dst.finishedProcessing <- t
}

if len(msgs) == 0 {
return nil
}

mi.log.Info("Injecting messages", zap.Int("count", len(msgs)))

return msgs
}
1 change: 1 addition & 0 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {

// process latest message cache state from both pathEnds
if err := pp.processLatestMessages(ctx, cancel); err != nil {
pp.log.Error("Failed to process latest messages", zap.Error(err))
// in case of IBC message send errors, schedule retry after durationErrorRetry
if retryTimer != nil {
retryTimer.Stop()
Expand Down
20 changes: 12 additions & 8 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,14 +1069,18 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func(
// now assemble and send messages in parallel
// if sending messages fails to one pathEnd, we don't need to halt sending to the other pathEnd.
var eg errgroup.Group
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd1MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1)
})
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd2MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2)
})
if _, ok := pp.pathEnd1MessageSender.(MessageSenderNop); !ok {
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd1MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1)
})
}
if _, ok := pp.pathEnd2MessageSender.(MessageSenderNop); !ok {
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd2MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2)
})
}
return eg.Wait()
}

Expand Down

0 comments on commit 0651311

Please sign in to comment.