diff --git a/badger/cmd/stream.go b/badger/cmd/stream.go index 5db01666c..7ec427c57 100644 --- a/badger/cmd/stream.go +++ b/badger/cmd/stream.go @@ -117,6 +117,7 @@ func stream(cmd *cobra.Command, args []string) error { WithValueDir(so.outDir). WithNumVersionsToKeep(so.numVersions). WithCompression(options.CompressionType(so.compressionType)). + WithEncryptionKey(encKey). WithReadOnly(false) err = inDB.StreamDB(outOpt) diff --git a/db.go b/db.go index ac947ff69..af8ad8b77 100644 --- a/db.go +++ b/db.go @@ -2016,6 +2016,7 @@ func (db *DB) StreamDB(outOptions Options) error { // Stream contents of DB to the output DB. stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir) + stream.FullCopy = true stream.Send = func(buf *z.Buffer) error { return writer.Write(buf) diff --git a/iterator.go b/iterator.go index 409e8618a..6dea075a6 100644 --- a/iterator.go +++ b/iterator.go @@ -368,17 +368,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool { // that the tables are sorted in the right order. func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table { filterTables := func(tables []*table.Table) []*table.Table { - if opt.SinceTs > 0 { - tmp := tables[:0] - for _, t := range tables { - if t.MaxVersion() < opt.SinceTs { - continue - } - tmp = append(tmp, t) + if opt.SinceTs == 0 { + return tables + } + out := tables[:0] + for _, t := range tables { + if t.MaxVersion() < opt.SinceTs { + continue } - tables = tmp + out = append(out, t) } - return tables + return out } if len(opt.Prefix) == 0 { @@ -492,7 +492,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { for i := 0; i < len(tables); i++ { iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse)) } - iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references. + iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references. res := &Iterator{ txn: txn, iitr: table.NewMergeIterator(iters, opt.Reverse), diff --git a/key_registry.go b/key_registry.go index 306e9c129..e0ccbd01e 100644 --- a/key_registry.go +++ b/key_registry.go @@ -30,6 +30,7 @@ import ( "github.com/dgraph-io/badger/v3/pb" "github.com/dgraph-io/badger/v3/y" + "github.com/pkg/errors" ) const ( @@ -264,7 +265,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error { // Write all the datakeys to the buf. for _, k := range reg.dataKeys { // Writing the datakey to the given buffer. - if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil { + if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil { return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry") } } @@ -338,8 +339,7 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { defer kr.Unlock() // Key might have generated by another go routine. So, // checking once again. - key, valid = validKey() - if valid { + if key, valid := validKey(); valid { return key, nil } k := make([]byte, len(kr.opt.EncryptionKey)) @@ -347,35 +347,50 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { if err != nil { return nil, err } - _, err = rand.Read(k) - if err != nil { + + if _, err = rand.Read(k); err != nil { return nil, err } // Otherwise Increment the KeyID and generate new datakey. kr.nextKeyID++ - dk := &pb.DataKey{ + dk := pb.DataKey{ KeyId: kr.nextKeyID, Data: k, CreatedAt: time.Now().Unix(), Iv: iv, } + kr.lastCreated = dk.CreatedAt + kr.dataKeys[kr.nextKeyID] = &dk // Don't store the datakey on file if badger is running in InMemory mode. - if !kr.opt.InMemory { - // Store the datekey. - buf := &bytes.Buffer{} - if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil { - return nil, err - } - // Persist the datakey to the disk - if _, err = kr.fp.Write(buf.Bytes()); err != nil { - return nil, err - } + if kr.opt.InMemory { + return &dk, nil + } - // storeDatakey encrypts the datakey So, placing un-encrypted key in the memory. - dk.Data = k - kr.lastCreated = dk.CreatedAt - kr.dataKeys[kr.nextKeyID] = dk - return dk, nil + // Store the datekey. + if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil { + return nil, err + } + return &dk, nil +} + +func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) { + // If we don't have a encryption key, we cannot store the datakey. + if len(kr.opt.EncryptionKey) == 0 { + return 0, errors.New("No encryption key found. Cannot add data key") + } + + if _, ok := kr.dataKeys[dk.KeyId]; !ok { + // If KeyId does not exists already, then use the next available KeyId to store data key. + kr.nextKeyID++ + dk.KeyId = kr.nextKeyID + } + kr.dataKeys[dk.KeyId] = &dk + + if kr.opt.InMemory { + return dk.KeyId, nil + } + // Store the datakey. + return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk) } // Close closes the key registry. @@ -387,7 +402,8 @@ func (kr *KeyRegistry) Close() error { } // storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset. -func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { +// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field. +func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error { // xor will encrypt the IV and xor with the given data. // It'll used for both encryption and decryption. xor := func() error { @@ -395,30 +411,21 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { return nil } var err error - k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv) + key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv) return err } // In memory datakey will be plain text so encrypting before storing to the disk. - var err error - if err = xor(); err != nil { + if err := xor(); err != nil { return y.Wrapf(err, "Error while encrypting datakey in storeDataKey") } - var data []byte - if data, err = k.Marshal(); err != nil { - err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey") - var err2 error - // decrypting the datakey back. - if err2 = xor(); err2 != nil { - return y.Wrapf(err, - y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error()) - } - return err + data, err := key.Marshal() + if err != nil { + return y.Wrapf(err, "Error while marshaling datakey in storeDataKey") } var lenCrcBuf [8]byte binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data))) binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable)) - y.Check2(buf.Write(lenCrcBuf[:])) - y.Check2(buf.Write(data)) - // Decrypting the datakey back since we're using the pointer. - return xor() + y.Check2(w.Write(lenCrcBuf[:])) + y.Check2(w.Write(data)) + return nil } diff --git a/level_handler.go b/level_handler.go index 669c097f4..9ff8f55e9 100644 --- a/level_handler.go +++ b/level_handler.go @@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { return maxVs, decr() } -// appendIterators appends iterators to an array of iterators, for merging. +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator { s.RLock() defer s.RUnlock() @@ -324,14 +324,41 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) out = append(out, t) } } - return appendIteratorsReversed(iters, out, topt) + return iteratorsReversed(out, topt) } tables := opt.pickTables(s.tables) if len(tables) == 0 { - return iters + return nil } - return append(iters, table.NewConcatIterator(tables, topt)) + return []y.Iterator{table.NewConcatIterator(tables, topt)} +} + +func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table { + if opt.Reverse { + panic("Invalid option for getTables") + } + + // Typically this would only be called for the last level. + s.RLock() + defer s.RUnlock() + + if s.level == 0 { + var out []*table.Table + for _, t := range s.tables { + if opt.pickTable(t) { + t.IncrRef() + out = append(out, t) + } + } + return out + } + + tables := opt.pickTables(s.tables) + for _, t := range tables { + t.IncrRef() + } + return tables } type levelHandlerRLocked struct{} diff --git a/levels.go b/levels.go index 1c625d1b3..9b214fdc4 100644 --- a/levels.go +++ b/levels.go @@ -32,6 +32,7 @@ import ( otrace "go.opencensus.io/trace" + "github.com/dgraph-io/badger/v3/options" "github.com/dgraph-io/badger/v3/pb" "github.com/dgraph-io/badger/v3/table" "github.com/dgraph-io/badger/v3/y" @@ -895,7 +896,7 @@ func (s *levelsController) compactBuildTables( var iters []y.Iterator switch { case lev == 0: - iters = appendIteratorsReversed(iters, topTables, table.NOCACHE) + iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...) case len(topTables) > 0: y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)} @@ -1615,7 +1616,8 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) return maxVs, nil } -func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator { +func iteratorsReversed(th []*table.Table, opt int) []y.Iterator { + out := make([]y.Iterator, 0, len(th)) for i := len(th) - 1; i >= 0; i-- { // This will increment the reference of the table handler. out = append(out, th[i].NewIterator(opt)) @@ -1623,16 +1625,25 @@ func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.I return out } -// appendIterators appends iterators to an array of iterators, for merging. +// getTables return tables from all levels. It would call IncrRef on all returned tables. +func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table { + res := make([][]*table.Table, 0, len(s.levels)) + for _, level := range s.levels { + res = append(res, level.getTables(opt)) + } + return res +} + +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelsController) appendIterators( - iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator { // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing // data when there's a compaction. + itrs := make([]y.Iterator, 0, len(s.levels)) for _, level := range s.levels { - iters = level.appendIterators(iters, opt) + itrs = append(itrs, level.iterators(opt)...) } - return iters + return itrs } // TableInfo represents the information about a table. @@ -1759,3 +1770,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string { sort.Strings(splits) return splits } + +// AddTable builds the table from the KV.value options passed through the KV.Key. +func (lc *levelsController) AddTable( + kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error { + // TODO: Encryption / Decryption might be required for the table, if the sender and receiver + // don't have same encryption mode. See if inplace encryption/decryption can be done. + // Tables are sent in the sorted order, so no need to sort them here. + encrypted := len(lc.kv.opt.EncryptionKey) > 0 + y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted)) + // The keyId is zero if there is no encryption. + opts := buildTableOptions(lc.kv) + opts.Compression = options.CompressionType(change.Compression) + opts.DataKey = dk + + fileID := lc.reserveFileID() + fname := table.NewFilename(fileID, lc.kv.opt.Dir) + + // kv.Value is owned by the z.buffer. Ensure that we copy this buffer. + var tbl *table.Table + var err error + if lc.kv.opt.InMemory { + if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil { + return errors.Wrap(err, "while creating in-memory table from buffer") + } + } else { + if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil { + return errors.Wrap(err, "while creating table from buffer") + } + } + + lc.levels[lev].addTable(tbl) + // Release the ref held by OpenTable. addTable would add a reference. + _ = tbl.DecrRef() + + change.Id = fileID + change.Level = uint32(lev) + if dk != nil { + change.KeyId = dk.KeyId + } + // We use the same data KeyId. So, change.KeyId remains the same. + y.AssertTrue(change.Op == pb.ManifestChange_CREATE) + return lc.kv.manifest.addChanges([]*pb.ManifestChange{change}) +} diff --git a/levels_test.go b/levels_test.go index 56dafa8c8..6e427f2c7 100644 --- a/levels_test.go +++ b/levels_test.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "io/ioutil" "math" "math/rand" "os" @@ -39,7 +40,15 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { BloomFalsePositive: db.opt.BloomFalsePositive, ChkMode: options.NoVerification, } - b := table.NewTableBuilder(opts) + createAndOpenWithOptions(db, td, level, &opts) +} + +func createAndOpenWithOptions(db *DB, td []keyValVersion, level int, opts *table.Options) { + if opts == nil { + bopts := buildTableOptions(db) + opts = &bopts + } + b := table.NewTableBuilder(*opts) defer b.Close() // Add all keys and versions to the table. @@ -48,13 +57,21 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} b.Add(key, val, 0) } - fname := table.NewFilename(db.lc.reserveFileID(), db.opt.Dir) - tab, err := table.CreateTable(fname, b) + fileID := db.lc.reserveFileID() + var tab *table.Table + var err error + if db.opt.InMemory { + data := b.Finish() + tab, err = table.OpenInMemoryTable(data, fileID, opts) + } else { + fname := table.NewFilename(fileID, db.opt.Dir) + tab, err = table.CreateTable(fname, b) + } if err != nil { panic(err) } if err := db.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(tab.ID(), level, 0, tab.CompressionType()), + newCreateChange(tab.ID(), level, tab.KeyID(), tab.CompressionType()), }); err != nil { panic(err) } @@ -1221,3 +1238,85 @@ func TestStaleDataCleanup(t *testing.T) { }) } + +func TestStreamWithFullCopy(t *testing.T) { + dbopts := DefaultOptions("") + dbopts.managedTxns = true + dbopts.MaxLevels = 7 + dbopts.NumVersionsToKeep = math.MaxInt32 + + encKey := make([]byte, 24) + _, err := rand.Read(encKey) + require.NoError(t, err) + + test := func(db *DB, outOpts Options) { + l4 := []keyValVersion{{"a", "1", 3, bitDelete}, {"d", "4", 3, 0}} + l5 := []keyValVersion{{"b", "2", 2, 0}} + l6 := []keyValVersion{{"a", "1", 2, 0}, {"c", "3", 1, 0}} + createAndOpenWithOptions(db, l4, 4, nil) + createAndOpenWithOptions(db, l5, 5, nil) + createAndOpenWithOptions(db, l6, 6, nil) + + if !outOpts.InMemory { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + outOpts.Dir = dir + outOpts.ValueDir = dir + } + + require.NoError(t, db.StreamDB(outOpts)) + out, err := Open(outOpts) + require.NoError(t, err) + defer func() { + require.NoError(t, out.Close()) + }() + err = out.View(func(txn *Txn) error { + // Key "a" should not be there because we deleted it at higher version. + _, err := txn.Get([]byte("a")) + require.Error(t, err) + require.Equal(t, err, ErrKeyNotFound) + _, err = txn.Get([]byte("b")) + require.NoError(t, err) + _, err = txn.Get([]byte("c")) + require.NoError(t, err) + _, err = txn.Get([]byte("d")) + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + t.Run("without encryption", func(t *testing.T) { + opts := dbopts + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + }) + }) + t.Run("with encryption", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) + t.Run("stream from in-memory to persistent", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + opts.InMemory = true + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + outOpts := opts + outOpts.InMemory = false + test(db, outOpts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) +} diff --git a/manifest.go b/manifest.go index 2d58f0730..06ad4e97f 100644 --- a/manifest.go +++ b/manifest.go @@ -127,7 +127,7 @@ func (m *Manifest) clone() Manifest { func openOrCreateManifestFile(opt Options) ( ret *manifestFile, result Manifest, err error) { if opt.InMemory { - return &manifestFile{inMemory: true}, Manifest{}, nil + return &manifestFile{inMemory: true, manifest: createManifest()}, Manifest{}, nil } return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion, manifestDeletionsRewriteThreshold) @@ -205,21 +205,21 @@ func (mf *manifestFile) close() error { // this depends on the filesystem -- some might append garbage data if a system crash happens at // the wrong time.) func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { - if mf.inMemory { - return nil - } changes := pb.ManifestChangeSet{Changes: changesParam} buf, err := proto.Marshal(&changes) if err != nil { return err } - // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() defer mf.appendLock.Unlock() if err := applyChangeSet(&mf.manifest, &changes); err != nil { return err } + if mf.inMemory { + mf.appendLock.Unlock() + return nil + } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { diff --git a/pb/badgerpb3.pb.go b/pb/badgerpb3.pb.go index 927878ca0..34cc4b5f6 100644 --- a/pb/badgerpb3.pb.go +++ b/pb/badgerpb3.pb.go @@ -44,6 +44,34 @@ func (EncryptionAlgo) EnumDescriptor() ([]byte, []int) { return fileDescriptor_6d729c99bbc38987, []int{0} } +type KV_Kind int32 + +const ( + KV_KEY KV_Kind = 0 + KV_DATA_KEY KV_Kind = 1 + KV_FILE KV_Kind = 2 +) + +var KV_Kind_name = map[int32]string{ + 0: "KEY", + 1: "DATA_KEY", + 2: "FILE", +} + +var KV_Kind_value = map[string]int32{ + "KEY": 0, + "DATA_KEY": 1, + "FILE": 2, +} + +func (x KV_Kind) String() string { + return proto.EnumName(KV_Kind_name, int32(x)) +} + +func (KV_Kind) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_6d729c99bbc38987, []int{0, 0} +} + type ManifestChange_Operation int32 const ( @@ -104,7 +132,8 @@ type KV struct { // Stream id is used to identify which stream the KV came from. StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` // Stream done is used to indicate end of stream. - StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + Kind KV_Kind `protobuf:"varint,12,opt,name=kind,proto3,enum=badgerpb3.KV_Kind" json:"kind,omitempty"` } func (m *KV) Reset() { *m = KV{} } @@ -196,6 +225,13 @@ func (m *KV) GetStreamDone() bool { return false } +func (m *KV) GetKind() KV_Kind { + if m != nil { + return m.Kind + } + return KV_KEY +} + type KVList struct { Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` // alloc_ref used internally for memory management. @@ -552,6 +588,7 @@ func (m *Match) GetIgnoreBytes() string { func init() { proto.RegisterEnum("badgerpb3.EncryptionAlgo", EncryptionAlgo_name, EncryptionAlgo_value) + proto.RegisterEnum("badgerpb3.KV_Kind", KV_Kind_name, KV_Kind_value) proto.RegisterEnum("badgerpb3.ManifestChange_Operation", ManifestChange_Operation_name, ManifestChange_Operation_value) proto.RegisterEnum("badgerpb3.Checksum_Algorithm", Checksum_Algorithm_name, Checksum_Algorithm_value) proto.RegisterType((*KV)(nil), "badgerpb3.KV") @@ -566,48 +603,52 @@ func init() { func init() { proto.RegisterFile("badgerpb3.proto", fileDescriptor_6d729c99bbc38987) } var fileDescriptor_6d729c99bbc38987 = []byte{ - // 651 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0x4d, 0x6b, 0xdb, 0x40, - 0x10, 0xf5, 0xca, 0xf2, 0xd7, 0x38, 0x71, 0xdc, 0xa5, 0x2d, 0x0a, 0x25, 0xae, 0xa3, 0x50, 0x30, - 0x85, 0xda, 0x34, 0x2e, 0xbd, 0xf4, 0xe4, 0x2f, 0x88, 0x71, 0x42, 0x60, 0x1b, 0x42, 0xe8, 0xc5, - 0xac, 0xa5, 0xb1, 0x2d, 0x6c, 0x4b, 0x62, 0xb5, 0x16, 0xf1, 0x8f, 0x28, 0xf4, 0x67, 0xf5, 0x98, - 0x43, 0x0f, 0x3d, 0x96, 0xe4, 0x8f, 0x94, 0x5d, 0x29, 0xae, 0x7d, 0xe8, 0x6d, 0xe6, 0xcd, 0x68, - 0xde, 0xe8, 0xbd, 0x91, 0xe0, 0x68, 0xc2, 0xdd, 0x19, 0x8a, 0x70, 0xd2, 0x6e, 0x86, 0x22, 0x90, - 0x01, 0x2d, 0x6d, 0x01, 0xfb, 0x17, 0x01, 0x63, 0x74, 0x4b, 0xab, 0x90, 0x5d, 0xe0, 0xc6, 0x22, - 0x75, 0xd2, 0x38, 0x60, 0x2a, 0xa4, 0x2f, 0x21, 0x17, 0xf3, 0xe5, 0x1a, 0x2d, 0x43, 0x63, 0x49, - 0x42, 0xdf, 0x40, 0x69, 0x1d, 0xa1, 0x18, 0xaf, 0x50, 0x72, 0x2b, 0xab, 0x2b, 0x45, 0x05, 0x5c, - 0xa1, 0xe4, 0xd4, 0x82, 0x42, 0x8c, 0x22, 0xf2, 0x02, 0xdf, 0x32, 0xeb, 0xa4, 0x61, 0xb2, 0xe7, - 0x94, 0x9e, 0x00, 0xe0, 0x7d, 0xe8, 0x09, 0x8c, 0xc6, 0x5c, 0x5a, 0x39, 0x5d, 0x2c, 0xa5, 0x48, - 0x47, 0x52, 0x0a, 0xa6, 0x1e, 0x98, 0xd7, 0x03, 0x75, 0xac, 0x98, 0x22, 0x29, 0x90, 0xaf, 0xc6, - 0x9e, 0x6b, 0x41, 0x9d, 0x34, 0x0e, 0x59, 0x31, 0x01, 0x86, 0x2e, 0x7d, 0x0b, 0xe5, 0xb4, 0xe8, - 0x06, 0x3e, 0x5a, 0xe5, 0x3a, 0x69, 0x14, 0x19, 0x24, 0x50, 0x3f, 0xf0, 0xd1, 0xee, 0x43, 0x7e, - 0x74, 0x7b, 0xe9, 0x45, 0x92, 0x9e, 0x80, 0xb1, 0x88, 0x2d, 0x52, 0xcf, 0x36, 0xca, 0xe7, 0x87, - 0xcd, 0x7f, 0x4a, 0x8c, 0x6e, 0x99, 0xb1, 0x88, 0x15, 0x0d, 0x5f, 0x2e, 0x03, 0x67, 0x2c, 0x70, - 0xaa, 0x69, 0x4c, 0x56, 0xd4, 0x00, 0xc3, 0xa9, 0x7d, 0x01, 0x2f, 0xae, 0xb8, 0xef, 0x4d, 0x31, - 0x92, 0xbd, 0x39, 0xf7, 0x67, 0xf8, 0x15, 0x25, 0x6d, 0x43, 0xc1, 0xd1, 0x49, 0x94, 0x4e, 0x3d, - 0xde, 0x99, 0xba, 0xdf, 0xce, 0x9e, 0x3b, 0xed, 0xef, 0x06, 0x54, 0xf6, 0x6b, 0xb4, 0x02, 0xc6, - 0xd0, 0xd5, 0x8a, 0x9b, 0xcc, 0x18, 0xba, 0xb4, 0x0d, 0xc6, 0x75, 0xa8, 0xd5, 0xae, 0x9c, 0x9f, - 0xfd, 0x77, 0x64, 0xf3, 0x3a, 0x44, 0xc1, 0xa5, 0x17, 0xf8, 0xcc, 0xb8, 0x0e, 0x95, 0x4b, 0x97, - 0x18, 0xe3, 0x52, 0x7b, 0x71, 0xc8, 0x92, 0x84, 0xbe, 0x82, 0xfc, 0x02, 0x37, 0x4a, 0xb8, 0xc4, - 0x87, 0xdc, 0x02, 0x37, 0x43, 0x97, 0x76, 0xe1, 0x08, 0x7d, 0x47, 0x6c, 0x42, 0xf5, 0xf8, 0x98, - 0x2f, 0x67, 0x81, 0xb6, 0xa2, 0xb2, 0xf7, 0x06, 0x83, 0x6d, 0x47, 0x67, 0x39, 0x0b, 0x58, 0x05, - 0xf7, 0x72, 0x5a, 0x87, 0xb2, 0x13, 0xac, 0x42, 0x81, 0x91, 0xf6, 0x39, 0xaf, 0x69, 0x77, 0x21, - 0xfb, 0x0c, 0x4a, 0xdb, 0x1d, 0x29, 0x40, 0xbe, 0xc7, 0x06, 0x9d, 0x9b, 0x41, 0x35, 0xa3, 0xe2, - 0xfe, 0xe0, 0x72, 0x70, 0x33, 0xa8, 0x12, 0x3b, 0x86, 0x62, 0x6f, 0x8e, 0xce, 0x22, 0x5a, 0xaf, - 0xe8, 0x47, 0x30, 0xf5, 0x2e, 0x44, 0xef, 0x72, 0xb2, 0xb3, 0xcb, 0x73, 0x4b, 0x53, 0x51, 0x0b, - 0x4f, 0xce, 0x57, 0x4c, 0xb7, 0xaa, 0x73, 0x8d, 0xd6, 0x2b, 0x2d, 0x96, 0xc9, 0x54, 0x68, 0xbf, - 0x83, 0xd2, 0xb6, 0x29, 0x61, 0xed, 0xb5, 0xcf, 0x7b, 0xd5, 0x0c, 0x3d, 0x80, 0xe2, 0xdd, 0xdd, - 0x05, 0x8f, 0xe6, 0x9f, 0x3f, 0x55, 0x89, 0xed, 0x40, 0xa1, 0xcf, 0x25, 0x1f, 0xe1, 0x66, 0x47, - 0x24, 0xb2, 0x2b, 0x12, 0x05, 0xd3, 0xe5, 0x92, 0xa7, 0x67, 0xaf, 0x63, 0x65, 0x95, 0x17, 0xa7, - 0xe7, 0x6e, 0x78, 0xb1, 0x3a, 0x67, 0x47, 0x20, 0x97, 0xe8, 0xaa, 0x73, 0x56, 0x1a, 0x67, 0x59, - 0x29, 0x45, 0x3a, 0xd2, 0xee, 0x42, 0xee, 0x8a, 0x4b, 0x67, 0x4e, 0x5f, 0x43, 0x3e, 0x14, 0x38, - 0xf5, 0xee, 0xd3, 0x0f, 0x2b, 0xcd, 0xe8, 0x29, 0x1c, 0x78, 0x33, 0x3f, 0x10, 0x38, 0x9e, 0x6c, - 0x24, 0x46, 0x9a, 0xab, 0xc4, 0xca, 0x09, 0xd6, 0x55, 0xd0, 0xfb, 0x63, 0xa8, 0xec, 0x3b, 0x41, - 0x0b, 0x90, 0xe5, 0x18, 0x55, 0x33, 0xdd, 0x2f, 0x3f, 0x1f, 0x6b, 0xe4, 0xe1, 0xb1, 0x46, 0xfe, - 0x3c, 0xd6, 0xc8, 0x8f, 0xa7, 0x5a, 0xe6, 0xe1, 0xa9, 0x96, 0xf9, 0xfd, 0x54, 0xcb, 0x7c, 0x3b, - 0x9d, 0x79, 0x72, 0xbe, 0x9e, 0x34, 0x9d, 0x60, 0xd5, 0x72, 0x67, 0x82, 0x87, 0xf3, 0x0f, 0x5e, - 0xd0, 0x4a, 0xf4, 0x6c, 0xc5, 0xed, 0x56, 0x38, 0x99, 0xe4, 0xf5, 0x1f, 0xa0, 0xfd, 0x37, 0x00, - 0x00, 0xff, 0xff, 0xa2, 0x8d, 0xa8, 0xf5, 0x14, 0x04, 0x00, 0x00, + // 705 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0xdd, 0x6e, 0xda, 0x58, + 0x10, 0xc6, 0xc6, 0xfc, 0x0d, 0x84, 0xb0, 0x47, 0xbb, 0x2b, 0x47, 0xab, 0xb0, 0xc4, 0xd1, 0xee, + 0xa2, 0x95, 0x16, 0xb4, 0x50, 0xf5, 0xa6, 0x57, 0xfc, 0xb8, 0x0a, 0x82, 0x28, 0xd2, 0x69, 0x14, + 0xa5, 0xbd, 0x41, 0x07, 0x7b, 0x00, 0x0b, 0xb0, 0xad, 0xe3, 0x83, 0x15, 0x1e, 0xa2, 0x52, 0x5f, + 0xa2, 0xef, 0xd2, 0xcb, 0x5c, 0xf6, 0xb2, 0x4a, 0x5e, 0xa4, 0x3a, 0xc7, 0x0e, 0x85, 0x8b, 0xde, + 0xcd, 0xf7, 0xcd, 0x78, 0x66, 0x3c, 0xdf, 0xcc, 0x81, 0xd3, 0x19, 0x73, 0x17, 0xc8, 0xc3, 0x59, + 0xb7, 0x15, 0xf2, 0x40, 0x04, 0xa4, 0xb4, 0x27, 0xac, 0xcf, 0x3a, 0xe8, 0xe3, 0x3b, 0x52, 0x83, + 0xec, 0x0a, 0x77, 0xa6, 0xd6, 0xd0, 0x9a, 0x15, 0x2a, 0x4d, 0xf2, 0x2b, 0xe4, 0x62, 0xb6, 0xde, + 0xa2, 0xa9, 0x2b, 0x2e, 0x01, 0xe4, 0x0f, 0x28, 0x6d, 0x23, 0xe4, 0xd3, 0x0d, 0x0a, 0x66, 0x66, + 0x95, 0xa7, 0x28, 0x89, 0x6b, 0x14, 0x8c, 0x98, 0x50, 0x88, 0x91, 0x47, 0x5e, 0xe0, 0x9b, 0x46, + 0x43, 0x6b, 0x1a, 0xf4, 0x05, 0x92, 0x73, 0x00, 0x7c, 0x08, 0x3d, 0x8e, 0xd1, 0x94, 0x09, 0x33, + 0xa7, 0x9c, 0xa5, 0x94, 0xe9, 0x09, 0x42, 0xc0, 0x50, 0x09, 0xf3, 0x2a, 0xa1, 0xb2, 0x65, 0xa5, + 0x48, 0x70, 0x64, 0x9b, 0xa9, 0xe7, 0x9a, 0xd0, 0xd0, 0x9a, 0x27, 0xb4, 0x98, 0x10, 0x23, 0x97, + 0xfc, 0x09, 0xe5, 0xd4, 0xe9, 0x06, 0x3e, 0x9a, 0xe5, 0x86, 0xd6, 0x2c, 0x52, 0x48, 0xa8, 0x61, + 0xe0, 0x23, 0xf9, 0x1b, 0x8c, 0x95, 0xe7, 0xbb, 0x66, 0xa5, 0xa1, 0x35, 0xab, 0x1d, 0xd2, 0xfa, + 0x31, 0x81, 0xf1, 0x5d, 0x6b, 0xec, 0xf9, 0x2e, 0x55, 0x7e, 0xeb, 0x1f, 0x30, 0x24, 0x22, 0x05, + 0xc8, 0x8e, 0xed, 0xf7, 0xb5, 0x0c, 0xa9, 0x40, 0x71, 0xd8, 0xbb, 0xed, 0x4d, 0x25, 0xd2, 0x48, + 0x11, 0x8c, 0xb7, 0xa3, 0x89, 0x5d, 0xd3, 0xad, 0x21, 0xe4, 0xc7, 0x77, 0x13, 0x2f, 0x12, 0xe4, + 0x1c, 0xf4, 0x55, 0x6c, 0x6a, 0x8d, 0x6c, 0xb3, 0xdc, 0x39, 0x39, 0x4a, 0x4c, 0xf5, 0x55, 0x2c, + 0xfb, 0x66, 0xeb, 0x75, 0xe0, 0x4c, 0x39, 0xce, 0x55, 0xdf, 0x06, 0x2d, 0x2a, 0x82, 0xe2, 0xdc, + 0xba, 0x82, 0x5f, 0xae, 0x99, 0xef, 0xcd, 0x31, 0x12, 0x83, 0x25, 0xf3, 0x17, 0xf8, 0x0e, 0x05, + 0xe9, 0x42, 0xc1, 0x51, 0x20, 0x4a, 0xb3, 0x9e, 0x1d, 0x64, 0x3d, 0x0e, 0xa7, 0x2f, 0x91, 0xd6, + 0x47, 0x1d, 0xaa, 0xc7, 0x3e, 0x52, 0x05, 0x7d, 0xe4, 0x2a, 0x09, 0x0d, 0xaa, 0x8f, 0x5c, 0xd2, + 0x05, 0xfd, 0x26, 0x54, 0xf2, 0x55, 0x3b, 0x97, 0x3f, 0x4d, 0xd9, 0xba, 0x09, 0x91, 0x33, 0xe1, + 0x05, 0x3e, 0xd5, 0x6f, 0x42, 0x29, 0xfb, 0x04, 0x63, 0x5c, 0x2b, 0x71, 0x4f, 0x68, 0x02, 0xc8, + 0x6f, 0x90, 0x5f, 0xe1, 0x4e, 0x2a, 0x91, 0x08, 0x9b, 0x5b, 0xe1, 0x6e, 0xe4, 0x92, 0x3e, 0x9c, + 0xa2, 0xef, 0xf0, 0x5d, 0x28, 0x3f, 0x9f, 0xb2, 0xf5, 0x22, 0x50, 0xda, 0x56, 0x8f, 0xfe, 0xc0, + 0xde, 0x47, 0xf4, 0xd6, 0x8b, 0x80, 0x56, 0xf1, 0x08, 0x93, 0x06, 0x94, 0x9d, 0x60, 0x13, 0x72, + 0x8c, 0xd4, 0xe2, 0xe4, 0x55, 0xd9, 0x43, 0xca, 0xba, 0x84, 0xd2, 0xbe, 0x47, 0x02, 0x90, 0x1f, + 0x50, 0xbb, 0x77, 0x6b, 0xd7, 0x32, 0xd2, 0x1e, 0xda, 0x13, 0xfb, 0xd6, 0xae, 0x69, 0x56, 0x0c, + 0xc5, 0xc1, 0x12, 0x9d, 0x55, 0xb4, 0xdd, 0x90, 0xff, 0xc1, 0x50, 0xbd, 0x68, 0xaa, 0x97, 0xf3, + 0x83, 0x5e, 0x5e, 0x42, 0x5a, 0xb2, 0x34, 0xf7, 0xc4, 0x72, 0x43, 0x55, 0xa8, 0xdc, 0xff, 0x68, + 0xbb, 0x51, 0xc3, 0x32, 0xa8, 0x34, 0xad, 0xbf, 0xa0, 0xb4, 0x0f, 0x4a, 0xaa, 0x0e, 0xba, 0x9d, + 0x41, 0xb2, 0x21, 0xf7, 0xf7, 0x57, 0x2c, 0x5a, 0xbe, 0x7e, 0x55, 0xd3, 0x2c, 0x07, 0x0a, 0x43, + 0x26, 0xd8, 0x18, 0x77, 0x07, 0x43, 0xd2, 0x0e, 0x87, 0x44, 0xc0, 0x70, 0x99, 0x60, 0xe9, 0x1d, + 0x29, 0x5b, 0x4a, 0xe5, 0xc5, 0xe9, 0xfd, 0xe8, 0x5e, 0x2c, 0xef, 0xc3, 0xe1, 0xc8, 0x04, 0xba, + 0xf2, 0x3e, 0xe4, 0x8c, 0xb3, 0xb4, 0x94, 0x32, 0x3d, 0x61, 0xf5, 0x21, 0x77, 0xcd, 0x84, 0xb3, + 0x24, 0xbf, 0x43, 0x3e, 0xe4, 0x38, 0xf7, 0x1e, 0xd2, 0x4b, 0x4d, 0x11, 0xb9, 0x80, 0x8a, 0xb7, + 0xf0, 0x03, 0x8e, 0xd3, 0xd9, 0x4e, 0x60, 0xa4, 0x6a, 0x95, 0x68, 0x39, 0xe1, 0xfa, 0x92, 0xfa, + 0xf7, 0x0c, 0xaa, 0xc7, 0x4a, 0xc8, 0x9d, 0x67, 0x18, 0xd5, 0x32, 0xfd, 0x37, 0x5f, 0x9e, 0xea, + 0xda, 0xe3, 0x53, 0x5d, 0xfb, 0xf6, 0x54, 0xd7, 0x3e, 0x3d, 0xd7, 0x33, 0x8f, 0xcf, 0xf5, 0xcc, + 0xd7, 0xe7, 0x7a, 0xe6, 0xc3, 0xc5, 0xc2, 0x13, 0xcb, 0xed, 0xac, 0xe5, 0x04, 0x9b, 0xb6, 0xbb, + 0xe0, 0x2c, 0x5c, 0xfe, 0xe7, 0x05, 0xed, 0x64, 0x9e, 0xed, 0xb8, 0xdb, 0x0e, 0x67, 0xb3, 0xbc, + 0x7a, 0x52, 0xba, 0xdf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x43, 0xc8, 0xf6, 0xac, 0x65, 0x04, 0x00, + 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -630,6 +671,11 @@ func (m *KV) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Kind != 0 { + i = encodeVarintBadgerpb3(dAtA, i, uint64(m.Kind)) + i-- + dAtA[i] = 0x60 + } if m.StreamDone { i-- if m.StreamDone { @@ -980,6 +1026,9 @@ func (m *KV) Size() (n int) { if m.StreamDone { n += 2 } + if m.Kind != 0 { + n += 1 + sovBadgerpb3(uint64(m.Kind)) + } return n } @@ -1346,6 +1395,25 @@ func (m *KV) Unmarshal(dAtA []byte) error { } } m.StreamDone = bool(v != 0) + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Kind", wireType) + } + m.Kind = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBadgerpb3 + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Kind |= KV_Kind(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBadgerpb3(dAtA[iNdEx:]) diff --git a/pb/badgerpb3.proto b/pb/badgerpb3.proto index d0df21fb3..859ecdb37 100644 --- a/pb/badgerpb3.proto +++ b/pb/badgerpb3.proto @@ -33,6 +33,13 @@ message KV { uint32 stream_id = 10; // Stream done is used to indicate end of stream. bool stream_done = 11; + + enum Kind { + KEY = 0; + DATA_KEY = 1; + FILE = 2; + } + Kind kind = 12; } message KVList { @@ -84,3 +91,4 @@ message Match { bytes prefix = 1; string ignore_bytes = 2; // Comma separated with dash to represent ranges "1, 2-3, 4-7, 9" } + diff --git a/stream.go b/stream.go index 0614f19aa..62fc694ee 100644 --- a/stream.go +++ b/stream.go @@ -25,9 +25,11 @@ import ( "time" "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/badger/v3/table" "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" + "github.com/pkg/errors" ) const batchSize = 16 << 20 // 16 MB @@ -82,7 +84,9 @@ type Stream struct { Send func(buf *z.Buffer) error // Read data above the sinceTs. All keys with version =< sinceTs will be ignored. - SinceTs uint64 + SinceTs uint64 + // FullCopy should be set to true only when encryption mode is same for sender and receiver. + FullCopy bool readTs uint64 db *DB rangeCh chan keyRange @@ -107,9 +111,6 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() - if item.IsDeletedOrExpired() { - break - } if !bytes.Equal(key, item.Key()) { // Break out on the first encounter with another key. break @@ -127,6 +128,7 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { } kv.Version = item.Version() kv.ExpiresAt = item.ExpiresAt() + kv.Meta = []byte{item.meta} kv.UserMeta = a.Copy([]byte{item.UserMeta()}) list.Kv = append(list.Kv, kv) @@ -137,6 +139,12 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { if item.DiscardEarlierVersions() { break } + if item.IsDeletedOrExpired() { + // We do a FullCopy in stream. It might happen that tables from L6 contain K(version=1), + // while the table at L4 that was not copied contains K(version=2) with delete mark. + // Hence, we need to send the deleted or expired item too. + break + } } return list, nil } @@ -163,18 +171,10 @@ func (st *Stream) produceRanges(ctx context.Context) { } // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. -func (st *Stream) produceKVs(ctx context.Context, threadId int) error { +func (st *Stream) produceKVs(ctx context.Context, itr *Iterator) error { atomic.AddInt32(&st.numProducers, 1) defer atomic.AddInt32(&st.numProducers, -1) - var txn *Txn - if st.readTs > 0 { - txn = st.db.NewTransactionAt(st.readTs, false) - } else { - txn = st.db.NewTransaction(false) - } - defer txn.Discard() - // produceKVs is running iterate serially. So, we can define the outList here. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs") defer func() { @@ -184,15 +184,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { }() iterate := func(kr keyRange) error { - iterOpts := DefaultIteratorOptions - iterOpts.AllVersions = true - iterOpts.Prefix = st.Prefix - iterOpts.PrefetchValues = false - iterOpts.SinceTs = st.SinceTs - itr := txn.NewIterator(iterOpts) - itr.ThreadId = threadId - defer itr.Close() - itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate") defer itr.Alloc.Release() @@ -376,6 +367,77 @@ outer: return nil } +func (st *Stream) copyTablesOver(ctx context.Context, tableMatrix [][]*table.Table) error { + // TODO: See if making this concurrent would be helpful. Most likely it won't. + // But, if it does work, then most like <3 goroutines might be sufficient. + infof := st.db.opt.Infof + // Make a copy of the manifest so that we don't have race condition. + manifest := st.db.manifest.manifest.clone() + dataKeys := make(map[uint64]struct{}) + // Iterate in reverse order so that the receiver gets the bottommost level first. + for i := len(tableMatrix) - 1; i >= 0; i-- { + level := i + tables := tableMatrix[i] + for _, t := range tables { + // This table can be picked for copying directly. + out := z.NewBuffer(int(t.Size())+1024, "Stream.Table") + if dk := t.DataKey(); dk != nil { + y.AssertTrue(dk.KeyId != 0) + // If we have a legit data key, send it over so the table can be decrypted. The same + // data key could have been used to encrypt many tables. Avoid sending it + // repeatedly. + if _, sent := dataKeys[dk.KeyId]; !sent { + infof("Sending data key with ID: %d\n", dk.KeyId) + val, err := dk.Marshal() + y.Check(err) + + // This would go to key registry in destination. + kv := &pb.KV{ + Value: val, + Kind: pb.KV_DATA_KEY, + } + KVToBuffer(kv, out) + dataKeys[dk.KeyId] = struct{}{} + } + } + + infof("Sending table ID: %d at level: %d. Size: %s\n", + t.ID(), level, humanize.IBytes(uint64(t.Size()))) + tableManifest := manifest.Tables[t.ID()] + change := pb.ManifestChange{ + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + KeyId: tableManifest.KeyID, + // Hard coding it, since we're supporting only AES for now. + EncryptionAlgo: pb.EncryptionAlgo_aes, + Compression: uint32(tableManifest.Compression), + } + + buf, err := change.Marshal() + y.Check(err) + + // We send the table along with level to the destination, so they'd know where to + // place the tables. We'd send all the tables first, before we start streaming. So, the + // destination DB would write streamed keys one level above. + kv := &pb.KV{ + // Key can be used for MANIFEST. + Key: buf, + Value: t.Data, + Kind: pb.KV_FILE, + } + KVToBuffer(kv, out) + + select { + case st.kvChan <- out: + case <-ctx.Done(): + out.Release() + return ctx.Err() + } + } + } + return nil +} + // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also @@ -383,6 +445,11 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { + if st.FullCopy { + if !st.db.opt.managedTxns || st.SinceTs != 0 || st.ChooseKey != nil && st.KeyToList != nil { + panic("Got invalid stream options when doing full copy") + } + } ctx, cancel := context.WithCancel(ctx) defer cancel() st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. @@ -396,7 +463,127 @@ func (st *Stream) Orchestrate(ctx context.Context) error { st.KeyToList = st.ToList } + // Pick up key-values from kvChan and send to stream. + kvErr := make(chan error, 1) + go func() { + // Picks up KV lists from kvChan, and sends them to Output. + err := st.streamKVs(ctx) + if err != nil { + cancel() // Stop all the go routines. + } + kvErr <- err + }() + + // Pick all relevant tables from levels. We'd use this to copy them over, + // or generate iterators from them. + memTables, decr := st.db.getMemTables() + defer decr() + + opts := DefaultIteratorOptions + opts.Prefix = st.Prefix + opts.SinceTs = st.SinceTs + tableMatrix := st.db.lc.getTables(&opts) + defer func() { + for _, tables := range tableMatrix { + for _, t := range tables { + t.DecrRef() + } + } + }() + y.AssertTrue(len(tableMatrix) == st.db.opt.MaxLevels) + + infof := st.db.opt.Infof + copyTables := func() error { + // Figure out which tables we can copy. Only choose from the last 2 levels. + // Say last level has data of size 100. Given a 10x level multiplier and + // assuming the tree is balanced, second last level would have 10, and the + // third last level would have 1. The third last level would only have 1% + // of the data of the last level. It's OK for us to stop there and just + // stream it, instead of trying to copy over those tables too. When we + // copy over tables to Level i, we can't stream any data to level i, i+1, + // and so on. The stream has to create tables at level i-1, so there can be + // overlap between the tables at i-1 and i. + + // Let's pick the tables which can be fully copied over from last level. + threshold := len(tableMatrix) - 2 + toCopy := make([][]*table.Table, len(tableMatrix)) + var numCopy, numStream int + for lev, tables := range tableMatrix { + // We stream only the data in the two bottommost levels. + if lev < threshold { + numStream += len(tables) + continue + } + var rem []*table.Table + cp := tables[:0] + for _, t := range tables { + // We can only copy over those tables that satisfy following conditions: + // - All the keys have version less than st.readTs + // - st.Prefix fully covers the table + if t.MaxVersion() > st.readTs || !t.CoveredByPrefix(st.Prefix) { + rem = append(rem, t) + continue + } + cp = append(cp, t) + } + toCopy[lev] = cp // Pick tables to copy. + tableMatrix[lev] = rem // Keep remaining for streaming. + numCopy += len(cp) + numStream += len(rem) + } + infof("Num tables to copy: %d. Num to stream: %d\n", numCopy, numStream) + + return st.copyTablesOver(ctx, toCopy) + } + + if st.FullCopy { + // As of now, we don't handle the non-zero SinceTs. + if err := copyTables(); err != nil { + return errors.Wrap(err, "while copying tables") + } + } + + var txn *Txn + if st.readTs > 0 { + txn = st.db.NewTransactionAt(st.readTs, false) + } else { + txn = st.db.NewTransaction(false) + } + defer txn.Discard() + + newIterator := func(threadId int) *Iterator { + var itrs []y.Iterator + for _, mt := range memTables { + itrs = append(itrs, mt.sl.NewUniIterator(false)) + } + if tables := tableMatrix[0]; len(tables) > 0 { + itrs = append(itrs, iteratorsReversed(tables, 0)...) + } + for _, tables := range tableMatrix[1:] { + if len(tables) == 0 { + continue + } + itrs = append(itrs, table.NewConcatIterator(tables, 0)) + } + + opt := DefaultIteratorOptions + opt.AllVersions = true + opt.Prefix = st.Prefix + opt.PrefetchValues = false + opt.SinceTs = st.SinceTs + + res := &Iterator{ + txn: txn, + iitr: table.NewMergeIterator(itrs, false), + opt: opt, + readTs: txn.readTs, + ThreadId: threadId, + } + return res + } + // Picks up ranges from Badger, and sends them to rangeCh. + // Just for simplicity, we'd consider all the tables for range production. go st.produceRanges(ctx) errCh := make(chan error, st.NumGo) // Stores error by consumeKeys. @@ -407,7 +594,9 @@ func (st *Stream) Orchestrate(ctx context.Context) error { go func(threadId int) { defer wg.Done() // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan. - if err := st.produceKVs(ctx, threadId); err != nil { + itr := newIterator(threadId) + defer itr.Close() + if err := st.produceKVs(ctx, itr); err != nil { select { case errCh <- err: default: @@ -416,16 +605,6 @@ func (st *Stream) Orchestrate(ctx context.Context) error { }(i) } - // Pick up key-values from kvChan and send to stream. - kvErr := make(chan error, 1) - go func() { - // Picks up KV lists from kvChan, and sends them to Output. - err := st.streamKVs(ctx) - if err != nil { - cancel() // Stop all the go routines. - } - kvErr <- err - }() wg.Wait() // Wait for produceKVs to be over. close(st.kvChan) // Now we can close kvChan. defer func() { diff --git a/stream_writer.go b/stream_writer.go index 4613718cc..83a652bfb 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -26,6 +26,7 @@ import ( "github.com/dgraph-io/badger/v3/y" "github.com/dgraph-io/ristretto/z" humanize "github.com/dustin/go-humanize" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" ) @@ -41,12 +42,18 @@ import ( // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new // DBs. type StreamWriter struct { - writeLock sync.Mutex - db *DB - done func() - throttle *y.Throttle - maxVersion uint64 - writers map[uint32]*sortedWriter + writeLock sync.Mutex + db *DB + done func() + throttle *y.Throttle + maxVersion uint64 + writers map[uint32]*sortedWriter + prevLevel int + senderPrevLevel int + keyId map[uint64]*pb.DataKey // map to store stores reader's keyId to data key. + // Writer might receive tables first, and then receive keys. If true, that means we have + // started processing keys. + processingKeys bool } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -60,6 +67,7 @@ func (db *DB) NewStreamWriter() *StreamWriter { // concurrent streams being processed. throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), + keyId: make(map[uint64]*pb.DataKey), } } @@ -108,7 +116,64 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if _, ok := closedStreams[kv.StreamId]; ok { panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) } + switch kv.Kind { + case pb.KV_DATA_KEY: + y.AssertTrue(len(sw.db.opt.EncryptionKey) > 0) + var dk pb.DataKey + if err := proto.Unmarshal(kv.Value, &dk); err != nil { + return errors.Wrapf(err, "unmarshal failed %s", kv.Value) + } + readerId := dk.KeyId + if _, ok := sw.keyId[readerId]; !ok { + // Insert the data key to the key registry if not already inserted. + id, err := sw.db.registry.AddKey(dk) + if err != nil { + return errors.Wrap(err, "failed to write data key") + } + dk.KeyId = id + sw.keyId[readerId] = &dk + } + return nil + case pb.KV_FILE: + // All tables should be recieved before any of the keys. + if sw.processingKeys { + return errors.New("Received pb.KV_FILE after pb.KV_KEY") + } + var change pb.ManifestChange + if err := proto.Unmarshal(kv.Key, &change); err != nil { + return errors.Wrap(err, "unable to unmarshal manifest change") + } + level := int(change.Level) + if sw.senderPrevLevel == 0 { + // We received the first file, set the sender's and receiver's max levels. + sw.senderPrevLevel = level + sw.prevLevel = len(sw.db.lc.levels) - 1 + } + // This is based on the assumption that the tables from the last + // level will be sent first and then the second last level tables. + // As long as the kv.Version (which stores the level) is same as + // the prevLevel, we know we're processing a last level table. + // The last level for this DB can be 8 while the DB that's sending + // this could have the last level at 7. + if sw.senderPrevLevel != level { + // If the previous level and the current level is different, we + // must be processing a table from the next last level. + sw.senderPrevLevel = level + sw.prevLevel-- + } + dk := sw.keyId[change.KeyId] + return sw.db.lc.AddTable(&kv, sw.prevLevel, dk, &change) + case pb.KV_KEY: + // Pass. The following code will handle the keys. + } + + sw.processingKeys = true + if sw.prevLevel == 0 { + // If prevLevel is 0, that means that we have not written anything yet. Equivalently, + // we were virtually writing to the maxLevel+1. + sw.prevLevel = len(sw.db.lc.levels) + } var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -284,6 +349,7 @@ type sortedWriter struct { builder *table.Builder lastKey []byte + level int streamID uint32 reqCh chan *request // Have separate closer for each writer, as it can be closed at any time. @@ -303,6 +369,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), closer: z.NewCloser(1), + level: sw.prevLevel - 1, // Write at the level just above the one we were writing to. } go w.handleRequests() @@ -434,7 +501,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } lc := w.db.lc - lhandler := lc.levels[len(lc.levels)-1] + lhandler := lc.levels[w.level] // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ Id: tbl.ID(), diff --git a/table/table.go b/table/table.go index 1378f9a36..45ec1940a 100644 --- a/table/table.go +++ b/table/table.go @@ -146,7 +146,8 @@ func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount } // OnDiskSize returns the total size of key-values stored in this table (including the // disk space occupied on the value log). -func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize } +func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize } +func (t *Table) DataKey() *pb.DataKey { return t.opt.DataKey } // CompressionType returns the compression algorithm used for block compression. func (t *Table) CompressionType() options.CompressionType { @@ -256,7 +257,21 @@ func (b block) verifyCheckSum() error { func CreateTable(fname string, builder *Builder) (*Table, error) { bd := builder.Done() - mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size) + mf, err := newFile(fname, bd.Size) + if err != nil { + return nil, err + } + + written := bd.Copy(mf.Data) + y.AssertTrue(written == len(mf.Data)) + if err := z.Msync(mf.Data); err != nil { + return nil, y.Wrapf(err, "while calling msync on %s", fname) + } + return OpenTable(mf, *builder.opts) +} + +func newFile(fname string, sz int) (*z.MmapFile, error) { + mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, sz) if err == z.NewFile { // Expected. } else if err != nil { @@ -264,13 +279,22 @@ func CreateTable(fname string, builder *Builder) (*Table, error) { } else { return nil, errors.Errorf("file already exists: %s", fname) } + return mf, nil +} - written := bd.Copy(mf.Data) +func CreateTableFromBuffer(fname string, buf []byte, opts Options) (*Table, error) { + mf, err := newFile(fname, len(buf)) + if err != nil { + return nil, err + } + + // We cannot use the buf directly here because it is not mmapped. + written := copy(mf.Data, buf) y.AssertTrue(written == len(mf.Data)) if err := z.Msync(mf.Data); err != nil { return nil, y.Wrapf(err, "while calling msync on %s", fname) } - return OpenTable(mf, *builder.opts) + return OpenTable(mf, opts) } // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function @@ -693,6 +717,12 @@ func (t *Table) DoesNotHave(hash uint32) bool { return !mayContain } +// CoveredByPrefix returns true if all the keys in the table are prefixed by the given prefix. +func (t *Table) CoveredByPrefix(prefix []byte) bool { + return bytes.HasPrefix(y.ParseKey(t.Biggest()), prefix) && + bytes.HasPrefix(y.ParseKey(t.Smallest()), prefix) +} + // readTableIndex reads table index from the sst and returns its pb format. func (t *Table) readTableIndex() (*fb.TableIndex, error) { data := t.readNoFail(t.indexStart, t.indexLen) @@ -809,7 +839,7 @@ func (t *Table) decompress(b *block) error { if sz, err := snappy.DecodedLen(b.data); err == nil { dst = z.Calloc(sz, "Table.Decompress") } else { - dst = z.Calloc(len(b.data) * 4, "Table.Decompress") // Take a guess. + dst = z.Calloc(len(b.data)*4, "Table.Decompress") // Take a guess. } b.data, err = snappy.Decode(dst, b.data) if err != nil { diff --git a/value.go b/value.go index 6e8f9178e..5063711fa 100644 --- a/value.go +++ b/value.go @@ -788,7 +788,9 @@ func estimateRequestSize(req *request) uint64 { // write is thread-unsafe by design and should not be called concurrently. func (vlog *valueLog) write(reqs []*request) error { - if vlog.db.opt.InMemory { + if vlog.db.opt.InMemory || vlog.db.opt.managedTxns { + // Don't do value log writes in managed mode. + // TODO: In the managed mode, don't create a value log. return nil } // Validate writes before writing to vlog. Because, we don't want to partially write and return