(* $Id$ *)
open Netsys_types
open Uq_engines.Operators
open Printf
type string_like = Netsys_types.tbuffer
class type obj_buffer =
object
method length : int
method blit_out : int -> tbuffer -> int -> int -> unit
method delete_hd : int -> unit
method index_from : int -> char -> int
method add : tbuffer -> int -> int -> unit
method advance : int -> unit
method page_for_additions : tbuffer * int * int
method page_for_consumption : tbuffer * int * int
method clear : unit -> unit
end
class type ['in_device] in_buffer_pre =
object
method buffer : obj_buffer
method eof : bool
method set_eof : unit -> unit
method start_fill_e : unit -> bool Uq_engines.engine
method fill_e_opt : bool Uq_engines.engine option
(* The current fill engine, or None *)
method udevice : 'in_device
method shutdown_e : unit -> unit Uq_engines.engine
method inactivate : unit -> unit
method event_system : Unixqueue.event_system
end
class type ['out_device] out_buffer_pre =
object
method buffer : obj_buffer
method eof : bool
method max : int option
method start_flush_e : unit -> unit Uq_engines.engine
method flush_e_opt : unit Uq_engines.engine option
(* The current flush engine, or None *)
method write_eof_e : unit -> bool Uq_engines.engine
(* The buffer must be empty before [write_eof_e] *)
method udevice : 'out_device option
method shutdown_e : float option -> unit Uq_engines.engine
method inactivate : unit -> unit
method event_system : Unixqueue.event_system
end
type in_device =
[ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
| `Multiplex of Uq_engines.multiplex_controller
| `Async_in of Uq_engines.async_in_channel * Unixqueue.event_system
| `Buffer_in of in_device in_buffer_pre
| `Count_in of (int -> unit) * in_device
]
type out_device =
[ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
| `Multiplex of Uq_engines.multiplex_controller
| `Async_out of Uq_engines.async_out_channel * Unixqueue.event_system
| `Buffer_out of out_device out_buffer_pre
| `Count_out of (int -> unit) * out_device
]
type in_buffer = in_device in_buffer_pre
type out_buffer = out_device out_buffer_pre
type in_bdevice =
[ `Buffer_in of in_buffer ]
type inout_device = [ in_device | out_device ]
type io_device =
[ `Polldescr of Netsys.fd_style * Unix.file_descr * Unixqueue.event_system
| `Multiplex of Uq_engines.multiplex_controller
]
exception Line_too_long
let rec device_esys0 =
function
| `Polldescr(_,_,esys) -> esys
| `Multiplex mplex -> mplex#event_system
| `Async_in(_,esys) -> esys
| `Async_out(_,esys) -> esys
| `Buffer_in b -> b#event_system
| `Buffer_out b -> b#event_system
| `Count_in(_,d) -> device_esys0 (d :> inout_device)
| `Count_out(_,d) -> device_esys0 (d :> inout_device)
let device_esys d =
device_esys0 (d :> inout_device)
let is_bytes =
function
| `String _ -> true
| `Bytes _ -> true
| `Memory _ -> false
let rec device_supports_memory0 =
function
| `Polldescr(style,_,_) ->
( match style with
| `Read_write | `Recv_send _ | `Recv_send_implied ->
true
| _ ->
false
)
| `Multiplex mplex ->
mplex # mem_supported
| `Async_in(_,esys) ->
false
| `Async_out(_,esys) ->
false
| `Buffer_in b ->
true
| `Buffer_out b ->
true
| `Count_in(_,d) -> device_supports_memory0 (d : in_device :> inout_device)
| `Count_out(_,d) -> device_supports_memory0 (d : out_device :> inout_device)
let device_supports_memory d =
device_supports_memory0 (d :> inout_device)
let ach_input_e ch esys s pos len =
(* case: async channel *)
let (e, signal) = Uq_engines.signal_engine esys in
let rec wait_for_input () =
try
let n = ch # input s pos len in
if n > 0 || len = 0 then
signal (`Done n)
else (
ch # request_notification
(fun () ->
wait_for_input();
false
)
)
with
| error -> signal (`Error error)
in
wait_for_input();
e
let rec buf_input_e b ms pos len =
let bl = b#buffer#length in
if bl > 0 || len = 0 then (
let n = min len bl in
b#buffer#blit_out 0 ms pos n;
b#buffer#delete_hd n;
eps_e (`Done n) b#event_system
)
else if b#eof then
eps_e (`Error End_of_file) b#event_system
else (
(* Optimization: if len is quite large, bypass the buffer *)
let d = b#udevice in
if len >= 4096 && (device_supports_memory d || is_bytes ms) then
dev_input_e d ms pos len
>> (function
| `Error End_of_file ->
b#set_eof(); `Error End_of_file
| st -> st
)
else
let fe =
match b#fill_e_opt with
| None -> b#start_fill_e ()
| Some fe -> fe in
fe ++ (fun _ -> buf_input_e b ms pos len)
)
and gread_e style fd ms pos len =
try
let n = Netsys.gread_tbuf style fd ms pos len in
(n, n=0)
with
| Unix.Unix_error((Unix.EAGAIN|Unix.EWOULDBLOCK),_,_) ->
(0, false)
and dev_input_e (d : in_device) ms pos len =
match d with
| `Polldescr(style, fd, esys) ->
new Uq_engines.input_engine
(fun fd ->
let (n, eof) = gread_e style fd ms pos len in
if len > 0 && n = 0 && eof then raise End_of_file;
n
)
fd (-1.0) esys
| `Multiplex mplex ->
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
let cancel() =
if mplex#reading then mplex # cancel_reading() in
( match ms with
| `String s
| `Bytes s ->
mplex # start_reading
~when_done:(fun xopt n ->
match xopt with
| None ->
signal (`Done n)
| Some Uq_engines.Cancelled ->
cancel(); signal `Aborted
| Some err -> signal (`Error err)
)
s pos len;
| `Memory m ->
if mplex#mem_supported then
mplex # start_mem_reading
~when_done:(fun xopt n ->
match xopt with
| None ->
signal (`Done n)
| Some Uq_engines.Cancelled ->
cancel(); signal `Aborted
| Some err -> signal (`Error err)
)
m pos len
else
signal
(`Error
(Failure "Uq_io: This mplex does not support `Memory"));
);
e >> (function
| `Done n -> `Done n
| `Error e -> `Error e
| `Aborted -> cancel(); `Aborted
)
| `Async_in (ch,esys) ->
( match ms with
| `String s
| `Bytes s ->
ach_input_e ch esys s pos len
| `Memory m ->
eps_e
(`Error
(Failure "Uq_io: async channels do not support `Memory"))
esys
)
| `Buffer_in b ->
buf_input_e b ms pos len
| `Count_in(c,d) ->
dev_input_e d ms pos len
>> (function
| `Done n -> c n; `Done n
| st -> st
)
let input_e d0 ms pos len =
let d = (d0 :> in_device) in
dev_input_e d ms pos len
let rec really_input_e d ms pos len =
if len = 0 then
eps_e (`Done ()) (device_esys d)
else
input_e d ms pos len ++
(fun n -> really_input_e d ms (pos+n) (len-n))
let input_line_e ?(max_len = Sys.max_string_length) (`Buffer_in b : in_bdevice) =
let consume k1 k2 =
if k2 > max_len then raise Line_too_long;
let s = Bytes.create k1 in
b#buffer#blit_out 0 (`Bytes s) 0 k1;
b#buffer#delete_hd k2;
Bytes.unsafe_to_string s in
let rec look_ahead eof =
try
let k = b#buffer#index_from 0 '\n' in
let s = consume k (k+1) in
eps_e (`Done s) b#event_system
with
| Not_found ->
if eof then (
let n = b#buffer#length in
if n = 0 then
eps_e (`Error End_of_file) b#event_system
else (
let s = consume n n in
eps_e (`Done s) b#event_system
)
)
else (
assert(not b#eof);
if b#buffer#length > max_len then
eps_e (`Error Line_too_long) b#event_system
else
let fe =
match b#fill_e_opt with
| None -> b#start_fill_e ()
| Some fe -> fe in
fe ++ look_ahead
)
| Line_too_long ->
eps_e (`Error Line_too_long) b#event_system
in
look_ahead b#eof
exception Cont of (unit -> string list Uq_engines.engine)
let input_lines_e ?(max_len = Sys.max_string_length) (`Buffer_in b) =
let copy_string i l =
let s = Bytes.create l in
b#buffer#blit_out i (`String s) 0 l;
Bytes.unsafe_to_string s in
let consume k =
b#buffer#delete_hd k in
let rec look_ahead i acc eof =
try
let k = b#buffer#index_from i '\n' in
if k-i+1 > max_len then raise Line_too_long;
let s = copy_string i (k-i) in
raise(Cont(fun () -> look_ahead (k+1) (s::acc) eof))
with
| Not_found ->
if eof then (
let n = b#buffer#length in
if n = 0 then (
assert(acc = []);
eps_e (`Error End_of_file) b#event_system
)
else (
let s = copy_string i (n-i) in
if n-i > max_len then raise Line_too_long;
consume n;
eps_e (`Done (List.rev (s :: acc))) b#event_system
)
)
else (
assert(not b#eof);
if acc <> [] then (
consume i;
eps_e (`Done (List.rev acc)) b#event_system
) else (
assert(i = 0);
if b#buffer#length > max_len then raise Line_too_long;
let fe =
match b#fill_e_opt with
| None -> b#start_fill_e ()
| Some fe -> fe in
fe ++ (look_ahead 0 [])
)
)
| Line_too_long ->
eps_e (`Error Line_too_long) b#event_system
| Cont f -> (* make the recursion tail-recursive *)
f ()
in
look_ahead 0 [] b#eof
let ach_output_e ch esys s pos len =
(* case: async channel *)
let (e, signal) = Uq_engines.signal_engine esys in
let rec wait_for_output () =
try
let n = ch # output s pos len in
if n > 0 || len = 0 then
signal (`Done n)
else (
ch # request_notification
(fun () ->
wait_for_output();
false
)
)
with
| error -> signal (`Error error)
in
wait_for_output();
e
let rec buf_output_e b ms pos len =
if b # eof then
eps_e
(`Error (Failure "Uq_io: Buffer already closed for new data"))
b#event_system
else (
let bl = b#buffer#length in
(* Optimization: if len is large, try to bypass the buffer *)
match b#udevice with
| Some d when (
bl=0 && len >= 4096 && (device_supports_memory d || is_bytes ms)
) ->
dev_output_e d ms pos len
| _ ->
let n =
match b # max with
| None -> len
| Some m -> max (min len (m - bl)) 0 in
if n > 0 || len = 0 then (
b#buffer#add ms pos n;
eps_e (`Done n) b#event_system
)
else (
let fe =
match b#flush_e_opt with
| None -> b#start_flush_e ()
| Some fe -> fe in
fe ++ (fun _ -> buf_output_e b ms pos len)
)
)
and dev_output_e (d : out_device) ms pos len =
match d with
| `Polldescr(style, fd, esys) ->
new Uq_engines.output_engine
(fun fd ->
Netsys.gwrite_tbuf style fd ms pos len
)
fd (-1.0) esys
| `Multiplex mplex ->
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
let cancel() =
if mplex#writing then mplex # cancel_writing() in
( match ms with
| `String s
| `Bytes s ->
mplex # start_writing
~when_done:(fun xopt n ->
match xopt with
| None -> signal (`Done n)
| Some Uq_engines.Cancelled ->
cancel(); signal `Aborted
| Some err -> signal (`Error err)
)
s pos len;
| `Memory m ->
if mplex#mem_supported then
mplex # start_mem_writing
~when_done:(fun xopt n ->
match xopt with
| None -> signal (`Done n)
| Some Uq_engines.Cancelled ->
cancel(); signal `Aborted
| Some err -> signal (`Error err)
)
m pos len
else
signal
(`Error
(Failure "Uq_io: This mplex does not support `Memory"));
);
e >> (function
| `Done n -> `Done n
| `Error e -> `Error e
| `Aborted -> cancel(); `Aborted
)
| `Async_out (ch,esys) ->
( match ms with
| `String s
| `Bytes s ->
ach_output_e ch esys s pos len
| `Memory m ->
eps_e
(`Error
(Failure "Uq_io: async channels do not support `Memory"))
esys
)
| `Buffer_out b ->
buf_output_e b ms pos len
| `Count_out(c,d) ->
dev_output_e d ms pos len
>> (function
| `Done n -> c n; `Done n
| st -> st
)
let output_e d ms pos len =
dev_output_e (d :> out_device) ms pos len
let rec really_output_e d ms pos len =
if len = 0 then
eps_e (`Done ()) (device_esys d)
else
output_e d ms pos len ++
(fun n -> really_output_e d ms (pos+n) (len-n))
let output_bytes_e d s =
really_output_e d (`Bytes s) 0 (Bytes.length s)
let output_string_e d s =
output_bytes_e d (Bytes.unsafe_of_string s)
let output_memory_e d m =
really_output_e d (`Memory m) 0 (Bigarray.Array1.dim m)
let output_netbuffer_e d b =
let s = Netbuffer.unsafe_buffer b in
really_output_e d (`String s) 0 (Netbuffer.length b)
let flush_e d =
match (d :> out_device) with
| `Buffer_out b ->
( match b#flush_e_opt with
| None -> b#start_flush_e ()
| Some fe -> fe
)
| _ ->
eps_e (`Done()) (device_esys d)
let rec write_eof0_e d =
match d with
| `Polldescr(style, fd, esys) ->
eps_e (`Done false) esys
| `Multiplex mplex ->
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
let cancel() =
if mplex#writing then mplex # cancel_writing() in
if mplex # supports_half_open_connection then
mplex # start_writing_eof
~when_done:(fun xopt ->
match xopt with
| None -> signal (`Done true)
| Some Uq_engines.Cancelled ->
cancel(); signal `Aborted
| Some error -> signal (`Error error)
)
()
else
signal (`Done false);
e >> (function
| `Done n -> `Done n
| `Error e -> `Error e
| `Aborted -> cancel(); `Aborted
)
| `Async_out (ch,esys) ->
eps_e (`Done false) esys
| `Buffer_out b ->
flush_e d ++
(fun () -> b#write_eof_e())
| `Count_out(_,d) ->
write_eof0_e d
let write_eof_e d =
write_eof0_e (d :> out_device)
let rec shutdown0_e ?linger d =
match d with
| `Polldescr(style, fd, esys) ->
Netsys.gclose style fd;
eps_e (`Done()) esys
| `Multiplex mplex ->
if mplex#reading then
mplex#cancel_reading();
if mplex#writing then
mplex#cancel_writing();
let (e, signal) = Uq_engines.signal_engine mplex#event_system in
let cancel() =
if not mplex#shutting_down then mplex # cancel_shutting_down() in
mplex # start_shutting_down
?linger
~when_done:(fun xopt ->
match xopt with
| None ->
mplex#inactivate();
signal (`Done())
| Some Uq_engines.Cancelled ->
cancel(); signal `Aborted
| Some error ->
signal (`Error error)
)
();
e >> (function
| `Done n -> `Done n
| `Error e -> `Error e
| `Aborted -> cancel(); `Aborted
)
| `Async_in (ch,esys) ->
ch # close_in();
eps_e (`Done()) esys
| `Async_out (ch,esys) ->
ch # close_out();
eps_e (`Done()) esys
| `Buffer_in b ->
b # shutdown_e ()
| `Buffer_out b ->
flush_e (`Buffer_out b) ++ (fun _ -> b # shutdown_e linger)
| `Count_in(_,d) ->
shutdown0_e ?linger (d :> [in_device | out_device])
| `Count_out(_,d) ->
shutdown0_e ?linger (d :> [in_device | out_device])
let shutdown_e ?linger d =
shutdown0_e ?linger (d :> [in_device | out_device])
let rec inactivate0 d =
match d with
| `Polldescr(style, fd, esys) ->
Netsys.gclose style fd
| `Multiplex mplex ->
mplex#inactivate()
| `Async_in (ch,esys) ->
ch # close_in()
| `Async_out (ch,esys) ->
ch # close_out()
| `Buffer_in b ->
b # inactivate ()
| `Buffer_out b ->
b # inactivate ()
| `Count_in(_,d) ->
inactivate0 (d :> inout_device)
| `Count_out(_,d) ->
inactivate0 (d :> inout_device)
let inactivate d =
inactivate0 (d :> inout_device)
let run_with_timeout tmo e1 =
let e2 = Uq_engines.timeout_engine tmo Uq_engines.Timeout e1 in
Unixqueue.run e1 # event_system;
match e2#state with
| `Done n -> n
| `Error err -> raise err
| `Aborted -> failwith "Aborted"
| `Working _ -> assert false
let in_obj_channel_1 (dev : in_device) tmo =
let run e = run_with_timeout tmo e in
let ch =
( object(self)
method input s pos len =
run(input_e dev (`String s) pos len)
method close_in() =
run(shutdown_e dev)
end
) in
let buffered =
match dev with
| `Buffer_in _ -> false
| _ -> true in
Netchannels.lift_in ~buffered (`Rec ch)
let in_obj_channel dev tmo =
in_obj_channel_1 (dev :> in_device) tmo
let out_obj_channel_1 (dev : out_device) tmo =
let run e = run_with_timeout tmo e in
let ch =
( object(self)
method output s pos len =
run(output_e dev (`String s) pos len)
method flush() =
run(flush_e dev)
method close_out() =
ignore(run(write_eof_e dev));
run(shutdown_e dev)
end
) in
let buffered =
match dev with
| `Buffer_out _ -> false
| _ -> true in
Netchannels.lift_out ~buffered (`Rec ch)
let out_obj_channel dev tmo =
out_obj_channel_1 (dev :> out_device) tmo
let io_obj_channel_1 ?(start_pos_in = 0) ?(start_pos_out = 0)
(dev : io_device) tmo =
let run e = run_with_timeout tmo e in
let in_eof = ref false in
let out_eof = ref false in
let pos_in = ref start_pos_in in
let pos_out = ref start_pos_out in
let ch =
( object(self)
method input s pos len =
let n = run(input_e dev (`String s) pos len) in
pos_in := !pos_in + n;
n
method close_in() =
if not !in_eof then (
in_eof := true;
if !out_eof then run(shutdown_e dev)
)
method output s pos len =
let n = run(output_e dev (`String s) pos len) in
pos_out := !pos_out + n;
n
method flush() =
run(flush_e dev)
method close_out() =
if not !out_eof then (
out_eof := true;
ignore(run(write_eof_e dev));
if !in_eof then run(shutdown_e dev)
)
method pos_in = !pos_in
method pos_out = !pos_out
end
) in
ch
let io_obj_channel ?start_pos_in ?start_pos_out dev tmo =
io_obj_channel_1 ?start_pos_in ?start_pos_out (dev :> io_device) tmo
let mem_obj_buffer small_buffer =
let psize =
if small_buffer then
Netsys_mem.small_block_size else Netsys_mem.default_block_size in
let buf =
Netpagebuffer.create psize in
( object
method length = Netpagebuffer.length buf
method blit_out bpos ms pos len =
Netpagebuffer.blit_to_tbuffer buf bpos ms pos len
method delete_hd n =
Netpagebuffer.delete_hd buf n
method index_from pos c =
Netpagebuffer.index_from buf pos c
method add ms pos len =
let ts = Netstring_tstring.tstring_of_tbuffer ms in
Netpagebuffer.add_subtstring buf ts pos len
method advance n =
Netpagebuffer.advance buf n
method page_for_additions =
let (m,pos,len) = Netpagebuffer.page_for_additions buf in
(`Memory m, pos, len)
method page_for_consumption =
let (m,pos,len) = Netpagebuffer.page_for_consumption buf in
(`Memory m, pos, len)
method clear() =
Netpagebuffer.clear buf
end
)
let str_obj_buffer small_buffer =
let bufsize =
if small_buffer then 4096 else 65536 in
let buf =
Netbuffer.create bufsize in
( object
method length = Netbuffer.length buf
method blit_out bpos ms pos len =
Netbuffer.blit_to_tbuffer buf bpos ms pos len
method delete_hd n =
Netbuffer.delete buf 0 n
method index_from pos c =
Netbuffer.index_from buf pos c
method add ms pos len =
let ts = Netstring_tstring.tstring_of_tbuffer ms in
Netbuffer.add_subtstring buf ts pos len
method advance n =
Netbuffer.advance buf n
method page_for_additions =
let (s,pos,len) = Netbuffer.area_for_additions buf in
(`Bytes s, pos, len)
method page_for_consumption =
let s = Netbuffer.unsafe_buffer buf in
(`Bytes s, 0, Netbuffer.length buf)
method clear() =
Netbuffer.clear buf
end
)
let create_in_buffer ?(small_buffer=false) d0 =
let d = (d0 :> in_device) in
let esys =
device_esys d in
let buf =
if device_supports_memory d then
mem_obj_buffer small_buffer
else
str_obj_buffer small_buffer in
let eof =
ref false in
let fill_e_opt =
ref None in
object
method buffer = buf
method eof = !eof
method set_eof() = eof := true
method start_fill_e () =
assert(!fill_e_opt = None);
if !eof then
eps_e (`Done true) esys
else (
let (ms,pos,len) = buf # page_for_additions in
let e =
input_e d ms pos len
++ (fun n ->
assert(n > 0);
buf # advance n;
fill_e_opt := None;
eps_e (`Done false) esys
)
>> (function
| `Done flag -> `Done flag
| `Error End_of_file ->
eof := true; `Done true
| `Error error -> `Error error
| `Aborted -> `Aborted
) in
fill_e_opt := Some e;
e
)
method fill_e_opt =
!fill_e_opt
method shutdown_e () =
shutdown_e d
method inactivate() =
buf#clear();
inactivate d
method udevice = d
method event_system = esys
end
let in_buffer_length (b:in_buffer) =
b#buffer#length
let in_buffer_blit (b:in_buffer) bpos ms mspos len =
b#buffer#blit_out bpos ms mspos len
let in_buffer_fill_e (b:in_buffer) =
match b#fill_e_opt with
| None -> b#start_fill_e ()
| Some fe -> fe
let create_out_buffer ?(small_buffer=false) ~max d0 =
let d = (d0 :> out_device) in
let esys =
device_esys d in
let buf =
if device_supports_memory d then
mem_obj_buffer small_buffer
else
str_obj_buffer small_buffer in
let eof =
ref false in
let flush_e_opt =
ref None in
let rec flush_e n =
if n > 0 then (
let (ms,pos,len) = buf # page_for_consumption in
let len' = min len n in
output_e d ms pos len' ++
(fun k ->
buf # delete_hd k;
flush_e (n-k)
)
)
else
eps_e (`Done ()) esys in
object
method buffer = buf
method eof = !eof
method max = max
method start_flush_e() =
assert (!flush_e_opt = None);
let e =
flush_e (buf#length)
>> (fun st -> flush_e_opt := None; st) in
flush_e_opt := Some e;
e
method flush_e_opt = !flush_e_opt
method write_eof_e () =
if buf#length = 0 then
write_eof_e d
else
eps_e
(`Error (Failure "Uq_io: called write_eof_e with non-empty buffer"))
esys
method shutdown_e linger =
shutdown_e ?linger d
method inactivate () =
buf#clear();
inactivate d
method udevice = Some d
method event_system = esys
end
let copy_e ?(small_buffer=false) ?len ?len64 d_in d_out =
let d_in_esys = device_esys d_in in
let d_out_esys = device_esys d_out in
if d_in_esys <> d_out_esys then
invalid_arg "Uq_io.copy_e: devices must use the same event system";
let esys = d_in_esys in
let ms, ms_len, free_ms =
if device_supports_memory d_in && device_supports_memory d_out then (
let m, f =
Netsys_mem.pool_alloc_memory2
(if small_buffer then Netsys_mem.small_pool
else Netsys_mem.default_pool) in
(`Memory m, Bigarray.Array1.dim m, f)
)
else (
let s = Bytes.create (if small_buffer then 4096 else 65536) in
(`Bytes s, Bytes.length s, (fun () -> ()))
) in
(* Note that calling free_ms only accelerates that ms is recognized
as free after the copy is done. It is not necessary to call it.
*)
let rec push_data p n =
if n = 0 then
eps_e (`Done ()) esys
else
output_e d_out ms p n ++ (fun k -> push_data (p+k) (n-k)) in
let count = ref 0L in
let eff_len =
match len, len64 with
| None, None -> None
| Some n, None -> Some(Int64.of_int n)
| None, Some n -> Some n
| Some n1, Some n2 -> Some(min (Int64.of_int n1) n2) in
let rec pull_data() =
let n =
match eff_len with
| None ->
ms_len
| Some l ->
Int64.to_int( min (Int64.of_int ms_len) (Int64.sub l !count)) in
let ( >> ) = Uq_engines.fmap_engine in
(* For a strange reason we need this - somewhere a generalization is
missing
*)
if n=0 then (
free_ms();
eps_e (`Done !count) esys
)
else
( input_e d_in ms 0 n
>> (function
| `Done n -> `Done(`Good n)
| `Error End_of_file -> `Done `Eof
| `Error error -> free_ms(); `Error error
| `Aborted -> free_ms(); `Aborted
)
: [`Good of int | `Eof] Uq_engines.engine
) ++
(function
| `Good n ->
count := Int64.add !count (Int64.of_int n);
push_data 0 n ++ (fun () -> pull_data())
| `Eof ->
free_ms();
eps_e (`Done !count) esys
) in
pull_data()
let eof_as_none =
function
| `Done x -> `Done(Some x)
| `Error End_of_file -> `Done None
| `Error e -> `Error e
| `Aborted -> `Aborted
let filter_out_buffer ~max (p : Netchannels.io_obj_channel) d0 : out_buffer =
let small_buffer = true in
let d = (d0 :> out_device) in
let esys =
device_esys d in
let buf = str_obj_buffer small_buffer in
let eof =
ref false in
let flush_e_opt =
ref None in
let rec do_flush_e() =
let q = ref 0 in
if buf#length > 0 then (
assert(not !eof);
(* First copy everything from buf to p: *)
let (ms,pos,len) = buf # page_for_consumption in
let s =
match ms with
| `String s | `Bytes s -> s
| `Memory _ -> assert false in
q := 1;
let n = p # output s pos len in
q := 2;
buf # delete_hd n;
(* Copy from p to d: *)
let p_dev =
`Async_in(new Uq_transfer.pseudo_async_in_channel p, esys) in
( copy_e p_dev d
>> (function
| `Done _ -> `Done ()
| `Error Netchannels.Buffer_underrun -> `Done ()
| `Error err -> `Error err
| `Aborted -> `Aborted
)
) ++ do_flush_e
)
else
if !eof then (
q := 3;
p # close_out();
q := 4;
let p_dev =
`Async_in(new Uq_transfer.pseudo_async_in_channel p, esys) in
copy_e p_dev d
>> (fun st ->
p#close_in();
match st with
| `Done _ -> `Done()
| `Error err -> `Error err
| `Aborted -> `Aborted
)
)
else (
eps_e (`Done()) esys
)
in
object(self)
method buffer = buf
method eof = !eof
method max = max
method start_flush_e() =
assert (!flush_e_opt = None);
let e =
do_flush_e ()
>> (fun st ->
flush_e_opt := None;
st
) in
flush_e_opt := Some e;
e
method flush_e_opt =
match !flush_e_opt with
| None -> None
| Some e ->
assert(match e#state with
| `Done _ -> false
| _ -> true
);
Some e
method write_eof_e () =
eof := true;
flush_e (`Buffer_out self)
++ (fun () ->
write_eof_e d
)
method shutdown_e linger =
eof := true;
flush_e (`Buffer_out self)
++ (fun () ->
shutdown_e ?linger d
)
method inactivate () =
p#close_in();
inactivate d
method udevice = None
(* It is not allowed to bypass this buffer *)
method event_system = esys
end