diff --git a/tm/tmengine/engine_test.go b/tm/tmengine/engine_test.go index cee9ead..d92538f 100644 --- a/tm/tmengine/engine_test.go +++ b/tm/tmengine/engine_test.go @@ -834,7 +834,7 @@ func TestEngine_plumbing_ReplayedHeaders(t *testing.T) { require.Equal(t, []tmconsensus.ProposedHeader{ { Header: ph1.Header, - Round: 1, + Round: 0, // Replayed headers always appear as round zero. // PubKey and Signature missing from round store during replay. }, }, phs) diff --git a/tm/tmengine/internal/tmmirror/internal/tmi/kernel.go b/tm/tmengine/internal/tmmirror/internal/tmi/kernel.go index 6e99e31..cecd222 100644 --- a/tm/tmengine/internal/tmmirror/internal/tmi/kernel.go +++ b/tm/tmengine/internal/tmmirror/internal/tmi/kernel.go @@ -1693,8 +1693,6 @@ func (k *Kernel) handleReplayedHeader( tempProofs[hash] = haveProof // Now merge the incoming proof with the local copy. - // TODO: this should not be hardcoded to pcp, - // but it's okay for now while we limit this to only the header's block hash. mergeRes := haveProof.MergeSparse(gcrypto.SparseSignatureProof{ PubKeyHash: string(header.ValidatorSet.PubKeyHash), Signatures: sparseSigs, @@ -1736,10 +1734,10 @@ func (k *Kernel) handleReplayedHeader( // That is fine, as noted in the documentation for the RoundStore. } - if err := k.rStore.SaveRoundProposedHeader(ctx, fakePH); err != nil { + if err := k.rStore.SaveRoundReplayedHeader(ctx, header); err != nil { return tmelink.ReplayedHeaderInternalError{ Err: fmt.Errorf( - "failed to replay saved header to round store: %w", + "failed to save replayed header to round store: %w", err, ), } diff --git a/tm/tmstore/roundstore.go b/tm/tmstore/roundstore.go index 3b34698..17608d8 100644 --- a/tm/tmstore/roundstore.go +++ b/tm/tmstore/roundstore.go @@ -13,6 +13,14 @@ type RoundStore interface { // as a candidate proposed header in the given height and round. SaveRoundProposedHeader(ctx context.Context, ph tmconsensus.ProposedHeader) error + // SaveRoundReplayedHeader saves the header as one + // that is about to be committed in the given height, + // due to mirror catchup. + // + // In the normal mirror flow, the replayed header is saved, + // and then OverwriteRoundPrecommitProofs is called. + SaveRoundReplayedHeader(ctx context.Context, h tmconsensus.Header) error + // The overwrite proofs methods overwrite existing entries // for the corresponding proof at the given height and round. // TODO: these methods should both accept sparse proofs, @@ -36,14 +44,12 @@ type RoundStore interface { // and may differ from one call to another. // // Note that in the event of replayed blocks during a mirror catchup, - // there may be ProposedHeader values without a PubKey or Signature field. + // there may be ProposedHeader values without its PubKey, Signature, or Annotations fields set. // // If there are no proposed blocks or votes at the given height and round, // [tmconsensus.RoundUnknownError] is returned. // If at least one proposed block, prevote, or precommit exists at the height and round, // a nil error is returned. - // TODO: this should return sparse proofs, - // so that the store can remain agnostic of signature proof schemes. LoadRoundState(ctx context.Context, height uint64, round uint32) ( phs []tmconsensus.ProposedHeader, prevotes, precommits tmconsensus.SparseSignatureCollection, diff --git a/tm/tmstore/tmmemstore/roundstore.go b/tm/tmstore/tmmemstore/roundstore.go index b2c3c53..00ef802 100644 --- a/tm/tmstore/tmmemstore/roundstore.go +++ b/tm/tmstore/tmmemstore/roundstore.go @@ -21,6 +21,10 @@ type RoundStore struct { // Height -> Round -> collection. prevotes, precommits map[uint64]map[uint32]tmconsensus.SparseSignatureCollection + + // Height -> collection. + // There really should never be more than one replayed header in a height, though. + replayedHeaders map[uint64][]tmconsensus.Header } func NewRoundStore() *RoundStore { @@ -29,6 +33,8 @@ func NewRoundStore() *RoundStore { prevotes: make(map[uint64]map[uint32]tmconsensus.SparseSignatureCollection), precommits: make(map[uint64]map[uint32]tmconsensus.SparseSignatureCollection), + + replayedHeaders: make(map[uint64][]tmconsensus.Header), } } @@ -66,6 +72,26 @@ func (s *RoundStore) SaveRoundProposedHeader(ctx context.Context, ph tmconsensus return nil } +func (s *RoundStore) SaveRoundReplayedHeader(ctx context.Context, h tmconsensus.Header) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Check if an existing proposed header has the same hash. + if byRound, ok := s.phs[h.Height]; ok { + for _, byHash := range byRound { + if _, ok := byHash[string(h.Hash)]; ok { + return tmstore.OverwriteError{ + Field: "hash", + Value: fmt.Sprintf("%x", h.Hash), + } + } + } + } + + s.replayedHeaders[h.Height] = append(s.replayedHeaders[h.Height], h) + return nil +} + func (s *RoundStore) OverwriteRoundPrevoteProofs( ctx context.Context, height uint64, @@ -125,8 +151,24 @@ func (s *RoundStore) LoadRoundState(ctx context.Context, height uint64, round ui prevotes = prevoteHeightMap[round] } + replayedHeaders := s.replayedHeaders[height] if precommitHeightMap, ok := s.precommits[height]; ok { precommits = precommitHeightMap[round] + + for hash := range precommits.BlockSignatures { + if hash == "" { + continue + } + + // For each non-nil precommit hash, + // check if it is for a replayed header; + // if so, add the replayed header to the proposed header list. + for _, rh := range replayedHeaders { + if hash == string(rh.Hash) { + phs = append(phs, tmconsensus.ProposedHeader{Header: rh}) + } + } + } } if phs == nil && prevotes.BlockSignatures == nil && precommits.BlockSignatures == nil { diff --git a/tm/tmstore/tmstoretest/roundstorecompliance.go b/tm/tmstore/tmstoretest/roundstorecompliance.go index 3fec4c9..f6d9e0e 100644 --- a/tm/tmstore/tmstoretest/roundstorecompliance.go +++ b/tm/tmstore/tmstoretest/roundstorecompliance.go @@ -107,7 +107,7 @@ func TestRoundStoreCompliance(t *testing.T, f RoundStoreFactory) { require.Equal(t, want, phs) }) - t.Run("overwriting an existing proposed block", func(t *testing.T) { + t.Run("overwriting an existing proposed block is disallowed", func(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) @@ -137,6 +137,69 @@ func TestRoundStoreCompliance(t *testing.T, f RoundStoreFactory) { }) }) + t.Run("replayed headers", func(t *testing.T) { + t.Run("returned in LoadRoundState", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s, err := f(t.Cleanup) + require.NoError(t, err) + + fx := tmconsensustest.NewStandardFixture(2) + + ph1 := fx.NextProposedHeader([]byte("val0"), 0) + require.Empty(t, ph1.Header.PrevCommitProof.Proofs) + ph1.Header.PrevCommitProof.Proofs = nil + + require.NoError(t, s.SaveRoundReplayedHeader(ctx, ph1.Header)) + + // In normal flow when the mirror is handling a replayed header, + // it saves the replayed header and then writes the precommit proof. + // If that gets interrupted after saving the header but before writing the precommit proof, + // there isn't really a way for us to know it's part of this particular round, + // so we don't assert about the header until the precommit proofs are saved. + voteMap := map[string][]int{ + string(ph1.Header.Hash): {0, 1}, + } + + precommits := fx.SparsePrecommitSignatureCollection(ctx, 1, 0, voteMap) + require.NoError(t, s.OverwriteRoundPrecommitProofs(ctx, 1, 0, precommits)) + + phs, _, _, err := s.LoadRoundState(ctx, 1, 0) + require.NoError(t, err) + require.Equal(t, []tmconsensus.ProposedHeader{ + { + Header: ph1.Header, + }, + }, phs) + }) + + t.Run("returns tmstore.OverwriteError when proposed header already exists", func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s, err := f(t.Cleanup) + require.NoError(t, err) + + fx := tmconsensustest.NewStandardFixture(2) + + ph1 := fx.NextProposedHeader([]byte("val0"), 0) + fx.SignProposal(ctx, &ph1, 0) + require.NoError(t, s.SaveRoundProposedHeader(ctx, ph1)) + + err = s.SaveRoundReplayedHeader(ctx, ph1.Header) + require.Error(t, err) + require.ErrorIs(t, err, tmstore.OverwriteError{ + Field: "hash", + Value: fmt.Sprintf("%x", ph1.Header.Hash), + }) + }) + }) + for _, tc := range []struct { typ string diff --git a/tmsqlite/migrate.go b/tmsqlite/migrate.go index 4d9715f..78a6f85 100644 --- a/tmsqlite/migrate.go +++ b/tmsqlite/migrate.go @@ -190,7 +190,7 @@ CREATE TABLE finalizations( ` CREATE TABLE headers( id INTEGER PRIMARY KEY NOT NULL, - hash BLOB NOT NULL, + hash BLOB NOT NULL UNIQUE, prev_block_hash BLOB, height INTEGER NOT NULL CHECK (height >= 0), prev_commit_proof_id INTEGER, diff --git a/tmsqlite/tmstore.go b/tmsqlite/tmstore.go index 31c6a89..e77505f 100644 --- a/tmsqlite/tmstore.go +++ b/tmsqlite/tmstore.go @@ -990,6 +990,27 @@ $committed)`, sql.Named("committed", committed), ) if err != nil { + if isUniqueConstraintError(err) { + // Special case for compliance. + // There is only one unique constraint on the headers table. + overwriteErr := tmstore.OverwriteError{ + Field: "hash", + Value: fmt.Sprintf("%x", h.Hash), + } + + // But, we still need to get the underlying ID. + var id int64 + if err := tx.QueryRowContext( + ctx, + `SELECT id FROM headers WHERE hash = ?`, + h.Hash, + ).Scan(&id); err != nil { + return -1, fmt.Errorf("failed to fall back to ID retrieval for header: %w", err) + } + + return id, overwriteErr + } + return -1, fmt.Errorf("failed to store header: %w", err) } @@ -1486,7 +1507,8 @@ func (s *TMStore) SaveProposedHeaderAction(ctx context.Context, ph tmconsensus.P defer tx.Rollback() headerID, err := s.createHeaderInTx(ctx, tx, ph.Header, false) - if err != nil { + if err != nil && !errors.As(err, new(tmstore.OverwriteError)) { + // We ignore the attempted overwrite and proceed with the proposed header insertion. return fmt.Errorf("failed to create header: %w", err) } @@ -1860,7 +1882,8 @@ func (s *TMStore) SaveRoundProposedHeader(ctx context.Context, ph tmconsensus.Pr defer tx.Rollback() headerID, err := s.createHeaderInTx(ctx, tx, ph.Header, false) - if err != nil { + if err != nil && !errors.As(err, new(tmstore.OverwriteError)) { + // We ignore the attempted overwrite and proceed with the proposed header insertion. return fmt.Errorf("failed to create header: %w", err) } @@ -1904,6 +1927,26 @@ proposer_pub_key_id return nil } +func (s *TMStore) SaveRoundReplayedHeader(ctx context.Context, h tmconsensus.Header) error { + defer trace.StartRegion(ctx, "SaveRoundReplayedHeader").End() + + tx, err := s.rw.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("failed to begin transaction: %w", err) + } + defer tx.Rollback() + + if _, err := s.createHeaderInTx(ctx, tx, h, false); err != nil { + return fmt.Errorf("failed to save replayed header: %w", err) + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + return nil +} + func (s *TMStore) OverwriteRoundPrevoteProofs( ctx context.Context, height uint64, @@ -2095,9 +2138,9 @@ vpks.hash_id, vpks.hash, vpks.idx, vpks.key, vpks.n_keys FROM validator_pub_keys_for_hash AS vpks JOIN headers ON (headers.validators_pub_key_hash_id = vpks.hash_id OR headers.next_validators_pub_key_hash_id = vpks.hash_id) -JOIN proposed_headers ON +LEFT JOIN proposed_headers ON proposed_headers.header_id = headers.id -WHERE proposed_headers.height = ? AND proposed_headers.round = ?`, +WHERE headers.height = ?1 OR (proposed_headers.height = ?1 AND proposed_headers.round = ?2)`, height, round, ) if err != nil { @@ -2157,9 +2200,9 @@ WHERE proposed_headers.height = ? AND proposed_headers.round = ?`, FROM validator_powers_for_hash AS vps JOIN headers ON (headers.validators_power_hash_id = vps.hash_id OR headers.next_validators_power_hash_id = vps.hash_id) -JOIN proposed_headers ON +LEFT JOIN proposed_headers ON proposed_headers.header_id = headers.id -WHERE proposed_headers.height = ? AND proposed_headers.round = ?`, +WHERE headers.height = ?1 OR (proposed_headers.height = ?1 AND proposed_headers.round = ?2)`, height, round, ) if err != nil { @@ -2207,7 +2250,7 @@ WHERE proposed_headers.height = ? AND proposed_headers.round = ?`, FROM proof_signatures AS ps JOIN headers AS hs ON hs.prev_commit_proof_id = ps.commit_proof_id JOIN proposed_headers AS phs ON phs.header_id = hs.id -WHERE phs.height = ? AND phs.round = ? +WHERE hs.height = ?1 OR (phs.height = ?1 AND phs.round = ?2) ORDER BY ps.key_id`, height, round, ) @@ -2278,7 +2321,28 @@ JOIN headers AS hs ON hs.id = phs.header_id JOIN validator_pub_keys AS keys ON keys.id = phs.proposer_pub_key_id LEFT JOIN commit_proofs AS pcp ON pcp.id = hs.prev_commit_proof_id LEFT JOIN validator_pub_key_hashes AS pcphash ON pcphash.id = pcp.validators_pub_key_hash_id -WHERE phs.height = ? AND phs.round = ?`, +WHERE phs.height = ?1 AND phs.round = ?2 +UNION ALL +SELECT +NULL, +NULL, NULL, +NULL, +hs.height, hs.hash, hs.prev_block_hash, +hs.prev_commit_proof_id, +pcp.round, +pcphash.hash, +hs.validators_pub_key_hash_id, hs.next_validators_pub_key_hash_id, +hs.validators_power_hash_id, hs.next_validators_power_hash_id, +hs.data_id, +hs.prev_app_state_hash, +hs.user_annotations, hs.driver_annotations +FROM headers AS hs +LEFT JOIN commit_proofs AS pcp ON pcp.id = hs.prev_commit_proof_id +LEFT JOIN validator_pub_key_hashes AS pcphash ON pcphash.id = pcp.validators_pub_key_hash_id +WHERE hs.height = ?1 AND hs.id NOT IN ( + SELECT header_id FROM proposed_headers WHERE height = ?1 AND round = ?2 +) +`, height, round, ) if err != nil { @@ -2316,13 +2380,15 @@ WHERE phs.height = ? AND phs.round = ?`, "failed to scan proposed headers row: %w", err, ) } - key, err := s.reg.Unmarshal(encKey) - if err != nil { - return nil, prevotes, precommits, fmt.Errorf( - "failed to unmarshal key from proposed headers: %w", err, - ) + if len(encKey) > 0 { + key, err := s.reg.Unmarshal(encKey) + if err != nil { + return nil, prevotes, precommits, fmt.Errorf( + "failed to unmarshal key from proposed headers: %w", err, + ) + } + ph.ProposerPubKey = key } - ph.ProposerPubKey = key keys := pubKeySets[vkID] pows := powerSets[vpID]