-
Notifications
You must be signed in to change notification settings - Fork 183
/
Copy pathstream.go
153 lines (133 loc) · 3.67 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package sse
import (
"net/url"
"sync"
"sync/atomic"
)
// Stream ...
type Stream struct {
ID string
event chan *Event
quit chan struct{}
quitOnce sync.Once
register chan *Subscriber
deregister chan *Subscriber
subscribers []*Subscriber
Eventlog EventLog
subscriberCount int32
// Enables replaying of eventlog to newly added subscribers
AutoReplay bool
isAutoStream bool
// Specifies the function to run when client subscribe or un-subscribe
OnSubscribe func(streamID string, sub *Subscriber)
OnUnsubscribe func(streamID string, sub *Subscriber)
}
// newStream returns a new stream
func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream {
return &Stream{
ID: id,
AutoReplay: replay,
subscribers: make([]*Subscriber, 0),
isAutoStream: isAutoStream,
register: make(chan *Subscriber),
deregister: make(chan *Subscriber),
event: make(chan *Event, buffSize),
quit: make(chan struct{}),
Eventlog: make(EventLog, 0),
OnSubscribe: onSubscribe,
OnUnsubscribe: onUnsubscribe,
}
}
func (str *Stream) run() {
go func(str *Stream) {
for {
select {
// Add new subscriber
case subscriber := <-str.register:
str.subscribers = append(str.subscribers, subscriber)
if str.AutoReplay {
str.Eventlog.Replay(subscriber)
}
// Remove closed subscriber
case subscriber := <-str.deregister:
i := str.getSubIndex(subscriber)
if i != -1 {
str.removeSubscriber(i)
}
if str.OnUnsubscribe != nil {
go str.OnUnsubscribe(str.ID, subscriber)
}
// Publish event to subscribers
case event := <-str.event:
if str.AutoReplay {
str.Eventlog.Add(event)
}
for i := range str.subscribers {
str.subscribers[i].connection <- event
}
// Shutdown if the server closes
case <-str.quit:
// remove connections
str.removeAllSubscribers()
return
}
}
}(str)
}
func (str *Stream) close() {
str.quitOnce.Do(func() {
close(str.quit)
})
}
func (str *Stream) getSubIndex(sub *Subscriber) int {
for i := range str.subscribers {
if str.subscribers[i] == sub {
return i
}
}
return -1
}
// addSubscriber will create a new subscriber on a stream
func (str *Stream) addSubscriber(eventid int, url *url.URL) *Subscriber {
atomic.AddInt32(&str.subscriberCount, 1)
sub := &Subscriber{
eventid: eventid,
quit: str.deregister,
connection: make(chan *Event, 64),
URL: url,
}
if str.isAutoStream {
sub.removed = make(chan struct{}, 1)
}
str.register <- sub
if str.OnSubscribe != nil {
go str.OnSubscribe(str.ID, sub)
}
return sub
}
func (str *Stream) removeSubscriber(i int) {
atomic.AddInt32(&str.subscriberCount, -1)
close(str.subscribers[i].connection)
if str.subscribers[i].removed != nil {
str.subscribers[i].removed <- struct{}{}
close(str.subscribers[i].removed)
}
str.subscribers = append(str.subscribers[:i], str.subscribers[i+1:]...)
}
func (str *Stream) removeAllSubscribers() {
for i := 0; i < len(str.subscribers); i++ {
close(str.subscribers[i].connection)
if str.subscribers[i].removed != nil {
str.subscribers[i].removed <- struct{}{}
close(str.subscribers[i].removed)
}
}
atomic.StoreInt32(&str.subscriberCount, 0)
str.subscribers = str.subscribers[:0]
}
func (str *Stream) getSubscriberCount() int {
return int(atomic.LoadInt32(&str.subscriberCount))
}