Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: ftp_client.ml 1275 2009-10-11 00:48:52Z gerd $ *)

open Telnet_client
open Ftp_data_endpoint
open Printf

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Ftp_client" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Ftp_client" Debug.enable

let () =
  Netlog.Debug.register_module "Ftp_client" Debug.enable


exception FTP_error of exn
exception FTP_protocol_violation of string

let proto_viol s =
  raise(FTP_protocol_violation s)

let () =
  Netexn.register_printer
    (FTP_error Not_found)
    (fun e ->
       match e with
	 | FTP_error e' ->
	     "Ftp_client.FTP_error(" ^ Netexn.to_string e' ^ ")"
	 | _ -> assert false
    )


type cmd_state =
    [ `Init
    | `Success
    | `Proto_error
    | `Temp_failure
    | `Perm_failure
    | `Rename_seq
    | `Restart_seq
    | `User_pass_seq
    | `User_acct_seq
    | `Pass_acct_seq
    | `Preliminary
    ]

type port =
    [ `Active of string * int * Unix.file_descr
    | `Passive of string * int
    | `Unspecified
    ]

type form_code =
    [ `Non_print | `Telnet | `ASA ]

type representation =
    [ `ASCII of form_code option
    | `EBCDIC of form_code option
    | `Image
    ]

type structure =
    [ `File_structure
    | `Record_structure
    ]

type transmission_mode =
    Ftp_data_endpoint.transmission_mode

type ftp_state =
    { cmd_state : cmd_state;
      ftp_connected : bool;
      ftp_data_conn : bool;
      ftp_user : string option;
      ftp_password : string option;
      ftp_account : string option;
      ftp_logged_in : bool;
      ftp_port : port;
      ftp_repr : representation;
      ftp_structure : structure;
      ftp_trans : transmission_mode;
      ftp_dir : string list;
      ftp_features : (string * string option) list option;
      ftp_options : (string * string option) list;
    }


