(* $Id: shell_fs.ml 1661 2011-08-28 22:45:55Z gerd $ *)
open Printf
type command_context =
{ sfs_command : string;
sfs_stdin : Shell.producer;
sfs_stdout : Shell.consumer;
sfs_stderr : Shell.consumer;
mutable sfs_status : Unix.process_status option;
}
class type command_interpreter =
object
method exec : command_context -> unit
method interrupt : unit -> unit
method run : unit -> unit
end
class type shell_stream_fs =
object
inherit Netfs.stream_fs
method last_stderr : string
end
exception Interrupt
let ws_re = Netstring_str.regexp "[ \t\r\n]+"
let cmd_interpreter mk_command =
let esys = Unixqueue.create_unix_event_system() in
let g = Unixqueue.new_group esys in
let cur_eng = ref None in
object(self)
method exec cctx =
if !cur_eng <> None then
failwith "Shell_fs.local_interpreter.exec: already running";
let e =
new Shell_uq.call_engine
~ignore_error_code:true
~stdin:cctx.sfs_stdin
~stdout:cctx.sfs_stdout
~stderr:cctx.sfs_stderr
(mk_command cctx)
esys in
Uq_engines.when_state
~is_done:(fun _ ->
cur_eng := None;
match List.rev(Shell_sys.processes e#job_instance) with
| p :: _ -> cctx.sfs_status <- Some (Shell_sys.status p)
| [] -> assert false
)
~is_error:(fun err ->
cur_eng := None;
match err with
| Shell.Subprocess_error ((_ :: _) as l) ->
let (_,st) = List.hd (List.rev l) in
cctx.sfs_status <- Some st
| _ ->
Unixqueue.once esys g 0.0 (fun () -> raise err)
)
e;
cur_eng := Some e
method interrupt() =
Unixqueue.once esys g 0.0 (fun () -> raise Interrupt)
method run() =
try
Unixqueue.run esys
with Interrupt ->
()
end
let local_interpreter () =
cmd_interpreter
(fun cctx ->
[ Shell.cmd
"/bin/sh"
[ "-c"; cctx.sfs_command ]
]
)
let ssh_interpreter ?(options=[ "-o"; "BatchMode yes"]) ?user ~host () =
let uh =
match user with
| None -> host
| Some u -> u ^ "@" ^ host in
cmd_interpreter
(fun cctx ->
[ Shell.cmd
"ssh"
(options @ [ uh; cctx.sfs_command ])
]
)
let output_stream_adapter ~ci ~close_in ~skip =
let page_size = Netsys_mem.pagesize in
let stdout_buf = Netpagebuffer.create page_size in
let stdout_eof = ref false in
let stdout_drop = ref false in
let to_skip = ref skip in
let consumer fd =
try
let n =
Netpagebuffer.add_inplace stdout_buf (Netsys_mem.mem_read fd) in
if !stdout_drop then
Netpagebuffer.clear stdout_buf;
if !to_skip > 0L then (
let q =
min !to_skip (Int64.of_int (Netpagebuffer.length stdout_buf)) in
Netpagebuffer.delete_hd stdout_buf (Int64.to_int q);
to_skip := Int64.sub !to_skip q;
);
if n = 0 then (
stdout_eof := true;
Unix.close fd;
);
if Netpagebuffer.length stdout_buf > 16 * page_size then
ci#interrupt();
not !stdout_eof
with Unix.Unix_error(Unix.EINTR,_,_) -> true
in
let ch =
( object
method input s pos len =
if Netpagebuffer.length stdout_buf = 0 then (
if !stdout_eof then raise End_of_file;
ci#run();
if Netpagebuffer.length stdout_buf = 0 && !stdout_eof then
raise End_of_file;
);
let n = min len (Netpagebuffer.length stdout_buf) in
Netpagebuffer.blit_to_string stdout_buf 0 s pos n;
Netpagebuffer.delete_hd stdout_buf n;
n
method close_in() =
stdout_drop := true;
Netpagebuffer.clear stdout_buf;
ci#run();
close_in()
end
) in
let ch' =
Netchannels.lift_in ~buffered:true (`Rec ch) in
(Shell.to_function ~consumer (), ch')
let input_stream_adapter ~ci ~close_out =
let page_size = Netsys_mem.pagesize in
let stdin_buf = Netpagebuffer.create page_size in
let stdin_eof = ref false in
let producer fd =
try
let (m,pos,len) = Netpagebuffer.page_for_consumption stdin_buf in
let n = Netsys_mem.mem_write fd m pos len in
Netpagebuffer.delete_hd stdin_buf n;
if Netpagebuffer.length stdin_buf = 0 then (
if !stdin_eof then
Unix.close fd
else
ci#interrupt()
);
not !stdin_eof
with Unix.Unix_error(Unix.EINTR,_,_) -> true
in
let ch =
( object
method output s pos len =
Netpagebuffer.add_sub_string stdin_buf s pos len;
if Netpagebuffer.length stdin_buf > 16 * page_size then
ci#run();
len
method flush() =
ci#run()
method close_out() =
stdin_eof := true;
ci#run();
close_out()
end
) in
let ch' =
Netchannels.lift_out ~buffered:false (`Rec ch) in
(Shell.from_function ~producer (), ch')
let execute ci cctx = ci # exec cctx
let wait ci = ci # run()
let slash_re = Netstring_str.regexp "/+"
let link_re = Netstring_str.regexp ".*? -> \\(.*\\)$"
exception Not_absolute
class shell_fs ?encoding ?(root="/") ?(dd_has_excl=false)
?tmp_directory ?tmp_prefix
(ci : command_interpreter) : shell_stream_fs =
let () =
match encoding with
| None -> ()
| Some e ->
if not (Netconversion.is_ascii_compatible e) then
failwith
"Shell_fs.shell_fs: the encoding is not ASCII-compatible" in
let check_component path c =
let iter f s =
match encoding with
| None ->
String.iter (fun c -> f (Char.code c)) s
| Some e ->
Netconversion.ustring_iter e f s in
try
let first = ref true in
iter
(fun code ->
if code = 0 || code = 47 || (!first && code = 45) then
raise (Unix.Unix_error(Unix.EINVAL,
"Shell_fs: invalid char in path",
path));
first := false
)
c
with Netconversion.Malformed_code ->
raise (Unix.Unix_error(Unix.EINVAL,
"Shell_fs: path does not comply to charset encoding",
path)) in
let check_and_norm_path p =
try
let l = Netstring_str.split_delim slash_re p in
( match l with
| [] -> raise (Unix.Unix_error(Unix.EINVAL,
"Shell_fs: empty path",
p))
| "" :: first :: rest ->
()
| first :: rest ->
raise Not_absolute
);
List.iter (check_component p) l;
let np = String.concat "/" l in
np
with
| Not_absolute ->
raise (Unix.Unix_error(Unix.EINVAL,
"Shell_fs: path not absolute",
p))
in
let get_sh_path p =
let np = check_and_norm_path p in
match root with
| "" ->
(* erase the leading / *)
let s = String.sub np 1 (String.length np-1) in
if s = "" then "." else s
| r ->
r ^ np
in
let stderr_buf = Buffer.create 80 in
let bs = 65536 in
let simple_exec cmd filename =
(* Get stdout as string *)
(* prerr_endline ("cmd: " ^ cmd); *)
let buf = Buffer.create 1000 in
let cctx =
{ sfs_command = cmd;
sfs_stdin = Shell.from_dev_null;
sfs_stdout = Shell.to_buffer buf;
sfs_stderr = Shell.to_buffer stderr_buf;
sfs_status = None
} in
ci#exec cctx;
ci#run();
match cctx.sfs_status with
| None ->
failwith "Shell_fs.simple_exec: no command status"
| Some (Unix.WEXITED n) ->
(n, Buffer.contents buf)
| Some _ ->
raise(Unix.Unix_error(Unix.EPERM,
"Shell_fs.simple_exec",
filename))
in
object(self)
method path_encoding = encoding
method path_exclusions = [0,0; 47,47]
method nominal_dot_dot = false
method last_stderr = Buffer.contents stderr_buf
method read flags filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let skip_d =
try
List.find
(fun flag ->
match flag with
| `Skip _ -> true
| _ -> false
)
flags
with Not_found -> `Skip 0L in
let skip = match skip_d with `Skip n -> n | _ -> assert false in
let qfn = Filename.quote fn in
let cmd =
sprintf "if test -e %s; then dd if=%s bs=%d skip=%Ld || exit 2; \
else exit 1; fi"
qfn qfn bs (Int64.div skip (Int64.of_int bs)) in
let cctx =
ref
{ sfs_command = cmd;
sfs_stdin = Shell.from_dev_null;
sfs_stdout = Shell.to_dev_null; (* updated later *)
sfs_stderr = Shell.to_buffer stderr_buf;
sfs_status = None
} in
let (stdout_consumer, ch) =
output_stream_adapter
~ci
~close_in:(fun () ->
match !cctx.sfs_status with
| None ->
failwith "Shell_fs.read: no command status"
| Some (Unix.WEXITED 0) ->
()
| Some (Unix.WEXITED 1) ->
raise(Unix.Unix_error(Unix.ENOENT,
"Shell_fs.read",
filename))
| Some _ ->
raise(Unix.Unix_error(Unix.EPERM,
"Shell_fs.read",
filename))
)
~skip:(Int64.rem skip (Int64.of_int bs)) in
cctx := { !cctx with sfs_stdout = stdout_consumer };
ci#exec !cctx;
ch
method read_file flags filename =
let (tmp_name, inch, outch) =
Netchannels.make_temporary_file
?tmp_directory ?tmp_prefix () in
close_in inch;
try
Netchannels.with_out_obj_channel
(new Netchannels.output_channel outch)
(fun obj_outch ->
Netchannels.with_in_obj_channel
(self # read [] filename)
(fun obj_inch ->
obj_outch # output_channel obj_inch
)
);
( object
method filename = tmp_name
method close() =
try Sys.remove tmp_name with _ -> ()
end
)
with
| error ->
( try Sys.remove tmp_name with _ -> ());
raise error
method write flags filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let create_flag = List.mem `Create flags in
let trunc_flag = List.mem `Truncate flags in
let excl_flag = List.mem `Exclusive flags in
if excl_flag && not dd_has_excl then
raise(Unix.Unix_error(Unix.EINVAL,
"Shell_fs.write: no support for exclusive create",
filename));
let notrunc_opt =
if trunc_flag then "" else "conv=notrunc" in
let qfn = Filename.quote fn in
let cmd =
if create_flag then
sprintf "dd of=%s bs=%d %s" qfn bs notrunc_opt
else
sprintf "if test -e %s; then dd of=%s bs=%d %s || exit 2; \
else exit 1; fi"
qfn qfn bs notrunc_opt in
let cctx =
ref
{ sfs_command = cmd;
sfs_stdin = Shell.from_dev_null; (* updated later *)
sfs_stdout = Shell.to_dev_null;
sfs_stderr = Shell.to_buffer stderr_buf;
sfs_status = None
} in
let stdin_producer, ch =
input_stream_adapter
~ci
~close_out:(fun () ->
match !cctx.sfs_status with
| None ->
failwith "Shell_fs.write: no command status"
| Some (Unix.WEXITED 0) ->
()
| Some (Unix.WEXITED 1) ->
raise(Unix.Unix_error(Unix.ENOENT,
"Shell_fs.write",
filename))
| Some _ ->
raise(Unix.Unix_error(Unix.EPERM,
"Shell_fs.write",
filename))
) in
cctx := { !cctx with sfs_stdin = stdin_producer };
ci#exec !cctx;
ch
method write_file flags filename local =
let flags' =
List.map
(function
| #Netfs.write_common as x -> (x :> Netfs.write_flag)
| _ -> `Dummy
)
flags in
Netchannels.with_in_obj_channel
(new Netchannels.input_channel (open_in_bin local#filename))
(fun obj_inch ->
Netchannels.with_out_obj_channel
(self # write flags' filename)
(fun obj_outch ->
obj_outch # output_channel obj_inch
)
)
method size _ filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let cmd =
sprintf "if test -e %s; then ls -ndL %s || exit 2; else exit 1"
qfn qfn in
let (n, stdout) = simple_exec cmd filename in
match n with
| 0 ->
let ch = new Netchannels.input_string stdout in
let line = try ch#input_line() with End_of_file -> "" in
let fields = Netstring_str.split ws_re line in
( match fields with
| _ :: _ :: _ :: _ :: size_str :: _ ->
Int64.of_string size_str
| _ ->
failwith "Shell_fs.size: unexpected output format of 'ls'"
)
| 1 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.size", filename))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.size", filename))
method test flags filename t =
List.hd (self#test_list flags filename [t])
method test_list flags filename tl =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let link_flag = List.mem `Link flags in
let cmd =
sprintf "if test -e %s; then echo true; else echo false; fi; \
if test -d %s; then echo true; else echo false; fi; \
if test -f %s; then echo true; else echo false; fi; \
if test -L %s; then echo true; else echo false; fi; \
if test -s %s; then echo true; else echo false; fi; \
if test -r %s; then echo true; else echo false; fi; \
if test -w %s; then echo true; else echo false; fi; \
if test -x %s; then echo true; else echo false; fi "
qfn qfn qfn qfn qfn qfn qfn qfn in
let (_, stdout) = simple_exec cmd filename in
let ch = new Netchannels.input_string stdout in
( try
let test_e = bool_of_string (ch#input_line ()) in
let test_d = bool_of_string (ch#input_line ()) in
let test_f = bool_of_string (ch#input_line ()) in
let test_L = bool_of_string (ch#input_line ()) in
let test_s = bool_of_string (ch#input_line ()) in
let test_r = bool_of_string (ch#input_line ()) in
let test_w = bool_of_string (ch#input_line ()) in
let test_x = bool_of_string (ch#input_line ()) in
List.map
(function
| `N -> test_e || test_L
| `E -> test_e || (link_flag && test_L)
| `D -> test_d && not (link_flag && test_L)
| `F -> test_f && not (link_flag && test_L)
| `H -> test_L
| `R -> test_r || (link_flag && test_L)
| `W -> test_w || (link_flag && test_L)
| `X -> test_x || (link_flag && test_L)
| `S -> test_s && not (link_flag && test_L)
)
tl
with
| _ ->
failwith "Shell_fs.test_list: unexpected script output"
)
method remove flags filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let rec_flag = List.mem `Recursive flags in
let cmd =
sprintf "if { test -e %s || test -L %s; }; then rm -f %s %s || exit 2; \
else exit 1; fi"
qfn qfn (if rec_flag then "-r" else "") qfn in
let (n, stdout) = simple_exec cmd filename in
match n with
| 0 ->
()
| 1 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.remove", filename))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.remove", filename))
method rename flags old_name new_name =
Buffer.clear stderr_buf;
let old_fn = get_sh_path old_name in
let old_qfn = Filename.quote old_fn in
let new_fn = get_sh_path new_name in
let new_qfn = Filename.quote new_fn in
(* mv allows it to move into directories. This is not intended by this
operation, so we have to check for it
*)
let cmd =
sprintf "if { test -e %s || test -L %s; }; then \
if test -d %s; then \
exit 2; \
else \
mv -f %s %s; \
fi; \
else \
exit 1; \
fi"
old_qfn old_qfn
new_qfn
old_qfn new_qfn in
let (n, stdout) = simple_exec cmd old_name in
match n with
| 0 ->
()
| 1 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.rename", old_name))
| 2 ->
raise(Unix.Unix_error(Unix.EEXIST, "Shell_fs.rename", new_name))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.rename", old_name))
method symlink flags old_name new_name =
Buffer.clear stderr_buf;
let old_fn = get_sh_path old_name in
let old_qfn = Filename.quote old_fn in
let new_fn = get_sh_path new_name in
let new_qfn = Filename.quote new_fn in
let cmd =
sprintf "if { test -e %s || test -L %s; }; then exit 1; \
else ln -s %s %s || exit 2; fi"
new_qfn new_qfn
old_qfn new_qfn in
let (n, stdout) = simple_exec cmd new_name in
match n with
| 0 ->
()
| 1 ->
raise(Unix.Unix_error(Unix.EEXIST, "Shell_fs.symlink", new_name))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.symlink", new_name))
method readdir flags filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let cmd =
sprintf "if test -d %s; then ls -a1 %s/. || exit 2; else exit 1; fi"
qfn qfn in
let (n, stdout) = simple_exec cmd filename in
match n with
| 0 ->
let ch = new Netchannels.input_string stdout in
Netchannels.lines_of_in_obj_channel ch
| 1 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.readdir", filename))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.readdir", filename))
method mkdir flags filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let path_flag = List.mem `Path flags in
let nonexcl_flag = List.mem `Nonexcl flags in
let cmd =
if path_flag then
sprintf "mkdir -p %s || exit 3" qfn
else
sprintf "if { test -e %s || test -L %s; }; then \
if test -d %s; then exit 1; else exit 2; fi; \
else mkdir %s || exit 3; fi"
qfn qfn
qfn
qfn in
let (n, stdout) = simple_exec cmd filename in
match n with
| 0 ->
()
| 1 ->
if not nonexcl_flag then
raise(Unix.Unix_error(Unix.EEXIST, "Shell_fs.mkdir", filename))
| 2 ->
raise(Unix.Unix_error(Unix.EEXIST, "Shell_fs.mkdir", filename))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.mkdir", filename))
method rmdir flags filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let cmd =
sprintf
"if { test -d %s && test ! -L %s; }; then \
rmdir %s || exit 3; \
else \
if test -e %s; then exit 1; else exit 2; fi; \
fi"
qfn qfn qfn qfn in
let (n, stdout) = simple_exec cmd filename in
match n with
| 0 ->
()
| 1 ->
raise(Unix.Unix_error(Unix.ENOTDIR, "Shell_fs.rmdir", filename))
| 2 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.rmdir", filename))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.rmdir", filename))
method copy flags old_name new_name =
Buffer.clear stderr_buf;
let old_fn = get_sh_path old_name in
let old_qfn = Filename.quote old_fn in
let new_fn = get_sh_path new_name in
let new_qfn = Filename.quote new_fn in
(* cp allows it to copy into directories. This is not intended by this
operation, so we have to check for it
*)
let cmd =
sprintf "if { test -e %s || test -L %s; }; then \
if test -d %s; then \
exit 2; \
else \
cp -p %s %s; \
fi; \
else \
exit 1; \
fi"
old_qfn old_qfn
new_qfn
old_qfn new_qfn in
let (n, stdout) = simple_exec cmd old_name in
match n with
| 0 ->
()
| 1 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.copy", old_name))
| 2 ->
raise(Unix.Unix_error(Unix.EEXIST, "Shell_fs.copy", new_name))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.copy", old_name))
method readlink _ filename =
Buffer.clear stderr_buf;
let fn = get_sh_path filename in
let qfn = Filename.quote fn in
let cmd =
sprintf "if test -e %s; then ls -nd %s || exit 2; else exit 1; fi"
qfn qfn in
let (n, stdout) = simple_exec cmd filename in
match n with
| 0 ->
let ch = new Netchannels.input_string stdout in
let line = try ch#input_line() with End_of_file -> "" in
let fields = Netstring_str.split ws_re line in
( match fields with
| perms :: _ ->
if perms <> "" && perms.[0] = 'l' then
(* look for leftmost occurrence of " -> <name>" *)
match Netstring_str.string_match link_re line 0 with
| None ->
raise(Unix.Unix_error(Unix.EINVAL,
"Shell_fs.readlink", filename))
| Some m ->
Netstring_str.matched_group m 1 line
else
raise(Unix.Unix_error(Unix.EINVAL,
"Shell_fs.readlink", filename))
| _ ->
failwith "Shell_fs.size: unexpected output format of 'ls'"
)
| 1 ->
raise(Unix.Unix_error(Unix.ENOENT, "Shell_fs.readlink", filename))
| _ ->
raise(Unix.Unix_error(Unix.EPERM, "Shell_fs.readlink", filename))
method cancel() = ()
end
let shell_fs = new shell_fs