forked from KomodoPlatform/komodo-defi-framework
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathlp_network.rs
348 lines (315 loc) · 14.1 KB
/
lp_network.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
/******************************************************************************
* Copyright © 2014-2019 The SuperNET Developers. *
* *
* See the AUTHORS, DEVELOPER-AGREEMENT and LICENSE files at *
* the top-level directory of this distribution for the individual copyright *
* holder information and the developer policies on copyright and licensing. *
* *
* Unless otherwise agreed in a custom licensing agreement, no part of the *
* SuperNET software, including this file may be copied, modified, propagated *
* or distributed except according to the terms contained in the LICENSE file *
* *
* Removal or modification of this copyright notice is prohibited. *
* *
******************************************************************************/
//
// lp_network.rs
// marketmaker
//
use bytes::Bytes;
use bitcrypto::ripemd160;
use common::{lp_queue_command, HyRes, QueuedCommand, COMMAND_QUEUE};
#[cfg(not(feature = "native"))]
use common::helperᶜ;
use common::executor::spawn;
use common::mm_ctx::MmArc;
use crossbeam::channel;
use futures01::{future, Future};
use futures::compat::Future01CompatExt;
use futures::future::FutureExt;
use gstuff::now_ms;
use primitives::hash::H160;
use serde_json::{self as json, Value as Json};
#[cfg(not(feature = "native"))]
use serde_bencode::ser::to_bytes as bencode;
use serde_bencode::de::from_bytes as bdecode;
use std::collections::hash_map::{HashMap, Entry};
use std::io::{BufRead, BufReader, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::Duration;
use crate::mm2::lp_native_dex::lp_command_process;
use crate::mm2::lp_ordermatch::lp_post_price_recv;
use crate::mm2::lp_swap::save_stats_swap_status;
use crate::mm2::rpc::lp_signatures::lp_notify_recv;
/// Result of `fn dispatcher`.
pub enum DispatcherRes {
/// `fn dispatcher` has found a Rust handler for the RPC "method".
Match (HyRes),
/// No handler found by `fn dispatcher`. Returning the `Json` request in order for it to be handled elsewhere.
NoMatch (Json)
}
/// The network module dispatcher, handles the messages received from other nodes
fn dispatcher (req: Json, ctx: MmArc) -> DispatcherRes {
// AP: the HTTP RPC server dispatcher was previously used for this purpose which IMHO
// breaks single responsibility principe, makes harder to maintain the codebase and possibly
// adds security concerns. Also we might end with using different serialization formats (binary)
// for P2P messages - JSON is excessive for such purpose while it's completely fine to use it for HTTP server.
// See https://github.com/artemii235/SuperNET/issues/415 for more info
// So this is a starting point of further refactoring
//log! ("dispatcher] " (json::to_string (&req) .unwrap()));
let method = match req["method"].clone() {
Json::String (method) => method,
_ => return DispatcherRes::NoMatch (req)
};
DispatcherRes::Match (match &method[..] { // Sorted alphanumerically (on the first latter) for readability.
"notify" => lp_notify_recv (ctx, req), // Invoked usually from the `lp_command_q_loop`
"postprice" => lp_post_price_recv (&ctx, req),
_ => return DispatcherRes::NoMatch (req)
})
}
#[derive(Serialize)]
struct CommandForNn {
result: Json,
#[serde(rename="queueid")]
queue_id: u32
}
/// Sends a reply to the `cmd.response_sock` peer.
fn reply_to_peer (cmd: QueuedCommand, mut reply: Vec<u8>) -> Result<(), String> {
if cmd.response_sock >= 0 {
if cmd.queue_id != 0 {
let result = try_s! (json::from_slice (&reply));
let nn_command = CommandForNn {
queue_id: cmd.queue_id,
result
};
reply = try_s! (json::to_vec (&nn_command))
}
// See also commits ce09bcd and 62f3cba: looks like we need the wired string to be zero-terminated.
reply.push (0);
}
Ok(())
}
/// Run the RPC handler and send it's reply to a peer.
fn rpc_reply_to_peer (handler: HyRes, cmd: QueuedCommand) {
let f = handler.then (move |r| -> Box<dyn Future<Item=(), Error=()> + Send> {
let res = match r {Ok (r) => r, Err (err) => {
log! ("rpc_reply_to_peer] handler error: " (err));
return Box::new (future::err(()))
}};
let body = res.into_body();
if let Err (err) = reply_to_peer (cmd, body) {
log! ("reply_to_peer error: " (err));
return Box::new (future::err(()))
}
Box::new (future::ok(()))
});
spawn (f.compat().map(|_|()))
}
/// The thread processing the peer-to-peer messaging bus.
pub unsafe fn lp_command_q_loop(ctx: MmArc) {
let mut processed_messages: HashMap<H160, u64> = HashMap::new();
loop {
if ctx.is_stopping() { break }
let cmd = match (*COMMAND_QUEUE).1.recv_timeout(Duration::from_millis(100)) {
Ok(cmd) => cmd,
Err(channel::RecvTimeoutError::Timeout) => continue, // And check `is_stopping`.
Err(channel::RecvTimeoutError::Disconnected) => break
};
let now = now_ms();
// clean up messages older than 60 seconds
processed_messages = processed_messages.drain().filter(|(_, timestamp)| timestamp + 60000 > now).collect();
let msg_hash = ripemd160(cmd.msg.as_bytes());
match processed_messages.entry(msg_hash) {
Entry::Vacant(e) => e.insert(now),
Entry::Occupied(_) => continue, // skip the messages that we processed previously
};
let json: Json = match json::from_str(&cmd.msg) {
Ok(j) => j,
Err(e) => {
log!("Error " (e) " parsing JSON from msg " (cmd.msg));
continue;
}
};
let method = json["method"].as_str();
if let Some(m) = method {
if m == "swapstatus" {
let handler = save_stats_swap_status(&ctx, json["data"].clone());
rpc_reply_to_peer(handler, cmd);
continue;
}
}
// rebroadcast the message if we're seednode
// swapstatus is excluded from rebroadcast as the message is big and other nodes might just not need it
let i_am_seed = ctx.conf["i_am_seed"].as_bool().unwrap_or(false);
if i_am_seed {
ctx.broadcast_p2p_msg(&cmd.msg);
}
let json = match dispatcher(json, ctx.clone()) {
DispatcherRes::Match(handler) => {
rpc_reply_to_peer(handler, cmd);
continue
},
DispatcherRes::NoMatch(req) => req
};
// Invokes `lp_trade_command`.
lp_command_process(
ctx.clone(),
json,
);
}
}
/// The loop processing seednode activity as message relayer/rebroadcaster
/// Non-blocking mode should be enabled on listener for this to work
pub fn seednode_loop(ctx: MmArc, listener: TcpListener) {
let mut clients = vec![];
loop {
if ctx.is_stopping() { break }
match listener.accept() {
Ok((stream, addr)) => {
ctx.log.log("😀", &[&"incoming_connection", &addr.to_string().as_str()], "New connection...");
match stream.set_nonblocking(true) {
Ok(_) => clients.push((BufReader::new(stream), addr, String::new())),
Err(e) => ctx.log.log("😟", &[&"incoming_connection", &addr.to_string().as_str()], &format!("Error {} setting nonblocking mode", e)),
}
},
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => (),
Err(e) => panic!("encountered IO error: {}", e),
}
clients = clients.drain_filter(|(client, addr, buf)| {
match client.read_line(buf) {
Ok(_) => {
if buf.len() > 0 {
let msgs: Vec<_> = buf.split('\n').collect();
for msg in msgs {
if !msg.is_empty() {
lp_queue_command(msg.to_string());
}
}
buf.clear();
}
true
},
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => true,
Err(e) => {
ctx.log.log("😟", &[&"incoming_connection", &addr.to_string().as_str()], &format!("Error {} reading from socket, dropping connection", e));
false
},
}
}).collect();
clients = match ctx.seednode_p2p_channel.1.recv_timeout(Duration::from_millis(1)) {
Ok(mut msg) => clients.drain_filter(|(client, addr, _)| {
msg.push('\n' as u8);
match client.get_mut().write(&msg) {
Ok(_) => true,
Err(e) => {
ctx.log.log("😟", &[&"incoming_connection", &addr.to_string().as_str()], &format!("Error {} writing to socket, dropping connection", e));
false
}
}
}).collect(),
Err(channel::RecvTimeoutError::Timeout) => clients,
Err(channel::RecvTimeoutError::Disconnected) => panic!("seednode_p2p_channel is disconnected"),
};
}
}
struct SeedConnection {
stream: BufReader<TcpStream>,
addr: String,
buf: String,
}
#[cfg(feature = "native")]
pub async fn start_client_p2p_loop (ctx: MmArc, addrs: Vec<String>) -> Result<(), String> {
try_s!(thread::Builder::new().name ("client_p2p_loop".into()) .spawn ({
move || client_p2p_loop (ctx, addrs)
}));
Ok(())
}
#[derive(Serialize, Deserialize)]
struct StartClientP2pLoopArgs {
ctx: u32,
addrs: Vec<String>
}
#[cfg(not(feature = "native"))]
pub async fn start_client_p2p_loop (ctx: MmArc, addrs: Vec<String>) -> Result<(), String> {
let args = StartClientP2pLoopArgs {ctx: try_s! (ctx.ffi_handle()), addrs};
let args = try_s! (bencode (&args));
try_s! (helperᶜ ("start_client_p2p_loop", args) .await);
Ok(())
}
pub async fn start_client_p2p_loopʰ (req: Bytes) -> Result<Vec<u8>, String> {
let args: StartClientP2pLoopArgs = try_s! (bdecode (&req));
let ctx = try_s! (MmArc::from_ffi_handle (args.ctx));
try_s! (start_client_p2p_loop (ctx, args.addrs) .await);
Ok (Vec::new())
}
/// The loop processing client node activity
fn client_p2p_loop(ctx: MmArc, addrs: Vec<String>) {
let mut seed_connections: Vec<SeedConnection> = vec![];
// ip and last connection attempt timestamp
let mut addrs: Vec<(String, u64)> = addrs.into_iter().map(|addr| (addr, 0)).collect();
loop {
if ctx.is_stopping() { break }
if seed_connections.len() < addrs.len() {
for (addr, last_attempt) in addrs.iter_mut() {
let is_connected = seed_connections.iter().find(|conn| &conn.addr == addr);
if is_connected.is_none() && *last_attempt + 30000 < now_ms() {
ctx.log.log("…", &[&"seed_connection", &addr.as_str()], "Connecting…");
*last_attempt = now_ms();
match TcpStream::connect(&*addr) {
Ok(stream) => {
match stream.set_nonblocking(true) {
Ok(_) => {
let conn = SeedConnection {
stream: BufReader::new(stream),
addr: addr.to_string(),
buf: String::new(),
};
ctx.log.log("⚡", &[&"seed_connection", &addr.as_str()], "Connected");
seed_connections.push(conn);
},
Err(e) => ctx.log.log("😟", &[&"seed_connection", &addr.as_str()], &format!("Error {} setting non-blocking mode", e)),
}
},
Err(e) => ctx.log.log("😟", &[&"seed_connection", &addr.as_str()], &format!("Connection error {}", e)),
}
}
}
}
seed_connections = seed_connections.drain_filter(|conn| {
match conn.stream.read_line(&mut conn.buf) {
Ok(_) => {
if conn.buf.len() > 0 {
let msgs: Vec<_> = conn.buf.split('\n').collect();
for msg in msgs {
if !msg.is_empty() {
lp_queue_command(msg.to_string());
}
}
conn.buf.clear();
}
true
},
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => true,
Err(e) => {
ctx.log.log("😟", &[&"seed_connection", &conn.addr.clone().as_str()], &format!("Error {} on reading from socket, dropping connection", e));
false
},
}
}).collect();
seed_connections = match ctx.client_p2p_channel.1.recv_timeout(Duration::from_millis(1)) {
Ok(mut msg) => seed_connections.drain_filter(|conn| {
msg.push('\n' as u8);
match conn.stream.get_mut().write(&msg) {
Ok(_) => true,
Err(e) => {
ctx.log.log("😟", &[&"seed_connection", &conn.addr.clone().as_str()], &format!("Error {} writing to socket, dropping connection", e));
false
}
}
}).collect(),
Err(channel::RecvTimeoutError::Timeout) => seed_connections,
Err(channel::RecvTimeoutError::Disconnected) => panic!("client_p2p_channel is disconnected"),
};
}
}