#require "equeue";;
open Unixqueue
type copy_state =
{ copy_ues : Unixqueue.event_system;
copy_group : Unixqueue.group;
copy_infd : Unix.file_descr;
copy_outfd : Unix.file_descr;
copy_size : int;
copy_inbuf : string;
copy_outbuf : string;
mutable copy_outlen : int;
mutable copy_eof : bool;
mutable copy_have_inres : bool;
mutable copy_have_outres : bool;
mutable copy_cleared : bool;
}
let copy_file ues old_name new_name =
(* Adds the necessary handlers and actions to the Unixqueue.event_system
* ues that copy the file 'old_name' to 'new_name'.
*)
let update_resources state ues =
let want_input_resource =
not state.copy_eof && state.copy_outlen < state.copy_size in
let want_output_resource =
state.copy_outlen > 0 in
if want_input_resource && not state.copy_have_inres then
add_resource ues state.copy_group (Wait_in state.copy_infd, -.1.0);
if not want_input_resource && state.copy_have_inres then
remove_resource ues state.copy_group (Wait_in state.copy_infd);
if want_output_resource && not state.copy_have_outres then
add_resource ues state.copy_group (Wait_out state.copy_outfd, -.1.0);
if not want_output_resource && state.copy_have_outres then
remove_resource ues state.copy_group (Wait_out state.copy_outfd);
state.copy_have_inres <- want_input_resource;
state.copy_have_outres <- want_output_resource;
if not want_input_resource && not want_output_resource &&
not state.copy_cleared
then begin
(* Close file descriptors at end: *)
Unix.close state.copy_infd;
Unix.close state.copy_outfd;
(* Remove everything: *)
clear ues state.copy_group;
state.copy_cleared <- true; (* avoid to call 'clear' twice *)
end
in
let handle_input state ues esys e =
(* There is data on the input file descriptor. *)
(* Calculate the available space in the output buffer: *)
let n = state.copy_size - state.copy_outlen in
assert(n > 0);
(* Read the data: *)
let n' = Unix.read state.copy_infd state.copy_inbuf 0 n in
(* End of stream reached? *)
state.copy_eof <- n' = 0;
(* Append the read data to the output buffer: *)
String.blit state.copy_inbuf 0 state.copy_outbuf state.copy_outlen n';
state.copy_outlen <- state.copy_outlen + n';
(* Add or remove resources: *)
update_resources state ues
in
let handle_output state ues esys e =
(* The file descriptor is ready to output data. *)
(* Write as much as possible: *)
let n' = Unix.write state.copy_outfd state.copy_outbuf 0 state.copy_outlen
in
(* Remove the written bytes from the output buffer: *)
String.blit
state.copy_outbuf n' state.copy_outbuf 0 (state.copy_outlen - n');
state.copy_outlen <- state.copy_outlen - n';
(* Add or remove resources: *)
update_resources state ues
in
let handle state ues esys e =
(* Only accept events associated with our own group. *)
match e with
Input_arrived (g,fd) ->
handle_input state ues esys e
| Output_readiness (g,fd) ->
handle_output state ues esys e
| _ ->
raise Equeue.Reject
in
let g = new_group ues in
let infd = Unix.openfile
old_name
[ Unix.O_RDONLY; Unix.O_NONBLOCK ]
0 in
let outfd = Unix.openfile
new_name
[ Unix.O_WRONLY; Unix.O_NONBLOCK; Unix.O_CREAT; Unix.O_TRUNC ]
0o666 in
Unix.clear_nonblock infd;
Unix.clear_nonblock outfd;
let size = 1024 in
let state =
{ copy_ues = ues;
copy_group = g;
copy_infd = infd;
copy_outfd = outfd;
copy_size = size;
copy_inbuf = String.create size;
copy_outbuf = String.create size;
copy_outlen = 0;
copy_eof = false;
copy_have_inres = false;
copy_have_outres = false;
copy_cleared = false;
} in
update_resources state ues;
add_handler ues g (handle state);
;;
let ues = create_unix_event_system() in
copy_file ues "a.old" "a.new";
run ues
;;