Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add GetBeforeUntilSlot #216

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/slottools"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -200,3 +201,101 @@ epochLoop:
}
return transactions, nil
}

func (multi *GsfaReaderMultiepoch) GetBeforeUntilSlot(
ctx context.Context,
pk solana.PublicKey,
limit int,
before uint64, // Before this slot, exclusive (i.e. get slots older than this slot, excluding it).
until uint64, // Until this slot, inclusive (i.e. stop at this slot, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
}
return multi.iterBeforeUntilSlot(ctx, pk, limit, before, until, fetcher)
}

// GetBeforeUntilSlot gets the signatures for the given public key,
// before the given slot.
func (multi *GsfaReaderMultiepoch) iterBeforeUntilSlot(
ctx context.Context,
pk solana.PublicKey,
limit int,
before uint64, // Before this slot, exclusive (i.e. get slots older than this slot, excluding it).
until uint64, // Until this slot, inclusive (i.e. stop at this slot, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 || before < until {
return make(EpochToTransactionObjects), nil
}

transactions := make(EpochToTransactionObjects)
beforeEpoch := slottools.CalcEpochForSlot(before)

epochLoop:
for readerIndex, index := range multi.epochs {
if ctx.Err() != nil {
return nil, ctx.Err()
}
epochNum, ok := index.GetEpoch()
if !ok {
return nil, fmt.Errorf("epoch is not set for the #%d provided gsfa reader", readerIndex)
}
if epochNum > beforeEpoch {
continue epochLoop
}

locsStartedAt := time.Now()
locs, err := index.offsets.Get(pk)
if err != nil {
if compactindexsized.IsNotFound(err) {
continue epochLoop
}
return nil, fmt.Errorf("error while getting initial offset: %w", err)
}
klog.V(5).Infof("locs.OffsetToFirst took %s", time.Since(locsStartedAt))
debugln("locs.OffsetToFirst:", locs)

next := locs // Start from the latest, and go back in time.

for {
if next == nil || next.IsZero() { // no previous.
continue epochLoop
}
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
startedReadAt := time.Now()
locations, newNext, err := index.ll.ReadWithSize(next.Offset, next.Size)
if err != nil {
return nil, fmt.Errorf("error while reading linked log with next=%v: %w", next, err)
}
klog.V(5).Infof("ReadWithSize took %s to get %d locs", time.Since(startedReadAt), len(locations))
if len(locations) == 0 {
continue epochLoop
}
debugln("sigIndexes:", locations, "newNext:", newNext)
next = &newNext
for locIndex, txLoc := range locations {
tx, err := fetcher(epochNum, txLoc)
if err != nil {
return nil, fmt.Errorf("error while getting signature at index=%v: %w", txLoc, err)
}
if tx.Slot < int(until) {
break epochLoop
}
sig, err := tx.Signature()
if err != nil {
return nil, fmt.Errorf("error while getting signature: %w", err)
}
klog.V(5).Infoln(locIndex, "sig:", sig, "epoch:", epochNum)
if limit > 0 && transactions.Count() >= limit {
break epochLoop
}
transactions[epochNum] = append(transactions[epochNum], tx)
}
}
}
return transactions, nil
}
Loading