diff --git a/gsfa/gsfa-read-multiepoch.go b/gsfa/gsfa-read-multiepoch.go index 6646136a..c1218068 100644 --- a/gsfa/gsfa-read-multiepoch.go +++ b/gsfa/gsfa-read-multiepoch.go @@ -10,6 +10,7 @@ import ( "github.com/rpcpool/yellowstone-faithful/compactindexsized" "github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/rpcpool/yellowstone-faithful/slottools" "k8s.io/klog/v2" ) @@ -200,3 +201,101 @@ epochLoop: } return transactions, nil } + +func (multi *GsfaReaderMultiepoch) GetBeforeUntilSlot( + ctx context.Context, + pk solana.PublicKey, + limit int, + before uint64, // Before this slot, exclusive (i.e. get slots older than this slot, excluding it). + until uint64, // Until this slot, inclusive (i.e. stop at this slot, including it). + fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error), +) (EpochToTransactionObjects, error) { + if limit <= 0 { + return make(EpochToTransactionObjects), nil + } + return multi.iterBeforeUntilSlot(ctx, pk, limit, before, until, fetcher) +} + +// GetBeforeUntilSlot gets the signatures for the given public key, +// before the given slot. +func (multi *GsfaReaderMultiepoch) iterBeforeUntilSlot( + ctx context.Context, + pk solana.PublicKey, + limit int, + before uint64, // Before this slot, exclusive (i.e. get slots older than this slot, excluding it). + until uint64, // Until this slot, inclusive (i.e. stop at this slot, including it). + fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error), +) (EpochToTransactionObjects, error) { + if limit <= 0 || before < until { + return make(EpochToTransactionObjects), nil + } + + transactions := make(EpochToTransactionObjects) + beforeEpoch := slottools.CalcEpochForSlot(before) + +epochLoop: + for readerIndex, index := range multi.epochs { + if ctx.Err() != nil { + return nil, ctx.Err() + } + epochNum, ok := index.GetEpoch() + if !ok { + return nil, fmt.Errorf("epoch is not set for the #%d provided gsfa reader", readerIndex) + } + if epochNum > beforeEpoch { + continue epochLoop + } + + locsStartedAt := time.Now() + locs, err := index.offsets.Get(pk) + if err != nil { + if compactindexsized.IsNotFound(err) { + continue epochLoop + } + return nil, fmt.Errorf("error while getting initial offset: %w", err) + } + klog.V(5).Infof("locs.OffsetToFirst took %s", time.Since(locsStartedAt)) + debugln("locs.OffsetToFirst:", locs) + + next := locs // Start from the latest, and go back in time. + + for { + if next == nil || next.IsZero() { // no previous. + continue epochLoop + } + if limit > 0 && transactions.Count() >= limit { + break epochLoop + } + startedReadAt := time.Now() + locations, newNext, err := index.ll.ReadWithSize(next.Offset, next.Size) + if err != nil { + return nil, fmt.Errorf("error while reading linked log with next=%v: %w", next, err) + } + klog.V(5).Infof("ReadWithSize took %s to get %d locs", time.Since(startedReadAt), len(locations)) + if len(locations) == 0 { + continue epochLoop + } + debugln("sigIndexes:", locations, "newNext:", newNext) + next = &newNext + for locIndex, txLoc := range locations { + tx, err := fetcher(epochNum, txLoc) + if err != nil { + return nil, fmt.Errorf("error while getting signature at index=%v: %w", txLoc, err) + } + if tx.Slot < int(until) { + break epochLoop + } + sig, err := tx.Signature() + if err != nil { + return nil, fmt.Errorf("error while getting signature: %w", err) + } + klog.V(5).Infoln(locIndex, "sig:", sig, "epoch:", epochNum) + if limit > 0 && transactions.Count() >= limit { + break epochLoop + } + transactions[epochNum] = append(transactions[epochNum], tx) + } + } + } + return transactions, nil +}