Skip to content

Commit

Permalink
Merge pull request hyperledger#123 from hyperledger/block-listener
Browse files Browse the repository at this point in the history
FFTM new listener type for block notifications and receipt decoding functions
  • Loading branch information
peterbroadhurst authored Jun 14, 2024
2 parents 32d8a0c + 29b880d commit 1ac00b5
Show file tree
Hide file tree
Showing 49 changed files with 2,379 additions and 273 deletions.
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ linters-settings:
values:
regexp:
COMPANY: .*
YEAR_FUZZY: '\d\d\d\d(,\d\d\d\d)?'
template: |-
Copyright © {{ YEAR }} {{ COMPANY }}
Copyright © {{ YEAR_FUZZY }} {{ COMPANY }}
SPDX-License-Identifier: Apache-2.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
DROP INDEX IF EXISTS transactions_status;
BEGIN;
DROP INDEX IF EXISTS transactions_status;
COMMIT;
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
CREATE INDEX transactions_status ON transactions(status);
BEGIN;
CREATE INDEX transactions_status ON transactions(status);
COMMIT;
5 changes: 5 additions & 0 deletions db/migrations/postgres/000010_add_listeners_type.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;

ALTER TABLE listeners DROP COLUMN "type";

COMMIT;
7 changes: 7 additions & 0 deletions db/migrations/postgres/000010_add_listeners_type.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
BEGIN;

ALTER TABLE listeners ADD COLUMN "type" VARCHAR(64);
UPDATE listeners SET "type" = 'events';
ALTER TABLE listeners ALTER COLUMN "type" SET NOT NULL;

COMMIT;
41 changes: 38 additions & 3 deletions internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Manager interface {
Stop()
NewBlockHashes() chan<- *ffcapi.BlockHashEvent
CheckInFlight(listenerID *fftypes.UUID) bool
StartConfirmedBlockListener(ctx context.Context, id *fftypes.UUID, fromBlock string, checkpoint *ffcapi.BlockListenerCheckpoint, eventStream chan<- *ffcapi.ListenerEvent) error
StopConfirmedBlockListener(ctx context.Context, id *fftypes.UUID) error
}

type NotificationType string
Expand Down Expand Up @@ -99,6 +101,8 @@ type blockConfirmationManager struct {
pendingMux sync.Mutex
receiptChecker *receiptChecker
retry *retry.Retry
cblLock sync.Mutex
cbls map[fftypes.UUID]*confirmedBlockListener
fetchReceiptUponEntry bool
done chan struct{}
}
Expand All @@ -108,6 +112,7 @@ func NewBlockConfirmationManager(baseContext context.Context, connector ffcapi.A
bcm := &blockConfirmationManager{
baseContext: baseContext,
connector: connector,
cbls: make(map[fftypes.UUID]*confirmedBlockListener),
blockListenerStale: true,
requiredConfirmations: config.GetInt(tmconfig.ConfirmationsRequired),
staleReceiptTimeout: config.GetDuration(tmconfig.ConfirmationsStaleReceiptTimeout),
Expand Down Expand Up @@ -233,6 +238,9 @@ func (bcm *blockConfirmationManager) Stop() {
// Reset context ready for restart
bcm.ctx, bcm.cancelFunc = context.WithCancel(bcm.baseContext)
}
for _, cbl := range bcm.copyCBLsList() {
_ = bcm.StopConfirmedBlockListener(bcm.ctx, cbl.id)
}
}

func (bcm *blockConfirmationManager) NewBlockHashes() chan<- *ffcapi.BlockHashEvent {
Expand Down Expand Up @@ -301,9 +309,10 @@ func (bcm *blockConfirmationManager) getBlockByHash(blockHash string) (*apitypes
return blockInfo, nil
}

func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, expectedParentHash string) (*apitypes.BlockInfo, error) {
func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, allowCache bool, expectedParentHash string) (*apitypes.BlockInfo, error) {
res, reason, err := bcm.connector.BlockInfoByNumber(bcm.ctx, &ffcapi.BlockInfoByNumberRequest{
BlockNumber: fftypes.NewFFBigInt(int64(blockNumber)),
AllowCache: allowCache,
ExpectedParentHash: expectedParentHash,
})
if err != nil {
Expand All @@ -326,6 +335,27 @@ func transformBlockInfo(res *ffcapi.BlockInfo) *apitypes.BlockInfo {
}
}

func (bcm *blockConfirmationManager) copyCBLsList() []*confirmedBlockListener {
bcm.cblLock.Lock()
defer bcm.cblLock.Unlock()
cbls := make([]*confirmedBlockListener, 0, len(bcm.cbls))
for _, cbl := range bcm.cbls {
cbls = append(cbls, cbl)
}
return cbls
}

func (bcm *blockConfirmationManager) propagateBlockHashToCBLs(bhe *ffcapi.BlockHashEvent) {
bcm.cblLock.Lock()
defer bcm.cblLock.Unlock()
for _, cbl := range bcm.cbls {
select {
case cbl.newBlockHashes <- bhe:
case <-cbl.processorDone:
}
}
}

func (bcm *blockConfirmationManager) confirmationsListener() {
defer close(bcm.done)
notifications := make([]*Notification, 0)
Expand All @@ -340,6 +370,11 @@ func (bcm *blockConfirmationManager) confirmationsListener() {
}
blockHashes = append(blockHashes, bhe.BlockHashes...)

// Need to also pass this event to any confirmed block listeners
// (they promise to always be efficient in handling these, having a go-routine
// dedicated to spinning fast just processing those separate to dispatching them)
bcm.propagateBlockHashToCBLs(bhe)

if bhe.Created != nil {
for i := 0; i < len(bhe.BlockHashes); i++ {
bcm.metricsEmitter.RecordBlockHashQueueingMetrics(bcm.ctx, time.Since(*bhe.Created.Time()).Seconds())
Expand Down Expand Up @@ -371,7 +406,7 @@ func (bcm *blockConfirmationManager) confirmationsListener() {

if bcm.blockListenerStale {
if err := bcm.walkChain(blocks); err != nil {
log.L(bcm.ctx).Errorf("Failed to create walk chain after restoring blockListener: %s", err)
log.L(bcm.ctx).Errorf("Failed to walk chain after restoring blockListener: %s", err)
continue
}
bcm.blockListenerStale = false
Expand Down Expand Up @@ -704,7 +739,7 @@ func (bs *blockState) getByNumber(blockNumber uint64, expectedParentHash string)
if block != nil {
return block, nil
}
block, err := bs.bcm.getBlockByNumber(blockNumber, expectedParentHash)
block, err := bs.bcm.getBlockByNumber(blockNumber, true, expectedParentHash)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1ac00b5

Please sign in to comment.