Skip to content

Commit

Permalink
git subrepo pull submodules/lev
Browse files Browse the repository at this point in the history
subrepo:
  subdir:   "submodules/lev"
  merged:   "8c48b28c"
upstream:
  origin:   "https://github.com/rgrinberg/lev"
  branch:   "master"
  commit:   "8c48b28c"
git-subrepo:
  version:  "0.4.1"
  origin:   "???"
  commit:   "???"

ps-id: 594d239b-805a-4aaa-b90f-c064e91af9db
  • Loading branch information
rgrinberg committed Jul 15, 2022
1 parent 834362d commit 1709fc1
Show file tree
Hide file tree
Showing 14 changed files with 316 additions and 148 deletions.
9 changes: 7 additions & 2 deletions ocaml-lsp-server/src/dune.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ end = struct
; finished : unit Fiber.Ivar.t
}

let stop t = Lev_fiber_csexp.Session.write t.session None
let stop t =
let+ () = Fiber.return () in
Lev_fiber_csexp.Session.close t.session

let write t sexp = Lev_fiber_csexp.Session.write t.session sexp
let write t sexp =
match sexp with
| None -> stop t
| Some sexp -> Lev_fiber_csexp.Session.write t.session sexp

let read t =
let* read = Lev_fiber_csexp.Session.read t.session in
Expand Down
2 changes: 1 addition & 1 deletion ocaml-lsp-server/src/merlin_config.ml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ module Dot_protocol_io =
(struct
include Lev_fiber_csexp.Session

let write t x = write t (Some [ x ])
let write t x = write t [ x ]
end)

let should_read_dot_merlin = ref false
Expand Down
6 changes: 3 additions & 3 deletions ocaml-lsp-server/src/ocamlformat_rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ module Ocamlformat_rpc = Ocamlformat_rpc_lib.Make (struct

let read = Lev_fiber_csexp.Session.read

let write t s = Lev_fiber_csexp.Session.write t (Some s)
let write t s = Lev_fiber_csexp.Session.write t s
end)

module Process : sig
Expand Down Expand Up @@ -120,8 +120,8 @@ end = struct
Ok process

let run { pid; session; _ } =
let* (_ : Unix.process_status) = Lev_fiber.waitpid ~pid:(Pid.to_int pid) in
Lev_fiber_csexp.Session.write session None
let+ (_ : Unix.process_status) = Lev_fiber.waitpid ~pid:(Pid.to_int pid) in
Lev_fiber_csexp.Session.close session
end

