Skip to content

Commit

Permalink
Expose stat and improve Path.load
Browse files Browse the repository at this point in the history
`stat` can be called on files and flows in order to discover what is the backing
descriptor. Some notes:
  - Use Unix.Largestat instead of Unix.stat
  - Add a portable stat for the user
  - Moved Path.Unix_perm into File.Unix_perm to avoid cyclical ref

A previous attempt of this diff tried to make proper use of Luv.File.Stat but
there were multiple issues: `mode` was not properly exported so I had to add a
discovery.ml; it only works for a subset of file types and we had to manually
convert timestamps to float; all doable but an insuferable amount of code to end
up with less functionality than just calling into Unix.Largestat.stat.

There are still some kinks:
  - Luv will always follow a link, so we can't really stat on a symbolic link,
    this is because we rely on realpath(3) which resolves the link.
  - We can't open named pipes in with luv atm, so we also can't stat it (#357).
  - I did test all filemodes, but didn't add all to the test since they are
    impractical (opening /dev/tty for Character_special...)

--

Path.load was doubling the buffer and trying to fit enough until it saw an EOF.
This would cause many unecessary large allocations and my poor 16GB ram laptop
couldn't even load a 5GB file without getting OOMed.

This diff makes load create an initial buffer of the size of the file and load
exactly that amount of bytes in it.
  • Loading branch information
haesbaert committed Oct 27, 2022
1 parent 849391e commit f487afd
Show file tree
Hide file tree
Showing 17 changed files with 239 additions and 43 deletions.
53 changes: 53 additions & 0 deletions lib_eio/file.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,52 @@
(** Tranditional Unix permissions. *)
module Unix_perm = struct
type t = int
(** This is the same as {!Unix.file_perm}, but avoids a dependency on [Unix]. *)
end


(** Portable file stats. *)
module Stat = struct

(** Kind of file from st_mode. **)
type kind = [
| `Unknown
| `Fifo
| `Character_special
| `Directory
| `Block_device
| `Regular_file
| `Symbolic_link
| `Socket
]

(** Like stat(2). *)
type t = {
dev : Int64.t;
ino : Int64.t;
kind : kind;
perm : Unix_perm.t;
nlink : Int64.t;
uid : Int64.t;
gid : Int64.t;
rdev : Int64.t;
size : Optint.Int63.t;
blksize : Int64.t;
blocks : Int64.t;
atime : float;
mtime : float;
ctime : float;
}
end

(** A file/flow that can be backed by Stat.t. *)
class virtual stat = object (_ : <Generic.t; ..>)
method virtual stat : Stat.t
end

(** A file opened for reading. *)
class virtual ro = object (_ : <Generic.t; Flow.source; ..>)
inherit stat
method probe _ = None
method read_methods = []
method virtual pread : file_offset:Optint.Int63.t -> Cstruct.t list -> int
Expand All @@ -11,6 +58,12 @@ class virtual rw = object (_ : <Generic.t; Flow.source; Flow.sink; ..>)
method virtual pwrite : file_offset:Optint.Int63.t -> Cstruct.t list -> int
end

(** [stat t] returns the {!Stat.t} record associated with [t]. *)
let stat t = t#stat

(** [size t] returns the size of [t]. *)
let size (t : #ro) = (stat t).size

(** [pread t ~file_offset bufs] performs a single read of [t] at [file_offset] into [bufs].
It returns the number of bytes read, which may be less than the space in [bufs],
Expand Down
17 changes: 6 additions & 11 deletions lib_eio/fs.ml
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
(** Defines types used by file-systems. *)

(** Tranditional Unix permissions. *)
module Unix_perm = struct
type t = int
(** This is the same as {!Unix.file_perm}, but avoids a dependency on [Unix]. *)
end

type path = string

exception Already_exists of path * exn
exception Not_found of path * exn
exception Permission_denied of path * exn
exception File_too_large of path * exn

(** When to create a new file. *)
type create = [
| `Never (** fail if the named file doesn't exist *)
| `If_missing of Unix_perm.t (** create if file doesn't already exist *)
| `Or_truncate of Unix_perm.t (** any existing file is truncated to zero length *)
| `Exclusive of Unix_perm.t (** always create; fail if the file already exists *)
| `Never (** fail if the named file doesn't exist *)
| `If_missing of File.Unix_perm.t (** create if file doesn't already exist *)
| `Or_truncate of File.Unix_perm.t (** any existing file is truncated to zero length *)
| `Exclusive of File.Unix_perm.t (** always create; fail if the file already exists *)
]
(** If a new file is created, the given permissions are used for it. *)

Expand All @@ -30,7 +25,7 @@ class virtual dir = object (_ : #Generic.t)
append:bool ->
create:create ->
path -> <File.rw; Flow.close>
method virtual mkdir : perm:Unix_perm.t -> path -> unit
method virtual mkdir : perm:File.Unix_perm.t -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> dir_with_close
method virtual read_dir : path -> string list
method virtual unlink : path -> unit
Expand Down
9 changes: 5 additions & 4 deletions lib_eio/mock/eio_mock.mli
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ module Flow : sig
type t = <
Eio.Flow.two_way;
Eio.Flow.close;
Eio.File.stat;
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
set_copy_method : copy_method -> unit;
Expand Down Expand Up @@ -119,8 +120,8 @@ module Net : sig
type t = <
Eio.Net.t;
on_listen : Eio.Net.listening_socket Handler.t;
on_connect : <Eio.Net.stream_socket; Eio.Flow.close> Handler.t;
on_datagram_socket : <Eio.Net.datagram_socket; Eio.Flow.close> Handler.t;
on_connect : <Eio.Net.stream_socket; Eio.Flow.close; Eio.File.stat> Handler.t;
on_datagram_socket : <Eio.Net.datagram_socket; Eio.Flow.close; Eio.File.stat> Handler.t;
on_getaddrinfo : Eio.Net.Sockaddr.t list Handler.t;
on_getnameinfo : (string * string) Handler.t;
>
Expand All @@ -133,13 +134,13 @@ module Net : sig
val make : string -> t
(** [make label] is a new mock network. *)

val on_connect : t -> <Eio.Net.stream_socket; Eio.Flow.close; ..> Handler.actions -> unit
val on_connect : t -> <Eio.Net.stream_socket; Eio.Flow.close; Eio.File.stat; ..> Handler.actions -> unit
(** [on_connect t actions] configures what to do when a client tries to connect somewhere. *)

val on_listen : t -> #Eio.Net.listening_socket Handler.actions -> unit
(** [on_listen t actions] configures what to do when a server starts listening for incoming connections. *)

val on_datagram_socket : t -> <Eio.Net.datagram_socket; Eio.Flow.close; ..> Handler.actions -> unit
val on_datagram_socket : t -> <Eio.Net.datagram_socket; Eio.Flow.close; Eio.File.stat; ..> Handler.actions -> unit
(** [on_datagram_socket t actions] configures how to create datagram sockets. *)

val on_getaddrinfo : t -> Eio.Net.Sockaddr.t list Handler.actions -> unit
Expand Down
18 changes: 18 additions & 0 deletions lib_eio/mock/flow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type copy_method = [
type t = <
Eio.Flow.two_way;
Eio.Flow.close;
Eio.File.stat;
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
set_copy_method : copy_method -> unit;
Expand Down Expand Up @@ -108,6 +109,23 @@ let make ?(pp=pp_default) label =
Queue.take on_close ()
done;
traceln "%s: closed" label

method stat = Eio.File.Stat.{
dev = Int64.zero;
ino = Int64.zero;
kind = `Unknown;
perm = 0;
nlink = Int64.zero;
uid = Int64.zero;
gid = Int64.zero;
rdev = Int64.zero;
size = Optint.Int63.zero;
blksize = Int64.zero;
blocks = Int64.zero;
atime = 0.;
mtime = 0.;
ctime = 0.;
}
end

let on_read (t:t) = Handler.seq t#on_read
Expand Down
8 changes: 5 additions & 3 deletions lib_eio/mock/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ open Eio.Std
type t = <
Eio.Net.t;
on_listen : Eio.Net.listening_socket Handler.t;
on_connect : <Eio.Net.stream_socket; Eio.Flow.close> Handler.t;
on_datagram_socket : <Eio.Net.datagram_socket; Eio.Flow.close> Handler.t;
on_connect : <Eio.Net.stream_socket; Eio.Flow.close; Eio.File.stat> Handler.t;
on_datagram_socket : <Eio.Net.datagram_socket; Eio.Flow.close; Eio.File.stat> Handler.t;
on_getaddrinfo : Eio.Net.Sockaddr.t list Handler.t;
on_getnameinfo : (string * string) Handler.t;
>
Expand Down Expand Up @@ -83,10 +83,12 @@ let listening_socket label =
let socket, addr = Handler.run on_accept in
Flow.attach_to_switch socket sw;
traceln "%s: accepted connection from %a" label Eio.Net.Sockaddr.pp addr;
(socket :> <Eio.Net.stream_socket; Eio.Flow.close>), addr
(socket :> <Eio.Net.stream_socket; Eio.Flow.close; Eio.File.stat>), addr

method close =
traceln "%s: closed" label

method stat = failwith "OMG"
end

let on_accept (l:listening_socket) actions =
Expand Down
2 changes: 2 additions & 0 deletions lib_eio/net.ml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ end

class virtual socket = object (_ : #Generic.t)
method probe _ = None
inherit File.stat
end

class virtual stream_socket = object
inherit File.stat
inherit Flow.two_way
end

Expand Down
1 change: 1 addition & 0 deletions lib_eio/net.mli
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ end

class virtual socket : object
inherit Generic.t
inherit File.stat
end

class virtual stream_socket : object
Expand Down
19 changes: 15 additions & 4 deletions lib_eio/path.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,21 @@ let with_lines path fn =
let buf = Buf_read.of_flow flow ~max_size:max_int in
fn (Buf_read.lines buf)

let load path =
with_open_in path @@ fun flow ->
let buf = Buf_read.of_flow flow ~max_size:max_int in
Buf_read.take_all buf
let load ((t:#Fs.dir), path) =
with_open_in (t, path) @@ fun flow ->
let size = File.size flow in
let mymax = min Sys.max_string_length Sys.max_array_length in
if Optint.Int63.(compare size (of_int mymax)) = 1 then
raise (Fs.File_too_large
(path, Invalid_argument "can't represent a string that long"));
let buf = Cstruct.create (Optint.Int63.to_int size) in
let rec loop buf got =
match Flow.single_read flow buf with
| n -> loop (Cstruct.shift buf n) (n + got)
| exception End_of_file -> got
in
let got = loop buf 0 in
Cstruct.to_string ~len:got buf

let save ?append ~create path data =
with_open_out ?append ~create path @@ fun flow ->
Expand Down
2 changes: 1 addition & 1 deletion lib_eio/path.mli
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ val with_open_out :

(** {1 Directories} *)

val mkdir : perm:Unix_perm.t -> _ t -> unit
val mkdir : perm:File.Unix_perm.t -> _ t -> unit
(** [mkdir ~perm t] creates a new directory [t] with permissions [perm]. *)

val open_dir : sw:Switch.t -> _ t -> <dir; Flow.close> t
Expand Down
1 change: 1 addition & 0 deletions lib_eio/unix/eio_unix.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
type unix_fd = <
unix_fd : [`Peek | `Take] -> Unix.file_descr;
Eio.File.stat
>

type socket = <
Expand Down
3 changes: 2 additions & 1 deletion lib_eio/unix/eio_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ open Eio.Std

type unix_fd = <
unix_fd : [`Peek | `Take] -> Unix.file_descr;
Eio.File.stat
>

type socket = <
Expand Down Expand Up @@ -96,7 +97,7 @@ module Private : sig
| Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int ->
(socket * socket) Effect.t (** See {!socketpair} *)
| Pipe : Eio.Switch.t ->
(<Eio.Flow.source; Eio.Flow.close; unix_fd> * <Eio.Flow.sink; Eio.Flow.close; unix_fd>) Effect.t (** See {!pipe} *)
(<Eio.Flow.source; Eio.Flow.close; unix_fd; Eio.File.stat> * <Eio.Flow.sink; Eio.Flow.close; unix_fd; Eio.File.stat>) Effect.t (** See {!pipe} *)
end

module Ctf = Ctf_unix
Expand Down
53 changes: 45 additions & 8 deletions lib_eio_linux/eio_linux.ml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ let system_thread = Ctf.mint_id ()
let wrap_errors path fn =
try fn () with
| Unix.Unix_error(Unix.EEXIST, _, _) as ex -> raise @@ Eio.Fs.Already_exists (path, ex)
| Unix.Unix_error(Unix.E2BIG, _, _) as ex -> raise @@ Eio.Fs.File_too_large (path, ex)
| Unix.Unix_error(Unix.ENOENT, _, _) as ex -> raise @@ Eio.Fs.Not_found (path, ex)
| Unix.Unix_error(Unix.EXDEV, _, _) as ex -> raise @@ Eio.Fs.Permission_denied (path, ex)
| Eio.Fs.Permission_denied _ as ex -> raise @@ Eio.Fs.Permission_denied (path, ex)
Expand Down Expand Up @@ -110,6 +111,36 @@ module FD = struct
match t.fd with
| `Open fd -> Fmt.pf f "%d" (Obj.magic fd : int)
| `Closed -> Fmt.string f "(closed)"

let fstat t =
(* todo: use uring *)
let ust = Unix.LargeFile.fstat (get_exn "fstat" t) in
let st_kind : Eio.File.Stat.kind =
match ust.st_kind with
| Unix.S_REG -> `Regular_file
| Unix.S_DIR -> `Directory
| Unix.S_CHR -> `Character_special
| Unix.S_BLK -> `Block_device
| Unix.S_LNK -> `Symbolic_link
| Unix.S_FIFO -> `Fifo
| Unix.S_SOCK -> `Socket
in
Eio.File.Stat.{
dev = ust.st_dev |> Int64.of_int;
ino = ust.st_ino |> Int64.of_int;
kind = st_kind;
perm = ust.st_perm;
nlink = ust.st_nlink |> Int64.of_int;
uid = ust.st_uid |> Int64.of_int;
gid = ust.st_gid |> Int64.of_int;
rdev = ust.st_rdev |> Int64.of_int;
size = ust.st_size |> Optint.Int63.of_int64;
blksize = 512 |> Int64.of_int; (* XXX *)
blocks = Int64.(div ust.st_size (of_int 512)); (* XXX *)
atime = ust.st_atime;
mtime = ust.st_mtime;
ctime = ust.st_ctime;
}
end

type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty
Expand Down Expand Up @@ -816,9 +847,7 @@ module Low_level = struct
| Cwd -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.beneath path
| Fs -> openat2 ~sw ?seekable ~access ~flags ~perm ~resolve:Uring.Resolve.empty path

let fstat fd =
(* todo: use uring *)
Unix.fstat (FD.get_exn "fstat" fd)
let fstat fd = FD.fstat fd

external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat"

Expand Down Expand Up @@ -930,7 +959,7 @@ end

external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"

type has_fd = < fd : FD.t >
type has_fd = < fd : FD.t; Eio.File.stat >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd >

Expand Down Expand Up @@ -1008,6 +1037,8 @@ let fallback_copy src dst =
let udp_socket sock = object
inherit Eio.Net.datagram_socket

method stat = FD.fstat sock

method close = FD.close sock

method send sockaddr buf =
Expand Down Expand Up @@ -1038,6 +1069,8 @@ let flow fd =
| Eio_unix.Private.Unix_file_descr op -> Some (FD.to_unix op fd)
| _ -> None

method stat = FD.fstat fd

method read_into buf =
if Lazy.force is_tty then (
(* Work-around for https://github.com/axboe/liburing/issues/354
Expand Down Expand Up @@ -1082,6 +1115,8 @@ let sink fd = (flow fd :> sink)
let listening_socket fd = object
inherit Eio.Net.listening_socket

method stat = FD.fstat fd

method! probe : type a. a Eio.Generic.ty -> a option = function
| Eio_unix.Private.Unix_file_descr op -> Some (FD.to_unix op fd)
| _ -> None
Expand All @@ -1095,7 +1130,7 @@ let listening_socket fd = object
| Unix.ADDR_UNIX path -> `Unix path
| Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port)
in
let flow = (flow client :> <Eio.Flow.two_way; Eio.Flow.close>) in
let flow = (flow client :> <Eio.Flow.two_way; Eio.Flow.close; Eio.File.stat>) in
flow, client_addr
end

Expand Down Expand Up @@ -1153,7 +1188,7 @@ let net = object
let sock_unix = Unix.socket (socket_domain_of connect_addr) socket_type 0 in
let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in
Low_level.connect sock addr;
(flow sock :> <Eio.Flow.two_way; Eio.Flow.close>)
(flow sock :> <Eio.Flow.two_way; Eio.Flow.close; Eio.File.stat>)

method datagram_socket ~sw saddr =
match saddr with
Expand Down Expand Up @@ -1455,8 +1490,10 @@ let rec run : type a.
(* See issue #319, PR #327 *)
Unix.set_nonblock r;
Unix.set_nonblock w;
let r = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true r) :> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd>) in
let w = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true w) :> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd>) in
let r = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true r)
:> <Eio.Flow.source; Eio.Flow.close; Eio_unix.unix_fd; Eio.File.stat>) in
let w = (flow (FD.of_unix ~sw ~seekable:false ~close_unix:true w)
:> <Eio.Flow.sink; Eio.Flow.close; Eio_unix.unix_fd; Eio.File.stat>) in
continue k (r, w)
)
| Low_level.Alloc -> Some (fun k ->
Expand Down
Loading

0 comments on commit f487afd

Please sign in to comment.