From afb90d0e584c1543901643739df93af1b7c1894e Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 14 May 2024 13:59:14 -0600 Subject: [PATCH] MessageSender refactor --- relayer/chains/cosmos/tx.go | 47 +++ relayer/processor/message_broadcaster.go | 327 +++++++++++++++++++ relayer/processor/message_injector.go | 122 +++++++ relayer/processor/message_processor.go | 326 +----------------- relayer/processor/message_sender.go | 29 ++ relayer/processor/path_processor.go | 3 + relayer/processor/path_processor_internal.go | 20 +- 7 files changed, 562 insertions(+), 312 deletions(-) create mode 100644 relayer/processor/message_broadcaster.go create mode 100644 relayer/processor/message_injector.go create mode 100644 relayer/processor/message_sender.go diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index eb8d03023..4c4b8066b 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -218,6 +218,53 @@ func (cc *CosmosProvider) SendMessagesToMempool( return nil } +// SendMessagesToMempool simulates and broadcasts a transaction with the given msgs and memo. +// This method will return once the transaction has entered the mempool. +// In an async goroutine, will wait for the tx to be included in the block unless asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. +func (cc *CosmosProvider) BuildTx( + ctx context.Context, + msgs []provider.RelayerMessage, + memo string, +) ([]byte, error) { + txSignerKey, feegranterKeyOrAddr, err := cc.buildSignerConfig(msgs) + if err != nil { + return nil, err + } + + sequenceGuard := ensureSequenceGuard(cc, txSignerKey) + sequenceGuard.Mu.Lock() + defer sequenceGuard.Mu.Unlock() + + dynamicFee := cc.DynamicFee(ctx) + + txBytes, sequence, _, err := cc.buildMessages( + ctx, + msgs, + memo, + 0, + txSignerKey, + feegranterKeyOrAddr, + sequenceGuard, + dynamicFee, + ) + + if err != nil { + // Account sequence mismatch errors can happen on the simulated transaction also. + if strings.Contains(err.Error(), legacyerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(sequenceGuard, err) + } + + return nil, err + } + + // TODO we don't know if this tx will be successful or not, so we can't update the sequence yet + // we had a successful tx broadcast with this sequence, so update it to the next + cc.updateNextAccountSequence(sequenceGuard, sequence+1) + + return txBytes, nil +} + func (cc *CosmosProvider) SubmitTxAwaitResponse(ctx context.Context, msgs []sdk.Msg, memo string, gas uint64, signingKeyName string) (*txtypes.GetTxResponse, error) { resp, err := cc.SendMsgsWith(ctx, msgs, memo, gas, signingKeyName, "") if err != nil { diff --git a/relayer/processor/message_broadcaster.go b/relayer/processor/message_broadcaster.go new file mode 100644 index 000000000..0484bbb16 --- /dev/null +++ b/relayer/processor/message_broadcaster.go @@ -0,0 +1,327 @@ +package processor + +import ( + "context" + "errors" + "fmt" + + chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var _ MessageSender = (*MessageBroadcaster)(nil) + +// MessageBroadcaster is used for broadcasting IBC messages to an RPC endpoint. +type MessageBroadcaster struct { + log *zap.Logger + metrics *PrometheusMetrics + + memo string +} + +func NewMessageBroadcaster( + log *zap.Logger, + metrics *PrometheusMetrics, + memo string, +) *MessageBroadcaster { + return &MessageBroadcaster{ + log: log, + metrics: metrics, + memo: memo, + } +} + +// trackAndSendMessages will increment attempt counters for each message and send each message. +// Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error +// in a previous batch. +func (mb *MessageBroadcaster) trackAndSendMessages( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + trackers []messageToTrack, + needsClientUpdate bool, +) error { + broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch + var batch []messageToTrack + + for _, t := range trackers { + + retries := dst.trackProcessingMessage(t) + if t.assembledMsg() == nil { + dst.trackFinishedProcessingMessage(t) + continue + } + + ordered := false + if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() { + ordered = true + } + + if broadcastBatch && (retries == 0 || ordered) { + batch = append(batch, t) + continue + } + go mb.sendSingleMessage(ctx, msgUpdateClient, src, dst, t) + } + + if len(batch) > 0 { + go mb.sendBatchMessages(ctx, msgUpdateClient, src, dst, batch) + } + + assembledCount := 0 + for _, m := range trackers { + if m.assembledMsg() != nil { + assembledCount++ + } + } + + if assembledCount > 0 { + return nil + } + + if needsClientUpdate && msgUpdateClient != nil { + go mb.sendClientUpdate(ctx, msgUpdateClient, src, dst) + return nil + } + + // only msgUpdateClient, don't need to send + return errors.New("all messages failed to assemble") +} + +// sendClientUpdate will send an isolated client update message. +func (mb *MessageBroadcaster) sendClientUpdate( + ctx context.Context, + msgUpdateClient provider.RelayerMessage, + src, dst *pathEndRuntime, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + dst.log.Debug("Will relay client update") + + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = dst.latestBlock.Height + dst.lastClientUpdateHeightMu.Unlock() + + msgs := []provider.RelayerMessage{msgUpdateClient} + + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mb.memo, ctx, nil); err != nil { + mb.log.Error("Error sending client update message", + zap.String("path_name", src.info.PathName), + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + ) + + mb.metricParseTxFailureCatagory(err, src) + return + } + dst.log.Debug("Client update broadcast completed") +} + +// sendBatchMessages will send a batch of messages, +// then increment metrics counters for successful packet messages. +func (mb *MessageBroadcaster) sendBatchMessages( + ctx context.Context, + msgUpdateClient provider.RelayerMessage, + src, dst *pathEndRuntime, + batch []messageToTrack, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + var ( + msgs []provider.RelayerMessage + fields []zapcore.Field + ) + + if msgUpdateClient == nil { + msgs = make([]provider.RelayerMessage, len(batch)) + for i, t := range batch { + msgs[i] = t.assembledMsg() + fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) + } + } else { + // messages are batch with appended MsgUpdateClient + msgs = make([]provider.RelayerMessage, 1+len(batch)) + msgs[0] = msgUpdateClient + + for i, t := range batch { + msgs[i+1] = t.assembledMsg() + fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) + } + } + + dst.log.Debug("Will relay messages", fields...) + + callback := func(_ *provider.RelayerTxResponse, err error) { + for _, t := range batch { + dst.finishedProcessing <- t + } + // only increment metrics counts for successful packets + if err != nil || mb.metrics == nil { + return + } + for _, tracker := range batch { + t, ok := tracker.(packetMessageToTrack) + if !ok { + continue + } + var channel, port string + if t.msg.eventType == chantypes.EventTypeRecvPacket { + channel = t.msg.info.DestChannel + port = t.msg.info.DestPort + } else { + channel = t.msg.info.SourceChannel + port = t.msg.info.SourcePort + } + mb.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) + } + } + callbacks := []func(rtr *provider.RelayerTxResponse, err error){callback} + + //During testing, this adds a callback so our test case can inspect the TX results + if PathProcMessageCollector != nil { + testCallback := func(rtr *provider.RelayerTxResponse, err error) { + msgResult := &PathProcessorMessageResp{ + DestinationChain: dst.chainProvider, + Response: rtr, + SuccessfulTx: err == nil, + Error: err, + } + PathProcMessageCollector <- msgResult + } + callbacks = append(callbacks, testCallback) + } + + if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mb.memo, ctx, callbacks); err != nil { + for _, t := range batch { + dst.finishedProcessing <- t + } + errFields := []zapcore.Field{ + zap.String("path_name", src.info.PathName), + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + } + + mb.metricParseTxFailureCatagory(err, src) + + if errors.Is(err, chantypes.ErrRedundantTx) { + mb.log.Debug("Redundant message(s)", errFields...) + return + } + mb.log.Error("Error sending messages", errFields...) + return + } + dst.log.Debug("Message broadcast completed", fields...) +} + +// sendSingleMessage will send an isolated message. +func (mb *MessageBroadcaster) sendSingleMessage( + ctx context.Context, + msgUpdateClient provider.RelayerMessage, + src, dst *pathEndRuntime, + tracker messageToTrack, +) { + var msgs []provider.RelayerMessage + + if msgUpdateClient == nil { + msgs = []provider.RelayerMessage{tracker.assembledMsg()} + } else { + msgs = []provider.RelayerMessage{msgUpdateClient, tracker.assembledMsg()} + } + + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + msgType := tracker.msgType() + + dst.log.Debug(fmt.Sprintf("Will broadcast %s message", msgType), zap.Object("msg", tracker)) + + // Set callback for packet messages so that we increment prometheus metrics on successful relays. + callbacks := []func(rtr *provider.RelayerTxResponse, err error){} + + callback := func(_ *provider.RelayerTxResponse, err error) { + dst.finishedProcessing <- tracker + + t, ok := tracker.(packetMessageToTrack) + if !ok { + return + } + // only increment metrics counts for successful packets + if err != nil || mb.metrics == nil { + return + } + var channel, port string + if t.msg.eventType == chantypes.EventTypeRecvPacket { + channel = t.msg.info.DestChannel + port = t.msg.info.DestPort + } else { + channel = t.msg.info.SourceChannel + port = t.msg.info.SourcePort + } + mb.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) + } + + callbacks = append(callbacks, callback) + + //During testing, this adds a callback so our test case can inspect the TX results + if PathProcMessageCollector != nil { + testCallback := func(rtr *provider.RelayerTxResponse, err error) { + msgResult := &PathProcessorMessageResp{ + DestinationChain: dst.chainProvider, + Response: rtr, + SuccessfulTx: err == nil, + Error: err, + } + PathProcMessageCollector <- msgResult + } + callbacks = append(callbacks, testCallback) + } + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mb.memo, ctx, callbacks) + if err != nil { + dst.finishedProcessing <- tracker + errFields := []zapcore.Field{ + zap.String("path_name", src.info.PathName), + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + } + + mb.metricParseTxFailureCatagory(err, src) + + errFields = append(errFields, zap.Object("msg", tracker)) + errFields = append(errFields, zap.Error(err)) + if errors.Is(err, chantypes.ErrRedundantTx) { + mb.log.Debug(fmt.Sprintf("Redundant %s message", msgType), errFields...) + return + } + mb.log.Error(fmt.Sprintf("Error broadcasting %s message", msgType), errFields...) + return + } + + dst.log.Debug(fmt.Sprintf("Successfully broadcasted %s message", msgType), zap.Object("msg", tracker)) +} + +func (mb *MessageBroadcaster) metricParseTxFailureCatagory(err error, src *pathEndRuntime) { + if mb.metrics == nil { + return + } + + for _, promError := range promErrorCatagories { + if errors.Is(err, promError) { + mb.metrics.IncTxFailure(src.info.PathName, src.info.ChainID, promError.Error()) + return + } + } + mb.metrics.IncTxFailure(src.info.PathName, src.info.ChainID, "Tx Failure") +} diff --git a/relayer/processor/message_injector.go b/relayer/processor/message_injector.go new file mode 100644 index 000000000..8edc24d3f --- /dev/null +++ b/relayer/processor/message_injector.go @@ -0,0 +1,122 @@ +package processor + +import ( + "context" + "errors" + "sync" + + chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" + "github.com/cosmos/relayer/v2/relayer/provider" + "go.uber.org/zap" +) + +var _ MessageSender = (*MessageInjector)(nil) + +// MessageInjector is used for broadcasting IBC messages to an RPC endpoint. +type MessageInjector struct { + log *zap.Logger + metrics *PrometheusMetrics + + memo string + + dst *pathEndRuntime + + cachedUpdateClient provider.RelayerMessage + cachedTrackers []messageToTrack + mu sync.Mutex +} + +func NewMessageInjector( + log *zap.Logger, + metrics *PrometheusMetrics, + memo string, +) *MessageInjector { + return &MessageInjector{ + log: log, + metrics: metrics, + memo: memo, + } +} + +// trackAndSendMessages will increment attempt counters for each message and send each message. +// Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error +// in a previous batch. +func (mb *MessageInjector) trackAndSendMessages( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + trackers []messageToTrack, + needsClientUpdate bool, +) error { + mb.mu.Lock() + defer mb.mu.Unlock() + + mb.cachedUpdateClient = msgUpdateClient + + mb.dst = dst + + //broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch + var batch []messageToTrack + + for _, t := range trackers { + retries := dst.trackProcessingMessage(t) + if t.assembledMsg() == nil { + dst.trackFinishedProcessingMessage(t) + continue + } + + ordered := false + if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() { + ordered = true + } + + //if broadcastBatch && (retries == 0 || ordered) { + if retries == 0 || ordered { + batch = append(batch, t) + continue + } + //go mb.sendSingleMessage(ctx, src, dst, t) + } + + mb.cachedTrackers = batch + + assembledCount := 0 + for _, m := range trackers { + if m.assembledMsg() != nil { + assembledCount++ + } + } + + if assembledCount > 0 { + return nil + } + + if needsClientUpdate && msgUpdateClient != nil { + return nil + } + + // only msgUpdateClient, don't need to send + return errors.New("all messages failed to assemble") +} + +// InjectMsgs returns relay messages ready to inject into a proposal. +func (mb *MessageInjector) InjectMsgs(ctx context.Context) []provider.RelayerMessage { + mb.mu.Lock() + defer mb.mu.Unlock() + + var msgs []provider.RelayerMessage + + if mb.cachedUpdateClient != nil { + msgs = append(msgs, mb.cachedUpdateClient) + } + + for _, t := range mb.cachedTrackers { + msg := t.assembledMsg() + if msg != nil { + msgs = append(msgs, msg) + } + mb.dst.finishedProcessing <- t + } + + return msgs +} diff --git a/relayer/processor/message_processor.go b/relayer/processor/message_processor.go index 602c588c8..9d6899b91 100644 --- a/relayer/processor/message_processor.go +++ b/relayer/processor/message_processor.go @@ -3,7 +3,6 @@ package processor import ( "bytes" "context" - "errors" "fmt" "sync" "time" @@ -13,7 +12,6 @@ import ( ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) // messageProcessor is used for concurrent IBC message assembly and sending @@ -21,9 +19,10 @@ type messageProcessor struct { log *zap.Logger metrics *PrometheusMetrics + messageSender MessageSender + memo string - msgUpdateClient provider.RelayerMessage clientUpdateThresholdTime time.Duration pktMsgs []packetMessageToTrack @@ -77,14 +76,14 @@ func (mp *messageProcessor) trackers() (trackers []messageToTrack) { func newMessageProcessor( log *zap.Logger, metrics *PrometheusMetrics, - memo string, + messageSender MessageSender, clientUpdateThresholdTime time.Duration, isLocalhost bool, ) *messageProcessor { return &messageProcessor{ log: log, metrics: metrics, - memo: memo, + messageSender: messageSender, clientUpdateThresholdTime: clientUpdateThresholdTime, isLocalhost: isLocalhost, } @@ -99,6 +98,8 @@ func (mp *messageProcessor) processMessages( ) error { var needsClientUpdate bool + var msgUpdateClient provider.RelayerMessage + // Localhost IBC does not permit client updates if !isLocalhostClient(src.clientState.ClientID, dst.clientState.ClientID) { var err error @@ -107,14 +108,15 @@ func (mp *messageProcessor) processMessages( return err } - if err := mp.assembleMsgUpdateClient(ctx, src, dst); err != nil { + msgUpdateClient, err = mp.assembleMsgUpdateClient(ctx, src, dst) + if err != nil { return err } } mp.assembleMessages(ctx, messages, src, dst) - return mp.trackAndSendMessages(ctx, src, dst, needsClientUpdate) + return mp.messageSender.trackAndSendMessages(ctx, src, dst, msgUpdateClient, mp.trackers(), needsClientUpdate) } func isLocalhostClient(srcClientID, dstClientID string) bool { @@ -215,19 +217,6 @@ func (mp *messageProcessor) assembleMessages(ctx context.Context, messages pathE wg.Wait() } -// assembledCount will return the number of assembled messages. -// This must be called after assembleMessages has completed. -func (mp *messageProcessor) assembledCount() int { - count := 0 - for _, m := range mp.trackers() { - if m.assembledMsg() != nil { - count++ - } - } - - return count -} - // assembleMessage will assemble a specific message based on it's type. func (mp *messageProcessor) assembleMessage( ctx context.Context, @@ -251,7 +240,7 @@ func (mp *messageProcessor) assembleMessage( // assembleMsgUpdateClient uses the ChainProvider from both pathEnds to assemble the client update header // from the source and then assemble the update client message in the correct format for the destination. -func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime) error { +func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst *pathEndRuntime) (provider.RelayerMessage, error) { clientID := dst.info.ClientID clientConsensusHeight := dst.clientState.ConsensusHeight trustedConsensusHeight := dst.clientTrustedState.ClientState.ConsensusHeight @@ -267,13 +256,13 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds if !trustedConsensusHeight.EQ(clientConsensusHeight) { deltaConsensusHeight := int64(clientConsensusHeight.RevisionHeight) - int64(trustedConsensusHeight.RevisionHeight) if trustedConsensusHeight.RevisionHeight != 0 && deltaConsensusHeight <= clientConsensusHeightUpdateThresholdBlocks { - return fmt.Errorf("observed client trusted height: %d does not equal latest client state height: %d", + return nil, fmt.Errorf("observed client trusted height: %d does not equal latest client state height: %d", trustedConsensusHeight.RevisionHeight, clientConsensusHeight.RevisionHeight) } header, err := src.chainProvider.QueryIBCHeader(ctx, int64(clientConsensusHeight.RevisionHeight+1)) if err != nil { - return fmt.Errorf("error getting IBC header at height: %d for chain_id: %s, %w", + return nil, fmt.Errorf("error getting IBC header at height: %d for chain_id: %s, %w", clientConsensusHeight.RevisionHeight+1, src.info.ChainID, err) } @@ -297,7 +286,7 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight && !bytes.Equal(src.latestHeader.NextValidatorsHash(), trustedNextValidatorsHash) { - return fmt.Errorf("latest header height is equal to the client trusted height: %d, "+ + return nil, fmt.Errorf("latest header height is equal to the client trusted height: %d, "+ "need to wait for next block's header before we can assemble and send a new MsgUpdateClient", trustedConsensusHeight.RevisionHeight) } @@ -308,97 +297,15 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds dst.clientTrustedState.IBCHeader, ) if err != nil { - return fmt.Errorf("error assembling new client header: %w", err) + return nil, fmt.Errorf("error assembling new client header: %w", err) } msgUpdateClient, err := dst.chainProvider.MsgUpdateClient(clientID, msgUpdateClientHeader) if err != nil { - return fmt.Errorf("error assembling MsgUpdateClient: %w", err) - } - - mp.msgUpdateClient = msgUpdateClient - - return nil -} - -// trackAndSendMessages will increment attempt counters for each message and send each message. -// Messages will be batched if the broadcast mode is configured to 'batch' and there was not an error -// in a previous batch. -func (mp *messageProcessor) trackAndSendMessages( - ctx context.Context, - src, dst *pathEndRuntime, - needsClientUpdate bool, -) error { - broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch - var batch []messageToTrack - - for _, t := range mp.trackers() { - - retries := dst.trackProcessingMessage(t) - if t.assembledMsg() == nil { - dst.trackFinishedProcessingMessage(t) - continue - } - - ordered := false - if m, ok := t.(packetMessageToTrack); ok && m.msg.info.ChannelOrder == chantypes.ORDERED.String() { - ordered = true - } - - if broadcastBatch && (retries == 0 || ordered) { - batch = append(batch, t) - continue - } - go mp.sendSingleMessage(ctx, src, dst, t) - } - - if len(batch) > 0 { - go mp.sendBatchMessages(ctx, src, dst, batch) - } - - if mp.assembledCount() > 0 { - return nil - } - - if needsClientUpdate && mp.msgUpdateClient != nil { - go mp.sendClientUpdate(ctx, src, dst) - return nil + return nil, fmt.Errorf("error assembling MsgUpdateClient: %w", err) } - // only msgUpdateClient, don't need to send - return errors.New("all messages failed to assemble") -} - -// sendClientUpdate will send an isolated client update message. -func (mp *messageProcessor) sendClientUpdate( - ctx context.Context, - src, dst *pathEndRuntime, -) { - broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) - defer cancel() - - dst.log.Debug("Will relay client update") - - dst.lastClientUpdateHeightMu.Lock() - dst.lastClientUpdateHeight = dst.latestBlock.Height - dst.lastClientUpdateHeightMu.Unlock() - - msgs := []provider.RelayerMessage{mp.msgUpdateClient} - - if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, nil); err != nil { - mp.log.Error("Error sending client update message", - zap.String("path_name", src.info.PathName), - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.Error(err), - ) - - mp.metricParseTxFailureCatagory(err, src) - return - } - dst.log.Debug("Client update broadcast completed") + return msgUpdateClient, nil } type PathProcessorMessageResp struct { @@ -409,204 +316,3 @@ type PathProcessorMessageResp struct { } var PathProcMessageCollector chan *PathProcessorMessageResp - -// sendBatchMessages will send a batch of messages, -// then increment metrics counters for successful packet messages. -func (mp *messageProcessor) sendBatchMessages( - ctx context.Context, - src, dst *pathEndRuntime, - batch []messageToTrack, -) { - broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) - defer cancel() - - var ( - msgs []provider.RelayerMessage - fields []zapcore.Field - ) - - if mp.isLocalhost { - msgs = make([]provider.RelayerMessage, len(batch)) - for i, t := range batch { - msgs[i] = t.assembledMsg() - fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) - } - } else { - // messages are batch with appended MsgUpdateClient - msgs = make([]provider.RelayerMessage, 1+len(batch)) - msgs[0] = mp.msgUpdateClient - - for i, t := range batch { - msgs[i+1] = t.assembledMsg() - fields = append(fields, zap.Object(fmt.Sprintf("msg_%d", i), t)) - } - } - - dst.log.Debug("Will relay messages", fields...) - - callback := func(_ *provider.RelayerTxResponse, err error) { - for _, t := range batch { - dst.finishedProcessing <- t - } - // only increment metrics counts for successful packets - if err != nil || mp.metrics == nil { - return - } - for _, tracker := range batch { - t, ok := tracker.(packetMessageToTrack) - if !ok { - continue - } - var channel, port string - if t.msg.eventType == chantypes.EventTypeRecvPacket { - channel = t.msg.info.DestChannel - port = t.msg.info.DestPort - } else { - channel = t.msg.info.SourceChannel - port = t.msg.info.SourcePort - } - mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) - } - } - callbacks := []func(rtr *provider.RelayerTxResponse, err error){callback} - - //During testing, this adds a callback so our test case can inspect the TX results - if PathProcMessageCollector != nil { - testCallback := func(rtr *provider.RelayerTxResponse, err error) { - msgResult := &PathProcessorMessageResp{ - DestinationChain: dst.chainProvider, - Response: rtr, - SuccessfulTx: err == nil, - Error: err, - } - PathProcMessageCollector <- msgResult - } - callbacks = append(callbacks, testCallback) - } - - if err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks); err != nil { - for _, t := range batch { - dst.finishedProcessing <- t - } - errFields := []zapcore.Field{ - zap.String("path_name", src.info.PathName), - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.Error(err), - } - - mp.metricParseTxFailureCatagory(err, src) - - if errors.Is(err, chantypes.ErrRedundantTx) { - mp.log.Debug("Redundant message(s)", errFields...) - return - } - mp.log.Error("Error sending messages", errFields...) - return - } - dst.log.Debug("Message broadcast completed", fields...) -} - -// sendSingleMessage will send an isolated message. -func (mp *messageProcessor) sendSingleMessage( - ctx context.Context, - src, dst *pathEndRuntime, - tracker messageToTrack, -) { - var msgs []provider.RelayerMessage - - if mp.isLocalhost { - msgs = []provider.RelayerMessage{tracker.assembledMsg()} - } else { - msgs = []provider.RelayerMessage{mp.msgUpdateClient, tracker.assembledMsg()} - } - - broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) - defer cancel() - - msgType := tracker.msgType() - - dst.log.Debug(fmt.Sprintf("Will broadcast %s message", msgType), zap.Object("msg", tracker)) - - // Set callback for packet messages so that we increment prometheus metrics on successful relays. - callbacks := []func(rtr *provider.RelayerTxResponse, err error){} - - callback := func(_ *provider.RelayerTxResponse, err error) { - dst.finishedProcessing <- tracker - - t, ok := tracker.(packetMessageToTrack) - if !ok { - return - } - // only increment metrics counts for successful packets - if err != nil || mp.metrics == nil { - return - } - var channel, port string - if t.msg.eventType == chantypes.EventTypeRecvPacket { - channel = t.msg.info.DestChannel - port = t.msg.info.DestPort - } else { - channel = t.msg.info.SourceChannel - port = t.msg.info.SourcePort - } - mp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, t.msg.eventType) - } - - callbacks = append(callbacks, callback) - - //During testing, this adds a callback so our test case can inspect the TX results - if PathProcMessageCollector != nil { - testCallback := func(rtr *provider.RelayerTxResponse, err error) { - msgResult := &PathProcessorMessageResp{ - DestinationChain: dst.chainProvider, - Response: rtr, - SuccessfulTx: err == nil, - Error: err, - } - PathProcMessageCollector <- msgResult - } - callbacks = append(callbacks, testCallback) - } - - err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, mp.memo, ctx, callbacks) - if err != nil { - dst.finishedProcessing <- tracker - errFields := []zapcore.Field{ - zap.String("path_name", src.info.PathName), - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - } - - mp.metricParseTxFailureCatagory(err, src) - - errFields = append(errFields, zap.Object("msg", tracker)) - errFields = append(errFields, zap.Error(err)) - if errors.Is(err, chantypes.ErrRedundantTx) { - mp.log.Debug(fmt.Sprintf("Redundant %s message", msgType), errFields...) - return - } - mp.log.Error(fmt.Sprintf("Error broadcasting %s message", msgType), errFields...) - return - } - - dst.log.Debug(fmt.Sprintf("Successfully broadcasted %s message", msgType), zap.Object("msg", tracker)) -} - -func (mp *messageProcessor) metricParseTxFailureCatagory(err error, src *pathEndRuntime) { - if mp.metrics == nil { - return - } - - for _, promError := range promErrorCatagories { - if errors.Is(err, promError) { - mp.metrics.IncTxFailure(src.info.PathName, src.info.ChainID, promError.Error()) - return - } - } - mp.metrics.IncTxFailure(src.info.PathName, src.info.ChainID, "Tx Failure") -} diff --git a/relayer/processor/message_sender.go b/relayer/processor/message_sender.go new file mode 100644 index 000000000..39d7785db --- /dev/null +++ b/relayer/processor/message_sender.go @@ -0,0 +1,29 @@ +package processor + +import ( + "context" + + "github.com/cosmos/relayer/v2/relayer/provider" +) + +type MessageSender interface { + trackAndSendMessages( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + trackers []messageToTrack, + needsClientUpdate bool, + ) error +} + +type MessageSenderNop struct{} + +func (MessageSenderNop) trackAndSendMessages( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + trackers []messageToTrack, + needsClientUpdate bool, +) error { + return nil +} diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 3cc2668da..82c5d3c82 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -59,6 +59,9 @@ type PathProcessor struct { pathEnd1 *pathEndRuntime pathEnd2 *pathEndRuntime + pathEnd1MessageSender MessageSender + pathEnd2MessageSender MessageSender + memo string clientUpdateThresholdTime time.Duration diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index b78c579b1..30cd42af3 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1064,20 +1064,36 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func( clientICQMessages: pathEnd2ClientICQMessages, } + pp.ensureMessageSenders() + // now assemble and send messages in parallel // if sending messages fails to one pathEnd, we don't need to halt sending to the other pathEnd. var eg errgroup.Group eg.Go(func() error { - mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime, pp.isLocalhost) + mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd1MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost) return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1) }) eg.Go(func() error { - mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime, pp.isLocalhost) + mp := newMessageProcessor(pp.log, pp.metrics, pp.pathEnd2MessageSender, pp.clientUpdateThresholdTime, pp.isLocalhost) return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2) }) return eg.Wait() } +func (pp *PathProcessor) SetMessageSenders(pathEnd1MessageSender, pathEnd2MessageSender MessageSender) { + pp.pathEnd1MessageSender = pathEnd1MessageSender + pp.pathEnd2MessageSender = pathEnd2MessageSender +} + +func (pp *PathProcessor) ensureMessageSenders() { + if pp.pathEnd1MessageSender == nil { + pp.pathEnd1MessageSender = NewMessageBroadcaster(pp.log, pp.metrics, pp.memo) + } + if pp.pathEnd2MessageSender == nil { + pp.pathEnd2MessageSender = NewMessageBroadcaster(pp.log, pp.metrics, pp.memo) + } +} + func (pp *PathProcessor) channelMessagesToSend(pathEnd1ChannelHandshakeRes, pathEnd2ChannelHandshakeRes, pathEnd1ChannelCloseRes, pathEnd2ChannelCloseRes pathEndChannelHandshakeResponse) ([]channelIBCMessage, []channelIBCMessage) { pathEnd1ChannelOpenSrcLen := len(pathEnd1ChannelHandshakeRes.SrcMessages) pathEnd1ChannelOpenDstLen := len(pathEnd1ChannelHandshakeRes.DstMessages)