forked from zeromq/gyre
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpeer.go
184 lines (155 loc) · 4.2 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package gyre
import (
"fmt"
"sync"
"time"
zmq "github.com/pebbe/zmq4"
"github.com/zeromq/gyre/zre/msg"
)
var (
optMx sync.Mutex
peerEvasive = 3 * time.Second // peerEvasive seconds' silence is evasive
peerExpired = 5 * time.Second // peerExpired seconds' silence is expired
loopInterval = 1 * time.Second
)
type peer struct {
mailbox *zmq.Socket // Socket through to peer
identity string
endpoint string // Endpoint connected to
name string // Peer's public name
evasiveAt time.Time // Peer is being evasive
expiredAt time.Time // Peer has expired by now
connected bool // Peer will send messages
ready bool // Peer has said Hello to us
status byte // Our status counter
sentSequence uint16 // Outgoing message sequence
wantSequence uint16 // Incoming message sequence
headers map[string]string // Peer headers
}
// newPeer creates a new peer
func newPeer(identity string) (p *peer) {
p = &peer{
identity: identity,
name: fmt.Sprintf("%.6s", identity),
headers: make(map[string]string),
}
p.refresh()
return
}
// destroy disconnects peer mailbox. No more messages will be sent to peer until connected again
func (p *peer) destroy() {
p.disconnect()
for k := range p.headers {
delete(p.headers, k)
}
}
// connect configures mailbox and connects to peer's router endpoint
func (p *peer) connect(from []byte, endpoint string) (err error) {
// Create new outgoing socket (drop any messages in transit)
p.mailbox, err = zmq.NewSocket(zmq.DEALER)
if err != nil {
return err
}
err = p.mailbox.SetIpv6(true)
if err != nil {
return err
}
// Set our own identity on the socket so that receiving node
// knows who each message came from. Note that we cannot use
// the UUID directly as the identity since it may contain a
// zero byte at the start, which libzmq does not like for
// historical and arguably bogus reasons that it nonetheless
// enforces.
routingID := append([]byte{1}, from...)
p.mailbox.SetIdentity(string(routingID))
// Set a high-water mark that allows for reasonable activity
optMx.Lock()
p.mailbox.SetSndhwm(int(peerExpired * time.Microsecond))
optMx.Unlock()
// Send messages immediately or return EAGAIN
p.mailbox.SetSndtimeo(0)
// Connect through to peer node
err = p.mailbox.Connect(endpoint)
if err != nil {
return err
}
p.endpoint = endpoint
p.connected = true
p.ready = false
return nil
}
// disconnects peer mailbox. No more messages will be sent to peer until connected again
func (p *peer) disconnect() {
if p.connected {
if p.mailbox != nil {
p.mailbox.Disconnect(p.endpoint)
p.mailbox.Close()
p.mailbox = nil
}
p.endpoint = ""
p.connected = false
p.ready = false
}
}
// send sends message to peer
func (p *peer) send(t msg.Transit) (err error) {
if p.connected {
p.sentSequence++
t.SetSequence(p.sentSequence)
err = t.Send(p.mailbox)
if err != nil {
p.disconnect()
}
}
return
}
// refresh refreshes activity at peer
func (p *peer) refresh() {
optMx.Lock()
defer optMx.Unlock()
p.evasiveAt = time.Now().Add(peerEvasive)
p.expiredAt = time.Now().Add(peerExpired)
}
// checkMessage checks peer message sequence
func (p *peer) checkMessage(t msg.Transit) bool {
p.wantSequence++
valid := p.wantSequence == t.Sequence()
if !valid {
p.wantSequence--
}
return valid
}
// setName sets name.
func (p *peer) setName(name string) {
p.name = name
}
// Returns a header in headers map
func (p *peer) Header(key string) (value string, ok bool) {
value, ok = p.headers[key]
return
}
func (p *peer) Headers() map[string]string {
return p.headers
}
// Returns identity (uuid) of the peer
func (p *peer) Identity() string {
return p.identity
}
// SetExpired sets expired.
func SetExpired(expired time.Duration) {
optMx.Lock()
defer optMx.Unlock()
peerExpired = expired
}
// SetEvasive sets evasive.
func SetEvasive(evasive time.Duration) {
optMx.Lock()
defer optMx.Unlock()
peerEvasive = evasive
}
// SetLoopInterval sets interval of checking health of other peers
func SetLoopInterval(interval time.Duration) {
optMx.Lock()
defer optMx.Unlock()
loopInterval = interval
}