From 2a611f3fcda283635426059d0ba90fb3b865af2d Mon Sep 17 00:00:00 2001 From: muXxer Date: Mon, 16 Dec 2019 00:15:30 +0100 Subject: [PATCH 1/4] Add StreamReader interface to database --- packages/database/database.go | 133 ++++++++++++++++++++++++++++++ packages/database/interfaces.go | 4 + packages/model/tangle/spent_db.go | 20 ++--- 3 files changed, 146 insertions(+), 11 deletions(-) diff --git a/packages/database/database.go b/packages/database/database.go index 60e813187..4d2f2584d 100644 --- a/packages/database/database.go +++ b/packages/database/database.go @@ -1,11 +1,18 @@ package database import ( + "context" + "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2/pb" "github.com/gohornet/hornet/packages/syncutils" ) +const ( + StreamNumGoRoutines = 16 +) + var ( ErrKeyNotFound = badger.ErrKeyNotFound @@ -240,3 +247,129 @@ func (pdb *prefixDb) ForEachPrefixKeyOnly(keyPrefix KeyPrefix, consumer func(Key }) return err } + +func (pdb *prefixDb) StreamForEach(consumer func(Entry) error) error { + stream := pdb.db.NewStream() + + stream.NumGo = StreamNumGoRoutines + stream.Prefix = pdb.prefix + stream.ChooseKey = nil + stream.KeyToList = nil + + // Send is called serially, while Stream.Orchestrate is running. + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + var meta byte + tmpMeta := kv.GetUserMeta() + if len(tmpMeta) > 0 { + meta = tmpMeta[0] + } + err := consumer(Entry{ + Key: pdb.keyWithoutPrefix(kv.GetKey()), + Value: kv.GetValue(), + Meta: meta, + }) + if err != nil { + return err + } + } + return nil + } + + // Run the stream + return stream.Orchestrate(context.Background()) +} + +func (pdb *prefixDb) StreamForEachKeyOnly(consumer func(KeyOnlyEntry) error) error { + stream := pdb.db.NewStream() + + stream.NumGo = StreamNumGoRoutines + stream.Prefix = pdb.prefix + stream.ChooseKey = nil + stream.KeyToList = nil + + // Send is called serially, while Stream.Orchestrate is running. + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + var meta byte + tmpMeta := kv.GetUserMeta() + if len(tmpMeta) > 0 { + meta = tmpMeta[0] + } + err := consumer(KeyOnlyEntry{ + Key: pdb.keyWithoutPrefix(kv.GetKey()), + Meta: meta, + }) + if err != nil { + return err + } + } + return nil + } + + // Run the stream + return stream.Orchestrate(context.Background()) +} + +func (pdb *prefixDb) StreamForEachPrefix(keyPrefix KeyPrefix, consumer func(Entry) error) error { + stream := pdb.db.NewStream() + + stream.NumGo = StreamNumGoRoutines + stream.Prefix = append(pdb.prefix, keyPrefix...) + stream.ChooseKey = nil + stream.KeyToList = nil + + // Send is called serially, while Stream.Orchestrate is running. + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + var meta byte + tmpMeta := kv.GetUserMeta() + if len(tmpMeta) > 0 { + meta = tmpMeta[0] + } + err := consumer(Entry{ + Key: pdb.keyWithoutPrefix(kv.GetKey()).keyWithoutKeyPrefix(keyPrefix), + Value: kv.GetValue(), + Meta: meta, + }) + if err != nil { + return err + } + } + return nil + } + + // Run the stream + return stream.Orchestrate(context.Background()) +} + +func (pdb *prefixDb) StreamForEachPrefixKeyOnly(keyPrefix KeyPrefix, consumer func(KeyOnlyEntry) error) error { + stream := pdb.db.NewStream() + + stream.NumGo = StreamNumGoRoutines + stream.Prefix = append(pdb.prefix, keyPrefix...) + stream.ChooseKey = nil + stream.KeyToList = nil + + // Send is called serially, while Stream.Orchestrate is running. + stream.Send = func(list *pb.KVList) error { + for _, kv := range list.Kv { + var meta byte + tmpMeta := kv.GetUserMeta() + if len(tmpMeta) > 0 { + meta = tmpMeta[0] + } + err := consumer(KeyOnlyEntry{ + Key: pdb.keyWithoutPrefix(kv.GetKey()).keyWithoutKeyPrefix(keyPrefix), + Meta: meta, + }) + if err != nil { + return err + } + } + return nil + } + + // Run the stream + return stream.Orchestrate(context.Background()) +} diff --git a/packages/database/interfaces.go b/packages/database/interfaces.go index 98d66ae78..ab6619bce 100644 --- a/packages/database/interfaces.go +++ b/packages/database/interfaces.go @@ -30,6 +30,10 @@ type Database interface { ForEach(func(entry Entry) (stop bool)) error ForEachPrefix(keyPrefix KeyPrefix, do func(entry Entry) (stop bool)) error ForEachPrefixKeyOnly(keyPrefix KeyPrefix, do func(entry KeyOnlyEntry) (stop bool)) error + StreamForEach(func(entry Entry) error) error + StreamForEachKeyOnly(func(entry KeyOnlyEntry) error) error + StreamForEachPrefix(keyPrefix KeyPrefix, do func(entry Entry) error) error + StreamForEachPrefixKeyOnly(keyPrefix KeyPrefix, do func(entry KeyOnlyEntry) error) error // Transactions Apply(set []Entry, delete []Key) error diff --git a/packages/model/tangle/spent_db.go b/packages/model/tangle/spent_db.go index 11747b232..2bb800ad1 100644 --- a/packages/model/tangle/spent_db.go +++ b/packages/model/tangle/spent_db.go @@ -1,9 +1,12 @@ package tangle import ( + "encoding/binary" + "io" + + "github.com/gohornet/hornet/packages/database" "github.com/iotaledger/iota.go/trinary" "github.com/pkg/errors" - "github.com/gohornet/hornet/packages/database" ) var ( @@ -51,19 +54,14 @@ func storeSpentAddressesInDatabase(spent []trinary.Hash) error { return nil } -// ToDo: stream that directly into the file -func ReadSpentAddressesFromDatabase() ([][]byte, error) { +func StreamSpentAddressesToWriter(buf io.Writer) error { - var addresses [][]byte - err := spentAddressesDatabase.ForEach(func(entry database.Entry) (stop bool) { - address := entry.Key - addresses = append(addresses, address) - return false + err := spentAddressesDatabase.StreamForEachKeyOnly(func(entry database.KeyOnlyEntry) error { + return binary.Write(buf, binary.BigEndian, entry.Key) }) if err != nil { - return nil, errors.Wrap(NewDatabaseError(err), "failed to read spent addresses from database") - } else { - return addresses, nil + return errors.Wrap(NewDatabaseError(err), "failed to stream spent addresses from database") } + return nil } From 6762a83bb7c0bde6bb4f9d7e5f2e7e4fb2825b93 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Mon, 16 Dec 2019 01:40:04 +0100 Subject: [PATCH 2/4] Import spent addresses directly into the database Avoid going to the LRU-cache and eviction. Batch import 1Mio addresses at once. Also skip converting bytes to trytes and to bytes again. --- packages/model/tangle/spent_db.go | 21 ++++++++++++++++++ plugins/snapshot/snapshot.go | 36 ++++++++++++++++++++++--------- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/packages/model/tangle/spent_db.go b/packages/model/tangle/spent_db.go index 2bb800ad1..9be5c53d2 100644 --- a/packages/model/tangle/spent_db.go +++ b/packages/model/tangle/spent_db.go @@ -54,6 +54,27 @@ func storeSpentAddressesInDatabase(spent []trinary.Hash) error { return nil } +func StoreSpentAddressesBytesInDatabase(spentInBytes [][]byte) error { + + var entries []database.Entry + + for _, addressInBytes := range spentInBytes { + key := addressInBytes + + entries = append(entries, database.Entry{ + Key: key, + Value: []byte{}, + }) + } + + // Now batch insert/delete all entries + if err := spentAddressesDatabase.Apply(entries, []database.Key{}); err != nil { + return errors.Wrap(NewDatabaseError(err), "failed to spent addresses") + } + + return nil +} + func StreamSpentAddressesToWriter(buf io.Writer) error { err := spentAddressesDatabase.StreamForEachKeyOnly(func(entry database.KeyOnlyEntry) error { diff --git a/plugins/snapshot/snapshot.go b/plugins/snapshot/snapshot.go index 9620cbfeb..edc122c8d 100644 --- a/plugins/snapshot/snapshot.go +++ b/plugins/snapshot/snapshot.go @@ -7,6 +7,7 @@ import ( "encoding/binary" "fmt" "io" + "math" "os" "strconv" "strings" @@ -21,6 +22,10 @@ import ( tanglePlugin "github.com/gohornet/hornet/plugins/tangle" ) +const ( + SpentAddressesImportBatchSize = 1000000 +) + type localSnapshot struct { msHash string msIndex milestone_index.MilestoneIndex @@ -485,21 +490,32 @@ func LoadSnapshotFromFile(filePath string) error { return fmt.Errorf("ledgerEntries: %s", err) } - log.Infof("Importing %d spent addresses\n", spentAddrsCount) - for i := 0; i < int(spentAddrsCount); i++ { - spentAddrBuf := make([]byte, 49) + log.Infof("Importing %d spent addresses", spentAddrsCount) - err = binary.Read(gzipReader, binary.BigEndian, spentAddrBuf) - if err != nil { - return fmt.Errorf("spentAddrs: %s", err) + batchAmount := int(math.Ceil(float64(spentAddrsCount) / float64(SpentAddressesImportBatchSize))) + for i := 0; i < batchAmount; i++ { + + var batchEntries [][]byte + batchStart := int32(i * SpentAddressesImportBatchSize) + batchEnd := batchStart + SpentAddressesImportBatchSize + + if batchEnd > spentAddrsCount { + batchEnd = spentAddrsCount } - hash, err := trinary.BytesToTrytes(spentAddrBuf) - if err != nil { - return fmt.Errorf("spentAddrs: %s", err) + for j := batchStart; j < batchEnd; j++ { + + spentAddrBuf := make([]byte, 49) + err = binary.Read(gzipReader, binary.BigEndian, spentAddrBuf) + if err != nil { + return fmt.Errorf("spentAddrs: %s", err) + } + + batchEntries = append(batchEntries, spentAddrBuf) } - tangle.MarkAddressAsSpent(hash[:81]) + tangle.StoreSpentAddressesBytesInDatabase(batchEntries) + log.Infof("Processed %d/%d", batchEnd, spentAddrsCount) } log.Info("Finished loading snapshot") From b22bf393c45ff0b1bab9ff200d73cf44a18ee922 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Mon, 16 Dec 2019 01:41:09 +0100 Subject: [PATCH 3/4] Prepare 0.2.6 release --- CHANGELOG.md | 6 ++++++ plugins/cli/plugin.go | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4da84d091..2baad3e29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ All notable changes to this project will be documented in this file. +## [0.2.6] - 16.12.2019 + +### Changed + + - Faster initial spent addresses import + ## [0.2.5] - 15.12.2019 ### Added diff --git a/plugins/cli/plugin.go b/plugins/cli/plugin.go index ff80ffa88..69d30d339 100644 --- a/plugins/cli/plugin.go +++ b/plugins/cli/plugin.go @@ -13,7 +13,7 @@ import ( var ( // AppVersion version number - AppVersion = "0.2.5" + AppVersion = "0.2.6" // AppName app code name AppName = "HORNET" From 656d9e9ace1583013fa32519d2c3a56de8902183 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Mon, 16 Dec 2019 01:50:49 +0100 Subject: [PATCH 4/4] Better error messages --- packages/model/tangle/spent_db.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/model/tangle/spent_db.go b/packages/model/tangle/spent_db.go index 9be5c53d2..d15b310f1 100644 --- a/packages/model/tangle/spent_db.go +++ b/packages/model/tangle/spent_db.go @@ -27,7 +27,7 @@ func databaseKeyForAddress(address trinary.Hash) []byte { func spentDatabaseContainsAddress(address trinary.Hash) (bool, error) { if contains, err := spentAddressesDatabase.Contains(databaseKeyForAddress(address)); err != nil { - return contains, errors.Wrap(NewDatabaseError(err), "failed to check if the address exists") + return contains, errors.Wrap(NewDatabaseError(err), "failed to check if the address exists in the spent addresses database") } else { return contains, nil } @@ -48,7 +48,7 @@ func storeSpentAddressesInDatabase(spent []trinary.Hash) error { // Now batch insert/delete all entries if err := spentAddressesDatabase.Apply(entries, []database.Key{}); err != nil { - return errors.Wrap(NewDatabaseError(err), "failed to spent addresses") + return errors.Wrap(NewDatabaseError(err), "failed to mark addresses as spent") } return nil @@ -69,7 +69,7 @@ func StoreSpentAddressesBytesInDatabase(spentInBytes [][]byte) error { // Now batch insert/delete all entries if err := spentAddressesDatabase.Apply(entries, []database.Key{}); err != nil { - return errors.Wrap(NewDatabaseError(err), "failed to spent addresses") + return errors.Wrap(NewDatabaseError(err), "failed to mark addresses as spent") } return nil