diff --git a/relayer/processor/message_injector.go b/relayer/processor/message_injector.go index 8edc24d3f..cc6aaf5f2 100644 --- a/relayer/processor/message_injector.go +++ b/relayer/processor/message_injector.go @@ -5,7 +5,6 @@ import ( "errors" "sync" - chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" ) @@ -19,8 +18,6 @@ type MessageInjector struct { memo string - dst *pathEndRuntime - cachedUpdateClient provider.RelayerMessage cachedTrackers []messageToTrack mu sync.Mutex @@ -41,82 +38,69 @@ func NewMessageInjector( // trackAndSendMessages will increment attempt counters for each message and send each message. // Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error // in a previous batch. -func (mb *MessageInjector) trackAndSendMessages( +func (mi *MessageInjector) trackAndSendMessages( ctx context.Context, src, dst *pathEndRuntime, msgUpdateClient provider.RelayerMessage, trackers []messageToTrack, needsClientUpdate bool, ) error { - mb.mu.Lock() - defer mb.mu.Unlock() - - mb.cachedUpdateClient = msgUpdateClient + mi.mu.Lock() + defer mi.mu.Unlock() - mb.dst = dst + mi.cachedUpdateClient = nil //broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch var batch []messageToTrack - for _, t := range trackers { - retries := dst.trackProcessingMessage(t) - if t.assembledMsg() == nil { - dst.trackFinishedProcessingMessage(t) - continue - } - - ordered := false - if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() { - ordered = true - } + assembledCount := 0 - //if broadcastBatch && (retries == 0 || ordered) { - if retries == 0 || ordered { + for _, t := range trackers { + msg := t.assembledMsg() + if msg != nil { batch = append(batch, t) - continue - } - //go mb.sendSingleMessage(ctx, src, dst, t) - } - - mb.cachedTrackers = batch - - assembledCount := 0 - for _, m := range trackers { - if m.assembledMsg() != nil { assembledCount++ } } - if assembledCount > 0 { + mi.cachedTrackers = batch + + if assembledCount > 0 || + (needsClientUpdate && msgUpdateClient != nil) { + mi.cachedUpdateClient = msgUpdateClient return nil } - if needsClientUpdate && msgUpdateClient != nil { - return nil + if len(trackers) > 0 { + return errors.New("all messages failed to assemble") } - // only msgUpdateClient, don't need to send - return errors.New("all messages failed to assemble") + return nil } // InjectMsgs returns relay messages ready to inject into a proposal. -func (mb *MessageInjector) InjectMsgs(ctx context.Context) []provider.RelayerMessage { - mb.mu.Lock() - defer mb.mu.Unlock() +func (mi *MessageInjector) InjectMsgs(ctx context.Context) []provider.RelayerMessage { + mi.mu.Lock() + defer mi.mu.Unlock() var msgs []provider.RelayerMessage - if mb.cachedUpdateClient != nil { - msgs = append(msgs, mb.cachedUpdateClient) + if mi.cachedUpdateClient != nil { + msgs = append(msgs, mi.cachedUpdateClient) } - for _, t := range mb.cachedTrackers { + for _, t := range mi.cachedTrackers { msg := t.assembledMsg() if msg != nil { msgs = append(msgs, msg) } - mb.dst.finishedProcessing <- t } + if len(msgs) == 0 { + return nil + } + + mi.log.Info("Injecting messages", zap.Int("count", len(msgs))) + return msgs } diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 82c5d3c82..3898b40c0 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -431,6 +431,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) { // process latest message cache state from both pathEnds if err := pp.processLatestMessages(ctx, cancel); err != nil { + pp.log.Error("Failed to process latest messages", zap.Error(err)) // in case of IBC message send errors, schedule retry after durationErrorRetry if retryTimer != nil { retryTimer.Stop() diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 30cd42af3..6ac1b191a 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1069,14 +1069,18 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( // now assemble and send messages in parallel // if sending messages fails to one pathEnd, we don't need to halt sending to the other pathEnd. var eg errgroup.Group - eg.Go(func() error { - mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd1MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost) - return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1) - }) - eg.Go(func() error { - mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd2MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost) - return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2) - }) + if _, ok := pp.pathEnd1MessageSender.(MessageSenderNop); !ok { + eg.Go(func() error { + mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd1MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost) + return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1) + }) + } + if _, ok := pp.pathEnd2MessageSender.(MessageSenderNop); !ok { + eg.Go(func() error { + mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd2MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost) + return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2) + }) + } return eg.Wait() }