forked from dgraph-io/badger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiterator.go
285 lines (253 loc) · 7.43 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bytes"
"sync"
"github.com/dgraph-io/badger/y"
)
// KVItem is returned during iteration. Both the Key() and Value() output is only valid until
// iterator.Next() is called.
type KVItem struct {
wg sync.WaitGroup
key []byte
vptr []byte
meta byte
userMeta byte
val []byte
casCounter uint64
slice *y.Slice
next *KVItem
}
// Key returns the key. Remember to copy if you need to access it outside the iteration loop.
func (item *KVItem) Key() []byte {
return item.key
}
// Value returns the value, generally fetched from the value log. This call can block while the
// value is populated asynchronously via a disk read. Remember to parse or copy it if you need to
// reuse it. DO NOT modify or append to this slice; it would result in internal data overwrite.
func (item *KVItem) Value() []byte {
item.wg.Wait()
return item.val
}
// Counter returns the CAS counter associated with the value.
func (item *KVItem) Counter() uint64 {
return item.casCounter
}
// UserMeta returns the userMeta set by the user
func (item *KVItem) UserMeta() byte {
return item.userMeta
}
type list struct {
head *KVItem
tail *KVItem
}
func (l *list) push(i *KVItem) {
i.next = nil
if l.tail == nil {
l.head = i
l.tail = i
return
}
l.tail.next = i
l.tail = i
}
func (l *list) pop() *KVItem {
if l.head == nil {
return nil
}
i := l.head
if l.head == l.tail {
l.tail = nil
l.head = nil
} else {
l.head = i.next
}
i.next = nil
return i
}
// IteratorOptions is used to set options when iterating over Badger key-value stores.
type IteratorOptions struct {
PrefetchSize int // How many KV pairs to prefetch while iterating.
FetchValues bool // Controls whether the values should be fetched from the value log.
Reverse bool // Direction of iteration. False is forward, true is backward.
}
// DefaultIteratorOptions contains default options when iterating over Badger key-value stores.
var DefaultIteratorOptions = IteratorOptions{
PrefetchSize: 100,
FetchValues: true,
Reverse: false,
}
// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
type Iterator struct {
kv *KV
iitr y.Iterator
opt IteratorOptions
item *KVItem
data list
waste list
}
func (it *Iterator) newItem() *KVItem {
item := it.waste.pop()
if item == nil {
item = &KVItem{slice: new(y.Slice)}
}
return item
}
// Item returns pointer to the current KVItem.
// This item is only valid until it.Next() gets called.
func (it *Iterator) Item() *KVItem { return it.item }
// Valid returns false when iteration is done.
func (it *Iterator) Valid() bool { return it.item != nil }
// ValidForPrefix returns false when iteration is done
// or when the current key is not prefixed by the specified prefix.
func (it *Iterator) ValidForPrefix(prefix []byte) bool {
return it.item != nil && bytes.HasPrefix(it.item.key, prefix)
}
// Close would close the iterator. It is important to call this when you're done with iteration.
func (it *Iterator) Close() {
it.iitr.Close()
}
// Next would advance the iterator by one. Always check it.Valid() after a Next()
// to ensure you have access to a valid it.Item().
func (it *Iterator) Next() {
// Reuse current item
it.item.wg.Wait() // Just cleaner to wait before pushing to avoid doing ref counting.
it.waste.push(it.item)
// Set next item to current
it.item = it.data.pop()
// Advance internal iterator until entry is not deleted
for it.iitr.Next(); it.iitr.Valid(); it.iitr.Next() {
if bytes.HasPrefix(it.iitr.Key(), badgerPrefix) {
continue
}
if it.iitr.Value().Meta&BitDelete == 0 { // Not deleted.
break
}
}
if !it.iitr.Valid() {
return
}
item := it.newItem()
it.fill(item)
it.data.push(item)
}
func (it *Iterator) fill(item *KVItem) {
vs := it.iitr.Value()
item.meta = vs.Meta
item.userMeta = vs.UserMeta
item.casCounter = vs.CASCounter
item.key = y.Safecopy(item.key, it.iitr.Key())
item.vptr = y.Safecopy(item.vptr, vs.Value)
item.val = nil
if it.opt.FetchValues {
item.wg.Add(1)
go func() {
it.kv.fillItem(item)
item.wg.Done()
}()
}
}
func (it *Iterator) prefetch() {
prefetchSize := it.opt.PrefetchSize
if it.opt.PrefetchSize <= 1 {
// Try prefetching atleast the first two items to put into it.item and it.data.
prefetchSize = 2
}
i := it.iitr
var count int
it.item = nil
for ; i.Valid(); i.Next() {
if bytes.HasPrefix(it.iitr.Key(), badgerPrefix) {
continue
}
if i.Value().Meta&BitDelete > 0 {
continue
}
count++
item := it.newItem()
it.fill(item)
if it.item == nil {
it.item = item
} else {
it.data.push(item)
}
if count == prefetchSize {
break
}
}
}
// Seek would seek to the provided key if present. If absent, it would seek to the next smallest key
// greater than provided if iterating in the forward direction. Behavior would be reversed is
// iterating backwards.
func (it *Iterator) Seek(key []byte) {
for i := it.data.pop(); i != nil; i = it.data.pop() {
i.wg.Wait()
it.waste.push(i)
}
it.iitr.Seek(key)
for it.iitr.Valid() && bytes.HasPrefix(it.iitr.Key(), badgerPrefix) {
it.iitr.Next()
}
it.prefetch()
}
// Rewind would rewind the iterator cursor all the way to zero-th position, which would be the
// smallest key if iterating forward, and largest if iterating backward. It does not keep track of
// whether the cursor started with a Seek().
func (it *Iterator) Rewind() {
i := it.data.pop()
for i != nil {
i.wg.Wait() // Just cleaner to wait before pushing. No ref counting needed.
it.waste.push(i)
i = it.data.pop()
}
it.iitr.Rewind()
for it.iitr.Valid() && bytes.HasPrefix(it.iitr.Key(), badgerPrefix) {
it.iitr.Next()
}
it.prefetch()
}
// NewIterator returns a new iterator. Depending upon the options, either only keys, or both
// key-value pairs would be fetched. The keys are returned in lexicographically sorted order.
// Usage:
// opt := badger.DefaultIteratorOptions
// itr := kv.NewIterator(opt)
// for itr.Rewind(); itr.Valid(); itr.Next() {
// item := itr.Item()
// key := item.Key()
// val := item.Value() // This could block while value is fetched from value log.
// // For key only iteration, set opt.FetchValues to false, and don't call
// // item.Value().
//
// // Remember that both key, val would become invalid in the next iteration of the loop.
// // So, if you need access to them outside, copy them or parse them.
// }
// itr.Close()
func (s *KV) NewIterator(opt IteratorOptions) *Iterator {
tables, decr := s.getMemTables()
defer decr()
var iters []y.Iterator
for i := 0; i < len(tables); i++ {
iters = append(iters, tables[i].NewUniIterator(opt.Reverse))
}
iters = s.lc.appendIterators(iters, opt.Reverse) // This will increment references.
res := &Iterator{
kv: s,
iitr: y.NewMergeIterator(iters, opt.Reverse),
opt: opt,
}
return res
}