Skip to content

Commit

Permalink
Fix memory leak when shards from already decoded blocks come in
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzampolin committed Dec 6, 2024
1 parent e21a0cc commit b092e8e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 9 deletions.
3 changes: 2 additions & 1 deletion gturbine/gtshred/process_shred_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/sha256"
"testing"
"time"
)

const (
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestProcessorShredding(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var cb = new(testProcessorCallback)

p := NewProcessor(cb)
p := NewProcessor(cb, time.Minute)

block := makeRandomBlock(tc.blockSize)
group, err := NewShredGroup(block, TestHeight, DefaultDataShreds, DefaultRecoveryShreds, DefaultChunkSize)
Expand Down
52 changes: 45 additions & 7 deletions gturbine/gtshred/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gtshred
import (
"fmt"
"sync"
"time"

"github.com/gordian-engine/gordian/gturbine"
"github.com/gordian-engine/gordian/gturbine/gtencoding"
Expand All @@ -16,20 +17,39 @@ const (
)

type Processor struct {
groups map[string]*ShredGroup
mu sync.Mutex
cb ProcessorCallback
groups map[string]*ShredGroup
mu sync.Mutex
cb ProcessorCallback
completedBlocks map[string]time.Time
cleanupInterval time.Duration
}

type ProcessorCallback interface {
ProcessBlock(height uint64, blockHash []byte, block []byte) error
}

func NewProcessor(cb ProcessorCallback) *Processor {
return &Processor{
cb: cb,
groups: make(map[string]*ShredGroup),
func NewProcessor(cb ProcessorCallback, cleanupInterval time.Duration) *Processor {
p := &Processor{
cb: cb,
groups: make(map[string]*ShredGroup),
completedBlocks: make(map[string]time.Time),
cleanupInterval: cleanupInterval,
}

// Start cleanup goroutine
go func() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()

for {
select {
case now := <-ticker.C:
p.cleanupStaleGroups(now)
}
}
}()

return p
}

// CollectDataShred processes an incoming data shred
Expand All @@ -38,6 +58,11 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
return fmt.Errorf("nil shred")
}

// Skip shreds from already processed blocks
if _, completed := p.completedBlocks[string(shred.BlockHash)]; completed {
return nil
}

p.mu.Lock()
defer p.mu.Unlock()
group, ok := p.groups[shred.GroupID]
Expand Down Expand Up @@ -74,7 +99,20 @@ func (p *Processor) CollectShred(shred *gturbine.Shred) error {
if err := p.cb.ProcessBlock(shred.Height, shred.BlockHash, block); err != nil {
return fmt.Errorf("failed to process block: %w", err)
}
// remove the group from the map
delete(p.groups, group.GroupID)

// then mark the block as completed at time.Now()
p.completedBlocks[string(shred.BlockHash)] = time.Now()
}
return nil
}

// In Processor
func (p *Processor) cleanupStaleGroups(now time.Time) {
for hash, completedAt := range p.completedBlocks {
if now.Sub(completedAt) > p.cleanupInterval {
delete(p.completedBlocks, hash)
}
}
}
52 changes: 52 additions & 0 deletions gturbine/gtshred/processor_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package gtshred

import (
"testing"
"time"
)

// import (
// "bytes"
// "crypto/rand"
Expand All @@ -8,6 +13,53 @@ package gtshred
// "testing"
// )

func TestProcessorMemoryCleanup(t *testing.T) {
// Create processor with short cleanup interval for testing
var cb = new(testProcessorCallback)
cleanupInterval := 100 * time.Millisecond
p := NewProcessor(cb, cleanupInterval)

// Create a test block and shred group
block := []byte("test block data")
group, err := NewShredGroup(block, 1, 2, 1, 100)
if err != nil {
t.Fatal(err)
}

// Process some shreds from the group to mark it complete
for i := 0; i < len(group.DataShreds); i++ {
err := p.CollectShred(group.DataShreds[i])
if err != nil {
t.Fatal(err)
}
}

// Verify block is marked as completed
if _, exists := p.completedBlocks[string(group.BlockHash)]; !exists {
t.Error("block should be marked as completed")
}

// Try to process another shred from same block
err = p.CollectShred(group.RecoveryShreds[0])
if err != nil {
t.Fatal(err)
}

// Verify no new group was created for this block
groupCount := len(p.groups)
if groupCount > 1 {
t.Errorf("expected at most 1 group, got %d", groupCount)
}

// Wait for cleanup
time.Sleep(cleanupInterval * 2)

// Verify completed block was cleaned up
if _, exists := p.completedBlocks[string(group.BlockHash)]; exists {
t.Error("completed block should have been cleaned up")
}
}

// func TestProcessor(t *testing.T) {
// t.Run("basic shred and reassemble", func(t *testing.T) {
// // Use 32:32 config
Expand Down
16 changes: 16 additions & 0 deletions gturbine/gtshred/shred_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,19 @@ func (g *ShredGroup) CollectShred(shred *gturbine.Shred) (bool, error) {

return g.IsFull(), nil
}

// Reset clears the ShredGroup data while maintaining allocated memory
func (g *ShredGroup) Reset() {
g.GroupID = uuid.New().String()
g.BlockHash = g.BlockHash[:0]
g.Height = 0
g.OriginalSize = 0

// Clear but keep underlying arrays
for i := range g.DataShreds {
g.DataShreds[i] = nil
}
for i := range g.RecoveryShreds {
g.RecoveryShreds[i] = nil
}
}
2 changes: 1 addition & 1 deletion gturbine/turbine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func newTestNode(t *testing.T, basePort int) *testNode {

cb := &testBlockHandler{}

processor := gtshred.NewProcessor(cb)
processor := gtshred.NewProcessor(cb, time.Minute)

shredHandler := &testShredHandler{}
node := &testNode{
Expand Down

0 comments on commit b092e8e

Please sign in to comment.