type cmd =
    [ `Connect
    | `Dummy
    | `USER of string
    | `PASS of string
    | `ACCT of string
    | `CWD of string
    | `CDUP
    | `SMNT of string
    | `QUIT
    | `REIN
    | `PORT
    | `PASV
    | `TYPE of representation
    | `STRU of structure
    | `MODE of transmission_mode
    | `RETR of string * (ftp_state -> Ftp_data_endpoint.local_receiver)
    | `STOR of string * (ftp_state -> Ftp_data_endpoint.local_sender)
    | `STOU of (unit -> Ftp_data_endpoint.local_sender)
    | `APPE of string * (ftp_state -> Ftp_data_endpoint.local_sender)
    | `ALLO of int * int option
    | `REST of string
    | `RNFR of string
    | `RNTO of string
    | `DELE of string
    | `RMD of string
    | `MKD of string
    | `PWD
    | `LIST of string option * (ftp_state -> Ftp_data_endpoint.local_receiver)
    | `NLST of string option * (ftp_state -> Ftp_data_endpoint.local_receiver)
    | `SITE of string
    | `SYST
    | `STAT of string option
    | `HELP of string option
    | `NOOP
    | `FEAT
    | `OPTS of string * string option
    | `MDTM of string
    ]

let string_of_cmd =
  function
    | `Connect -> ""
    | `Dummy -> ""
    | `USER s -> "USER " ^ s ^ "\r\n"
    | `PASS s -> "PASS " ^ s ^ "\r\n"
    | `ACCT s -> "ACCT " ^ s ^ "\r\n"
    | `CWD s  -> "CWD " ^ s ^ "\r\n"
    | `CDUP   -> "CDUP\r\n"
    | `SMNT s -> "SMNT " ^ s ^ "\r\n"
    | `QUIT   -> "QUIT\r\n"
    | `REIN   -> "REIN\r\n"
    | `PORT   -> assert false   (* not done here *)
    | `PASV   -> "PASV\r\n"
    | `TYPE t -> "TYPE " ^ 
	         ( match t with
		     | `ASCII None -> "A"
		     | `ASCII (Some `Non_print) -> "A N"
		     | `ASCII (Some `Telnet) -> "A T"
		     | `ASCII (Some `ASA) -> "A C"
		     | `EBCDIC None -> "E"
		     | `EBCDIC (Some `Non_print) -> "E N"
		     | `EBCDIC (Some `Telnet) -> "E T"
		     | `EBCDIC (Some `ASA) -> "E C"
		     | `Image -> "I"
		 ) ^ "\r\n"
    | `STRU `File_structure -> "STRU F\r\n"
    | `STRU `Record_structure -> "STRU R\r\n"
    | `MODE `Stream_mode -> "MODE S\r\n"
    | `MODE `Block_mode -> "MODE B\r\n"
    | `RETR (s,_) -> "RETR " ^ s ^ "\r\n"
    | `STOR (s,_) -> "STOR " ^ s ^ "\r\n"
    | `STOU _     -> "STOU\r\n"
    | `APPE (s,_) -> "APPE " ^ s ^ "\r\n"
    | `ALLO(n,r)  -> "ALLO " ^ string_of_int n ^ 
	             (match r with
			| None -> ""
			| Some m -> " R " ^ string_of_int m) ^ "\r\n"
    | `REST s -> "REST " ^ s ^ "\r\n"
    | `RNFR s -> "RNFR " ^ s ^ "\r\n"
    | `RNTO s -> "RNTO " ^ s ^ "\r\n"
    | `DELE s -> "DELE " ^ s ^ "\r\n"
    | `RMD  s -> "RMD " ^ s ^ "\r\n"
    | `MKD  s -> "MKD " ^ s ^ "\r\n"
    | `PWD    -> "PWD\r\n"
    | `LIST(None,_) -> "LIST\r\n"
    | `LIST(Some s,_) -> "LIST " ^ s ^ "\r\n"
    | `NLST(None,_) -> "NLST\r\n"
    | `NLST(Some s,_) -> "NLST " ^ s ^ "\r\n"
    | `SITE s -> "SITE " ^ s ^ "\r\n"
    | `SYST   -> "SYST\r\n"
    | `STAT None -> "STAT\r\n"
    | `STAT(Some s) -> "STAT " ^ s ^ "\r\n"
    | `HELP None -> "HELP\r\n"
    | `HELP(Some s) -> "HELP " ^ s ^ "\r\n"
    | `NOOP -> "NOOP\r\n"
    | `FEAT -> "FEAT\r\n"
    | `OPTS (cmd,None) -> "OPTS " ^ cmd ^ "\r\n"
    | `OPTS (cmd,Some param) -> "OPTS " ^ cmd ^ " " ^ param ^ "\r\n"
    | `MDTM s -> "MDTM " ^ s ^ "\r\n"


let port_re = Netstring_pcre.regexp ".*[^0-9](\\d+),(\\d+),(\\d+),(\\d+),(\\d+),(\\d+)"

let extract_port s =
  match Netstring_pcre.string_match port_re s 0 with
    | None ->
	proto_viol "Cannot parse specification of passive port"
    | Some m ->
	let h1 = Netstring_pcre.matched_group m 1 s in
	let h2 = Netstring_pcre.matched_group m 2 s in
	let h3 = Netstring_pcre.matched_group m 3 s in
	let h4 = Netstring_pcre.matched_group m 4 s in
	let p1 = Netstring_pcre.matched_group m 5 s in
	let p2 = Netstring_pcre.matched_group m 6 s in
	let p = int_of_string p1 * 256 + int_of_string p2 in
	(h1 ^ "." ^ h2 ^ "." ^ h3 ^ "." ^ h4, p)

let addr_re = Netstring_pcre.regexp "^(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)$"

let format_port (addr,p) =
  match Netstring_pcre.string_match addr_re addr 0 with
    | None ->
	failwith "Bad IP address"
    | Some m ->
	let h1 = Netstring_pcre.matched_group m 1 addr in
	let h2 = Netstring_pcre.matched_group m 2 addr in
	let h3 = Netstring_pcre.matched_group m 3 addr in
	let h4 = Netstring_pcre.matched_group m 4 addr in
	let p1 = string_of_int(p lsr 8) in
	let p2 = string_of_int(p land 0xff) in
	h1 ^ "," ^ h2 ^ "," ^ h3 ^ "," ^ h4 ^ "," ^ p1 ^ "," ^ p2

let set_ftp_port state value =
  ( match state.ftp_port with
      | `Active(_,_,fd) ->
	  Netlog.Debug.release_fd fd;
	  Unix.close fd
      | _ ->
	  ()
  );
  { state with ftp_port = value }


let feature_line_re = Netstring_pcre.regexp "^ ([\x21-\xff]+)( .*)?$"

let line_re = Netstring_pcre.regexp "\r?\n"

let parse_features s =
  let lines = Netstring_pcre.split line_re s in
  List.flatten
    (List.map
       (fun line ->
	  match Netstring_pcre.string_match feature_line_re line 0 with
	    | None -> []
	    | Some m ->
		let label = Netstring_pcre.matched_group m 1 line in
		let param = 
		  try Some(Netstring_pcre.matched_group m 2 line)
		  with Not_found -> None in
		[ label, param ]
       )
       lines)

type reply = int * string
    (* Reply code plus text *)


let string_of_state =
  function
    | `Init -> "Init"
    | `Success -> "Success"
    | `Proto_error -> "Proto_error"
    | `Temp_failure -> "Temp_failure"
    | `Perm_failure -> "Perm_failure"
    | `Rename_seq -> "Rename_seq"
    | `Restart_seq -> "Restart_seq"
    | `User_pass_seq -> "User_pass_seq"
    | `User_acct_seq -> "User_acct_seq"
    | `Pass_acct_seq -> "Pass_acct_seq"
    | `Preliminary -> "Preliminary"

let init_state s =
  let _s_name =
    match Unix.getsockname s with
      | Unix.ADDR_INET(addr,_) ->
	  Unix.string_of_inet_addr addr
      | _ ->
	  failwith "Not an internet socket"
  in
  { cmd_state = `Init;
    ftp_connected = true;
    ftp_data_conn = false;
    ftp_user = None;
    ftp_password = None;
    ftp_account = None;
    ftp_logged_in = false;
    ftp_port = `Unspecified;
    ftp_repr = `ASCII None;
    ftp_structure = `File_structure;
    ftp_trans = `Stream_mode;
    ftp_dir = [];
    ftp_features = None;
    ftp_options = [];
  }


let start_reply_re = Netstring_pcre.regexp "^[0-9][0-9][0-9]-"
let end_reply_re = Netstring_pcre.regexp "^[0-9][0-9][0-9] "


let is_active state =
  match state.ftp_port with
    | `Active _ -> true
    | _ -> false

let is_passive state =
  match state.ftp_port with
    | `Passive _ -> true
    | _ -> false


class work_engine e =
object
  method is_working =
    match e # state with
      | `Working _ -> true
      | _ -> false
  method abort() =
    match e # state with
      | `Working _ -> e # abort()
      | _ -> ()
end


exception Dummy


class ftp_client_pi
        ?(event_system = Unixqueue.create_unix_event_system())
        ?(onempty = fun _ -> ())
        ?(onclose = fun () -> ())
        ?(onerrorstate = fun _ -> ())
        ?(onusererror = fun _ -> ())
        sock =
  let ctrl_input_buffer = Netbuffer.create 500 in
  let ctrl_input, ctrl_input_shutdown = 
    Netchannels.create_input_netbuffer ctrl_input_buffer in
  let peer_str = 
    try Netsys.string_of_sockaddr (Netsys.getpeername sock)
    with _ -> "(noaddr)" in
object(self)

  val queue = Queue.create()

  val mutable state = `Working 0

  val mutable ftp_state = init_state sock

  val mutable data_engine = None
  val mutable work_engines = ( [] : work_engine list)

  val ctrl = new telnet_session

  val reply_text = Buffer.create 200
  val mutable reply_code = (-1)
  val mutable reply_callback = (fun _ _ -> ())

  val mutable interaction_state = 
    ( `Waiting `Connect 
	: [ `Ready 
          | `Connecting_pasv of cmd * Uq_engines.connect_status Uq_engines.engine
	  | `Listening_actv of cmd * Unixqueue.event Uq_engines.engine
          | `Transfer of cmd
	  | `Transfer_replied of cmd * int * string
	  | `Waiting of cmd
	  ] )
      (* `Ready: another command can be sent now
       * `Connecting_pasv: In passive mode, we are connecting to the
       *    remote port (done by the argument engine). This state is
       *    skipped when we are still connected.
       * `Listening_actv: In active mode, we are listening for the
       *    connect (done by the argument engine). This state is
       *    skipped when we are still connected.
       * `Transfer: a data transfer is in progress
       * `Transfer_replied: while the rest of the transfer is not yet
       *    done, the server already sent a reply
       * `Waiting: it is waited for the reply
       *)



  initializer (
    ctrl # set_connection (Telnet_socket sock);
    ctrl # set_event_system event_system;
    ctrl # set_callback self#receive_ctrl_reply;
    ctrl # set_exception_handler self#catch_telnet_exception;
    ctrl # attach();
  )

  method ftp_state = ftp_state

  method is_empty = Queue.is_empty queue

  method state = (state : unit Uq_engines.engine_state)

  method event_system = event_system

  method abort() =
    ctrl # reset();
    self # close_connection();
    self # set_state `Aborted

  method private catch_telnet_exception e =
    ctrl # reset();
    self # close_connection();
    self # set_state (`Error(Telnet_protocol e))

  method private set_state s =
    state <- s;
    match s with
      | `Error e -> onerrorstate e
      | _ -> ()

  method private set_error e =
    ctrl # reset();
    self # close_connection();
    self # set_state (`Error e)

  method private protect f =
    (* Run [f ()] and catch exceptions *)
    try
      f()
    with
	err ->
	  ctrl # reset();
	  self # close_connection();
	  self # set_state (`Error err)
      

  method private clean_work_engines() =
    work_engines <- List.filter (fun e -> e # is_working) work_engines


  method private receive_ctrl_reply got_synch =
    while not (Queue.is_empty ctrl#input_queue) do
      let tc = Queue.take ctrl#input_queue in
      match tc with
	| Telnet_data data ->
	    Netbuffer.add_string ctrl_input_buffer data;
	    self # protect (self # parse_ctrl_reply)

	| Telnet_nop ->
	    ()

	| Telnet_will _
	| Telnet_wont _
	| Telnet_do _
	| Telnet_dont _ ->
	    ctrl # process_option_command tc

	| Telnet_sb _ 
	| Telnet_se ->
	    ()    (* Ignore subnegotiation *)

	| Telnet_eof ->
	    ctrl_input_shutdown();
	    self # protect (self # parse_ctrl_reply)

	| Telnet_timeout ->
	    ()  (* TODO *)

	| _ ->
	    (* Unexpected telnet command *)
	    self # protect (fun () ->
			      proto_viol "Unexpected command on Telnet level")
    done

  method private parse_ctrl_reply() =
    try
      while true do
	let line = ctrl_input # input_line() in  (* or exception! *)
	dlogr (fun () ->
		 sprintf "ctrl received: %s" line);
	if Netstring_pcre.string_match start_reply_re line 0 <> None then (
	  let code = int_of_string (String.sub line 0 3) in
	  if reply_code <> (-1) && reply_code <> code then 
	    proto_viol "Parse error of control message";
	  reply_code <- code;
	  Buffer.add_string reply_text line;
	  Buffer.add_string reply_text "\n";
	)
	else
	  if Netstring_pcre.string_match end_reply_re line 0 <> None then (
	    let code = int_of_string (String.sub line 0 3) in
	    if reply_code <> (-1) && reply_code <> code then
	      proto_viol "Parse error of control message";
	    Buffer.add_string reply_text line;
	    Buffer.add_string reply_text "\n";
	    let text = Buffer.contents reply_text in
	    reply_code <- (-1);
	    Buffer.clear reply_text;
	    self # interpret_ctrl_reply code text;
	  )
	  else (
	    if reply_code = (-1) then
	      proto_viol "Parse error of control message";
	    Buffer.add_string reply_text line;
	    Buffer.add_string reply_text "\n";
	  )
      done
    with
      | Netchannels.Buffer_underrun ->
	  (* No complete line yet *)
	  ()
      | End_of_file ->
	  onclose();
	  self # set_state (`Done ())

  method private interpret_ctrl_reply code text =
    (* This method is called whenever a reply has been completely received.
     * This may happen in a number of situations:
     * - As greeting message
     * - As regular response to a sent FTP command
     * - When the data transfer is over. Note that the control response
     *   may be received before the end of the transfer is seen by the
     *   client.
     * - Within the data transfer
     * - At any other point in time, but this is regarded as protocol error
     *)
    let reply st cmd_state =
      let st' = { st with cmd_state = cmd_state } in
      ftp_state <- st';
      dlogr (fun () ->
	       sprintf "state: %s" (string_of_state cmd_state));
      try
	reply_callback st' (code,text)
      with
	  err ->
	    onusererror err
    in
    let ready() =
      interaction_state <- `Ready in
    ( match interaction_state with
	| `Ready ->
	    proto_viol "Spontaneous control message"
	| `Waiting `Connect ->
	    (match code with
	       | 120 -> reply ftp_state `Preliminary
	       | 220 -> ready(); reply ftp_state `Success
	       | n when n >= 400 && n <= 499 ->
		   ready(); reply ftp_state `Temp_failure
	       | n when n >= 500 && n <= 599 ->
		   ready(); reply ftp_state `Perm_failure
	       | _   -> proto_viol "Unexpected control message"
	    )
	| `Waiting `Dummy ->
	    ready(); reply ftp_state `Success
	| `Waiting (`USER s) ->
	    ( match code with
		| 230 -> 
		    ready(); 
                    reply { ftp_state with 
			      ftp_user = Some s;
			      ftp_password = None;
			      ftp_account = None;
			      ftp_logged_in = true } `Success
		| 530 -> 
		    ready();
		    reply { ftp_state with
			      ftp_logged_in = false } `Perm_failure
		| 331 -> 
		    ready(); 
		    reply { ftp_state with 
			      ftp_user = Some s;
			      ftp_password = None;
			      ftp_account = None;
			      ftp_logged_in = false } `User_pass_seq
		| 332 -> 
		    ready();
		    reply { ftp_state with 
			      ftp_user = Some s;
			      ftp_password = None;
			      ftp_account = None;
			      ftp_logged_in = false } `User_acct_seq
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`PASS s) ->
	    ( match code with
		| 202 | 230 -> 
		    ready(); 
		    reply { ftp_state with
			      ftp_password = Some s;
			      ftp_account = None;
			      ftp_logged_in = true } `Success
		| 530 ->
		    ready();
		    reply { ftp_state with
			      ftp_logged_in = false } `Perm_failure
		| 332 ->  
		    ready();
		    reply { ftp_state with 
			      ftp_password = Some s;
			      ftp_account = None;
			      ftp_logged_in = false } `Pass_acct_seq
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`ACCT s) ->
	    ( match code with
		| 202 | 230 -> 
		    ready(); 
		    reply { ftp_state with
			      ftp_account = Some s;
			      ftp_logged_in = true } `Success
		| 530 -> 
		    ready();
		    reply { ftp_state with
			      ftp_logged_in = false } `Perm_failure
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`CWD s) ->
	    ( match code with
		| 200 | 250 -> 
		    ready(); 
		    let ftp_state' =
		      { ftp_state with ftp_dir = s :: ftp_state.ftp_dir } in
		    reply ftp_state' `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting `CDUP ->
	    ( match code with
		| 200 | 250 -> 
		    ready();
		    let ftp_state' =
		      match ftp_state.ftp_dir with
			| [] -> ftp_state
			| _ :: dir ->
			    { ftp_state with ftp_dir = dir } in
		    reply ftp_state' `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting `REIN ->
	    ( match code with
		| 120 -> 
		    reply ftp_state `Preliminary
		| 220 -> 
		    ready(); reply (init_state sock) `Success
		    (* CHECK: Close data connection? *)
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting `PASV ->
	    ( match code with
		| 227 ->
		    let (addr,port) = extract_port text in
		    ready();
		    self # close_connection();
		    reply (set_ftp_port ftp_state (`Passive(addr,port))) `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`TYPE t) ->
	    ( match code with
		| n when n >= 200 && n <= 299 ->
		    ready();
		    reply { ftp_state with ftp_repr = t } `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`MODE m) ->
	    ( match code with
		| n when n >= 200 && n <= 299 ->
		    ready();
		    reply { ftp_state with ftp_trans = m } `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`STRU s) ->
	    ( match code with
		| n when n >= 200 && n <= 299 ->
		    ready();
		    reply { ftp_state with ftp_structure = s } `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`REST _) ->
	    ( match code with
		| 350 ->
		    ready();
		    reply ftp_state `Restart_seq
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`RNFR _) ->
	    ( match code with
		| 350 ->
		    ready(); reply ftp_state `Rename_seq
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting `FEAT ->
	    ( match code with
		| 211 ->
		    let l = parse_features text in
		    ready(); 
		    reply { ftp_state with ftp_features = Some l } `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`OPTS(cmd,param_opt)) ->
	    ( match code with
		| n when n >= 200 && n <= 299 ->
		    let l =
		      (cmd, param_opt) ::
			(List.filter
			   (fun (cmd',_) -> 
			      String.lowercase cmd <> String.lowercase cmd')
			   ftp_state.ftp_options) in
		    ready(); 
		    reply { ftp_state with ftp_options = l } `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	| `Waiting (`SMNT _)
	| `Waiting `QUIT
	| `Waiting `PORT
	| `Waiting (`ALLO _)
	| `Waiting (`RNTO _)
	| `Waiting (`DELE _)
	| `Waiting (`RMD _)
	| `Waiting (`MKD _)
	| `Waiting `PWD 
 	| `Waiting `SYST
 	| `Waiting (`STAT _)
	| `Waiting (`HELP _)
	| `Waiting (`SITE _)
	| `Waiting (`MDTM _)
            (* See http://www.ietf.org/internet-drafts/draft-ietf-ftpext-mlst-16.txt *)
	| `Waiting `NOOP ->
	    ( match code with
		| n when n >= 100 && n <= 199 ->
		    reply ftp_state `Preliminary
		| n when n >= 200 && n <= 299 ->
		    ready(); reply ftp_state `Success
		| n when n >= 400 && n <= 499 ->
		    ready(); reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    ready(); reply ftp_state `Perm_failure
		| _   -> 
		    proto_viol "Unexpected control message"
	    )
	(* `STOR |`STOU | `APPE left out for now *)

	| `Connecting_pasv(_, conn_engine) ->
	    (* Somewhat unexpected reply! *)
	    ( match code with
		| 125 | 150 ->
		    reply ftp_state `Preliminary
		| n when n >= 400 && n <= 499 ->
		    conn_engine # abort();
		    ready();
		    reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    conn_engine # abort();
		    ready();
		    reply ftp_state `Perm_failure
		| _ ->
		    conn_engine # abort();
		    proto_viol "Unexpected control message"
	    )
	| `Listening_actv(_, acc_engine) ->
	    (* Somewhat unexpected reply! *)
	    ( match code with
		| 125 | 150 ->
		    reply ftp_state `Preliminary
		| n when n >= 400 && n <= 499 ->
		    acc_engine # abort();
		    ready();
		    reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    acc_engine # abort();
		    ready();
		    reply ftp_state `Perm_failure
		| _ ->
		    acc_engine # abort();
		    proto_viol "Unexpected control message"
	    )
	| `Transfer cmd ->
	    (* The transfer probably ends in the near future, just record
             * the reply, and wait for the end of the transfer.
             *)
	    ( match code with
		| 125 | 150 ->
		    reply ftp_state `Preliminary
		| n when n >= 400 && n <= 499 ->
		    self # close_connection();
		    ready();
		    reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    self # close_connection();
		    ready();
		    reply ftp_state `Perm_failure
		| _ ->
		    interaction_state <- `Transfer_replied(cmd,code,text)
	    )
	| `Transfer_replied cmd ->
	    (* Another reply! This is an error. *)
	    proto_viol "Unexpected control message"
	| `Waiting(`RETR(_,f))
	| `Waiting(`LIST(_,f))
	| `Waiting(`NLST(_,f)) ->
	    (* This state is only possible when the transfer has already
             * been completed.
             *)
	    ( match data_engine with
		| None -> ()  (* strange *)
		| Some e -> 
		    if e # descr_state <> `Clean then self # close_connection()
	    );
	    ( match code with
		| 125 | 150 ->
		    reply ftp_state `Preliminary
		| 226 ->
		    self # close_connection();
 		    ready();
		    reply ftp_state `Success
		| 250 ->
 		    ready();
		    reply ftp_state `Success
		| n when n >= 400 && n <= 499 ->
		    self # close_connection();
		    reply ftp_state `Temp_failure
		| n when n >= 500 && n <= 599 ->
		    self # close_connection();
		    reply ftp_state `Perm_failure
		| _ ->
		    proto_viol "Unexpected control message"
	    )
	| _ -> assert false

    );
    self # send_command_when_ready()


  method private send_command_when_ready() =
    if interaction_state = `Ready then (
      try
	assert(reply_code = (-1));
	let (cmd, onreply) = Queue.take queue in  (* or Queue.Empty *)
	interaction_state <- `Waiting cmd;
	( match cmd with
	    | `RETR(_,f)
	    | `LIST(_,f)
	    | `NLST(_,f) ->
		( match ftp_state.ftp_port with
		    | `Passive(_,_) ->
			(* In passive mode, connect now: *)
			self # setup_passive_receiver cmd f
			
		    | `Active(_,_,_) ->
			(* In active mode, accept the connection now *)
			self # setup_active_receiver cmd f

		    | `Unspecified ->
			failwith "FTP client: Usage error, one must send `PORT or `PASV before the transfer"
		)
	    | _ -> ()
	);
	let line = 
	  match cmd with
	    | `PORT ->
		let addr =  (* of control connection *)
		  match Unix.getsockname sock with
		    | Unix.ADDR_INET(addr,_) -> addr
		    | _ -> assert false in
		let addr_str = Unix.string_of_inet_addr addr in
		let dom = Netsys.domain_of_inet_addr addr in
		let server_sock = Unix.socket dom Unix.SOCK_STREAM 0 in
		Unix.bind server_sock (Unix.ADDR_INET(addr,0));
		Unix.listen server_sock 1;
		Netlog.Debug.track_fd
		  ~owner:"Ftp_client"
		  ~descr:("Data server (active mode) for " ^ 
			    peer_str)
		  server_sock;
		let port =
		  match Unix.getsockname server_sock with
		    | Unix.ADDR_INET(_,port) -> port
		    | _ -> assert false in
		dlogr (fun () ->
			 sprintf "created data server (active mode) \
                                  listening for %s:%d"
			   addr_str port);
		ftp_state <- set_ftp_port ftp_state (`Active(addr_str,port,server_sock));
		let port_str = format_port (addr_str,port) in
		"PORT " ^ port_str ^ "\r\n"
	    | _ -> string_of_cmd cmd in
	reply_callback <- onreply;
	if line <> "" then (
	  dlogr (fun () ->
		   sprintf "ctrl sent: %s" line);
	  Queue.push (Telnet_data line) ctrl#output_queue;
	  ctrl # update();
	) else (
	  dlog "ctrl sent: DUMMY";
	  raise Dummy
	)
      with
	| Queue.Empty ->
	    onempty ftp_state
	| Dummy ->
	    self # interpret_ctrl_reply 200 "200 DUMMY"
    )


  method private close_connection() =
    (* Terminates any transfer immediately and closes the connection *)
    ( match data_engine with
	| None -> ()
	| Some e -> 
	    dlogr (fun () -> "aborting data transfer");
	    ( match e # state with
		| `Working _ -> e # abort()
		| _ -> ()
	    );
	    let data_sock = e # descr in
	    Netlog.Debug.release_fd data_sock;
	    Unix.close data_sock;
	    data_engine <- None;
    );
    ftp_state <- set_ftp_port ftp_state `Unspecified;
    List.iter (fun e -> e # abort()) work_engines;
    work_engines <- []


  method private setup_passive_receiver cmd f =
    assert(is_passive ftp_state);
    ( match data_engine with
	| Some e ->
	    (* Continue with the old connection *)
	    if e # descr_state <> `Clean then
	      proto_viol "Data connection not clean";
	    (* Create a new engine taking the connection over *)
	    let data_sock = e # descr in
	    dlogr (fun () ->
		     sprintf "reusing passive-mode data connection");
	    self # setup_receiver 
	      ~is_done:(fun () ->
			  match interaction_state with
			    | `Transfer_replied(_,c,t) ->
				interaction_state <-
				  `Waiting cmd;
				self # interpret_ctrl_reply c t
			    | `Transfer cmd ->
				interaction_state <-
				  `Waiting cmd
			    | _ -> assert false
		       )
	      ~is_error:(fun err ->
			   self # set_error (FTP_error err))
	      f data_sock;
	    interaction_state <- `Transfer cmd
	| None ->
	    (* Indicates that a connection is to be opened *)
	    let addr,port =
	      match ftp_state.ftp_port with
		| `Passive(a,p) -> a,p | _ -> assert false in
	    let conn_engine =
	      Uq_engines.connector
		(`Socket(
		   `Sock_inet(Unix.SOCK_STREAM,
			      Unix.inet_addr_of_string addr,
			      port),
		   Uq_engines.default_connect_options))
		event_system in
	    Uq_engines.when_state
	      ~is_done:(function
			  | `Socket(data_sock,_) ->
			      dlogr (fun () ->
				       sprintf "passive-mode data connection \
                                                to %s:%d established"
					 addr port);
			      Netlog.Debug.track_fd
				~owner:"Ftp_client"
				~descr:("Data connection (passive mode) for "^ 
					  peer_str)
				data_sock;
			      self # setup_receiver 
				~is_done:(fun () ->
					    match interaction_state with
					      | `Transfer_replied(_,c,t) ->
						  interaction_state <-
						    `Waiting cmd;
						  self # interpret_ctrl_reply c t
					      | `Transfer cmd ->
						  interaction_state <-
						    `Waiting cmd
					      | _ -> assert false
					 )
				~is_error:(fun err ->
					     self # set_error (FTP_error err))
				f data_sock;
			      interaction_state <- `Transfer cmd;
			      self # clean_work_engines();
			  | _ -> assert false
		       )
	      ~is_error:(fun err -> 
			   self # clean_work_engines();
			   self # set_error (FTP_error err))
	      conn_engine;
	    work_engines <- new work_engine conn_engine :: work_engines;
	    interaction_state <- `Connecting_pasv(cmd,conn_engine);
    )

  method private setup_active_receiver cmd f =
    assert(is_active ftp_state);
    ( match data_engine with
	| Some e ->
	    (* Continue with the old connection *)
	    if e # descr_state <> `Clean then
	      proto_viol "Data connection not clean";
	    (* Create a new engine taking the connection over *)
	    dlogr (fun () ->
		     sprintf "reusing active-mode data connection");
	    let data_sock = e # descr in
	    self # setup_receiver 
	      ~is_done:(fun () ->
			  match interaction_state with
			    | `Transfer_replied(_,c,t) ->
				interaction_state <-
				  `Waiting cmd;
				self # interpret_ctrl_reply c t
			    | `Transfer cmd ->
				interaction_state <-
				  `Waiting cmd
			    | _ -> assert false
		       )
	      ~is_error:(fun err ->
			   self # set_error (FTP_error err))
	      f data_sock;
	    interaction_state <- `Transfer cmd
	| None ->
	    (* Indicates that a connection is to be opened *)
	    let addr,port,server_sock =
	      match ftp_state.ftp_port with
		| `Active(a,p,fd) -> a,p,fd | _ -> assert false in
	    let acc_engine =
	      new Uq_engines.poll_engine 
		[ (Unixqueue.Wait_in server_sock), (-1.0) ] event_system in
	    Uq_engines.when_state
	      ~is_done:(function
			  | Unixqueue.Input_arrived(_,_) ->
			      let data_sock, _ = Unix.accept server_sock in
			      dlogr (fun () ->
				       sprintf "accepted new active-mode \
                                                data connection");
			      Netlog.Debug.track_fd
				~owner:"Ftp_client"
				~descr:("Data connection (active mode) for " ^ 
					  peer_str)
				data_sock;
			      self # setup_receiver 
				~is_done:(fun () ->
					    match interaction_state with
					      | `Transfer_replied(_,c,t) ->
						  interaction_state <-
						    `Waiting cmd;
						  self # interpret_ctrl_reply c t
					      | `Transfer cmd ->
						  interaction_state <-
						    `Waiting cmd
					      | _ -> assert false
					 )
				~is_error:(fun err ->
					     self # set_error (FTP_error err))
				f data_sock;
			      interaction_state <- `Transfer cmd;
			      self # clean_work_engines()
			  | _ ->
			      assert false
		       )
	      ~is_error:(fun err -> 
			   self # clean_work_engines();
			   self # set_error (FTP_error err))
	      acc_engine;
            let acc_engine = (acc_engine :> Unixqueue.event Uq_engines.engine) in
	    work_engines <- new work_engine acc_engine :: work_engines;
	    interaction_state <- `Listening_actv(cmd,acc_engine);
    )


  method private setup_receiver ~is_done ~is_error f data_sock =
    let local = f ftp_state in
    let e = 
      new ftp_data_receiver
	~esys:event_system
	~mode:ftp_state.ftp_trans
	~local_receiver:local
	~descr:data_sock () in
    data_engine <- Some (e :> ftp_data_engine);
    Uq_engines.when_state 
      ~is_done
      ~is_error
      e

	

  method add_cmd ?(onreply = fun _ _ -> ()) (cmd : cmd) =
    match state with
      | `Working _ ->
	  Queue.push (cmd, onreply) queue;
	  self # protect (self # send_command_when_ready)
      | _ ->
	  failwith "Ftp_client.ftp_client_pi: Connection already terminated"


  method send_abort () = ()

  method run () = Unixqueue.run event_system

end


module Action : sig
  type plan
  type action = plan -> unit

  val ftp_state : plan -> ftp_state

  val empty : action
  val command : cmd -> action
  val dyn_command : (unit -> cmd) -> action
  val seq2 : action -> action -> action
  val full_seq2 : action -> (reply -> action) -> action
  val seq : action list -> action
  val expect : cmd_state -> action -> action
  val seq2_case : action -> (cmd_state * action) list -> action
  val execute : onreply:(ftp_state -> reply -> unit) -> 
                onerror:(ftp_state -> reply -> unit) -> 
                ftp_client_pi -> 
                action ->
                  unit

end = struct
  type plan = 
      { pi : ftp_client_pi;            (* where we are *)
	ftp_state : ftp_state;
	onreply : ftp_state -> reply -> unit; 
                    (* what to do next *)
	onerror : ftp_state -> reply -> unit
		    (* what to do if all else fails *)
      }

  type action =
      plan -> unit

  let ftp_state p = p.ftp_state

  let execute ~onreply ~onerror pi act =
    act
      { pi = pi; 
	ftp_state = pi#ftp_state; 
	onreply = onreply; 
	onerror = onerror
      }

  let empty p =
    p.pi # add_cmd 
      ~onreply:p.onreply
      `Dummy

  let command cmd p =
    p.pi # add_cmd
      ~onreply:(fun ftp_state r ->
		  if ftp_state.cmd_state <> `Preliminary then
		    p.onreply ftp_state r)
      cmd

  let dyn_command f_cmd p =
    command (f_cmd()) p

  let seq2 act1 act2 p =
    act1
      { p with
	  onreply = (fun ftp_state r ->
		       act2
			 { p with
			     ftp_state = ftp_state
			 }
		    )
      }

  let full_seq2 act1 f_act2 p =
    act1
      { p with
	  onreply = (fun ftp_state r ->
		       f_act2
			 r
			 { p with
			     ftp_state = ftp_state
			 }
		    )
      }

  let rec seq act_list p =
    match act_list with
      | [] ->
	  empty p
      | [act] ->
	  act p
      | act :: act_list' ->
	  act
	    { p with
		onreply = (fun ftp_state r ->
			     seq act_list' { p with ftp_state = ftp_state }
			  )
	    }

  let expect s act p =
    act
      { p with
	  onreply = (fun ftp_state r ->
		       if ftp_state.cmd_state = s then
			 p.onreply ftp_state r
		       else
			 p.onerror ftp_state r
		    )
      }

  let seq2_case act cases p =
    act 
      { p with
	  onreply = (fun ftp_state r ->
		       match
			 try
			   Some(List.assoc ftp_state.cmd_state cases)
			 with
			     Not_found -> None
		       with
			 | Some act' ->
			     act' { p with ftp_state = ftp_state }
			 | None ->
			     p.onerror ftp_state r
		    )
      }

end


class type ftp_method =
object
  method connect : (string * int) option
  method execute : Action.action
end


exception FTP_method_temp_failure of int * string
exception FTP_method_perm_failure of int * string
exception FTP_method_unexpected_reply of int * string


class connect_method ~host ?(port = 21) () : ftp_method =
object(self)
  method connect = Some(host,port)
  method execute = Action.empty
end


class login_method ~user ~get_password ~get_account () : ftp_method =
object(self)
  method connect = None
  method execute =
    Action.seq2_case
      (Action.command (`USER user))
      [ `Success, Action.empty;
	`User_pass_seq, ( Action.seq2_case
			    (Action.dyn_command
			       (fun () -> `PASS (get_password())))
			    [ `Success, Action.empty;
			      `Pass_acct_seq, ( Action.expect `Success 
						  (Action.dyn_command 
						     (fun () ->
							`ACCT (get_account())))
					      );
			    ]
			);
	`User_acct_seq, ( Action.expect `Success 
			    (Action.dyn_command 
			       (fun () -> `ACCT (get_account())))
			);
      ]
end


let slash_re = Netstring_pcre.regexp "/+";;


let rec basename l =
  match l with
    | [] -> failwith "Bad filename"
    | [name] -> name
    | _ :: l' -> basename l'


let rec dirname l =
  match l with
    | [] -> []
    | [name] -> []
    | dir :: l' -> dir :: dirname l'


let rec is_prefix l1 l2 =
  match (l1, l2) with
    | (x1::l1'), (x2::l2') ->
	x1 = x2 && is_prefix l1' l2' 
    | [], _ ->
	true
    | (_::_), [] ->
	false


let rec without_prefix l1 l2 =
  match (l1, l2) with
    | (x1::l1'), (x2::l2') ->
	if x1 = x2 then without_prefix l1' l2' else failwith "without_prefix"
    | [], _ ->
	l2
    | (_::_), [] ->
	failwith "without_prefix"


class walk_method (destination : [ `File of string | `Dir of string | `Stay ] )
                  : ftp_method =
object(self)
  method connect = None
    
  method execute =
    let rec walk_to_directory path p =
      let ftp_state = Action.ftp_state p in
      let cur_dir = List.rev ftp_state.ftp_dir in
      if is_prefix cur_dir path then
	match without_prefix cur_dir path with
	  | [] ->
	      Action.empty p
	  | dir :: _  ->
	      Action.seq2
		(Action.expect
		   `Success
		   (Action.command (`CWD dir)))
		(walk_to_directory path)
		p
      else
	Action.seq2
	  (Action.expect
	     `Success
	     (Action.command `CDUP))
	  (walk_to_directory path)
	  p
    in

    match destination with
      | `File name ->
	  let rpath = List.rev (Netstring_pcre.split slash_re name) in
	  ( match rpath with
	      | _ :: dir -> walk_to_directory (List.rev dir)
	      | [] -> failwith "Bad filename"
	  )
      | `Dir name ->
	  let path = Netstring_pcre.split slash_re name in
	  walk_to_directory path
      | `Stay ->
	  Action.empty
end



type filename =
    [ `NVFS of string
    | `Verbatim of string
    ]


let destination_of_file = 
  function
    | `NVFS name -> `File name
    | `Verbatim _ -> `Stay

let destination_of_dir = 
  function
    | `NVFS name -> `Dir name
    | `Verbatim _ -> `Stay

let ftp_filename =
  function
    | `NVFS name -> basename (Netstring_pcre.split slash_re name)
    | `Verbatim name -> name


class file_method ~(perform : string -> ftp_method) 
                  (file : filename) : ftp_method =
  (* Internal class, not exported *)
  let walk = new walk_method (destination_of_file file) in
  let filename = ftp_filename file in
object(self)
  method connect = walk # connect

  method execute =
    Action.seq2
      walk#execute
      ((perform filename) # execute)
end


class dir_method ~(perform : string option -> ftp_method) 
                 (dir : filename) : ftp_method =
  (* Internal class, not exported *)
  let walk = new walk_method (destination_of_dir dir) in
  let filename_opt = 
    match dir with
      | `NVFS name -> None
      | `Verbatim name -> Some name in
object(self)
  method connect = walk # connect

  method execute =
    Action.seq2
      walk#execute
      ((perform filename_opt) # execute)

end


class invoke_method ~command 
                    ~(process_result : ftp_state -> (int * string) -> unit)
                    () : ftp_method =
object(self)
  method connect = None

  method execute =
    Action.full_seq2
      (Action.expect `Success
	 (Action.command command))
      (fun reply p ->
	 let fs = Action.ftp_state p in
	 process_result fs reply)
end    


class set_structure_method structure =
  invoke_method 
    ~command:(`STRU structure) 
    ~process_result:(fun _ _ -> ())
    ()


class set_mode_method mode =
  invoke_method 
    ~command:(`MODE mode) 
    ~process_result:(fun _ _ -> ())
    ()


class mkdir_method file =
  file_method
    ~perform:(fun filename ->
		new invoke_method 
		  ~command:(`MKD filename)
		  ~process_result:(fun _ _ -> ())
                  () )
    file


class rmdir_method file =
  file_method
    ~perform:(fun filename ->
		new invoke_method 
		  ~command:(`RMD filename)
		  ~process_result:(fun _ _ -> ())
                  () )
    file


class delete_method file =
  file_method
    ~perform:(fun filename ->
		new invoke_method 
		  ~command:(`DELE filename)
		  ~process_result:(fun _ _ -> ()) 
                   () )
    file


(* MDTM response is YYYYMMDDHHMMSS[.s+] (GMT) *)

let time_re = Netstring_pcre.regexp ".*[^0-9](\\d{4})(\\d{2})(\\d{2})(\\d{2})(\\d{2})(\\d{2})(?:\\.(\\d+))?"

let extract_time s =
  match Netstring_pcre.string_match time_re s 0 with
  | None ->
      failwith ("Cannot parse time-val: " ^ s)
  | Some m ->
      let fraction =
        try if (Netstring_pcre.matched_group m 7 s).[0] < '5' then 0 else 1
        with Not_found -> 0
      in
      let get_int i = int_of_string (Netstring_pcre.matched_group m i s) in
      Netdate.since_epoch
        { Netdate.year = get_int 1;
          Netdate.month = get_int 2;
          Netdate.day = get_int 3;
          Netdate.hour = get_int 4;
          Netdate.minute = get_int 5;
          Netdate.second = get_int 6 + fraction;
          Netdate.zone = 0;  (* GMT *)
          Netdate.week_day = -1 }


class mdtm_method ~file ~process_result () =
  file_method
    ~perform:(fun filename ->
		new invoke_method 
		  ~command:(`MDTM filename)
		  ~process_result:(fun _ (code,text) -> 
				     let t = extract_time text in
				     process_result t)
		  () )
    file


class rename_method' filename_from filename_to : ftp_method =
object(self)
  method connect = None

  method execute =
    Action.seq2_case
      (Action.command (`RNFR filename_from))
      [ `Rename_seq, (Action.expect `Success 
			(Action.command (`RNTO filename_to))) ]
end


class rename_method ~file_from ~(file_to : filename) () =
  (* Check arguments: *)
  let filename_to =
    match (file_from, file_to) with
      | (`NVFS p1), (`NVFS p2) ->
	  (* p1 and p2 must point to the same directory. Return basename of p2 *)
	  let p1' = Netstring_pcre.split slash_re p1 in
	  let p2' = Netstring_pcre.split slash_re p2 in
	  let d1 = dirname p1' in
	  let d2 = dirname p2' in
	  if d1 <> d2 then invalid_arg "Ftp_client.rename_method";
	  basename p2'
      | (`Verbatim _), (`Verbatim s) -> s
      | _ -> invalid_arg "Ftp_client.rename_method"
  in
  file_method
    ~perform:(fun filename_from ->
		new rename_method' filename_from filename_to
	     )
    file_from



class retrieve_method ~command ~representation () : ftp_method =
object(self)
  method connect = None

  method execute =
    Action.seq
      [Action.expect `Success (Action.command (`TYPE representation));
       Action.seq2_case
	 (Action.command `PASV)
	 [ `Success, Action.empty;
	   `Perm_failure, (Action.expect `Success
			     (Action.command `PORT))
	 ];
       Action.expect `Success (Action.command command)
      ]
end


class get_method ~file ~representation ~store () : ftp_method =
  file_method
    ~perform:(fun filename ->
		new retrieve_method 
		  ~command:(`RETR(filename,store))
		  ~representation ())
    file


class list_method ~dir ~representation ~store () : ftp_method =
  dir_method
    ~perform:(fun filename_opt ->
		new retrieve_method 
		  ~command:(`LIST(filename_opt,store))
		  ~representation ())
    dir


class nlst_method ~dir ~representation ~store () : ftp_method =
  dir_method
    ~perform:(fun filename_opt ->
		new retrieve_method 
		  ~command:(`NLST(filename_opt,store))
		  ~representation ())
    dir


class ftp_client 
        ?(event_system = Unixqueue.create_unix_event_system())
        ?(onempty = fun () -> ())
        () =
object(self)
  val mutable pi_opt = (None : ftp_client_pi option)
  val mutable queue = Queue.create()
  val mutable current = None
  val mutable work_engines = ( [] : work_engine list)

  val mutable state = `Working 0
  val mutable notify_list = []
  val mutable notify_list_new = []

  method state = (state : unit Uq_engines.engine_state)
  method event_system = event_system

  method abort() =
    ( match pi_opt with
	| None -> ()
	| Some pi -> pi # abort()
    );
    List.iter (fun e -> e # abort()) work_engines;
    work_engines <- [];
    self # set_state `Aborted

  method private clean_work_engines() =
    work_engines <- List.filter (fun e -> e # is_working) work_engines

  method request_notification f =
    notify_list_new <- f :: notify_list_new

  method private notify() =
    notify_list <- notify_list @ notify_list_new;
    notify_list_new <- [];
    notify_list <- List.filter (fun f -> f()) notify_list

  method private set_state s =
    if s <> state then (
      state <- s;
      self # notify();
    )

  method private set_error e =
    self # abort();
    self # set_state (`Error e)

  method private still_running =
    match state with
      | `Working _ -> true
      | _ -> false

  method private protect : 'a . ('a -> unit) -> 'a -> unit =
    fun f arg ->
      try
	f arg
      with
	  err ->
	    self # set_error err

  method private next() =
    try
      let (ftpm, onsuccess, onerror) = Queue.take queue in (* or Queue.Empty *)
      current <- Some(ftpm,onsuccess,onerror);
      (* First check whether we have to connect again: *)
      ( match ftpm # connect with
	  | None ->
	      ( match pi_opt with
		  | None -> failwith "Missing connection"
		  | Some pi -> self # exec pi ftpm onsuccess onerror
	      )
	  | Some (host,port) ->
	      (* First stop the current connection *)
	      ( match pi_opt with
		  | None -> ()
		  | Some pi ->
		      pi # add_cmd `QUIT;
		      (* TODO: fallback method if QUIT fails *)
		      self # clean_work_engines();
		      work_engines <- new work_engine pi :: work_engines;
		      pi_opt <- None
	      );
	      let conn_engine =
		Uq_engines.connector
		  (`Socket(
		     `Sock_inet(Unix.SOCK_STREAM,
				Unix.inet_addr_of_string host,
				port),
		     Uq_engines.default_connect_options))
		  event_system in
	      Uq_engines.when_state
		~is_done:(function
			    | `Socket(sock,_) ->
				(* N.B. This socket is fd-tracked by Telnet_client *)
				let pi = 
				  new ftp_client_pi
				    ~event_system
				    ~onerrorstate:self#set_error
				    sock in
				pi_opt <- Some pi;
				self # clean_work_engines();
				self # exec pi ftpm onsuccess onerror
			    | _ -> assert false
			 )
		~is_error:(fun err -> 
			     self # clean_work_engines();
			     self # set_error (FTP_error err))
		~is_aborted:(fun _ -> self # clean_work_engines())
		conn_engine;
	      work_engines <- new work_engine conn_engine :: work_engines
      )
    with
	Queue.Empty ->
	  onempty();
	  ( match pi_opt with
	      | None -> ()
	      | Some pi ->
		  pi # add_cmd `QUIT;
		  (* TODO: fallback method if QUIT fails *)
		  self # clean_work_engines();
		  work_engines <- new work_engine pi :: work_engines;
		  pi_opt <- None;
	  );

	  

  method private exec pi ftpm onsuccess onerror =
    let onreply fs r = 
      self # protect onsuccess ();
      current <- None;
      if self # still_running then self # next()
    in
    let onerror fs (code,text) = 
      let e =
	match fs.cmd_state with
	  | `Temp_failure -> FTP_method_temp_failure (code,text)
	  | `Perm_failure -> FTP_method_perm_failure (code,text)
	  | _             -> FTP_method_unexpected_reply (code,text) in
      self # protect onerror e;
      current <- None;
      if self # still_running then self # next();
    in
    let _ftp_state = pi # ftp_state in
    self # protect (
      fun () ->
	let action = ftpm # execute in
	Action.execute ~onreply ~onerror pi action) ();

(*
  method private pi_got_empty ftp_state =
    current <- None;
    self # next()
 *)
    
  method add ?(onsuccess = fun () -> ()) 
             ?(onerror = fun e -> (raise e : unit)) 
             (ftpm : ftp_method) =
    if not self#still_running then
      failwith "Ftp_client.ftp_client: Engine has already finished";
    Queue.add (ftpm, onsuccess, onerror) queue;
    if current = None then self # next();


  method run () = Unixqueue.run event_system

end


let () =
  Netsys_signal.init()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml