-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttpaf_effects.ml
86 lines (81 loc) · 3.38 KB
/
httpaf_effects.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
open Httpaf
let debug = false
let dprintf s =
if debug then begin
Printf.printf "[%d] " (Aeio.get_tid ());
Printf.printf s
end else
Printf.ifprintf stdout s
exception Partial
let read_buffer_size = 4096
let create_connection_handler ?config request_handler =
fun fd _ ->
let conn = Server_connection.create ?config (fun request ->
request_handler request) in
let buffer = Lwt_bytes.create read_buffer_size in
let buffer_len = ref 0 in
let rec reader_thread () =
match Server_connection.next_read_operation conn with
| `Read ->
begin
try
let current_read_len =
Aeio.Bigstring.read fd buffer !buffer_len (read_buffer_size - !buffer_len)
in
buffer_len := !buffer_len + current_read_len;
if current_read_len = 0 then begin
dprintf "reader_thread.`Read: `Eof\n%!";
Server_connection.read_eof conn buffer ~off:0 ~len:!buffer_len |> ignore;
end else begin
dprintf "reader_thread.`Read: `Ok %d\n%!" current_read_len;
let bytes_consumed = Server_connection.read conn buffer ~off:0 ~len:!buffer_len in
Bigstringaf.blit buffer ~src_off:bytes_consumed buffer ~dst_off:0 ~len:(!buffer_len - bytes_consumed);
buffer_len := !buffer_len - bytes_consumed
end
with _ -> ignore(Server_connection.read_eof conn buffer ~off:0 ~len:0)
end;
reader_thread ()
| `Yield ->
dprintf "reader_thread.`Read: `Yield\n%!";
let tid = if debug then Aeio.get_tid () else 0xC0FFEE in
let iv = Aeio.IVar.create () in
Server_connection.yield_reader conn (fun () ->
dprintf "Server_connection.wakeup reader thread %d\n%!" tid;
Aeio.IVar.fill iv ());
Aeio.IVar.read iv;
reader_thread ()
| `Close -> Aeio.shutdown fd Unix.SHUTDOWN_RECEIVE
in
let rec writer_thread () =
let success = Server_connection.report_write_result conn in
match Server_connection.next_write_operation conn with
| `Write iovecs ->
(* TODO: Aeio.writev *)
let written = ref 0 in
begin try
List.iter (fun {Faraday.buffer; off; len} ->
let w = Aeio.Bigstring.write fd buffer off len in
written := !written + w;
if w < len then raise Partial) iovecs;
dprintf "writer_thread.`Write: `Ok %d\n%!" !written;
success (`Ok !written)
with
| Partial ->
dprintf "writer_thread.`Write: `Ok %d\n%!" !written;
success (`Ok !written)
| _ -> success `Closed
end;
writer_thread ()
| `Yield ->
dprintf "writer_thread.`Write: `Yield\n%!";
let tid = if debug then Aeio.get_tid () else 0xC0FFEE in
let iv = Aeio.IVar.create () in
Server_connection.yield_writer conn (fun () ->
dprintf "Server_connection.wakeup writer thread %d\n%!" tid;
Aeio.IVar.fill iv ());
Aeio.IVar.read iv;
writer_thread ()
| `Close _ -> Aeio.shutdown fd Unix.SHUTDOWN_SEND
in
ignore @@ Aeio.async reader_thread ();
ignore @@ Aeio.async writer_thread ()