Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add slot to gsfa index #190

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (gsfa *GsfaReaderMultiepoch) Get(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return nil, nil
Expand Down Expand Up @@ -102,7 +102,7 @@ func (multi *GsfaReaderMultiepoch) GetBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
Expand All @@ -118,7 +118,7 @@ func (multi *GsfaReaderMultiepoch) iterBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktimeSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
Expand Down
14 changes: 7 additions & 7 deletions gsfa/gsfa-read.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (index *GsfaReader) Get(
ctx context.Context,
pk solana.PublicKey,
limit int,
) ([]linkedlog.OffsetAndSizeAndBlocktime, error) {
) ([]linkedlog.OffsetAndSizeAndBlocktimeSlot, error) {
if limit <= 0 {
return []linkedlog.OffsetAndSizeAndBlocktime{}, nil
return []linkedlog.OffsetAndSizeAndBlocktimeSlot{}, nil
}
lastOffset, err := index.offsets.Get(pk)
if err != nil {
Expand All @@ -106,7 +106,7 @@ func (index *GsfaReader) Get(
}
debugln("locs.OffsetToFirst:", lastOffset)

var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime
var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktimeSlot
next := lastOffset // Start from the latest, and go back in time.

for {
Expand Down Expand Up @@ -138,10 +138,10 @@ func (index *GsfaReader) GetBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktime) (solana.Signature, error),
) ([]linkedlog.OffsetAndSizeAndBlocktime, error) {
fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktimeSlot) (solana.Signature, error),
) ([]linkedlog.OffsetAndSizeAndBlocktimeSlot, error) {
if limit <= 0 {
return []linkedlog.OffsetAndSizeAndBlocktime{}, nil
return []linkedlog.OffsetAndSizeAndBlocktimeSlot{}, nil
}
locs, err := index.offsets.Get(pk)
if err != nil {
Expand All @@ -152,7 +152,7 @@ func (index *GsfaReader) GetBeforeUntil(
}
debugln("locs.OffsetToFirst:", locs)

var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime
var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktimeSlot
next := locs // Start from the latest, and go back in time.

reachedBefore := false
Expand Down
27 changes: 14 additions & 13 deletions gsfa/gsfa-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type GsfaWriter struct {
offsets *hashmap.Map[solana.PublicKey, [2]uint64]
ll *linkedlog.LinkedLog
man *manifest.Manifest
fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktime
accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]
fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot
accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot]
offsetsWriter *indexes.PubkeyToOffsetAndSize_Writer
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -61,10 +61,10 @@ func NewGsfaWriter(
}
ctx, cancel := context.WithCancel(context.Background())
index := &GsfaWriter{
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot, 50), // TODO: make this configurable
popRank: newRollingRankOfTopPerformers(10_000),
offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot](int(1_000_000)),
ctx: ctx,
cancel: cancel,
fullBufferWriterDone: make(chan struct{}),
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewGsfaWriter(
func (a *GsfaWriter) fullBufferWriter() {
numReadFromChan := uint64(0)
howManyBuffersToFlushConcurrently := 256
tmpBuf := make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlice, howManyBuffersToFlushConcurrently)
tmpBuf := make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlotSlice, howManyBuffersToFlushConcurrently)

for {
// fmt.Println("numReadFromChan", numReadFromChan, "len(a.fullBufferWriterChan)", len(a.fullBufferWriterChan), "a.exiting.Load()", a.exiting.Load())
Expand All @@ -131,7 +131,7 @@ func (a *GsfaWriter) fullBufferWriter() {
klog.Errorf("Error while flushing transactions for key %s: %v", buf.Key, err)
}
}
tmpBuf = make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlice, howManyBuffersToFlushConcurrently)
tmpBuf = make(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlotSlice, howManyBuffersToFlushConcurrently)
}
tmpBuf = append(tmpBuf, buffer)
}
Expand All @@ -151,10 +151,11 @@ func (a *GsfaWriter) Push(
a.mu.Lock()
defer a.mu.Unlock()

oas := &linkedlog.OffsetAndSizeAndBlocktime{
oas := &linkedlog.OffsetAndSizeAndBlocktimeSlot{
Offset: offset,
Size: length,
Blocktime: blocktime,
Slot: slot,
}
publicKeys = publicKeys.Dedupe()
publicKeys.Sort()
Expand All @@ -177,7 +178,7 @@ func (a *GsfaWriter) Push(
// if this key has less than 100 values and is not in the top list of keys by flush count, then
// it's very likely that this key isn't going to get a lot of values soon
if len(values) < 100 && len(values) > 0 && !a.popRank.has(key) {
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{
Key: key,
Values: values,
}); err != nil {
Expand All @@ -190,14 +191,14 @@ func (a *GsfaWriter) Push(
for _, publicKey := range publicKeys {
current, ok := a.accum.Get(publicKey)
if !ok {
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch)
current = make([]*linkedlog.OffsetAndSizeAndBlocktimeSlot, 0, itemsPerBatch)
current = append(current, oas)
a.accum.Set(publicKey, current)
} else {
current = append(current, oas)
if len(current) >= itemsPerBatch {
a.popRank.Incr(publicKey, 1)
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktime{
a.fullBufferWriterChan <- linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{
Key: publicKey,
Values: clone(current),
}
Expand Down Expand Up @@ -259,13 +260,13 @@ func (a *GsfaWriter) Close() error {
)
}

func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]) error {
func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktimeSlot]) error {
keys := solana.PublicKeySlice(m.Keys())
keys.Sort()
for ii := range keys {
key := keys[ii]
vals, _ := m.Get(key)
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktime{
if err := a.flushKVs(linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot{
Key: key,
Values: vals,
}); err != nil {
Expand All @@ -276,7 +277,7 @@ func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.Of
return nil
}

func (a *GsfaWriter) flushKVs(kvs ...linkedlog.KeyToOffsetAndSizeAndBlocktime) error {
func (a *GsfaWriter) flushKVs(kvs ...linkedlog.KeyToOffsetAndSizeAndBlocktimeSlot) error {
if len(kvs) == 0 {
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions gsfa/linkedlog/linked-log.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *LinkedLog) write(b []byte) (uint64, uint32, error) {
const mib = 1024 * 1024

// Read reads the block stored at the given offset.
func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) {
func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktimeSlot, indexes.OffsetAndSize, error) {
lenBuf := make([]byte, binary.MaxVarintLen64)
_, err := s.file.ReadAt(lenBuf, int64(offset))
if err != nil {
Expand All @@ -130,7 +130,7 @@ func sizeOfUvarint(n uint64) int {
return binary.PutUvarint(make([]byte, binary.MaxVarintLen64), n)
}

func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) {
func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktimeSlot, indexes.OffsetAndSize, error) {
if size > 256*mib {
return nil, indexes.OffsetAndSize{}, fmt.Errorf("compacted indexes length too large: %d", size)
}
Expand Down Expand Up @@ -158,7 +158,7 @@ func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAnd
return sigIndexes, nextOffset, nil
}

func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktime, error) {
func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktimeSlot, error) {
decompressed, err := tooling.DecompressZstd(data)
if err != nil {
return nil, fmt.Errorf("error while decompressing data: %w", err)
Expand All @@ -178,15 +178,15 @@ func (s KeyToOffsetAndSizeAndBlocktimeSlice) Has(key solana.PublicKey) bool {
return false
}

type KeyToOffsetAndSizeAndBlocktime struct {
type KeyToOffsetAndSizeAndBlocktimeSlot struct {
Key solana.PublicKey
Values []*OffsetAndSizeAndBlocktime
}

func (s *LinkedLog) Put(
callbackBefore func(pk solana.PublicKey) (indexes.OffsetAndSize, error),
callbackAfter func(pk solana.PublicKey, offset uint64, ln uint32) error,
values ...KeyToOffsetAndSizeAndBlocktime,
values ...KeyToOffsetAndSizeAndBlocktimeSlot,
) (uint64, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -205,7 +205,7 @@ func (s *LinkedLog) Put(
if len(val.Values) == 0 {
continue
}
slices.Reverse[[]*OffsetAndSizeAndBlocktime](val.Values) // reverse the slice so that the most recent indexes are first
slices.Reverse[[]*OffsetAndSizeAndBlocktimeSlot](val.Values) // reverse the slice so that the most recent indexes are first
err := func() error {
encodedIndexes, err := createIndexesPayload(val.Values)
if err != nil {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *LinkedLog) Put(
return uint64(previousSize), nil
}

func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktime) ([]byte, error) {
func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktimeSlot) ([]byte, error) {
buf := make([]byte, 0, 9*len(indexes))
for _, index := range indexes {
buf = append(buf, index.Bytes()...)
Expand Down
21 changes: 12 additions & 9 deletions gsfa/linkedlog/offset-size-blocktime.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,35 @@
"slices"
)

func NewOffsetAndSizeAndBlocktime(offset uint64, size uint64, blocktime uint64) *OffsetAndSizeAndBlocktime {
return &OffsetAndSizeAndBlocktime{
func NewOffsetAndSizeAndBlocktimeSlot(offset uint64, size uint64, slot uint64, blocktime uint64) *OffsetAndSizeAndBlocktimeSlot{
return &OffsetAndSizeAndBlocktimeSlot
Offset: offset,
Size: size,

Check failure on line 14 in gsfa/linkedlog/offset-size-blocktime.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

syntax error: unexpected :, expected := or = or comma
Blocktime: blocktime,
Slot: slot
}
}

Check failure on line 18 in gsfa/linkedlog/offset-size-blocktime.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

syntax error: non-declaration statement outside function body

type OffsetAndSizeAndBlocktime struct {
type OffsetAndSizeAndBlocktimeSlot struct {
Offset uint64 // uint48, 6 bytes, max 281.5 TB (terabytes)
Size uint64 // uint24, 3 bytes, max 16.7 MB (megabytes)
Blocktime uint64 // uint40, 5 bytes, max 1099511627775 (seconds since epoch)
Slot uint64
}

// Bytes returns the offset and size as a byte slice.
func (oas OffsetAndSizeAndBlocktime) Bytes() []byte {
func (oas OffsetAndSizeAndBlocktimeSlot) Bytes() []byte {
buf := make([]byte, 0, binary.MaxVarintLen64*3)
buf = binary.AppendUvarint(buf, oas.Offset)
buf = binary.AppendUvarint(buf, oas.Size)
buf = binary.AppendUvarint(buf, oas.Blocktime)
buf = binary.AppendUvarint(buf, oas.Slot)
buf = slices.Clip(buf)
return buf
}

// FromBytes parses the offset and size from a byte slice.
func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error {
func (oas *OffsetAndSizeAndBlocktimeSlot) FromBytes(buf []byte) error {
if len(buf) > binary.MaxVarintLen64*3 {
return errors.New("invalid byte slice length")
}
Expand All @@ -55,7 +58,7 @@
return nil
}

func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error {
func (oas *OffsetAndSizeAndBlocktimeSlot) FromReader(r UvarintReader) error {
var err error
oas.Offset, err = r.ReadUvarint()
if err != nil {
Expand Down Expand Up @@ -92,11 +95,11 @@
return v, nil
}

func OffsetAndSizeAndBlocktimeSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktime, error) {
func OffsetAndSizeAndBlocktimeSlotSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktimeSlot, error) {
r := &uvarintReader{buf: buf}
oass := make([]OffsetAndSizeAndBlocktime, 0)
oass := make([]OffsetAndSizeAndBlocktimeSlot, 0)
for {
oas := OffsetAndSizeAndBlocktime{}
oas := OffsetAndSizeAndBlocktimeSlot{}
err := oas.FromReader(r)
if err != nil {
if errors.Is(err, io.EOF) {
Expand Down
12 changes: 6 additions & 6 deletions gsfa/linkedlog/offset-size-blocktime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
"testing"
)

func TestOffsetAndSizeAndBlocktime(t *testing.T) {
func TestOffsetAndSizeAndBlocktimeSlott *testing.T) {

Check failure on line 9 in gsfa/linkedlog/offset-size-blocktime_test.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

expected '(', found '*'
{
ca := OffsetAndSizeAndBlocktime{
ca := OffsetAndSizeAndBlocktimeSlot
Offset: 1,
Size: 2,
Blocktime: 3,
}
buf := ca.Bytes()

{
ca2 := OffsetAndSizeAndBlocktime{}
ca2 := OffsetAndSizeAndBlocktimeSlot}
err := ca2.FromBytes(buf)
if err != nil {
panic(err)
Expand All @@ -28,15 +28,15 @@
}
{
// now with very high values
ca := OffsetAndSizeAndBlocktime{
ca := OffsetAndSizeAndBlocktimeSlot
Offset: 281474976710655,
Size: 16777215,
Blocktime: 1099511627775,
}
buf := ca.Bytes()

{
ca2 := OffsetAndSizeAndBlocktime{}
ca2 := OffsetAndSizeAndBlocktimeSlot}
err := ca2.FromBytes(buf)
if err != nil {
panic(err)
Expand All @@ -47,7 +47,7 @@
}
}
{
many := []OffsetAndSizeAndBlocktime{
many := []OffsetAndSizeAndBlocktimeSlot
{
Offset: 1,
Size: 2,
Expand Down
2 changes: 1 addition & 1 deletion multiepoch-getSignaturesForAddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn
limit,
params.Before,
params.Until,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktimeSlot) (*ipldbindcode.Transaction, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
Expand Down
Loading