Skip to content

Commit

Permalink
Merge pull request #31 from gohornet/faster-spent-import
Browse files Browse the repository at this point in the history
Faster spent import
  • Loading branch information
muXxer authored Dec 16, 2019
2 parents b2ecf47 + 656d9e9 commit fb7d443
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 24 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

All notable changes to this project will be documented in this file.

## [0.2.6] - 16.12.2019

### Changed

- Faster initial spent addresses import

## [0.2.5] - 15.12.2019

### Added
Expand Down
133 changes: 133 additions & 0 deletions packages/database/database.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package database

import (
"context"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/pb"

"github.com/gohornet/hornet/packages/syncutils"
)

const (
StreamNumGoRoutines = 16
)

var (
ErrKeyNotFound = badger.ErrKeyNotFound

Expand Down Expand Up @@ -240,3 +247,129 @@ func (pdb *prefixDb) ForEachPrefixKeyOnly(keyPrefix KeyPrefix, consumer func(Key
})
return err
}

func (pdb *prefixDb) StreamForEach(consumer func(Entry) error) error {
stream := pdb.db.NewStream()

stream.NumGo = StreamNumGoRoutines
stream.Prefix = pdb.prefix
stream.ChooseKey = nil
stream.KeyToList = nil

// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
var meta byte
tmpMeta := kv.GetUserMeta()
if len(tmpMeta) > 0 {
meta = tmpMeta[0]
}
err := consumer(Entry{
Key: pdb.keyWithoutPrefix(kv.GetKey()),
Value: kv.GetValue(),
Meta: meta,
})
if err != nil {
return err
}
}
return nil
}

// Run the stream
return stream.Orchestrate(context.Background())
}

func (pdb *prefixDb) StreamForEachKeyOnly(consumer func(KeyOnlyEntry) error) error {
stream := pdb.db.NewStream()

stream.NumGo = StreamNumGoRoutines
stream.Prefix = pdb.prefix
stream.ChooseKey = nil
stream.KeyToList = nil

// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
var meta byte
tmpMeta := kv.GetUserMeta()
if len(tmpMeta) > 0 {
meta = tmpMeta[0]
}
err := consumer(KeyOnlyEntry{
Key: pdb.keyWithoutPrefix(kv.GetKey()),
Meta: meta,
})
if err != nil {
return err
}
}
return nil
}

// Run the stream
return stream.Orchestrate(context.Background())
}

func (pdb *prefixDb) StreamForEachPrefix(keyPrefix KeyPrefix, consumer func(Entry) error) error {
stream := pdb.db.NewStream()

stream.NumGo = StreamNumGoRoutines
stream.Prefix = append(pdb.prefix, keyPrefix...)
stream.ChooseKey = nil
stream.KeyToList = nil

// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
var meta byte
tmpMeta := kv.GetUserMeta()
if len(tmpMeta) > 0 {
meta = tmpMeta[0]
}
err := consumer(Entry{
Key: pdb.keyWithoutPrefix(kv.GetKey()).keyWithoutKeyPrefix(keyPrefix),
Value: kv.GetValue(),
Meta: meta,
})
if err != nil {
return err
}
}
return nil
}

// Run the stream
return stream.Orchestrate(context.Background())
}

