-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreservoir.go
155 lines (126 loc) · 2.75 KB
/
reservoir.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package sampling
import (
"container/heap"
"math"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// There are three different reservoirs available: simple, slightly optimized simple, and weighted.
// Each reservoir makes the assumption that the iterator contains more items than the desired samples
// and that the iterator is not an infinite stream of data.
type SimpleReservoir struct {
k int
i Iterator
}
// NewSimpleReservoir ...
func NewSimpleReservoir(samples int, itr Iterator) SimpleReservoir {
return SimpleReservoir{
k: samples,
i: itr,
}
}
func (sr SimpleReservoir) Sample() []interface{} {
var samples []interface{}
// load samples with first k elements from iterator
for i := 0; i < sr.k; i++ {
item, hasNext := sr.i.Next()
if !hasNext {
panic("iterator must have more items than desired samples")
}
samples = append(samples, item)
}
n := sr.k
for {
item, hasNext := sr.i.Next()
if !hasNext {
break
}
n++
j := rand.Intn(n)
if j < sr.k {
samples[j] = item
}
}
return samples
}
type Reservoir struct {
k int
// assume that super set of items can fit in memory
i []interface{}
}
// NewReservoir ...
func NewReservoir(samples int, itr Iterator) Reservoir {
var items []interface{}
for {
item, hasNext := itr.Next()
if !hasNext {
break
}
items = append(items, item)
}
return Reservoir{
k: samples,
i: items,
}
}
func (r Reservoir) Sample() []interface{} {
var samples []interface{}
// load samples with first k elements from iterator
for i := 0; i < r.k; i++ {
samples = append(samples, r.i[i])
}
w := math.Pow(math.E, math.Log(rand.Float64())/float64(r.k))
i := float64(0)
n := float64(len(r.i))
for i < n {
i = i + math.Floor(math.Log(rand.Float64())/math.Log(1-w)) + 1.0
if i < n {
samples[rand.Intn(r.k)] = r.i[int(i)]
w = w * math.Pow(math.E, math.Log(rand.Float64())/float64(r.k))
}
}
return samples
}
type WeightedReservoir struct {
k int
i Iterator
}
// NewWeightedReservoir ...
func NewWeightedReservoir(samples int, itr Iterator) WeightedReservoir {
return WeightedReservoir{
k: samples,
i: itr,
}
}
func (wr WeightedReservoir) Sample() []interface{} {
var samples []interface{}
h := &MinHeap{}
heap.Init(h)
for {
item, hasNext := wr.i.Next()
if !hasNext {
break
}
wi, ok := item.(WeightedItem)
if !ok {
panic("weighted reservoir can only accept items of type WeightedItem")
}
r := math.Pow(rand.Float64(), 1/wi.Weight)
if h.Len() < wr.k {
heap.Push(h, &Node{Key: r, Obj: wi.Obj})
} else {
head := h.Peek().(*Node)
if r > head.Key {
_ = heap.Pop(h)
heap.Push(h, &Node{Key: r, Obj: wi.Obj})
}
}
}
for h.Len() > 0 {
samples = append(samples, heap.Pop(h).(*Node).Obj)
}
return samples
}