-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsource.go
61 lines (54 loc) · 1.15 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package bakelite
import (
"fmt"
)
type recordSource struct {
db *DB
table string // only for error messages
source <-chan []any
rowID int
peek *tableLeafCell
}
func newRecordSource(db *DB, table string, source <-chan []any) *recordSource {
return &recordSource{
db: db,
table: table,
source: source,
rowID: 1,
}
}
// Get next tableLeafCell from source. It'll keep returning the same cell until
// Pop() is called. This is because creating a leaf cell might store overflow
// in the DB as a side effect.
// If source is empty we'll return nil
func (r *recordSource) Peek() *tableLeafCell {
if r.peek == nil {
row, ok := <-r.source
if !ok {
return nil
}
rec, err := makeRecord(row)
if err != nil {
r.db.err = fmt.Errorf("table %q: %w", r.table, err)
return nil
}
cell := r.db.makeLeafCell(r.rowID, rec)
r.peek = &cell
r.rowID++
}
return r.peek
}
func (r *recordSource) Pop() {
r.peek = nil
}
// helper to go from slices to a channel
func stream(rows [][]any) <-chan []any {
source := make(chan []any)
go func() {
defer close(source)
for _, row := range rows {
source <- row
}
}()
return source
}