(* $Id: ftp_client.ml 1275 2009-10-11 00:48:52Z gerd $ *)
open Telnet_client
open Ftp_data_endpoint
open Printf
module Debug = struct
let enable = ref false
end
let dlog = Netlog.Debug.mk_dlog "Ftp_client" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Ftp_client" Debug.enable
let () =
Netlog.Debug.register_module "Ftp_client" Debug.enable
exception FTP_error of exn
exception FTP_protocol_violation of string
let proto_viol s =
raise(FTP_protocol_violation s)
let () =
Netexn.register_printer
(FTP_error Not_found)
(fun e ->
match e with
| FTP_error e' ->
"Ftp_client.FTP_error(" ^ Netexn.to_string e' ^ ")"
| _ -> assert false
)
type cmd_state =
[ `Init
| `Success
| `Proto_error
| `Temp_failure
| `Perm_failure
| `Rename_seq
| `Restart_seq
| `User_pass_seq
| `User_acct_seq
| `Pass_acct_seq
| `Preliminary
]
type port =
[ `Active of string * int * Unix.file_descr
| `Passive of string * int
| `Unspecified
]
type form_code =
[ `Non_print | `Telnet | `ASA ]
type representation =
[ `ASCII of form_code option
| `EBCDIC of form_code option
| `Image
]
type structure =
[ `File_structure
| `Record_structure
]
type transmission_mode =
Ftp_data_endpoint.transmission_mode
type ftp_state =
{ cmd_state : cmd_state;
ftp_connected : bool;
ftp_data_conn : bool;
ftp_user : string option;
ftp_password : string option;
ftp_account : string option;
ftp_logged_in : bool;
ftp_port : port;
ftp_repr : representation;
ftp_structure : structure;
ftp_trans : transmission_mode;
ftp_dir : string list;
ftp_features : (string * string option) list option;
ftp_options : (string * string option) list;
}
type cmd =
[ `Connect
| `Dummy
| `USER of string
| `PASS of string
| `ACCT of string
| `CWD of string
| `CDUP
| `SMNT of string
| `QUIT
| `REIN
| `PORT
| `PASV
| `TYPE of representation
| `STRU of structure
| `MODE of transmission_mode
| `RETR of string * (ftp_state -> Ftp_data_endpoint.local_receiver)
| `STOR of string * (ftp_state -> Ftp_data_endpoint.local_sender)
| `STOU of (unit -> Ftp_data_endpoint.local_sender)
| `APPE of string * (ftp_state -> Ftp_data_endpoint.local_sender)
| `ALLO of int * int option
| `REST of string
| `RNFR of string
| `RNTO of string
| `DELE of string
| `RMD of string
| `MKD of string
| `PWD
| `LIST of string option * (ftp_state -> Ftp_data_endpoint.local_receiver)
| `NLST of string option * (ftp_state -> Ftp_data_endpoint.local_receiver)
| `SITE of string
| `SYST
| `STAT of string option
| `HELP of string option
| `NOOP
| `FEAT
| `OPTS of string * string option
| `MDTM of string
]
let string_of_cmd =
function
| `Connect -> ""
| `Dummy -> ""
| `USER s -> "USER " ^ s ^ "\r\n"
| `PASS s -> "PASS " ^ s ^ "\r\n"
| `ACCT s -> "ACCT " ^ s ^ "\r\n"
| `CWD s -> "CWD " ^ s ^ "\r\n"
| `CDUP -> "CDUP\r\n"
| `SMNT s -> "SMNT " ^ s ^ "\r\n"
| `QUIT -> "QUIT\r\n"
| `REIN -> "REIN\r\n"
| `PORT -> assert false (* not done here *)
| `PASV -> "PASV\r\n"
| `TYPE t -> "TYPE " ^
( match t with
| `ASCII None -> "A"
| `ASCII (Some `Non_print) -> "A N"
| `ASCII (Some `Telnet) -> "A T"
| `ASCII (Some `ASA) -> "A C"
| `EBCDIC None -> "E"
| `EBCDIC (Some `Non_print) -> "E N"
| `EBCDIC (Some `Telnet) -> "E T"
| `EBCDIC (Some `ASA) -> "E C"
| `Image -> "I"
) ^ "\r\n"
| `STRU `File_structure -> "STRU F\r\n"
| `STRU `Record_structure -> "STRU R\r\n"
| `MODE `Stream_mode -> "MODE S\r\n"
| `MODE `Block_mode -> "MODE B\r\n"
| `RETR (s,_) -> "RETR " ^ s ^ "\r\n"
| `STOR (s,_) -> "STOR " ^ s ^ "\r\n"
| `STOU _ -> "STOU\r\n"
| `APPE (s,_) -> "APPE " ^ s ^ "\r\n"
| `ALLO(n,r) -> "ALLO " ^ string_of_int n ^
(match r with
| None -> ""
| Some m -> " R " ^ string_of_int m) ^ "\r\n"
| `REST s -> "REST " ^ s ^ "\r\n"
| `RNFR s -> "RNFR " ^ s ^ "\r\n"
| `RNTO s -> "RNTO " ^ s ^ "\r\n"
| `DELE s -> "DELE " ^ s ^ "\r\n"
| `RMD s -> "RMD " ^ s ^ "\r\n"
| `MKD s -> "MKD " ^ s ^ "\r\n"
| `PWD -> "PWD\r\n"
| `LIST(None,_) -> "LIST\r\n"
| `LIST(Some s,_) -> "LIST " ^ s ^ "\r\n"
| `NLST(None,_) -> "NLST\r\n"
| `NLST(Some s,_) -> "NLST " ^ s ^ "\r\n"
| `SITE s -> "SITE " ^ s ^ "\r\n"
| `SYST -> "SYST\r\n"
| `STAT None -> "STAT\r\n"
| `STAT(Some s) -> "STAT " ^ s ^ "\r\n"
| `HELP None -> "HELP\r\n"
| `HELP(Some s) -> "HELP " ^ s ^ "\r\n"
| `NOOP -> "NOOP\r\n"
| `FEAT -> "FEAT\r\n"
| `OPTS (cmd,None) -> "OPTS " ^ cmd ^ "\r\n"
| `OPTS (cmd,Some param) -> "OPTS " ^ cmd ^ " " ^ param ^ "\r\n"
| `MDTM s -> "MDTM " ^ s ^ "\r\n"
let port_re = Netstring_pcre.regexp ".*[^0-9](\\d+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)"
let extract_port s =
match Netstring_pcre.string_match port_re s 0 with
| None ->
proto_viol "Cannot parse specification of passive port"
| Some m ->
let h1 = Netstring_pcre.matched_group m 1 s in
let h2 = Netstring_pcre.matched_group m 2 s in
let h3 = Netstring_pcre.matched_group m 3 s in
let h4 = Netstring_pcre.matched_group m 4 s in
let p1 = Netstring_pcre.matched_group m 5 s in
let p2 = Netstring_pcre.matched_group m 6 s in
let p = int_of_string p1 * 256 + int_of_string p2 in
(h1 ^ "." ^ h2 ^ "." ^ h3 ^ "." ^ h4, p)
let addr_re = Netstring_pcre.regexp "^(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$"
let format_port (addr,p) =
match Netstring_pcre.string_match addr_re addr 0 with
| None ->
failwith "Bad IP address"
| Some m ->
let h1 = Netstring_pcre.matched_group m 1 addr in
let h2 = Netstring_pcre.matched_group m 2 addr in
let h3 = Netstring_pcre.matched_group m 3 addr in
let h4 = Netstring_pcre.matched_group m 4 addr in
let p1 = string_of_int(p lsr 8) in
let p2 = string_of_int(p land 0xff) in
h1 ^ "," ^ h2 ^ "," ^ h3 ^ "," ^ h4 ^ "," ^ p1 ^ "," ^ p2
let set_ftp_port state value =
( match state.ftp_port with
| `Active(_,_,fd) ->
Netlog.Debug.release_fd fd;
Unix.close fd
| _ ->
()
);
{ state with ftp_port = value }
let feature_line_re = Netstring_pcre.regexp "^ ([\x21-\xff]+)( .*)?$"
let line_re = Netstring_pcre.regexp "\r?\n"
let parse_features s =
let lines = Netstring_pcre.split line_re s in
List.flatten
(List.map
(fun line ->
match Netstring_pcre.string_match feature_line_re line 0 with
| None -> []
| Some m ->
let label = Netstring_pcre.matched_group m 1 line in
let param =
try Some(Netstring_pcre.matched_group m 2 line)
with Not_found -> None in
[ label, param ]
)
lines)
type reply = int * string
(* Reply code plus text *)
let string_of_state =
function
| `Init -> "Init"
| `Success -> "Success"
| `Proto_error -> "Proto_error"
| `Temp_failure -> "Temp_failure"
| `Perm_failure -> "Perm_failure"
| `Rename_seq -> "Rename_seq"
| `Restart_seq -> "Restart_seq"
| `User_pass_seq -> "User_pass_seq"
| `User_acct_seq -> "User_acct_seq"
| `Pass_acct_seq -> "Pass_acct_seq"
| `Preliminary -> "Preliminary"
let init_state s =
let _s_name =
match Unix.getsockname s with
| Unix.ADDR_INET(addr,_) ->
Unix.string_of_inet_addr addr
| _ ->
failwith "Not an internet socket"
in
{ cmd_state = `Init;
ftp_connected = true;
ftp_data_conn = false;
ftp_user = None;
ftp_password = None;
ftp_account = None;
ftp_logged_in = false;
ftp_port = `Unspecified;
ftp_repr = `ASCII None;
ftp_structure = `File_structure;
ftp_trans = `Stream_mode;
ftp_dir = [];
ftp_features = None;
ftp_options = [];
}
let start_reply_re = Netstring_pcre.regexp "^[0-9][0-9][0-9]-"
let end_reply_re = Netstring_pcre.regexp "^[0-9][0-9][0-9] "
let is_active state =
match state.ftp_port with
| `Active _ -> true
| _ -> false
let is_passive state =
match state.ftp_port with
| `Passive _ -> true
| _ -> false
class work_engine e =
object
method is_working =
match e # state with
| `Working _ -> true
| _ -> false
method abort() =
match e # state with
| `Working _ -> e # abort()
| _ -> ()
end
exception Dummy
class ftp_client_pi
?(event_system = Unixqueue.create_unix_event_system())
?(onempty = fun _ -> ())
?(onclose = fun () -> ())
?(onerrorstate = fun _ -> ())
?(onusererror = fun _ -> ())
sock =
let ctrl_input_buffer = Netbuffer.create 500 in
let ctrl_input, ctrl_input_shutdown =
Netchannels.create_input_netbuffer ctrl_input_buffer in
let peer_str =
try Netsys.string_of_sockaddr (Netsys.getpeername sock)
with _ -> "(noaddr)" in
object(self)
val queue = Queue.create()
val mutable state = `Working 0
val mutable ftp_state = init_state sock
val mutable data_engine = None
val mutable work_engines = ( [] : work_engine list)
val ctrl = new telnet_session
val reply_text = Buffer.create 200
val mutable reply_code = (-1)
val mutable reply_callback = (fun _ _ -> ())
val mutable interaction_state =
( `Waiting `Connect
: [ `Ready
| `Connecting_pasv of cmd * Uq_engines.connect_status Uq_engines.engine
| `Listening_actv of cmd * Unixqueue.event Uq_engines.engine
| `Transfer of cmd
| `Transfer_replied of cmd * int * string
| `Waiting of cmd
] )
(* `Ready: another command can be sent now
* `Connecting_pasv: In passive mode, we are connecting to the
* remote port (done by the argument engine). This state is
* skipped when we are still connected.
* `Listening_actv: In active mode, we are listening for the
* connect (done by the argument engine). This state is
* skipped when we are still connected.
* `Transfer: a data transfer is in progress
* `Transfer_replied: while the rest of the transfer is not yet
* done, the server already sent a reply
* `Waiting: it is waited for the reply
*)
initializer (
ctrl # set_connection (Telnet_socket sock);
ctrl # set_event_system event_system;
ctrl # set_callback self#receive_ctrl_reply;
ctrl # set_exception_handler self#catch_telnet_exception;
ctrl # attach();
)
method ftp_state = ftp_state
method is_empty = Queue.is_empty queue
method state = (state : unit Uq_engines.engine_state)
method event_system = event_system
method abort() =
ctrl # reset();
self # close_connection();
self # set_state `Aborted
method private catch_telnet_exception e =
ctrl # reset();
self # close_connection();
self # set_state (`Error(Telnet_protocol e))
method private set_state s =
state <- s;
match s with
| `Error e -> onerrorstate e
| _ -> ()
method private set_error e =
ctrl # reset();
self # close_connection();
self # set_state (`Error e)
method private protect f =
(* Run [f ()] and catch exceptions *)
try
f()
with
err ->
ctrl # reset();
self # close_connection();
self # set_state (`Error err)
method private clean_work_engines() =
work_engines <- List.filter (fun e -> e # is_working) work_engines
method private receive_ctrl_reply got_synch =
while not (Queue.is_empty ctrl#input_queue) do
let tc = Queue.take ctrl#input_queue in
match tc with
| Telnet_data data ->
Netbuffer.add_string ctrl_input_buffer data;
self # protect (self # parse_ctrl_reply)
| Telnet_nop ->
()
| Telnet_will _
| Telnet_wont _
| Telnet_do _
| Telnet_dont _ ->
ctrl # process_option_command tc
| Telnet_sb _
| Telnet_se ->
() (* Ignore subnegotiation *)
| Telnet_eof ->
ctrl_input_shutdown();
self # protect (self # parse_ctrl_reply)
| Telnet_timeout ->
() (* TODO *)
| _ ->
(* Unexpected telnet command *)
self # protect (fun () ->
proto_viol "Unexpected command on Telnet level")
done
method private parse_ctrl_reply() =
try
while true do
let line = ctrl_input # input_line() in (* or exception! *)
dlogr (fun () ->
sprintf "ctrl received: %s" line);
if Netstring_pcre.string_match start_reply_re line 0 <> None then (
let code = int_of_string (String.sub line 0 3) in
if reply_code <> (-1) && reply_code <> code then
proto_viol "Parse error of control message";
reply_code <- code;
Buffer.add_string reply_text line;
Buffer.add_string reply_text "\n";
)
else
if Netstring_pcre.string_match end_reply_re line 0 <> None then (
let code = int_of_string (String.sub line 0 3) in
if reply_code <> (-1) && reply_code <> code then
proto_viol "Parse error of control message";
Buffer.add_string reply_text line;
Buffer.add_string reply_text "\n";
let text = Buffer.contents reply_text in
reply_code <- (-1);
Buffer.clear reply_text;
self # interpret_ctrl_reply code text;
)
else (
if reply_code = (-1) then
proto_viol "Parse error of control message";
Buffer.add_string reply_text line;
Buffer.add_string reply_text "\n";
)
done
with
| Netchannels.Buffer_underrun ->
(* No complete line yet *)
()
| End_of_file ->
onclose();
self # set_state (`Done ())
method private interpret_ctrl_reply code text =
(* This method is called whenever a reply has been completely received.
* This may happen in a number of situations:
* - As greeting message
* - As regular response to a sent FTP command
* - When the data transfer is over. Note that the control response
* may be received before the end of the transfer is seen by the
* client.
* - Within the data transfer
* - At any other point in time, but this is regarded as protocol error
*)
let reply st cmd_state =
let st' = { st with cmd_state = cmd_state } in
ftp_state <- st';
dlogr (fun () ->
sprintf "state: %s" (string_of_state cmd_state));
try
reply_callback st' (code,text)
with
err ->
onusererror err
in
let ready() =
interaction_state <- `Ready in
( match interaction_state with
| `Ready ->
proto_viol "Spontaneous control message"
| `Waiting `Connect ->
(match code with
| 120 -> reply ftp_state `Preliminary
| 220 -> ready(); reply ftp_state `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ -> proto_viol "Unexpected control message"
)
| `Waiting `Dummy ->
ready(); reply ftp_state `Success
| `Waiting (`USER s) ->
( match code with
| 230 ->
ready();
reply { ftp_state with
ftp_user = Some s;
ftp_password = None;
ftp_account = None;
ftp_logged_in = true } `Success
| 530 ->
ready();
reply { ftp_state with
ftp_logged_in = false } `Perm_failure
| 331 ->
ready();
reply { ftp_state with
ftp_user = Some s;
ftp_password = None;
ftp_account = None;
ftp_logged_in = false } `User_pass_seq
| 332 ->
ready();
reply { ftp_state with
ftp_user = Some s;
ftp_password = None;
ftp_account = None;
ftp_logged_in = false } `User_acct_seq
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`PASS s) ->
( match code with
| 202 | 230 ->
ready();
reply { ftp_state with
ftp_password = Some s;
ftp_account = None;
ftp_logged_in = true } `Success
| 530 ->
ready();
reply { ftp_state with
ftp_logged_in = false } `Perm_failure
| 332 ->
ready();
reply { ftp_state with
ftp_password = Some s;
ftp_account = None;
ftp_logged_in = false } `Pass_acct_seq
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`ACCT s) ->
( match code with
| 202 | 230 ->
ready();
reply { ftp_state with
ftp_account = Some s;
ftp_logged_in = true } `Success
| 530 ->
ready();
reply { ftp_state with
ftp_logged_in = false } `Perm_failure
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`CWD s) ->
( match code with
| 200 | 250 ->
ready();
let ftp_state' =
{ ftp_state with ftp_dir = s :: ftp_state.ftp_dir } in
reply ftp_state' `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting `CDUP ->
( match code with
| 200 | 250 ->
ready();
let ftp_state' =
match ftp_state.ftp_dir with
| [] -> ftp_state
| _ :: dir ->
{ ftp_state with ftp_dir = dir } in
reply ftp_state' `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting `REIN ->
( match code with
| 120 ->
reply ftp_state `Preliminary
| 220 ->
ready(); reply (init_state sock) `Success
(* CHECK: Close data connection? *)
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting `PASV ->
( match code with
| 227 ->
let (addr,port) = extract_port text in
ready();
self # close_connection();
reply (set_ftp_port ftp_state (`Passive(addr,port))) `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`TYPE t) ->
( match code with
| n when n >= 200 && n <= 299 ->
ready();
reply { ftp_state with ftp_repr = t } `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`MODE m) ->
( match code with
| n when n >= 200 && n <= 299 ->
ready();
reply { ftp_state with ftp_trans = m } `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`STRU s) ->
( match code with
| n when n >= 200 && n <= 299 ->
ready();
reply { ftp_state with ftp_structure = s } `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`REST _) ->
( match code with
| 350 ->
ready();
reply ftp_state `Restart_seq
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`RNFR _) ->
( match code with
| 350 ->
ready(); reply ftp_state `Rename_seq
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting `FEAT ->
( match code with
| 211 ->
let l = parse_features text in
ready();
reply { ftp_state with ftp_features = Some l } `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`OPTS(cmd,param_opt)) ->
( match code with
| n when n >= 200 && n <= 299 ->
let l =
(cmd, param_opt) ::
(List.filter
(fun (cmd',_) ->
String.lowercase cmd <> String.lowercase cmd')
ftp_state.ftp_options) in
ready();
reply { ftp_state with ftp_options = l } `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| `Waiting (`SMNT _)
| `Waiting `QUIT
| `Waiting `PORT
| `Waiting (`ALLO _)
| `Waiting (`RNTO _)
| `Waiting (`DELE _)
| `Waiting (`RMD _)
| `Waiting (`MKD _)
| `Waiting `PWD
| `Waiting `SYST
| `Waiting (`STAT _)
| `Waiting (`HELP _)
| `Waiting (`SITE _)
| `Waiting (`MDTM _)
(* See http://www.ietf.org/internet-drafts/draft-ietf-ftpext-mlst-16.txt *)
| `Waiting `NOOP ->
( match code with
| n when n >= 100 && n <= 199 ->
reply ftp_state `Preliminary
| n when n >= 200 && n <= 299 ->
ready(); reply ftp_state `Success
| n when n >= 400 && n <= 499 ->
ready(); reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
ready(); reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
(* `STOR |`STOU | `APPE left out for now *)
| `Connecting_pasv(_, conn_engine) ->
(* Somewhat unexpected reply! *)
( match code with
| 125 | 150 ->
reply ftp_state `Preliminary
| n when n >= 400 && n <= 499 ->
conn_engine # abort();
ready();
reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
conn_engine # abort();
ready();
reply ftp_state `Perm_failure
| _ ->
conn_engine # abort();
proto_viol "Unexpected control message"
)
| `Listening_actv(_, acc_engine) ->
(* Somewhat unexpected reply! *)
( match code with
| 125 | 150 ->
reply ftp_state `Preliminary
| n when n >= 400 && n <= 499 ->
acc_engine # abort();
ready();
reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
acc_engine # abort();
ready();
reply ftp_state `Perm_failure
| _ ->
acc_engine # abort();
proto_viol "Unexpected control message"
)
| `Transfer cmd ->
(* The transfer probably ends in the near future, just record
* the reply, and wait for the end of the transfer.
*)
( match code with
| 125 | 150 ->
reply ftp_state `Preliminary
| n when n >= 400 && n <= 499 ->
self # close_connection();
ready();
reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
self # close_connection();
ready();
reply ftp_state `Perm_failure
| _ ->
interaction_state <- `Transfer_replied(cmd,code,text)
)
| `Transfer_replied cmd ->
(* Another reply! This is an error. *)
proto_viol "Unexpected control message"
| `Waiting(`RETR(_,f))
| `Waiting(`LIST(_,f))
| `Waiting(`NLST(_,f)) ->
(* This state is only possible when the transfer has already
* been completed.
*)
( match data_engine with
| None -> () (* strange *)
| Some e ->
if e # descr_state <> `Clean then self # close_connection()
);
( match code with
| 125 | 150 ->
reply ftp_state `Preliminary
| 226 ->
self # close_connection();
ready();
reply ftp_state `Success
| 250 ->
ready();
reply ftp_state `Success
| n when n >= 400 && n <= 499 ->
self # close_connection();
reply ftp_state `Temp_failure
| n when n >= 500 && n <= 599 ->
self # close_connection();
reply ftp_state `Perm_failure
| _ ->
proto_viol "Unexpected control message"
)
| _ -> assert false
);
self # send_command_when_ready()
method private send_command_when_ready() =
if interaction_state = `Ready then (
try
assert(reply_code = (-1));
let (cmd, onreply) = Queue.take queue in (* or Queue.Empty *)
interaction_state <- `Waiting cmd;
( match cmd with
| `RETR(_,f)
| `LIST(_,f)
| `NLST(_,f) ->
( match ftp_state.ftp_port with
| `Passive(_,_) ->
(* In passive mode, connect now: *)
self # setup_passive_receiver cmd f
| `Active(_,_,_) ->
(* In active mode, accept the connection now *)
self # setup_active_receiver cmd f
| `Unspecified ->
failwith "FTP client: Usage error, one must send `PORT or `PASV before the transfer"
)
| _ -> ()
);
let line =
match cmd with
| `PORT ->
let addr = (* of control connection *)
match Unix.getsockname sock with
| Unix.ADDR_INET(addr,_) -> addr
| _ -> assert false in
let addr_str = Unix.string_of_inet_addr addr in
let dom = Netsys.domain_of_inet_addr addr in
let server_sock = Unix.socket dom Unix.SOCK_STREAM 0 in
Unix.bind server_sock (Unix.ADDR_INET(addr,0));
Unix.listen server_sock 1;
Netlog.Debug.track_fd
~owner:"Ftp_client"
~descr:("Data server (active mode) for " ^
peer_str)
server_sock;
let port =
match Unix.getsockname server_sock with
| Unix.ADDR_INET(_,port) -> port
| _ -> assert false in
dlogr (fun () ->
sprintf "created data server (active mode) \
listening for %s:%d"
addr_str port);
ftp_state <- set_ftp_port ftp_state (`Active(addr_str,port,server_sock));
let port_str = format_port (addr_str,port) in
"PORT " ^ port_str ^ "\r\n"
| _ -> string_of_cmd cmd in
reply_callback <- onreply;
if line <> "" then (
dlogr (fun () ->
sprintf "ctrl sent: %s" line);
Queue.push (Telnet_data line) ctrl#output_queue;
ctrl # update();
) else (
dlog "ctrl sent: DUMMY";
raise Dummy
)
with
| Queue.Empty ->
onempty ftp_state
| Dummy ->
self # interpret_ctrl_reply 200 "200 DUMMY"
)
method private close_connection() =
(* Terminates any transfer immediately and closes the connection *)
( match data_engine with
| None -> ()
| Some e ->
dlogr (fun () -> "aborting data transfer");
( match e # state with
| `Working _ -> e # abort()
| _ -> ()
);
let data_sock = e # descr in
Netlog.Debug.release_fd data_sock;
Unix.close data_sock;
data_engine <- None;
);
ftp_state <- set_ftp_port ftp_state `Unspecified;
List.iter (fun e -> e # abort()) work_engines;
work_engines <- []
method private setup_passive_receiver cmd f =
assert(is_passive ftp_state);
( match data_engine with
| Some e ->
(* Continue with the old connection *)
if e # descr_state <> `Clean then
proto_viol "Data connection not clean";
(* Create a new engine taking the connection over *)
let data_sock = e # descr in
dlogr (fun () ->
sprintf "reusing passive-mode data connection");
self # setup_receiver
~is_done:(fun () ->
match interaction_state with
| `Transfer_replied(_,c,t) ->
interaction_state <-
`Waiting cmd;
self # interpret_ctrl_reply c t
| `Transfer cmd ->
interaction_state <-
`Waiting cmd
| _ -> assert false
)
~is_error:(fun err ->
self # set_error (FTP_error err))
f data_sock;
interaction_state <- `Transfer cmd
| None ->
(* Indicates that a connection is to be opened *)
let addr,port =
match ftp_state.ftp_port with
| `Passive(a,p) -> a,p | _ -> assert false in
let conn_engine =
Uq_engines.connector
(`Socket(
`Sock_inet(Unix.SOCK_STREAM,
Unix.inet_addr_of_string addr,
port),
Uq_engines.default_connect_options))
event_system in
Uq_engines.when_state
~is_done:(function
| `Socket(data_sock,_) ->
dlogr (fun () ->
sprintf "passive-mode data connection \
to %s:%d established"
addr port);
Netlog.Debug.track_fd
~owner:"Ftp_client"
~descr:("Data connection (passive mode) for "^
peer_str)
data_sock;
self # setup_receiver
~is_done:(fun () ->
match interaction_state with
| `Transfer_replied(_,c,t) ->
interaction_state <-
`Waiting cmd;
self # interpret_ctrl_reply c t
| `Transfer cmd ->
interaction_state <-
`Waiting cmd
| _ -> assert false
)
~is_error:(fun err ->
self # set_error (FTP_error err))
f data_sock;
interaction_state <- `Transfer cmd;
self # clean_work_engines();
| _ -> assert false
)
~is_error:(fun err ->
self # clean_work_engines();
self # set_error (FTP_error err))
conn_engine;
work_engines <- new work_engine conn_engine :: work_engines;
interaction_state <- `Connecting_pasv(cmd,conn_engine);
)
method private setup_active_receiver cmd f =
assert(is_active ftp_state);
( match data_engine with
| Some e ->
(* Continue with the old connection *)
if e # descr_state <> `Clean then
proto_viol "Data connection not clean";
(* Create a new engine taking the connection over *)
dlogr (fun () ->
sprintf "reusing active-mode data connection");
let data_sock = e # descr in
self # setup_receiver
~is_done:(fun () ->
match interaction_state with
| `Transfer_replied(_,c,t) ->
interaction_state <-
`Waiting cmd;
self # interpret_ctrl_reply c t
| `Transfer cmd ->
interaction_state <-
`Waiting cmd
| _ -> assert false
)
~is_error:(fun err ->
self # set_error (FTP_error err))
f data_sock;
interaction_state <- `Transfer cmd
| None ->
(* Indicates that a connection is to be opened *)
let addr,port,server_sock =
match ftp_state.ftp_port with
| `Active(a,p,fd) -> a,p,fd | _ -> assert false in
let acc_engine =
new Uq_engines.poll_engine
[ (Unixqueue.Wait_in server_sock), (-1.0) ] event_system in
Uq_engines.when_state
~is_done:(function
| Unixqueue.Input_arrived(_,_) ->
let data_sock, _ = Unix.accept server_sock in
dlogr (fun () ->
sprintf "accepted new active-mode \
data connection");
Netlog.Debug.track_fd
~owner:"Ftp_client"
~descr:("Data connection (active mode) for " ^
peer_str)
data_sock;
self # setup_receiver
~is_done:(fun () ->
match interaction_state with
| `Transfer_replied(_,c,t) ->
interaction_state <-
`Waiting cmd;
self # interpret_ctrl_reply c t
| `Transfer cmd ->
interaction_state <-
`Waiting cmd
| _ -> assert false
)
~is_error:(fun err ->
self # set_error (FTP_error err))
f data_sock;
interaction_state <- `Transfer cmd;
self # clean_work_engines()
| _ ->
assert false
)
~is_error:(fun err ->
self # clean_work_engines();
self # set_error (FTP_error err))
acc_engine;
let acc_engine = (acc_engine :> Unixqueue.event Uq_engines.engine) in
work_engines <- new work_engine acc_engine :: work_engines;
interaction_state <- `Listening_actv(cmd,acc_engine);
)
method private setup_receiver ~is_done ~is_error f data_sock =
let local = f ftp_state in
let e =
new ftp_data_receiver
~esys:event_system
~mode:ftp_state.ftp_trans
~local_receiver:local
~descr:data_sock () in
data_engine <- Some (e :> ftp_data_engine);
Uq_engines.when_state
~is_done
~is_error
e
method add_cmd ?(onreply = fun _ _ -> ()) (cmd : cmd) =
match state with
| `Working _ ->
Queue.push (cmd, onreply) queue;
self # protect (self # send_command_when_ready)
| _ ->
failwith "Ftp_client.ftp_client_pi: Connection already terminated"
method send_abort () = ()
method run () = Unixqueue.run event_system
end
module Action : sig
type plan
type action = plan -> unit
val ftp_state : plan -> ftp_state
val empty : action
val command : cmd -> action
val dyn_command : (unit -> cmd) -> action
val seq2 : action -> action -> action
val full_seq2 : action -> (reply -> action) -> action
val seq : action list -> action
val expect : cmd_state -> action -> action
val seq2_case : action -> (cmd_state * action) list -> action
val execute : onreply:(ftp_state -> reply -> unit) ->
onerror:(ftp_state -> reply -> unit) ->
ftp_client_pi ->
action ->
unit
end = struct
type plan =
{ pi : ftp_client_pi; (* where we are *)
ftp_state : ftp_state;
onreply : ftp_state -> reply -> unit;
(* what to do next *)
onerror : ftp_state -> reply -> unit
(* what to do if all else fails *)
}
type action =
plan -> unit
let ftp_state p = p.ftp_state
let execute ~onreply ~onerror pi act =
act
{ pi = pi;
ftp_state = pi#ftp_state;
onreply = onreply;
onerror = onerror
}
let empty p =
p.pi # add_cmd
~onreply:p.onreply
`Dummy
let command cmd p =
p.pi # add_cmd
~onreply:(fun ftp_state r ->
if ftp_state.cmd_state <> `Preliminary then
p.onreply ftp_state r)
cmd
let dyn_command f_cmd p =
command (f_cmd()) p
let seq2 act1 act2 p =
act1
{ p with
onreply = (fun ftp_state r ->
act2
{ p with
ftp_state = ftp_state
}
)
}
let full_seq2 act1 f_act2 p =
act1
{ p with
onreply = (fun ftp_state r ->
f_act2
r
{ p with
ftp_state = ftp_state
}
)
}
let rec seq act_list p =
match act_list with
| [] ->
empty p
| [act] ->
act p
| act :: act_list' ->
act
{ p with
onreply = (fun ftp_state r ->
seq act_list' { p with ftp_state = ftp_state }
)
}
let expect s act p =
act
{ p with
onreply = (fun ftp_state r ->
if ftp_state.cmd_state = s then
p.onreply ftp_state r
else
p.onerror ftp_state r
)
}
let seq2_case act cases p =
act
{ p with
onreply = (fun ftp_state r ->
match
try
Some(List.assoc ftp_state.cmd_state cases)
with
Not_found -> None
with
| Some act' ->
act' { p with ftp_state = ftp_state }
| None ->
p.onerror ftp_state r
)
}
end
class type ftp_method =
object
method connect : (string * int) option
method execute : Action.action
end
exception FTP_method_temp_failure of int * string
exception FTP_method_perm_failure of int * string
exception FTP_method_unexpected_reply of int * string
class connect_method ~host ?(port = 21) () : ftp_method =
object(self)
method connect = Some(host,port)
method execute = Action.empty
end
class login_method ~user ~get_password ~get_account () : ftp_method =
object(self)
method connect = None
method execute =
Action.seq2_case
(Action.command (`USER user))
[ `Success, Action.empty;
`User_pass_seq, ( Action.seq2_case
(Action.dyn_command
(fun () -> `PASS (get_password())))
[ `Success, Action.empty;
`Pass_acct_seq, ( Action.expect `Success
(Action.dyn_command
(fun () ->
`ACCT (get_account())))
);
]
);
`User_acct_seq, ( Action.expect `Success
(Action.dyn_command
(fun () -> `ACCT (get_account())))
);
]
end
let slash_re = Netstring_pcre.regexp "/+";;
let rec basename l =
match l with
| [] -> failwith "Bad filename"
| [name] -> name
| _ :: l' -> basename l'
let rec dirname l =
match l with
| [] -> []
| [name] -> []
| dir :: l' -> dir :: dirname l'
let rec is_prefix l1 l2 =
match (l1, l2) with
| (x1::l1'), (x2::l2') ->
x1 = x2 && is_prefix l1' l2'
| [], _ ->
true
| (_::_), [] ->
false
let rec without_prefix l1 l2 =
match (l1, l2) with
| (x1::l1'), (x2::l2') ->
if x1 = x2 then without_prefix l1' l2' else failwith "without_prefix"
| [], _ ->
l2
| (_::_), [] ->
failwith "without_prefix"
class walk_method (destination : [ `File of string | `Dir of string | `Stay ] )
: ftp_method =
object(self)
method connect = None
method execute =
let rec walk_to_directory path p =
let ftp_state = Action.ftp_state p in
let cur_dir = List.rev ftp_state.ftp_dir in
if is_prefix cur_dir path then
match without_prefix cur_dir path with
| [] ->
Action.empty p
| dir :: _ ->
Action.seq2
(Action.expect
`Success
(Action.command (`CWD dir)))
(walk_to_directory path)
p
else
Action.seq2
(Action.expect
`Success
(Action.command `CDUP))
(walk_to_directory path)
p
in
match destination with
| `File name ->
let rpath = List.rev (Netstring_pcre.split slash_re name) in
( match rpath with
| _ :: dir -> walk_to_directory (List.rev dir)
| [] -> failwith "Bad filename"
)
| `Dir name ->
let path = Netstring_pcre.split slash_re name in
walk_to_directory path
| `Stay ->
Action.empty
end
type filename =
[ `NVFS of string
| `Verbatim of string
]
let destination_of_file =
function
| `NVFS name -> `File name
| `Verbatim _ -> `Stay
let destination_of_dir =
function
| `NVFS name -> `Dir name
| `Verbatim _ -> `Stay
let ftp_filename =
function
| `NVFS name -> basename (Netstring_pcre.split slash_re name)
| `Verbatim name -> name
class file_method ~(perform : string -> ftp_method)
(file : filename) : ftp_method =
(* Internal class, not exported *)
let walk = new walk_method (destination_of_file file) in
let filename = ftp_filename file in
object(self)
method connect = walk # connect
method execute =
Action.seq2
walk#execute
((perform filename) # execute)
end
class dir_method ~(perform : string option -> ftp_method)
(dir : filename) : ftp_method =
(* Internal class, not exported *)
let walk = new walk_method (destination_of_dir dir) in
let filename_opt =
match dir with
| `NVFS name -> None
| `Verbatim name -> Some name in
object(self)
method connect = walk # connect
method execute =
Action.seq2
walk#execute
((perform filename_opt) # execute)
end
class invoke_method ~command
~(process_result : ftp_state -> (int * string) -> unit)
() : ftp_method =
object(self)
method connect = None
method execute =
Action.full_seq2
(Action.expect `Success
(Action.command command))
(fun reply p ->
let fs = Action.ftp_state p in
process_result fs reply)
end
class set_structure_method structure =
invoke_method
~command:(`STRU structure)
~process_result:(fun _ _ -> ())
()
class set_mode_method mode =
invoke_method
~command:(`MODE mode)
~process_result:(fun _ _ -> ())
()
class mkdir_method file =
file_method
~perform:(fun filename ->
new invoke_method
~command:(`MKD filename)
~process_result:(fun _ _ -> ())
() )
file
class rmdir_method file =
file_method
~perform:(fun filename ->
new invoke_method
~command:(`RMD filename)
~process_result:(fun _ _ -> ())
() )
file
class delete_method file =
file_method
~perform:(fun filename ->
new invoke_method
~command:(`DELE filename)
~process_result:(fun _ _ -> ())
() )
file
(* MDTM response is YYYYMMDDHHMMSS[.s+] (GMT) *)
let time_re = Netstring_pcre.regexp ".*[^0-9](\\d{4})(\\d{2})(\\d{2})(\\d{2})(\\d{2})(\\d{2})(?:\\.(\\d+))?"
let extract_time s =
match Netstring_pcre.string_match time_re s 0 with
| None ->
failwith ("Cannot parse time-val: " ^ s)
| Some m ->
let fraction =
try if (Netstring_pcre.matched_group m 7 s).[0] < '5' then 0 else 1
with Not_found -> 0
in
let get_int i = int_of_string (Netstring_pcre.matched_group m i s) in
Netdate.since_epoch
{ Netdate.year = get_int 1;
Netdate.month = get_int 2;
Netdate.day = get_int 3;
Netdate.hour = get_int 4;
Netdate.minute = get_int 5;
Netdate.second = get_int 6 + fraction;
Netdate.zone = 0; (* GMT *)
Netdate.week_day = -1 }
class mdtm_method ~file ~process_result () =
file_method
~perform:(fun filename ->
new invoke_method
~command:(`MDTM filename)
~process_result:(fun _ (code,text) ->
let t = extract_time text in
process_result t)
() )
file
class rename_method' filename_from filename_to : ftp_method =
object(self)
method connect = None
method execute =
Action.seq2_case
(Action.command (`RNFR filename_from))
[ `Rename_seq, (Action.expect `Success
(Action.command (`RNTO filename_to))) ]
end
class rename_method ~file_from ~(file_to : filename) () =
(* Check arguments: *)
let filename_to =
match (file_from, file_to) with
| (`NVFS p1), (`NVFS p2) ->
(* p1 and p2 must point to the same directory. Return basename of p2 *)
let p1' = Netstring_pcre.split slash_re p1 in
let p2' = Netstring_pcre.split slash_re p2 in
let d1 = dirname p1' in
let d2 = dirname p2' in
if d1 <> d2 then invalid_arg "Ftp_client.rename_method";
basename p2'
| (`Verbatim _), (`Verbatim s) -> s
| _ -> invalid_arg "Ftp_client.rename_method"
in
file_method
~perform:(fun filename_from ->
new rename_method' filename_from filename_to
)
file_from
class retrieve_method ~command ~representation () : ftp_method =
object(self)
method connect = None
method execute =
Action.seq
[Action.expect `Success (Action.command (`TYPE representation));
Action.seq2_case
(Action.command `PASV)
[ `Success, Action.empty;
`Perm_failure, (Action.expect `Success
(Action.command `PORT))
];
Action.expect `Success (Action.command command)
]
end
class get_method ~file ~representation ~store () : ftp_method =
file_method
~perform:(fun filename ->
new retrieve_method
~command:(`RETR(filename,store))
~representation ())
file
class list_method ~dir ~representation ~store () : ftp_method =
dir_method
~perform:(fun filename_opt ->
new retrieve_method
~command:(`LIST(filename_opt,store))
~representation ())
dir
class nlst_method ~dir ~representation ~store () : ftp_method =
dir_method
~perform:(fun filename_opt ->
new retrieve_method
~command:(`NLST(filename_opt,store))
~representation ())
dir
class ftp_client
?(event_system = Unixqueue.create_unix_event_system())
?(onempty = fun () -> ())
() =
object(self)
val mutable pi_opt = (None : ftp_client_pi option)
val mutable queue = Queue.create()
val mutable current = None
val mutable work_engines = ( [] : work_engine list)
val mutable state = `Working 0
val mutable notify_list = []
val mutable notify_list_new = []
method state = (state : unit Uq_engines.engine_state)
method event_system = event_system
method abort() =
( match pi_opt with
| None -> ()
| Some pi -> pi # abort()
);
List.iter (fun e -> e # abort()) work_engines;
work_engines <- [];
self # set_state `Aborted
method private clean_work_engines() =
work_engines <- List.filter (fun e -> e # is_working) work_engines
method request_notification f =
notify_list_new <- f :: notify_list_new
method private notify() =
notify_list <- notify_list @ notify_list_new;
notify_list_new <- [];
notify_list <- List.filter (fun f -> f()) notify_list
method private set_state s =
if s <> state then (
state <- s;
self # notify();
)
method private set_error e =
self # abort();
self # set_state (`Error e)
method private still_running =
match state with
| `Working _ -> true
| _ -> false
method private protect : 'a . ('a -> unit) -> 'a -> unit =
fun f arg ->
try
f arg
with
err ->
self # set_error err
method private next() =
try
let (ftpm, onsuccess, onerror) = Queue.take queue in (* or Queue.Empty *)
current <- Some(ftpm,onsuccess,onerror);
(* First check whether we have to connect again: *)
( match ftpm # connect with
| None ->
( match pi_opt with
| None -> failwith "Missing connection"
| Some pi -> self # exec pi ftpm onsuccess onerror
)
| Some (host,port) ->
(* First stop the current connection *)
( match pi_opt with
| None -> ()
| Some pi ->
pi # add_cmd `QUIT;
(* TODO: fallback method if QUIT fails *)
self # clean_work_engines();
work_engines <- new work_engine pi :: work_engines;
pi_opt <- None
);
let conn_engine =
Uq_engines.connector
(`Socket(
`Sock_inet(Unix.SOCK_STREAM,
Unix.inet_addr_of_string host,
port),
Uq_engines.default_connect_options))
event_system in
Uq_engines.when_state
~is_done:(function
| `Socket(sock,_) ->
(* N.B. This socket is fd-tracked by Telnet_client *)
let pi =
new ftp_client_pi
~event_system
~onerrorstate:self#set_error
sock in
pi_opt <- Some pi;
self # clean_work_engines();
self # exec pi ftpm onsuccess onerror
| _ -> assert false
)
~is_error:(fun err ->
self # clean_work_engines();
self # set_error (FTP_error err))
~is_aborted:(fun _ -> self # clean_work_engines())
conn_engine;
work_engines <- new work_engine conn_engine :: work_engines
)
with
Queue.Empty ->
onempty();
( match pi_opt with
| None -> ()
| Some pi ->
pi # add_cmd `QUIT;
(* TODO: fallback method if QUIT fails *)
self # clean_work_engines();
work_engines <- new work_engine pi :: work_engines;
pi_opt <- None;
);
method private exec pi ftpm onsuccess onerror =
let onreply fs r =
self # protect onsuccess ();
current <- None;
if self # still_running then self # next()
in
let onerror fs (code,text) =
let e =
match fs.cmd_state with
| `Temp_failure -> FTP_method_temp_failure (code,text)
| `Perm_failure -> FTP_method_perm_failure (code,text)
| _ -> FTP_method_unexpected_reply (code,text) in
self # protect onerror e;
current <- None;
if self # still_running then self # next();
in
let _ftp_state = pi # ftp_state in
self # protect (
fun () ->
let action = ftpm # execute in
Action.execute ~onreply ~onerror pi action) ();
(*
method private pi_got_empty ftp_state =
current <- None;
self # next()
*)
method add ?(onsuccess = fun () -> ())
?(onerror = fun e -> (raise e : unit))
(ftpm : ftp_method) =
if not self#still_running then
failwith "Ftp_client.ftp_client: Engine has already finished";
Queue.add (ftpm, onsuccess, onerror) queue;
if current = None then self # next();
method run () = Unixqueue.run event_system
end
let () =
Netsys_signal.init()