func (pdb *prefixDb) StreamForEachPrefixKeyOnly(keyPrefix KeyPrefix, consumer func(KeyOnlyEntry) error) error {
stream := pdb.db.NewStream()

stream.NumGo = StreamNumGoRoutines
stream.Prefix = append(pdb.prefix, keyPrefix...)
stream.ChooseKey = nil
stream.KeyToList = nil

// Send is called serially, while Stream.Orchestrate is running.
stream.Send = func(list *pb.KVList) error {
for _, kv := range list.Kv {
var meta byte
tmpMeta := kv.GetUserMeta()
if len(tmpMeta) > 0 {
meta = tmpMeta[0]
}
err := consumer(KeyOnlyEntry{
Key: pdb.keyWithoutPrefix(kv.GetKey()).keyWithoutKeyPrefix(keyPrefix),
Meta: meta,
})
if err != nil {
return err
}
}
return nil
}

// Run the stream
return stream.Orchestrate(context.Background())
}
4 changes: 4 additions & 0 deletions packages/database/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type Database interface {
ForEach(func(entry Entry) (stop bool)) error
ForEachPrefix(keyPrefix KeyPrefix, do func(entry Entry) (stop bool)) error
ForEachPrefixKeyOnly(keyPrefix KeyPrefix, do func(entry KeyOnlyEntry) (stop bool)) error
StreamForEach(func(entry Entry) error) error
StreamForEachKeyOnly(func(entry KeyOnlyEntry) error) error
StreamForEachPrefix(keyPrefix KeyPrefix, do func(entry Entry) error) error
StreamForEachPrefixKeyOnly(keyPrefix KeyPrefix, do func(entry KeyOnlyEntry) error) error

// Transactions
Apply(set []Entry, delete []Key) error
Expand Down
45 changes: 32 additions & 13 deletions packages/model/tangle/spent_db.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package tangle

import (
"encoding/binary"
"io"

"github.com/gohornet/hornet/packages/database"
"github.com/iotaledger/iota.go/trinary"
"github.com/pkg/errors"
"github.com/gohornet/hornet/packages/database"
)

var (
Expand All @@ -24,7 +27,7 @@ func databaseKeyForAddress(address trinary.Hash) []byte {

func spentDatabaseContainsAddress(address trinary.Hash) (bool, error) {
if contains, err := spentAddressesDatabase.Contains(databaseKeyForAddress(address)); err != nil {
return contains, errors.Wrap(NewDatabaseError(err), "failed to check if the address exists")
return contains, errors.Wrap(NewDatabaseError(err), "failed to check if the address exists in the spent addresses database")
} else {
return contains, nil
}
Expand All @@ -45,25 +48,41 @@ func storeSpentAddressesInDatabase(spent []trinary.Hash) error {

// Now batch insert/delete all entries
if err := spentAddressesDatabase.Apply(entries, []database.Key{}); err != nil {
return errors.Wrap(NewDatabaseError(err), "failed to spent addresses")
return errors.Wrap(NewDatabaseError(err), "failed to mark addresses as spent")
}

return nil
}

func StoreSpentAddressesBytesInDatabase(spentInBytes [][]byte) error {

var entries []database.Entry

for _, addressInBytes := range spentInBytes {
key := addressInBytes

entries = append(entries, database.Entry{
Key: key,
Value: []byte{},
})
}

// Now batch insert/delete all entries
if err := spentAddressesDatabase.Apply(entries, []database.Key{}); err != nil {
return errors.Wrap(NewDatabaseError(err), "failed to mark addresses as spent")
}

return nil
}

// ToDo: stream that directly into the file
func ReadSpentAddressesFromDatabase() ([][]byte, error) {
func StreamSpentAddressesToWriter(buf io.Writer) error {

var addresses [][]byte
err := spentAddressesDatabase.ForEach(func(entry database.Entry) (stop bool) {
address := entry.Key
addresses = append(addresses, address)
return false
err := spentAddressesDatabase.StreamForEachKeyOnly(func(entry database.KeyOnlyEntry) error {
return binary.Write(buf, binary.BigEndian, entry.Key)
})

if err != nil {
return nil, errors.Wrap(NewDatabaseError(err), "failed to read spent addresses from database")
} else {
return addresses, nil
return errors.Wrap(NewDatabaseError(err), "failed to stream spent addresses from database")
}
return nil
}
2 changes: 1 addition & 1 deletion plugins/cli/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var (
// AppVersion version number
AppVersion = "0.2.5"
AppVersion = "0.2.6"

// AppName app code name
AppName = "HORNET"
Expand Down
36 changes: 26 additions & 10 deletions plugins/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"os"
"strconv"
"strings"
Expand All @@ -21,6 +22,10 @@ import (
tanglePlugin "github.com/gohornet/hornet/plugins/tangle"
)

const (
SpentAddressesImportBatchSize = 1000000
)

type localSnapshot struct {
msHash string
msIndex milestone_index.MilestoneIndex
Expand Down Expand Up @@ -485,21 +490,32 @@ func LoadSnapshotFromFile(filePath string) error {
return fmt.Errorf("ledgerEntries: %s", err)
}

log.Infof("Importing %d spent addresses\n", spentAddrsCount)
for i := 0; i < int(spentAddrsCount); i++ {
spentAddrBuf := make([]byte, 49)
log.Infof("Importing %d spent addresses", spentAddrsCount)

err = binary.Read(gzipReader, binary.BigEndian, spentAddrBuf)
if err != nil {
return fmt.Errorf("spentAddrs: %s", err)
batchAmount := int(math.Ceil(float64(spentAddrsCount) / float64(SpentAddressesImportBatchSize)))
for i := 0; i < batchAmount; i++ {

var batchEntries [][]byte
batchStart := int32(i * SpentAddressesImportBatchSize)
batchEnd := batchStart + SpentAddressesImportBatchSize

if batchEnd > spentAddrsCount {
batchEnd = spentAddrsCount
}

hash, err := trinary.BytesToTrytes(spentAddrBuf)
if err != nil {
return fmt.Errorf("spentAddrs: %s", err)
for j := batchStart; j < batchEnd; j++ {

spentAddrBuf := make([]byte, 49)
err = binary.Read(gzipReader, binary.BigEndian, spentAddrBuf)
if err != nil {
return fmt.Errorf("spentAddrs: %s", err)
}

batchEntries = append(batchEntries, spentAddrBuf)
}

tangle.MarkAddressAsSpent(hash[:81])
tangle.StoreSpentAddressesBytesInDatabase(batchEntries)
log.Infof("Processed %d/%d", batchEnd, spentAddrsCount)
}

log.Info("Finished loading snapshot")
Expand Down

0 comments on commit fb7d443

Please sign in to comment.