Skip to content

Commit

Permalink
chore: add replayed header concept to round store
Browse files Browse the repository at this point in the history
The replayed headers act a little like plain proposed headers, but they
are different enough that it seems valid to add them to the round store
as their own methods.

Tests are still passing with gcosmos using tmmemstore. I think there are
more edge cases to shake out with tmsqlite.
  • Loading branch information
mark-rushakoff committed Oct 9, 2024
1 parent 6f49e35 commit 7d280b7
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 24 deletions.
2 changes: 1 addition & 1 deletion tm/tmengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func TestEngine_plumbing_ReplayedHeaders(t *testing.T) {
require.Equal(t, []tmconsensus.ProposedHeader{
{
Header: ph1.Header,
Round: 1,
Round: 0, // Replayed headers always appear as round zero.
// PubKey and Signature missing from round store during replay.
},
}, phs)
Expand Down
6 changes: 2 additions & 4 deletions tm/tmengine/internal/tmmirror/internal/tmi/kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,8 +1693,6 @@ func (k *Kernel) handleReplayedHeader(
tempProofs[hash] = haveProof

// Now merge the incoming proof with the local copy.
// TODO: this should not be hardcoded to pcp,
// but it's okay for now while we limit this to only the header's block hash.
mergeRes := haveProof.MergeSparse(gcrypto.SparseSignatureProof{
PubKeyHash: string(header.ValidatorSet.PubKeyHash),
Signatures: sparseSigs,
Expand Down Expand Up @@ -1736,10 +1734,10 @@ func (k *Kernel) handleReplayedHeader(
// That is fine, as noted in the documentation for the RoundStore.
}

if err := k.rStore.SaveRoundProposedHeader(ctx, fakePH); err != nil {
if err := k.rStore.SaveRoundReplayedHeader(ctx, header); err != nil {
return tmelink.ReplayedHeaderInternalError{
Err: fmt.Errorf(
"failed to replay saved header to round store: %w",
"failed to save replayed header to round store: %w",
err,
),
}
Expand Down
12 changes: 9 additions & 3 deletions tm/tmstore/roundstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ type RoundStore interface {
// as a candidate proposed header in the given height and round.
SaveRoundProposedHeader(ctx context.Context, ph tmconsensus.ProposedHeader) error

// SaveRoundReplayedHeader saves the header as one
// that is about to be committed in the given height,
// due to mirror catchup.
//
// In the normal mirror flow, the replayed header is saved,
// and then OverwriteRoundPrecommitProofs is called.
SaveRoundReplayedHeader(ctx context.Context, h tmconsensus.Header) error

// The overwrite proofs methods overwrite existing entries
// for the corresponding proof at the given height and round.
// TODO: these methods should both accept sparse proofs,
Expand All @@ -36,14 +44,12 @@ type RoundStore interface {
// and may differ from one call to another.
//
// Note that in the event of replayed blocks during a mirror catchup,
// there may be ProposedHeader values without a PubKey or Signature field.
// there may be ProposedHeader values without its PubKey, Signature, or Annotations fields set.
//
// If there are no proposed blocks or votes at the given height and round,
// [tmconsensus.RoundUnknownError] is returned.
// If at least one proposed block, prevote, or precommit exists at the height and round,
// a nil error is returned.
// TODO: this should return sparse proofs,
// so that the store can remain agnostic of signature proof schemes.
LoadRoundState(ctx context.Context, height uint64, round uint32) (
phs []tmconsensus.ProposedHeader,
prevotes, precommits tmconsensus.SparseSignatureCollection,
Expand Down
42 changes: 42 additions & 0 deletions tm/tmstore/tmmemstore/roundstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ type RoundStore struct {

// Height -> Round -> collection.
prevotes, precommits map[uint64]map[uint32]tmconsensus.SparseSignatureCollection

// Height -> collection.
// There really should never be more than one replayed header in a height, though.
replayedHeaders map[uint64][]tmconsensus.Header
}

func NewRoundStore() *RoundStore {
Expand All @@ -29,6 +33,8 @@ func NewRoundStore() *RoundStore {

prevotes: make(map[uint64]map[uint32]tmconsensus.SparseSignatureCollection),
precommits: make(map[uint64]map[uint32]tmconsensus.SparseSignatureCollection),

replayedHeaders: make(map[uint64][]tmconsensus.Header),
}
}

Expand Down Expand Up @@ -66,6 +72,26 @@ func (s *RoundStore) SaveRoundProposedHeader(ctx context.Context, ph tmconsensus
return nil
}

func (s *RoundStore) SaveRoundReplayedHeader(ctx context.Context, h tmconsensus.Header) error {
s.mu.Lock()
defer s.mu.Unlock()

// Check if an existing proposed header has the same hash.
if byRound, ok := s.phs[h.Height]; ok {
for _, byHash := range byRound {
if _, ok := byHash[string(h.Hash)]; ok {
return tmstore.OverwriteError{
Field: "hash",
Value: fmt.Sprintf("%x", h.Hash),
}
}
}
}

s.replayedHeaders[h.Height] = append(s.replayedHeaders[h.Height], h)
return nil
}

func (s *RoundStore) OverwriteRoundPrevoteProofs(
ctx context.Context,
height uint64,
Expand Down Expand Up @@ -125,8 +151,24 @@ func (s *RoundStore) LoadRoundState(ctx context.Context, height uint64, round ui
prevotes = prevoteHeightMap[round]
}

replayedHeaders := s.replayedHeaders[height]
if precommitHeightMap, ok := s.precommits[height]; ok {
precommits = precommitHeightMap[round]

for hash := range precommits.BlockSignatures {
if hash == "" {
continue
}

// For each non-nil precommit hash,
// check if it is for a replayed header;
// if so, add the replayed header to the proposed header list.
for _, rh := range replayedHeaders {
if hash == string(rh.Hash) {
phs = append(phs, tmconsensus.ProposedHeader{Header: rh})
}
}
}
}

if phs == nil && prevotes.BlockSignatures == nil && precommits.BlockSignatures == nil {
Expand Down
65 changes: 64 additions & 1 deletion tm/tmstore/tmstoretest/roundstorecompliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestRoundStoreCompliance(t *testing.T, f RoundStoreFactory) {
require.Equal(t, want, phs)
})

t.Run("overwriting an existing proposed block", func(t *testing.T) {
t.Run("overwriting an existing proposed block is disallowed", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -137,6 +137,69 @@ func TestRoundStoreCompliance(t *testing.T, f RoundStoreFactory) {
})
})

t.Run("replayed headers", func(t *testing.T) {
t.Run("returned in LoadRoundState", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s, err := f(t.Cleanup)
require.NoError(t, err)

fx := tmconsensustest.NewStandardFixture(2)

ph1 := fx.NextProposedHeader([]byte("val0"), 0)
require.Empty(t, ph1.Header.PrevCommitProof.Proofs)
ph1.Header.PrevCommitProof.Proofs = nil

require.NoError(t, s.SaveRoundReplayedHeader(ctx, ph1.Header))

// In normal flow when the mirror is handling a replayed header,
// it saves the replayed header and then writes the precommit proof.
// If that gets interrupted after saving the header but before writing the precommit proof,
// there isn't really a way for us to know it's part of this particular round,
// so we don't assert about the header until the precommit proofs are saved.
voteMap := map[string][]int{
string(ph1.Header.Hash): {0, 1},
}

precommits := fx.SparsePrecommitSignatureCollection(ctx, 1, 0, voteMap)
require.NoError(t, s.OverwriteRoundPrecommitProofs(ctx, 1, 0, precommits))

phs, _, _, err := s.LoadRoundState(ctx, 1, 0)
require.NoError(t, err)
require.Equal(t, []tmconsensus.ProposedHeader{
{
Header: ph1.Header,
},
}, phs)
})

t.Run("returns tmstore.OverwriteError when proposed header already exists", func(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s, err := f(t.Cleanup)
require.NoError(t, err)

fx := tmconsensustest.NewStandardFixture(2)

ph1 := fx.NextProposedHeader([]byte("val0"), 0)
fx.SignProposal(ctx, &ph1, 0)
require.NoError(t, s.SaveRoundProposedHeader(ctx, ph1))

err = s.SaveRoundReplayedHeader(ctx, ph1.Header)
require.Error(t, err)
require.ErrorIs(t, err, tmstore.OverwriteError{
Field: "hash",
Value: fmt.Sprintf("%x", ph1.Header.Hash),
})
})
})

for _, tc := range []struct {
typ string

Expand Down
2 changes: 1 addition & 1 deletion tmsqlite/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ CREATE TABLE finalizations(
`
CREATE TABLE headers(
id INTEGER PRIMARY KEY NOT NULL,
hash BLOB NOT NULL,
hash BLOB NOT NULL UNIQUE,
prev_block_hash BLOB,
height INTEGER NOT NULL CHECK (height >= 0),
prev_commit_proof_id INTEGER,
Expand Down
94 changes: 80 additions & 14 deletions tmsqlite/tmstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,27 @@ $committed)`,
sql.Named("committed", committed),
)
if err != nil {
if isUniqueConstraintError(err) {
// Special case for compliance.
// There is only one unique constraint on the headers table.
overwriteErr := tmstore.OverwriteError{
Field: "hash",
Value: fmt.Sprintf("%x", h.Hash),
}

// But, we still need to get the underlying ID.
var id int64
if err := tx.QueryRowContext(
ctx,
`SELECT id FROM headers WHERE hash = ?`,
h.Hash,
).Scan(&id); err != nil {
return -1, fmt.Errorf("failed to fall back to ID retrieval for header: %w", err)
}

return id, overwriteErr
}

return -1, fmt.Errorf("failed to store header: %w", err)
}

Expand Down Expand Up @@ -1486,7 +1507,8 @@ func (s *TMStore) SaveProposedHeaderAction(ctx context.Context, ph tmconsensus.P
defer tx.Rollback()

headerID, err := s.createHeaderInTx(ctx, tx, ph.Header, false)
if err != nil {
if err != nil && !errors.As(err, new(tmstore.OverwriteError)) {
// We ignore the attempted overwrite and proceed with the proposed header insertion.
return fmt.Errorf("failed to create header: %w", err)
}

Expand Down Expand Up @@ -1860,7 +1882,8 @@ func (s *TMStore) SaveRoundProposedHeader(ctx context.Context, ph tmconsensus.Pr
defer tx.Rollback()

headerID, err := s.createHeaderInTx(ctx, tx, ph.Header, false)
if err != nil {
if err != nil && !errors.As(err, new(tmstore.OverwriteError)) {
// We ignore the attempted overwrite and proceed with the proposed header insertion.
return fmt.Errorf("failed to create header: %w", err)
}

Expand Down Expand Up @@ -1904,6 +1927,26 @@ proposer_pub_key_id
return nil
}

func (s *TMStore) SaveRoundReplayedHeader(ctx context.Context, h tmconsensus.Header) error {
defer trace.StartRegion(ctx, "SaveRoundReplayedHeader").End()

tx, err := s.rw.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to begin transaction: %w", err)
}
defer tx.Rollback()

if _, err := s.createHeaderInTx(ctx, tx, h, false); err != nil {
return fmt.Errorf("failed to save replayed header: %w", err)
}

if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}

return nil
}

func (s *TMStore) OverwriteRoundPrevoteProofs(
ctx context.Context,
height uint64,
Expand Down Expand Up @@ -2095,9 +2138,9 @@ vpks.hash_id, vpks.hash, vpks.idx, vpks.key, vpks.n_keys
FROM validator_pub_keys_for_hash AS vpks
JOIN headers ON
(headers.validators_pub_key_hash_id = vpks.hash_id OR headers.next_validators_pub_key_hash_id = vpks.hash_id)
JOIN proposed_headers ON
LEFT JOIN proposed_headers ON
proposed_headers.header_id = headers.id
WHERE proposed_headers.height = ? AND proposed_headers.round = ?`,
WHERE headers.height = ?1 OR (proposed_headers.height = ?1 AND proposed_headers.round = ?2)`,
height, round,
)
if err != nil {
Expand Down Expand Up @@ -2157,9 +2200,9 @@ WHERE proposed_headers.height = ? AND proposed_headers.round = ?`,
FROM validator_powers_for_hash AS vps
JOIN headers ON
(headers.validators_power_hash_id = vps.hash_id OR headers.next_validators_power_hash_id = vps.hash_id)
JOIN proposed_headers ON
LEFT JOIN proposed_headers ON
proposed_headers.header_id = headers.id
WHERE proposed_headers.height = ? AND proposed_headers.round = ?`,
WHERE headers.height = ?1 OR (proposed_headers.height = ?1 AND proposed_headers.round = ?2)`,
height, round,
)
if err != nil {
Expand Down Expand Up @@ -2207,7 +2250,7 @@ WHERE proposed_headers.height = ? AND proposed_headers.round = ?`,
FROM proof_signatures AS ps
JOIN headers AS hs ON hs.prev_commit_proof_id = ps.commit_proof_id
JOIN proposed_headers AS phs ON phs.header_id = hs.id
WHERE phs.height = ? AND phs.round = ?
WHERE hs.height = ?1 OR (phs.height = ?1 AND phs.round = ?2)
ORDER BY ps.key_id`,
height, round,
)
Expand Down Expand Up @@ -2278,7 +2321,28 @@ JOIN headers AS hs ON hs.id = phs.header_id
JOIN validator_pub_keys AS keys ON keys.id = phs.proposer_pub_key_id
LEFT JOIN commit_proofs AS pcp ON pcp.id = hs.prev_commit_proof_id
LEFT JOIN validator_pub_key_hashes AS pcphash ON pcphash.id = pcp.validators_pub_key_hash_id
WHERE phs.height = ? AND phs.round = ?`,
WHERE phs.height = ?1 AND phs.round = ?2
UNION ALL
SELECT
NULL,
NULL, NULL,
NULL,
hs.height, hs.hash, hs.prev_block_hash,
hs.prev_commit_proof_id,
pcp.round,
pcphash.hash,
hs.validators_pub_key_hash_id, hs.next_validators_pub_key_hash_id,
hs.validators_power_hash_id, hs.next_validators_power_hash_id,
hs.data_id,
hs.prev_app_state_hash,
hs.user_annotations, hs.driver_annotations
FROM headers AS hs
LEFT JOIN commit_proofs AS pcp ON pcp.id = hs.prev_commit_proof_id
LEFT JOIN validator_pub_key_hashes AS pcphash ON pcphash.id = pcp.validators_pub_key_hash_id
WHERE hs.height = ?1 AND hs.id NOT IN (
SELECT header_id FROM proposed_headers WHERE height = ?1 AND round = ?2
)
`,
height, round,
)
if err != nil {
Expand Down Expand Up @@ -2316,13 +2380,15 @@ WHERE phs.height = ? AND phs.round = ?`,
"failed to scan proposed headers row: %w", err,
)
}
key, err := s.reg.Unmarshal(encKey)
if err != nil {
return nil, prevotes, precommits, fmt.Errorf(
"failed to unmarshal key from proposed headers: %w", err,
)
if len(encKey) > 0 {
key, err := s.reg.Unmarshal(encKey)
if err != nil {
return nil, prevotes, precommits, fmt.Errorf(
"failed to unmarshal key from proposed headers: %w", err,
)
}
ph.ProposerPubKey = key
}
ph.ProposerPubKey = key

keys := pubKeySets[vkID]
pows := powerSets[vpID]
Expand Down

0 comments on commit 7d280b7

Please sign in to comment.