Skip to content

Commit

Permalink
add GetBeforeUntilSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
anjor committed Jan 9, 2025
1 parent 29af2e8 commit 8b3fe05
Showing 1 changed file with 99 additions and 0 deletions.
99 changes: 99 additions & 0 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/slottools"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -200,3 +201,101 @@ epochLoop:
}
return transactions, nil
}

func (multi *GsfaReaderMultiepoch) GetBeforeUntilSlot(
ctx context.Context,
pk solana.PublicKey,
limit int,
before uint64, // Before this slot, exclusive (i.e. get slots older than this slot, excluding it).
until uint64, // Until this slot, inclusive (i.e. stop at this slot, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
}
return multi.iterBeforeUntilSlot(ctx, pk, limit, before, until, fetcher)
}

// GetBeforeUntilSlot gets the signatures for the given public key,
// before the given slot.
func (multi *GsfaReaderMultiepoch) iterBeforeUntilSlot(
ctx context.Context,
pk solana.PublicKey,
limit int,
before uint64, // Before this slot, exclusive (i.e. get slots older than this slot, excluding it).
until uint64, // Until this slot, inclusive (i.e. stop at this slot, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 || before < until {
return make(EpochToTransactionObjects), nil
}

transactions := make(EpochToTransactionObjects)
beforeEpoch := slottools.CalcEpochForSlot(before)

epochLoop:
for readerIndex, index := range multi.epochs {
if ctx.Err() != nil {
return nil, ctx.Err()
}
epochNum, ok := index.GetEpoch()
if !ok {
return nil, fmt.Errorf("epoch is not set for the #%d provided gsfa reader", readerIndex)
}
if epochNum > beforeEpoch {
continue epochLoop
}

locsStartedAt := time.Now()
locs, err := index.offsets.Get(pk)
if err != nil {
if compactindexsized.IsNotFound(err) {
continue epochLoop
}
return nil, fmt.Errorf("error while getting initial offset: %w", err)
}
klog.V(5).Infof("locs.OffsetToFirst took %s", time.Since(locsStartedAt))
debugln("locs.OffsetToFirst:", locs)

next := locs // Start from the latest, and go back in time.

for {
if next == nil || next.IsZero() { // no previous.
continue epochLoop
}
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
startedReadAt := time.Now()
locations, newNext, err := index.ll.ReadWithSize(next.Offset, next.Size)
if err != nil {
return nil, fmt.Errorf("error while reading linked log with next=%v: %w", next, err)
}
klog.V(5).Infof("ReadWithSize took %s to get %d locs", time.Since(startedReadAt), len(locations))
if len(locations) == 0 {
continue epochLoop
}
debugln("sigIndexes:", locations, "newNext:", newNext)
next = &newNext
for locIndex, txLoc := range locations {
tx, err := fetcher(epochNum, txLoc)
if err != nil {
return nil, fmt.Errorf("error while getting signature at index=%v: %w", txLoc, err)
}
sig, err := tx.Signature()
if err != nil {
return nil, fmt.Errorf("error while getting signature: %w", err)
}
klog.V(5).Infoln(locIndex, "sig:", sig, "epoch:", epochNum)
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
transactions[epochNum] = append(transactions[epochNum], tx)
if tx.Slot < int(until) {
break epochLoop
}
}
}
}
return transactions, nil
}

0 comments on commit 8b3fe05

Please sign in to comment.