type state =
Expand Down
4 changes: 2 additions & 2 deletions submodules/lev/.gitrepo
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[subrepo]
remote = https://github.com/rgrinberg/lev
branch = master
commit = 15446b6ed2cb1225afc38a691fe99eaaee1efe97
parent = fbb9745eeb86c8ba5aeafa04917c652c84ff1b67
commit = 8c48b28c1505dd8c53c7a3e2e90c4c5d772e7bce
parent = fba2cb0686043388f66de74c3723523344154918
method = rebase
cmdver = 0.4.1
1 change: 1 addition & 0 deletions submodules/lev/dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
(name lev-fiber)
(depends
(ppx_expect :with-test)
(ocaml (>= 4.14.0))
lev
dyn
fiber
Expand Down
85 changes: 35 additions & 50 deletions submodules/lev/lev-fiber-csexp/src/lev_fiber_csexp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ module Session = struct
let close t =
match t.state with
| Closed -> ()
| Open { in_channel; out_channel; read = _; write = _; socket = _ } ->
(* with a socket, there's only one fd. We make sure to close it only once.
with dune rpc init, we have two separate fd's (stdin/stdout) so we must
close both. *)
| Open { in_channel; out_channel; read = _; write = _; socket } ->
(match socket with
| false -> ()
| true -> (
try
let fd = Io.fd out_channel in
Unix.shutdown (Lev_fiber.Fd.fd_exn fd) Unix.SHUTDOWN_ALL
with Unix.Unix_error (_, _, _) -> ()));
Io.close in_channel;
Io.close out_channel;
t.state <- Closed
Expand Down Expand Up @@ -96,11 +100,7 @@ module Session = struct
Io.Reader.Expert.consume reader ~len:len_read;
atom reader parser (len - len_read)
in
let+ res =
Io.with_read in_channel ~f:(fun reader -> loop reader Stack.Empty)
in
(match res with None -> Io.close in_channel | Some _ -> ());
res
Io.with_read in_channel ~f:(fun reader -> loop reader Stack.Empty)

let read t =
match t.state with
Expand All @@ -109,54 +109,39 @@ module Session = struct
Fiber.Mutex.with_lock mutex ~f:(fun () -> read t)

let write_closed sexps =
match sexps with
| None -> Fiber.return ()
| Some sexps ->
Code_error.raise "attempting to write to a closed channel"
[ ("sexp", Dyn.(list Sexp.to_dyn) sexps) ]
Code_error.raise "attempting to write to a closed channel"
[ ("sexp", Dyn.(list Sexp.to_dyn) sexps) ]

let write t sexps =
match t.state with
| Closed -> write_closed sexps
| Open { out_channel; socket; _ } -> (
match sexps with
| None ->
(match socket with
| false -> ()
| true -> (
try
let fd = Io.fd out_channel in
Unix.shutdown (Lev_fiber.Fd.fd_exn fd) Unix.SHUTDOWN_ALL
with Unix.Unix_error (_, _, _) -> ()));
close t;
Fiber.return ()
| Some sexps ->
Io.with_write out_channel ~f:(fun writer ->
let rec write sexp src_pos =
if src_pos = String.length sexp then Fiber.return ()
| Open { out_channel; _ } ->
Io.with_write out_channel ~f:(fun writer ->
let rec write sexp src_pos =
if src_pos = String.length sexp then Fiber.return ()
else
let* size =
let size = Io.Writer.Expert.available writer in
if size > 0 then Fiber.return size
else
let* size =
let size = Io.Writer.Expert.available writer in
if size > 0 then Fiber.return size
else
let+ () = Io.Writer.flush writer in
Io.Writer.Expert.available writer
in
let dst, { Io.Slice.pos = dst_pos; len } =
Io.Writer.Expert.prepare writer ~len:size
in
let len = min len (String.length sexp - src_pos) in
Bytes.blit_string ~src:sexp ~src_pos ~dst ~dst_pos ~len;
Io.Writer.Expert.commit writer ~len;
write sexp (src_pos + len)
let+ () = Io.Writer.flush writer in
Io.Writer.Expert.available writer
in
let rec loop = function
| [] -> Io.Writer.flush writer
| sexp :: sexps ->
let* () = write (Csexp.to_string sexp) 0 in
loop sexps
let dst, { Io.Slice.pos = dst_pos; len } =
Io.Writer.Expert.prepare writer ~len:size
in
loop sexps))
let len = min len (String.length sexp - src_pos) in
Bytes.blit_string ~src:sexp ~src_pos ~dst ~dst_pos ~len;
Io.Writer.Expert.commit writer ~len;
write sexp (src_pos + len)
in
let rec loop = function
| [] -> Io.Writer.flush writer
| sexp :: sexps ->
let* () = write (Csexp.to_string sexp) 0 in
loop sexps
in
loop sexps)

let write t sexps =
match t.state with
Expand Down
6 changes: 4 additions & 2 deletions submodules/lev/lev-fiber-csexp/src/lev_fiber_csexp.mli
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@

module Session : sig
type t
(** Rpc session backed by two threads. *)
(** Rpc session backed by an input & output stream *)

val create :
socket:bool ->
Lev_fiber.Io.input Lev_fiber.Io.t ->
Lev_fiber.Io.output Lev_fiber.Io.t ->
t

val close : t -> unit

(* [write t x] writes the s-expression when [x] is [Some sexp], and closes the
session if [x = None ] *)
val write : t -> Csexp.t list option -> unit Fiber.t
val write : t -> Csexp.t list -> unit Fiber.t

val read : t -> Csexp.t option Fiber.t
(** If [read] returns [None], the session is closed and all subsequent reads
Expand Down
10 changes: 5 additions & 5 deletions submodules/lev/lev-fiber-csexp/test/lev_fiber_csexp_rpc_tests.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ let%expect_test "serve/connect" =
let* session = Csexp_rpc.connect (socket ()) sockaddr in
let* () =
Csexp_rpc.Session.write session
(Some [ Csexp.List [ Atom "one"; List [ Atom "two"; Atom "three" ] ] ])
[ Csexp.List [ Atom "one"; List [ Atom "two"; Atom "three" ] ] ]
in
let* response = Csexp_rpc.Session.read session in
let+ response = Csexp_rpc.Session.read session in
(match response with
| None -> assert false
| Some s ->
let resp = Csexp.to_string s in
printfn "client: received %S" resp);
Csexp_rpc.Session.write session None
Csexp_rpc.Session.close session
and server () =
let fd = socket () in
let unix_fd = Lev_fiber.Fd.fd_exn fd in
Expand All @@ -50,9 +50,9 @@ let%expect_test "serve/connect" =
| Some req -> printfn "server: request %S" (Csexp.to_string req));
let* () =
Csexp_rpc.Session.write session
(Some [ Atom "response"; List [ Atom "foo" ] ])
[ Atom "response"; List [ Atom "foo" ] ]
in
let* () = Csexp_rpc.Session.write session None in
Csexp_rpc.Session.close session;
Lev_fiber.Socket.Server.close server)
in
Fiber.fork_and_join_unit client server
Expand Down
1 change: 1 addition & 0 deletions submodules/lev/lev-fiber.opam
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bug-reports: "https://github.com/rgrinberg/lev/issues"
depends: [
"dune" {>= "3.0"}
"ppx_expect" {with-test}
"ocaml" {>= "4.14.0"}
"lev"
"dyn"
"fiber"
Expand Down
Loading

0 comments on commit 1709fc1

Please sign in to comment.