-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathblockchain.ml
256 lines (237 loc) · 8.26 KB
/
blockchain.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
open Core.Std
open Async.Std
module Hardcoded = struct
(* When answering an open Getheaders query, headers message should contain 2000 headers
except for the last message that contains less headers. *)
let max_headers = 2_000
let get_more_headers_after = sec 600.
(* If we haven't heard about the sync node while syncing in this span, try with another
sync node. *)
let sync_node_timeout = sec 60.
end
module Status = struct
type t =
| Not_connected
| Syncing of Address.t
| At_tip
end
module Header_node = struct
type t =
{ header : Header.t option
; depth : int
; difficulty_sum : float
; hash : Hash.t
} with fields
let genesis_hash = Header.hash Header.genesis
let () =
assert (Hash.(=) genesis_hash
(Hash.of_hex "000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f"))
let genesis =
{ header = None
; depth = 0
; difficulty_sum = 0.
; hash = genesis_hash
}
end
type t =
{ mutable status : Status.t
; headers : Header_node.t Hash.Table.t
; mutable current_tip : Header_node.t
; mutable has_changed_since_last_write : bool
; mutable last_batch_processed : Time.t
; blockchain_file : string
; stop : unit Ivar.t
; network : Network.t
; mutable checked_len : int
} with fields
let process_header t (header : Header.t) ~mark_as_changed =
let hash = Header.hash header in
(* TODO: check [hash] vs [Header.hash header] *)
match Hashtbl.find t.headers hash with
(* XCR aalekseyev: If we return Ok here, this will let [process_headers] work even if we already know some
of the blocks the peer is trying to send (e.g. when we are on an orphaned block).
lmazare: indeed.*)
| Some header_node -> Ok header_node
| None ->
match Hashtbl.find t.headers header.previous_block_header_hash with
| Some previous_header ->
let header_node =
{ Header_node.header = Some header
; depth = 1 + previous_header.depth
; difficulty_sum = Hash.difficulty hash +. previous_header.difficulty_sum
; hash
}
in
Hashtbl.add_exn t.headers ~key:hash ~data:header_node;
if mark_as_changed then
t.has_changed_since_last_write <- true;
if t.current_tip.difficulty_sum < header_node.difficulty_sum then
t.current_tip <- header_node;
Ok header_node
| None ->
Log.Global.error "Cannot find hash for previous block!\n block: %s\n prev: %s\n tip: %s\n full header: %s"
(Hash.to_hex hash)
(Hash.to_hex header.previous_block_header_hash)
(Hash.to_hex t.current_tip.hash)
(Sexp.to_string (Header.sexp_of_t header));
Error "cannot find hash for previous block"
let write_blockchain_file t =
let tmp_file = sprintf "%s.tmp" t.blockchain_file in
(* Store the headers sorted by depth to make it easy to process them when reading the file. *)
let headers =
Hashtbl.to_alist t.headers
|> List.sort ~cmp:(fun (_, hn1) (_, hn2) -> Int.compare hn1.Header_node.depth hn2.depth)
|> List.map ~f:(fun (_, hn) -> hn.Header_node.header)
in
Writer.with_file tmp_file ~f:(fun writer ->
Deferred.List.iter headers ~f:(function
| None -> Deferred.unit
| Some header ->
Writer.write_bin_prot writer Header.bin_writer_t header;
Writer.flushed writer)
)
>>= fun () ->
(* The rename operation is atomic, this avoids corrupting the file if the process dies
while writing it. *)
Unix.rename ~src:tmp_file ~dst:t.blockchain_file
let process_headers t ~node ~headers =
let address = Node.address node in
match t.status with
(* If we're currently syncing with [node], append the headers. *)
| Syncing sync_address when Address.(=) sync_address address ->
t.last_batch_processed <- Time.now ();
let headers_len = List.length headers in
let at_tip = headers_len < Hardcoded.max_headers in
let headers_len_pre = Hashtbl.length t.headers in
let last_header_node =
List.fold headers ~init:None ~f:(fun acc (header : Header.t) ->
match process_header t header ~mark_as_changed:true with
| Ok header_node -> Some header_node
| Error _ -> acc)
in
if at_tip then begin
if Hashtbl.length t.headers = headers_len + headers_len_pre then
t.status <- At_tip;
end;
Option.iter last_header_node ~f:(fun header_node ->
Log.Global.debug "New blockchain length: %d, difficulty: %f."
(Header_node.depth header_node)
(Header_node.difficulty_sum header_node));
if at_tip then ()
else
Option.iter last_header_node ~f:(fun header_node ->
let from_the_highest_of = [ Header_node.hash header_node ] in
Node.send node (Message.getheaders ~from_the_highest_of ~stop_hash:None)
)
| Syncing _ | Not_connected | At_tip ->
(* Discard this message. *)
()
let need_syncing t ~now:now_ =
match t.status with
| Not_connected -> true
| Syncing _ -> false
| At_tip ->
Time.(add t.last_batch_processed Hardcoded.get_more_headers_after <= now_)
let sync_timeout t ~now:now_ =
let timeout =
match t.status with
| Not_connected | At_tip -> false
| Syncing _ ->
Time.(add t.last_batch_processed Hardcoded.sync_node_timeout <= now_)
in
if timeout then
(* Maybe remove/blacklist the host ? *)
t.status <- Not_connected
let start_syncing t node =
t.status <- Syncing (Node.address node);
let rec loop acc header_hash step n =
(* TODO: we should store the current blockchain in an array for a faster lookup. *)
let acc, n, step =
if n = step then
header_hash :: acc, 1, 2*step
else
acc, n+1, step
in
match Hashtbl.find t.headers header_hash with
| None ->
(* aalekseyev: seems like this should this be unreachable. *)
List.rev acc
| Some header_node ->
match header_node.Header_node.header with
| None -> List.rev acc
| Some header ->
loop acc (Header.previous_block_header_hash header) step n
in
let getheaders =
Message.getheaders
~from_the_highest_of:(loop [] t.current_tip.hash 1 1)
~stop_hash:None
in
Node.send node getheaders
let close t =
Ivar.fill_if_empty t.stop ()
let refresh t =
let now = Time.now () in
sync_timeout t ~now;
let connected_nodes = Network.connected_nodes t.network in
let connected_node_count = List.length connected_nodes in
if 5 <= connected_node_count && need_syncing t ~now then begin
List.nth_exn connected_nodes (Random.int connected_node_count)
|> fun node -> start_syncing t node
end
let create ~blockchain_file ~network =
let stop = Ivar.create () in
let headers = Hash.Table.of_alist_exn [ Header_node.genesis_hash, Header_node.genesis ] in
let t =
{ status = Not_connected
; headers
; current_tip = Header_node.genesis
; blockchain_file
; has_changed_since_last_write = false
; last_batch_processed = Time.epoch
; stop
; network
; checked_len = 0
}
in
Network.set_callbacks network ~process_headers:(process_headers t);
Sys.file_exists blockchain_file
>>= fun file_exists ->
begin
match file_exists with
| `Yes ->
Reader.with_file blockchain_file ~f:(fun reader ->
let rec loop () =
Reader.read_bin_prot reader Header.bin_reader_t
>>= function
| `Eof ->
Deferred.unit
| `Ok header ->
process_header t header ~mark_as_changed:false
|> ignore;
loop ()
in
loop ()
)
| `No | `Unknown -> Deferred.unit
end
>>| fun () ->
Log.Global.info "Read %d headers from %s."
(Hashtbl.length t.headers) blockchain_file;
let stop = Ivar.read stop in
Clock.every' ~stop (sec 30.) (fun () ->
Log.Global.debug "Current blockchain table size: %d, tip: %d %s"
(Hashtbl.length t.headers)
(Header_node.depth t.current_tip)
(if t.has_changed_since_last_write then "writing blockchain file..." else "");
if t.has_changed_since_last_write then begin
write_blockchain_file t
>>| fun () -> t.has_changed_since_last_write <- false
end else Deferred.unit
);
Clock.every ~stop (sec 1.) (fun () -> refresh t);
t
let blockchain_length t = Hashtbl.length t.headers
let tip_depth t = t.current_tip.depth
let tip_difficulty_sum t = t.current_tip.difficulty_sum
let tip_hash t = t.current_tip.hash