Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: nethttp_client.ml 2195 2015-01-01 12:23:39Z gerd $
 * ----------------------------------------------------------------------
 *
 *)

(* TODO:
   - Also support automatic compression of uploads. Be prepared to get
     a 415 response, and fall back to [identity] (or whatever is available)
     Plan:
     * message: set_content_encoding
     * message, private-api: allow automatic compression in open_value_rd
     * Uq_io: implement filter_in_buffer
     * postprocess: intercept 415 responses, and choose a different
       encoding
 *)


(* Reference documents:
 * RFC 2068, 2616:      HTTP 1.1
 * RFC 2069, 2617:      Digest Authentication
 *)

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Nethttp_client" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Nethttp_client" Debug.enable

let () =
  Netlog.Debug.register_module "Nethttp_client" Debug.enable


open Nethttp_client_conncache
open Printf
open Uq_engines.Operators

exception Bad_message of string;;
exception Http_error of (int * string);;
exception Http_protocol of exn;;
exception Proxy_error of int
exception No_reply;;
exception Too_many_redirections;;
exception Name_resolution_error = Uq_resolver.Host_not_found
exception URL_syntax_error of string
exception Timeout of string
exception Response_too_large

let () =
  Netexn.register_printer
    (Http_protocol Not_found)
    (fun e ->
       match e with
	 | Http_protocol e' ->
	     "Nethttp_client.Http_protocol(" ^ Netexn.to_string e' ^ ")"
	 | _ ->
	     assert false
    );
  Netexn.register_printer
    (Http_error(0,""))
    (fun e ->
       match e with
	 | Http_error(code,text) ->
	     sprintf "Nethttp_client.Http_error(%d,%S)" code text
	 | _ ->
	     assert false
    )
  

let() =
  Netsys_signal.init()


type status =
  [ `Unserved
  | `Http_protocol_error of exn
  | `Successful
  | `Redirection
  | `Client_error
  | `Server_error
  ]

type response_body_storage =
    [ `Memory
    | `File of unit -> string
    | `Body of unit -> Netmime.mime_body
    | `Device of unit -> Uq_io.out_device
    ]

type 'message_class how_to_reconnect =
    Send_again
  | Request_fails
  | Inquire of ('message_class -> bool)
  | Send_again_if_idem
;;

type 'message_class how_to_redirect =
    Redirect
  | Do_not_redirect
  | Redirect_inquire of ('message_class -> bool)
  | Redirect_if_idem
;;

type resolver =
    Unixqueue.unix_event_system -> 
    string -> 
    (Unix.inet_addr option -> unit) -> 
      unit
;;

type counters =
    { mutable new_connections : int;
      mutable timed_out_connections : int;
      mutable crashed_connections: int;
      mutable server_eof_connections : int;
      mutable successful_connections : int;
      mutable failed_connections : int;
    }


let better_unix_error f arg =
  try
    f arg
  with
    Unix.Unix_error (e,syscall,param) ->
      let error = Unix.error_message e in
      if param = "" then
	failwith error
      else
	failwith (param ^ ": " ^ error)


let rec syscall f =
  (* Invoke system call, and handle EINTR *)
  try
    f()
  with
      Unix.Unix_error(Unix.EINTR,_,_) ->
	(* "interrupted system call": A signal happened while the system
	 * blocked.
	 * Simply restart the call.
	 *)
	syscall f
;;


let hex_digits = [| '0'; '1'; '2'; '3'; '4'; '5'; '6'; '7';
		    '8'; '9'; 'a'; 'b'; 'c'; 'd'; 'e'; 'f' |];;

let encode_hex s =
  (* encode with lowercase hex digits *)
  let l = String.length s in
  let t = String.make (2*l) ' ' in
  for n = 0 to l - 1 do
    let x = Char.code s.[n] in
    t.[2*n]   <- hex_digits.( x lsr 4 );
    t.[2*n+1] <- hex_digits.( x land 15 );
  done;
  t
;;


type synchronization =
  | Sync
  | Pipeline of int
;;


let max_pipeline = 8 ;;

let pipeline_blacklist =
  (* Stolen from Mozilla *)
  [ Netstring_str.regexp "Microsoft-IIS/";
    Netstring_str.regexp "Netscape-Enterprise/3.";
  ]
;;


type transport_layer_id = int

type http_options = 
    { synchronization : synchronization;
      maximum_connection_failures : int;
      maximum_message_errors : int;
      inhibit_persistency : bool;
      connection_timeout : float;
      number_of_parallel_connections : int;
      maximum_redirections : int;
      handshake_timeout : float;
      resolver : resolver;
      configure_socket : Unix.file_descr -> unit;
      tls : Netsys_crypto_types.tls_config option;
      schemes : (string * Neturl.url_syntax * int option * transport_layer_id)
	         list;
      verbose_status : bool;
      verbose_request_header : bool;
      verbose_response_header : bool;
      verbose_request_contents : bool;
      verbose_response_contents : bool;
      verbose_connection : bool;
      verbose_events : bool;
    }
;;

type header_kind = [ `Base | `Effective ]

(*
let http_re =
  Netstring_str.regexp
    "^https?://\
     \\(\
       \\([^/:@]+\\)\
       \\(:\\([^/:@]+\\)\\)?@\
     \\)?\
     \\([^/:@]+\\)\
     \\(:\\([0-9]+\\)\\)?\
     \\([/?].*\\)?$"
 *)

let parse_url_0 ?base_url schemes url =
  try
    let sch = 
      try Neturl.extract_url_scheme url 
      with err ->
	( match base_url with
	    | None -> raise err
	    | Some b -> Neturl.url_scheme b
	) in
    let (_,syn,p_opt,trans) = 
      List.find (fun (sch',_,_,_) -> sch=sch') schemes in
    let ht = Hashtbl.create 1 in
    Hashtbl.add ht sch syn;
    let url' = Neturl.fixup_url_string ~escape_hash:true url in
    let nu1 = 
      Neturl.parse_url 
	?base_syntax:(match base_url with
			| None -> None
			| Some b -> Some(Neturl.url_syntax_of_url b)
		     )
	~schemes:ht ~accept_8bits:true url' in
    let nu2 = 
      Neturl.ensure_absolute_url ?base:base_url nu1 in
    (Neturl.default_url ?port:p_opt ~path:[""] nu2, trans)
  with
    | Neturl.Malformed_URL -> raise Not_found

let parse_url ?base_url options url =
  parse_url_0 ?base_url !options.schemes url

(*
  match Netstring_str.string_match http_re url 0 with
    | None ->
	raise Not_found
    | Some m ->
	let is_https =
	  String.sub url 0 5 = "https" in
	let user =
	  try Some(Netstring_str.matched_group m 2 url) 
	  with Not_found -> None in
	let password =
	  try Some(Netstring_str.matched_group m 4 url)
	  with Not_found -> None in
	let host =
	  Netstring_str.matched_group m 5 url in
	let port =
	  try int_of_string(Netstring_str.matched_group m 7 url)
	  with Not_found -> (if is_https then 443 else 80) in
	let path =
	  try Netstring_str.matched_group m 8 url with Not_found -> "" in
	(is_https,user,password,host,port,path)
 *)

let new_trans_id = Nethttp.new_trans_id
let http_trans_id = Nethttp.http_trans_id
let https_trans_id = Nethttp.https_trans_id
let spnego_trans_id = Nethttp.spnego_trans_id
let proxy_only_trans_id = Nethttp.proxy_only_trans_id

let https_list =
  [ https_trans_id; spnego_trans_id ]
  (* by default, we recognize these two transports as TLS transports *)

let is_https trans_id =
  List.mem trans_id https_list




(* For some time we had

   Neturl.url_enable_query = Neturl.Url_part_not_recognized

   I don't remember what the reason was. However, URLs of the form
   scheme://host?query cannot be parsed then. (Rev 1625 did that.)
 *)

let default_schemes =
  let http_syn =
    { (Hashtbl.find Neturl.common_url_syntax "http") with
      Neturl.url_enable_query = Neturl.Url_part_allowed
    } in
  let https_syn =
    { (Hashtbl.find Neturl.common_url_syntax "https") with
      Neturl.url_enable_query = Neturl.Url_part_allowed
    } in
  let ipp_syn =
    { (Hashtbl.find Neturl.common_url_syntax "ipp") with
      Neturl.url_enable_query = Neturl.Url_part_allowed
    } in
  [ "http", http_syn, Some 80, http_trans_id;
    "https", https_syn, Some 443, https_trans_id;
    "ipp", ipp_syn, Some 631, http_trans_id;
  ]


let get_default_port options scheme =
  try
    let (_, _, port_opt, _) =
      List.find (fun (n,_,_,_) -> n = scheme) options.schemes in
    port_opt
  with Not_found -> None


let comma_re = Netstring_str.regexp "[ \t\n\r]*,[ \t\n\r]*" ;;

let split_words_by_commas s =
  Netstring_str.split comma_re s ;;

let space_re = Netstring_str.regexp "[ \t\n\r]+" ;;

let split_words s =
  Netstring_str.split space_re s ;;


let sync_resolver esys name reply =
  (* FIXME: Use Uq_resolver also in the async case! *)
  let addr =
    try
      let h = Uq_resolver.get_host_by_name name in
      Some h.Unix.h_addr_list.(0)
    with Uq_resolver.Host_not_found _ -> None in
  reply addr
;;


(**********************************************************************)
(***                          BUFFERS                               ***)
(**********************************************************************)

(* Similar to Buffer, but may be accessed in a more efficient way. *)

module B = Netbuffer;;

type buffer_type = B.t;;


module Q = 
struct
  (* This queue allows one to insert elements. *)

  type 'a t = 'a Queue.t

  exception Empty = Queue.Empty

  let create = Queue.create

  let add = Queue.add

  let push = add 

  let add_at n x q =
    (* Add x behind the n-the last element of q, i.e.
     * add_at 0 x q = Add behind the last element = add x q
     * add_at 1 x q = Add before the last element
     * ...
     * This algorithm is usually called for n ~ length, and is quite
     * efficient in this case.
     *)
    let l = Queue.length q in
    let q' = Queue.create() in
    for k = 1 to l-n do
      Queue.add (Queue.take q) q'
    done;
    Queue.add x q';
    Queue.transfer q q';
    Queue.transfer q' q

  let take = Queue.take
     
  let pop = take

  let peek = Queue.peek

  let top = peek

  let clear = Queue.clear

  let copy = Queue.copy

  let is_empty = Queue.is_empty

  let length = Queue.length

  let iter = Queue.iter

  let transfer = Queue.transfer

  (* fold: omitted *)

end



(**********************************************************************)
(***                  THE HTTP CALL CONTAINER                       ***)
(**********************************************************************)


let dump_header prefix h =
  List.iter
    (fun (n,v) ->
       dlog (prefix ^ n ^ ": " ^ v))
    h



type 'session auth_state =  (* 'session = auth_session, defined below *)
    [ `None
         (* Authentication has not yet been tried *)
    | `In_advance of 'session
         (* This session had been tried before a 401 response was seen *)
    | `In_reply of 'session * (string * string) list
         (* This session was tried after a 401 response was seen *)
    | `In_reply_reroute of 'session * (string * string) list * int
         (* In_reply + the next request must use a different transport *)
    | `Resubmit of int
         (* Resubmit the call on a different transport *)
    | `Auth_error
         (* Failure *)
    | `OK
         (* The auth protocol ran until the end *)
    ]


(* What is reported to the user: *)
type 'a auth_status =
    [ `Continue of 'a
    | `OK
    | `Auth_error
    | `Reroute of int
    | `Continue_reroute of 'a * int
    | `None
    ]

class type http_call =
object
  method is_served : bool
  method status : status
  method auth_status : unit auth_status
  method tls_session_props : Nettls_support.tls_session_props option
  method gssapi_props : Netsys_gssapi.client_props option
  method request_method : string
  method request_uri : string
  method set_request_uri : string -> unit
(* method is_https : bool *)
  method request_header : header_kind -> Netmime.mime_header
  method set_request_header : Netmime.mime_header -> unit
  method set_expect_handshake : unit -> unit
  method set_chunked_request : unit -> unit
  method effective_request_uri : string
  method request_body : Netmime.mime_body
  method set_request_body : Netmime.mime_body -> unit
  method set_request_device : (unit -> Uq_io.in_device) -> unit
  method set_accept_encoding : unit -> unit
  method response_status_code : int
  method response_status_text : string
  method response_status : Nethttp.http_status
  method response_protocol : string
  method response_header : Netmime.mime_header
  method response_body : Netmime.mime_body
  method response_body_storage : response_body_storage
  method set_response_body_storage : response_body_storage -> unit
  method max_response_body_length : int64
  method set_max_response_body_length : int64 -> unit
  method get_reconnect_mode : http_call how_to_reconnect
  method set_reconnect_mode : http_call how_to_reconnect -> unit
  method get_redirect_mode : http_call how_to_redirect
  method set_redirect_mode : http_call how_to_redirect -> unit
  method proxy_enabled : bool
  method set_proxy_enabled : bool -> unit
  method no_proxy : unit -> unit
  method is_proxy_allowed : unit -> bool
  method proxy_use_connect : bool
  method empty_path_replacement : string
  method is_idempotent : bool
  method has_req_body : bool
  method has_resp_body : bool
  method set_transport_layer : transport_layer_id -> unit
  method same_call : unit -> http_call
  method get_req_method : unit -> string
  method get_host : unit -> string
  method get_port : unit -> int
  method get_path : unit -> string
  method get_uri : unit -> string
  method get_req_body : unit -> string
  method get_req_header : unit -> (string * string) list
  method assoc_req_header : string -> string
  method assoc_multi_req_header : string -> string list
  method set_req_header : string -> string -> unit
  method get_resp_header : unit -> (string * string) list
  method assoc_resp_header : string -> string
  method assoc_multi_resp_header : string -> string list
  method get_resp_body : unit -> string
  method dest_status : unit -> (string * int * string)
  method private_api : private_api
end

and private_api =
object
  method parse_request_uri : http_options ref -> unit
  method request_uri_with : ?path:string option -> ?remove_particles:bool ->
                            unit -> Neturl.url

  method transport_layer : http_options ref -> transport_layer_id
  method reroute : transport_layer_id -> unit
  method reroutable : bool

  method get_error_counter : int
  method set_error_counter : int -> unit
  method set_error_exception : exn -> unit
  method error_exception : exn option

  method error_if_unserved : bool -> exn -> unit

  method get_redir_counter : int
  method set_redir_counter : int -> unit

  method continue : bool
    (* The value of the continue flag *)
  method set_continue : bool -> unit
    (* Set the continue flag to [true]. The argument says whether a
       "100" code was received or not. (In the latter case the reaction is
       different.)
     *)
  method wait_continue_e : float -> Unixqueue.event_system -> 
                                                         bool Uq_engines.engine
    (* Waits until [set_continue] is called, or until the time is over.
       The bool result says whether a "100" code was received or not.
     *)

  method auth_state : auth_session auth_state
  method set_auth_state : auth_session auth_state -> unit

  method conn_id : int
  method set_conn_id : int -> unit

  method set_tls_session_props : Nettls_support.tls_session_props -> unit
  method set_gssapi_props : Netsys_gssapi.client_props -> unit

  method retry_anyway : bool
  method set_retry_anyway : bool -> unit
    (* This flag is set when we retry the request after an authentication
       error. It overrides the reconnect mode - even if we do not want to
       retry the request normally, we do it now exceptionally.
       This is especially important for non-idempotent requests.

       The flag remains active until we get to the point that we can
       write the request line.
     *)

  method set_effective_request_uri : string -> unit

  method prepare_transmission : unit -> unit
    (* Initializes the `Effective request header, and creates the response
     * objects. Sets the status to [`Unserved]. The error and redirection
     * counters are not changed.
     *)
  method set_response_status : int -> string -> string -> unit
    (* code, text, proto *)
  method set_response_header : Netmime.mime_header -> unit
    (* Sets the response header *)
  method response_body_open_wr : Unixqueue.event_system -> Uq_io.out_device
    (* Opens the response body for writing *)
  method request_body_open_rd : Unixqueue.event_system -> Uq_io.in_device
    (* Opens the request body for reading *)
  method finish_request_e : Unixqueue.event_system -> unit Uq_engines.engine
    (* Finish the request part of the call *)
  method finish_response_e : Unixqueue.event_system -> unit Uq_engines.engine
    (* The call is finished. The [status] is set to [`Successful], 
     * [`Redirection], [`Client_error], or [`Server_error] depending on
     * the response.
     *)
  method cleanup : unit -> unit
    (* Release resources *)

  method response_code : int
  method response_proto : string
  method response_header : Netmime.mime_header

  method decompression_enabled : bool

  method dump_status : unit -> unit
  method dump_response_header : unit -> unit
  method dump_response_body : unit -> unit
end

and auth_session =
object
  method auth_scheme : string
  method auth_domain : Neturl.url list * int
  method auth_realm : string
  method auth_user : string
  method auth_session_id : string option
  method authenticate : http_call -> bool -> (string * string) list auth_status
  method invalidate : http_call -> unit
end


class type virtual gen_call =
object
  inherit http_call
  method private virtual fixup_request : unit -> unit
  method private virtual def_request_method : string
  method private virtual def_empty_path_replacement : string
  method private virtual def_is_idempotent : bool
  method private virtual def_has_req_body : bool
  method private virtual def_has_resp_body : bool
end


class virtual generic_call : gen_call =
object(self)
  val mutable status = (`Unserved : status)
  val mutable finished = false

  val mutable req_uri = None
      (* The req_uri must always include the port number *)
  val mutable req_uri_raw = ""

  val mutable req_trans = http_trans_id
  val mutable req_trans_set = `None
  val mutable req_secure = false
  val mutable req_host = ""   (* derived from req_uri *)
  val mutable req_port = 80   (* derived from req_uri *)
  val mutable req_path = ""   (* derived from req_uri *)
  val mutable req_base_header = new Netmime.basic_mime_header []
  val mutable req_work_header = new Netmime.basic_mime_header []
  val mutable req_body = new Netmime.memory_mime_body ""
  val mutable req_dev = None

  val mutable eff_req_uri = ""

  val mutable resp_code = 0
  val mutable resp_text = ""
  val mutable resp_proto = ""
  val mutable resp_header = new Netmime.basic_mime_header []
  val mutable resp_header_set = false
  val mutable resp_body = None
  val mutable resp_body_max = Int64.max_int
  val mutable resp_decompress = false

  val mutable resp_body_storage = (`Memory : response_body_storage)
  val mutable reconn_mode = Send_again_if_idem
  val mutable redir_mode = Redirect_if_idem
  val mutable proxy_enabled = true
  val mutable retry_anyway = false

  val mutable private_api = None

  val mutable resp_handle = None
  val mutable req_handle = None

  val mutable continue = false   (* Whether 100-Continue has been seen *)    
  val mutable continue_e = None
  val mutable continue_aborted = false
  val mutable continue_100 = false

  val mutable error_counter = 0
  val mutable redir_counter = 0
  val mutable auth_state = `None

  val mutable tls_session_props = None
  val mutable gssapi_props = None

  val mutable conn_id = 0

  method private resp_body =
    match resp_body with
      | None ->
	  let rbody =
	    match resp_body_storage with
	      | `Memory ->
		  new Netmime.memory_mime_body ""
	      | `File f ->
		  let name = f() in
		  new Netmime.file_mime_body name
	      | `Body f ->
		  f () 
	      | `Device _ ->
		  failwith "Nethttp_client: response is forwarded to device - \
                            no accessible response_body" in
	  resp_body <- Some rbody;
	  rbody
      | Some rbody -> rbody 


  method private def_private_api fixup_request =
    ( object(pself) 
	method private release_resources() =
(* eprintf "release_resources call=%d\n%!" (Oo.id self); *)
	  ( match req_handle with
	      | None -> ()
	      | Some d ->
		  Uq_io.inactivate d; req_handle <- None
	  );
	  ( match resp_handle with
	      | None -> ()
	      | Some d ->
		  Uq_io.inactivate d; resp_handle <- None
	  );
	  ( match continue_e with
	      | None -> ()
	      | Some e -> 
		  continue_e <- None;
		  continue_aborted <- true;
		  e#abort()
		  
	  )

	method get_error_counter = error_counter
	method set_error_counter n = error_counter <- n
	method get_redir_counter = redir_counter
	method set_redir_counter n = redir_counter <- n

	method continue = continue
	method set_continue code_100 = 
	  continue <- true;
	  continue_100 <- code_100;
	  match continue_e with
	    | None -> ()
	    | Some e -> 
		continue_e <- None;
		e#abort()

	method wait_continue_e tmo esys =
	  if continue then
	    eps_e (`Done true) esys
	  else (
	    assert(continue_e = None); (* cannot be called multiple times *)
	    let e = 
	      Uq_engines.delay_engine tmo
		(fun () -> eps_e (`Done ()) esys)
		esys in
	    continue_e <- Some e;
	    ( e
	      >> (fun _ -> 
		    continue_e <- None; 
		    if continue_aborted then `Aborted else  `Done continue_100
		 )
	    )
	  )

	method auth_state = auth_state
	method set_auth_state s =
          auth_state <- s

        method set_tls_session_props p = tls_session_props <- Some p
        method set_gssapi_props p = gssapi_props <- Some p

        method conn_id = conn_id
        method set_conn_id id = conn_id <- id

	method retry_anyway = retry_anyway
	method set_retry_anyway flag = retry_anyway <- flag;

	method set_effective_request_uri s = eff_req_uri <- s

	method parse_request_uri options =
	  if req_uri = None then (
	    try
	      let (nu, trans) = parse_url options req_uri_raw in
	      if (Neturl.url_provides ~user:true nu || 
		    Neturl.url_provides ~password:true nu)
	      then
		failwith "Nethttp_client: URL must not contain user or password";
	      req_uri <- Some nu;
	      req_host <- Neturl.url_host nu;
	      req_port <- Neturl.url_port nu;
	      (* req_path must also contain the parts after the path, i.e.
	         param and query
	       *)
	      req_path <- 
                Neturl.join_path(Neturl.url_path ~encoded:true nu) ^ 
                ( try String.concat "" 
                        (List.map
                          (fun s -> ";" ^ s)
                          (Neturl.url_param ~encoded:true nu))
                  with Not_found -> "") ^
                ( try "?" ^ Neturl.url_query ~encoded:true nu 
                  with Not_found -> "");
	      req_trans <- trans;
	    with
		Not_found ->
		  failwith ("Nethttp_client: bad URL (" ^ req_uri_raw ^ ")")
	  )

	method transport_layer options =
	  pself # parse_request_uri options;
          match req_trans_set with
            | `None ->
	         req_trans
            | `User id ->
                 id
            | `Reroute id ->
                 id

        method reroute id =
          match req_trans_set with
            | `None
            | `Reroute _ ->
                 req_trans_set <- `Reroute id
            | `User _ ->
                 ()

        method reroutable =
          match req_trans_set with
            | `User _ -> false
            | _ -> true


	method request_uri_with ?path ?(remove_particles=false) () =
	  match req_uri with
	    | None ->
		assert false
	    | Some ru ->
		let nu = 
		  Neturl.modify_url
		    ?path:(match path with
			     | None | Some None -> None
			     | Some (Some p) -> Some(Neturl.split_path p)
			  )
		    (Neturl.remove_from_url
		       ~path:(path = Some None)
		       ~query:remove_particles
		       ~param:remove_particles
		       ~fragment:remove_particles
		       ru) in
		nu


	method decompression_enabled = resp_decompress


	method prepare_transmission () =
	  pself # release_resources();
	  status <- `Unserved;
          finished <- false;
          tls_session_props <- None;
	  req_work_header <- new Netmime.basic_mime_header 
	                             req_base_header#fields;
	  resp_code <- 0;
	  resp_text <- "";
	  resp_proto <- "";
	  resp_header <- new Netmime.basic_mime_header [];
	  resp_header_set <- false;
	  resp_body <- None;
	  fixup_request();
	  ( try ignore(req_work_header # field "Date")
	    with Not_found ->
	      Nethttp.Header.set_date req_work_header (Unix.time())
	  );
	  ( try ignore(req_work_header # field "User-agent")
	    with Not_found ->
	      req_work_header # update_field "User-agent" "Netclient"
	  );
	  continue <- false;
	  continue_e <- None;
	  continue_aborted <- false;
	  continue_100 <- false;

	method request_body_open_rd esys =
	  match req_dev with
	    | Some f ->
		let d = f() in
		req_handle <- Some d;
		d
	    | _ ->
		let ch = req_body # open_value_rd() in
		let d = 
		  `Async_in(new Uq_transfer.pseudo_async_in_channel ch,esys) in
		req_handle <- Some d;
		d


	method set_response_status code text proto =
	  resp_code <- code;
	  resp_text <- text;
	  resp_proto <- proto

	method set_response_header h =
	  resp_header <- h;
	  resp_header_set <- true;
	  assert(resp_code <> 0);

	method response_body_open_wr esys =
	  let d0 =
	    match resp_body_storage with
	      | `Device f ->
		  f()
	      | _ ->
		  let rbody = self#resp_body in
		  let ch = rbody # open_value_wr() in
		  `Async_out(new Uq_transfer.pseudo_async_out_channel ch,esys) in
	  (* Check for maximum response length: *)
	  let c = ref 0L in
	  let c_max = self # max_response_body_length in
	  let out_count n =
	    c := Int64.add !c (Int64.of_int n);
	    if !c > c_max then raise Response_too_large in
	  let d1 = `Count_out(out_count, d0) in
	  (* Check for decompression: *)
	  let d2 =
	    if pself#decompression_enabled then
	      let algos = 
		try Nethttp.Header.get_content_encoding resp_header
		with Not_found -> [] in
	      match algos with
		| [ algo ] ->
		    ( try 
			let decoder = 
			  Netcompression.lookup_decoder ~iana_name:algo () in
			resp_header # delete_field "Content-Encoding";
			let b = 
			  Uq_io.filter_out_buffer
			    ~max:(Some 4096) decoder d1 in
			`Buffer_out b
		      with
			| Not_found -> d1
		    )
		| _ -> 
		    d1
	    else
	      d1 in
	  resp_handle <- Some d2;
	  d2

	method response_code = resp_code
	method response_proto = resp_proto
	method response_header = resp_header


	method finish_request_e esys =
(* eprintf "finish_request call=%d\n%!" (Oo.id self); *)
	  match req_handle with
	    | None ->
		eps_e (`Done ()) esys
	    | Some d ->
		req_handle <- None;
		Uq_io.shutdown_e d
		>> (fun st -> Uq_io.inactivate d; st)

	method finish_response_e esys =
(* eprintf "finish_response call=%d\n%!" (Oo.id self); *)
	  assert(resp_code <> 0);
	  finished <- true;
	  status <- (if resp_code >= 200 && resp_code <= 299 then
		       `Successful
		     else if resp_code >= 300 && resp_code <= 399 then
		       `Redirection
		     else if resp_code >= 400 && resp_code <= 499 then
		       `Client_error
		     else
		       `Server_error);
	  (* do this last - it can trigger user callback functions: *)
	  match resp_handle with
	    | None ->
		eps_e (`Done ()) esys
	    | Some d ->
(* prerr_endline "Nethttp_cllent SHUTDOWN"; *)
		resp_handle <- None;
		Uq_io.shutdown_e d
		>> (fun st -> Uq_io.inactivate d; st)

	method error_if_unserved verbose error =
	  match status with
	    | `Unserved ->
		if verbose then
		  dlogr (fun () -> 
			   sprintf "Call %d - error %s"
			     (Oo.id self)
			     (Netexn.to_string error));
		pself # set_error_exception error;
	    | _ ->
		()

	method set_error_exception x =
	  finished <- true;
	  ( match x with
		Http_error(_,_) -> assert false   (* not allowed *)
	      | _ ->
		  status <- (`Http_protocol_error x);
	  );
	  (* do this last - it can trigger user callback functions: *)
	  pself # release_resources();

	method error_exception =
	  match status with
	    | `Http_protocol_error x -> Some x
	    | _ -> None

	method cleanup () =
	  pself # release_resources()

	method dump_status () =
	  dlog (sprintf
		  "Call %d - HTTP response code: %d (%s)"
		  (Oo.id self) resp_code resp_text);
	  dlog (sprintf
		  "Call %d - HTTP response protocol: %s"
		  (Oo.id self) resp_proto);

	method dump_response_header () =
	  dump_header 
	    (sprintf 
	       "Call %d - HTTP response "
	       (Oo.id self))
	    (resp_header # fields)

	method dump_response_body () =
	  dlog (sprintf "Call %d - HTTP response body:\n%s\n"
		  (Oo.id self) 
		  (match resp_body with
		     | Some rbody -> rbody#value
		     | None -> ""
		  ))
      end
    )


  (* Call state *)

  method is_served = finished
  method status = status

  method auth_status =
    match auth_state with
      | `OK -> `OK
      | `None -> `None
      | `Auth_error -> `Auth_error
      | _ -> `Continue ()

  method tls_session_props = tls_session_props
  method gssapi_props = gssapi_props

  (* Accessing the request message (new style) *)

  method request_method = self # def_request_method
  method request_uri = req_uri_raw
  method set_request_uri uri = req_uri_raw <- uri; req_uri <- None

  method set_transport_layer id = 
    req_trans_set <- `User id


  method request_header (k:header_kind) =
    match k with
      | `Base -> req_base_header
      | `Effective -> req_work_header
  method set_request_header h =
    req_base_header <- h
  method set_expect_handshake() =
    req_base_header # update_field "Expect" "100-continue"
  method set_chunked_request() =
    req_base_header # update_field "Transfer-encoding" "chunked"
  method request_body = 
    if req_dev <> None then
      failwith "Nethttp_client: No request_body - using a device instead";
    req_body
  method set_request_body b = req_body <- b; req_dev <- None
  method set_request_device f = req_dev <- Some f

  method effective_request_uri = eff_req_uri

  method set_accept_encoding() =
    let all_algos =
      Netcompression.all_decoders() in
    Nethttp.Header.set_accept_encoding
      req_base_header
      (if all_algos = [] then ["identity",[]] else
	 (List.map (fun token -> (token,[])) all_algos));
    resp_decompress <- true

  (* Accessing the response message (new style) *)

  method private check_response() =
    match status with
      | `Unserved ->
	  if not resp_header_set then
	    failwith "Nethttp_client: HTTP call is unserved, no response yet"
      | `Http_protocol_error e -> 
	  raise (Http_protocol e)
      | _ -> ()

  method response_status_code = self#check_response(); resp_code
  method response_status_text = self#check_response(); resp_text
  method response_status = 
    self#check_response();
    try Nethttp.http_status_of_int resp_code
    with Not_found ->
      Nethttp.http_status_of_int (Nethttp.base_code resp_code)
  method response_protocol = self#check_response(); resp_proto
  method response_header = self#check_response(); resp_header
  method response_body = 
    (* [status] is already set after the header is available. The presence
       of the body is indicated by [finished].
     *)
    self#check_response(); 
    if not finished then 
      failwith "Nethttp_client: HTTP call is unserved, response not yet complete";
    self#resp_body

  (* Options *)

  method response_body_storage = resp_body_storage
  method set_response_body_storage s = resp_body_storage <- s
  method max_response_body_length = resp_body_max
  method set_max_response_body_length n = resp_body_max <- n
  method get_reconnect_mode = reconn_mode
  method set_reconnect_mode m = reconn_mode <- m
  method get_redirect_mode = redir_mode
  method set_redirect_mode m = redir_mode <- m
  method proxy_enabled = proxy_enabled
  method set_proxy_enabled e = proxy_enabled <- e
  method no_proxy() = proxy_enabled <- false
  method is_proxy_allowed() = proxy_enabled
  method proxy_use_connect = req_trans <> http_trans_id

  (* Method characteristics *)

  method empty_path_replacement = self # def_empty_path_replacement
  method is_idempotent = self # def_is_idempotent
  method has_req_body = self # def_has_req_body
  method has_resp_body = self # def_has_resp_body

  (* Repeating calls *)
    
  method same_call() =
    let same =
      {< status = `Unserved;
	 finished = false;
	 resp_header = new Netmime.basic_mime_header [];
	 resp_header_set = false;
	 resp_body = None;
	 eff_req_uri = "";
	 private_api = None;
	 error_counter = 0;
	 redir_counter = 0;
	 continue = false;
	 continue_aborted = false;
	 continue_100 = false;
	 continue_e = None;
	 resp_handle = None;
	 req_handle = None;
	 auth_state = `None;
	 retry_anyway = false
      >} in
    (same : #http_call :> http_call)

  (* Old style access methods *)

  method get_req_method() = self # def_request_method
  method get_host() = req_host
  method get_port() = req_port
  method get_path() = req_path
  method get_uri() = req_uri_raw
  method get_req_body() = req_body # value
  method get_req_header () =
    List.map (fun (n,v) -> (String.lowercase n, v)) req_base_header#fields
  method assoc_req_header n =
    req_base_header # field n
  method assoc_multi_req_header n =
    req_base_header # multiple_field n
  method set_req_header n v =
    req_base_header # update_field n v
  method get_resp_header() =
    self#check_response(); 
    List.map (fun (n,v) -> (String.lowercase n, v)) resp_header#fields
  method assoc_resp_header n =
    self#check_response(); 
    resp_header # field n
  method assoc_multi_resp_header n =
    self#check_response(); 
    resp_header # multiple_field n
  method get_resp_body() =
    self#check_response();
    if resp_code >= 200 && resp_code <= 299 then
      self # resp_body # value
    else
      raise(Http_error(resp_code, self#resp_body#value))
  method dest_status() =
    self#check_response();
    (resp_proto, resp_code, resp_text)

  (* Private *)

  method private_api = 
    match private_api with
      | None ->
	  let api = self # def_private_api self#fixup_request in
	  private_api <- Some api;
	  api
      | Some api ->
	  api

  (* Virtual methods *)

  method virtual private fixup_request : unit -> unit
  method virtual private def_request_method : string
  method virtual private def_empty_path_replacement : string
  method virtual private def_is_idempotent : bool
  method virtual private def_has_req_body : bool
  method virtual private def_has_resp_body : bool
end

(******** SUBCLASSES IMPLEMENTING HTTP METHODS ************************)

class get_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "GET"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class head_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "HEAD"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = false
  method private def_empty_path_replacement = "/"
end

class trace_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "TRACE"
  method private def_is_idempotent = false
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class options_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "OPTIONS"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "*"
end

class post_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "POST"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class put_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "PUT"
  method private def_is_idempotent = true (* sic! *)
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class delete_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "DELETE"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class connect_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "CONNECT"
  method private def_is_idempotent = false
  method private def_has_req_body = false
  method private def_has_resp_body = true
    (* This is broken to some degree. See the comment below on when
       CONNECT has a response body
     *)
  method private def_empty_path_replacement = "/"
end


class get the_query =
  object (self)
    inherit get_call
    initializer
      self # set_request_uri the_query
  end


class trace the_query max_hops =
  object (self)
    inherit trace_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      (self # request_header `Effective) # update_field 
	                                "max-forwards" (string_of_int max_hops)
  end


class options the_query =
  object (self)
    inherit options_call
    initializer
      self # set_request_uri the_query
  end


class head the_query =
  object (self)
    inherit head_call
    initializer
      self # set_request_uri the_query
  end


class post query params =
  object (self)
    inherit post_call
    initializer
      self # set_request_uri query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      let is_chunked =
	try rh # field "Transfer-encoding" <> "identity"
	with Not_found -> false in
      rh # update_field "Content-type" "application/x-www-form-urlencoded";
      let l = List.map (fun (n,v) -> n ^ "=" ^ Netencoding.Url.encode v) params in
      let s = String.concat "&" l in
      if not is_chunked then
	rh # update_field "Content-length" (string_of_int (String.length s));
      self # request_body # set_value s
  end
;;


class post_raw the_query s =
  object (self)
    inherit post_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      let is_chunked =
	try rh # field "Transfer-encoding" <> "identity"
	with Not_found -> false in
      self # request_body # set_value s;
      if not is_chunked then
	rh # update_field "Content-length" (string_of_int (String.length s));
  end
;;


class put the_query s =
  object (self)
    inherit put_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      let is_chunked =
	try rh # field "Transfer-encoding" <> "identity"
	with Not_found -> false in
      self # request_body # set_value s;
      if not is_chunked then
	rh # update_field "Content-length" (string_of_int (String.length s));
  end
;;


class delete the_query =
  object (self)
    inherit delete_call
    initializer
      self # set_request_uri the_query
  end
;;


class connect the_query (* host:port only *) =
  object (self)
    inherit connect_call
    initializer
      self # set_request_uri ("http://" ^ the_query)
    method effective_request_uri =
      the_query
  end
;;


(**********************************************************************)
(***                  AUTHENTICATION METHODS                        ***)
(**********************************************************************)

let check_neturl fn_name neturl =
  (* Whether all required parts are provided *)
  if not(Neturl.url_provides
           ~scheme:true ~host:true ~port:true (* ~path:true *) neturl)
  then
    failwith(fn_name ^ ": This URL is incomplete (required: \
                        scheme://host:port/path): " ^ 
               Neturl.string_of_url neturl)


let norm_neturl neturl =
  (* Returns the neturl as normalized string (esp. normalized % sequences) *)
  check_neturl "norm_neturl" neturl;
  try
    let neturl' =
      Neturl.make_url 
        ~encoded:false
        ~scheme:(Neturl.url_scheme neturl)
        ~host:(Neturl.url_host neturl)
        ~port:(Neturl.url_port neturl)
        ~path:(try Neturl.url_path neturl with Not_found -> [ "" ])
        ?query:(try Some(Neturl.url_query neturl) with Not_found -> None)
        ?fragment:(try Some(Neturl.url_fragment neturl) with Not_found -> None)
        (Neturl.url_syntax_of_url neturl) in
    Neturl.string_of_url neturl'
  with
    | error ->
        (* Log error because this might hard to track down otherwise: *)
        dlog (sprintf "norm_neturl: %s" (Netexn.to_string error));
        raise error


let prefixes_of_neturl s_url =
  (* Returns a list of all legal prefixes of the absolute URI s.
   * The prefixes are in Neturl format. Duplicate results may be returned.
   *)
  try
    let rec rev_path_prefixes rev_path =
      match rev_path with
        | [] -> []
        | [ "" ] -> [ rev_path ]
        | [ ""; "" ] -> assert false
        | [ _; "" ] -> rev_path :: rev_path_prefixes [ "" ]
        | "" :: rev_path' ->
	    if rev_path' = [ ""; "" ] then
	      rev_path :: rev_path_prefixes [ "" ]
	    else
	      rev_path :: rev_path_prefixes rev_path'
        | _ :: rev_path' ->
	    rev_path :: (rev_path_prefixes ("" :: rev_path'))
    in
    let path_prefixes path =
      List.map List.rev (rev_path_prefixes (List.rev path)) in
    let s_nofrag_url = Neturl.remove_from_url ~fragment:true s_url in
    let s_noquery_url = Neturl.remove_from_url ~query:true s_nofrag_url in
    let path = try Neturl.url_path s_noquery_url with Not_found -> [ "" ] in
    s_url :: s_nofrag_url ::
      (List.map
         (fun prefix -> Neturl.modify_url ~path:prefix s_noquery_url
         )
         (path_prefixes path))
  with
    | ex ->
        (* Log error because this might hard to track down otherwise: *)
        dlogr (fun () -> sprintf "prefixes_of_neturl: %s" 
                                 (Netexn.to_string ex));
        [ s_url ]



class type key =
object
  method user : string
  method password : string
  method realm : string
  method domain : Neturl.url list
  method credentials : (string * string * (string * string) list) list
end


let key ~user ~password ~realm ~domain =
  List.iter (check_neturl "key") domain;
  let domain_params =
    List.map (fun uri -> ("domain-uri", norm_neturl uri)) domain in
  let creds =
    [ "password", password, ("realm", realm) :: domain_params ] in
  ( object
      method user = user
      method password = password
      method realm = realm
      method domain = domain
      method credentials = creds
    end
  )

let key_creds ~user ~creds options : key =
  let password, params =
    try Netsys_sasl_util.extract_password2 creds
    with Not_found ->
      raise(Invalid_argument "Nethttp_client.key_creds: no password") in
  let realm =
    try List.assoc "realm" params
    with Not_found -> 
      "default realm" in
  let domain_strings =
    List.map snd (List.find_all (fun (n,_) -> n = "domain-uri") params) in
  let domain =
    List.map
      (fun url ->
         fst(parse_url (ref options) url)
      )
      domain_strings in
  List.iter (check_neturl "key_creds") domain;
  ( object
      method user = user
      method password = password
      method realm = realm
      method domain = domain
      method credentials = creds
    end
  )


class type key_handler =
object
  method inquire_key :
           domain:Neturl.url option -> realm:string -> auth:string -> key
  method invalidate_key : key -> unit
end


class key_ring ?uplink ?(no_invalidation=false) () =
object(self)
  val mutable keys = (Hashtbl.create 10 : 
			(string * string, key * bool) Hashtbl.t)
    (* Maps (domain, realm) to (key, from_uplink) *)

  method inquire_key ~domain ~realm ~(auth:string) =
    dlogr
      (fun () ->
         sprintf "key_ring: inquiring key for auth=%s realm=%S domains=%s"
                 auth realm
                 ( match domain with
                     | None -> "-"
                     | Some url -> norm_neturl url
                 )
      );
    let prefixes =
      match domain with
        | Some dom ->
            List.map
              norm_neturl
              (prefixes_of_neturl dom)
        | None ->
            [] in
    dlogr
      (fun () ->
         sprintf "key_ring: checking prefixes for key: %s"
                 (String.concat "," prefixes)
      );
    try
      try
        let prefix =
          List.find
            (fun prefix -> Hashtbl.mem keys (prefix,realm))
            prefixes in
        let key = fst(Hashtbl.find keys (prefix, realm)) in
        dlogr (fun () ->
               sprintf "key_ring: found key for domain prefix=%s"  prefix);
        key
      with
        | Not_found ->
            let key = fst(Hashtbl.find keys ("*", realm)) in
            dlog "key_ring: found key for domain prefix=*";
            key
    with Not_found ->
      match uplink with
	| None -> 
            dlog "key_ring: no key, no uplink";
            raise Not_found
	| Some h ->
            dlog "key_ring: forwarding request to uplink key handler";
	    let key = h # inquire_key ~domain ~realm ~auth in
	    (* or Not_found *)
            List.iter
              (fun dom_uri ->
                 let dom_uri_s = norm_neturl dom_uri in
	         Hashtbl.replace keys (dom_uri_s, key#realm) (key,true)
              )
              key#domain;
	    key

  method invalidate_key (key : key) =
    if not no_invalidation then (
      let domains = key # domain in
      let domain_strings =
        if domains = [] then
          [ "*" ]
        else
          List.map norm_neturl domains in
      let realm = key # realm in
      List.iter
        (fun dom_uri_s ->
           try
             let (_, from_uplink) = Hashtbl.find keys (dom_uri_s, realm) in
             Hashtbl.remove keys (dom_uri_s, realm);
             if from_uplink then
	       ( match uplink with
	           | None -> assert false
	           | Some h -> h # invalidate_key key
	       )
           with
	       Not_found -> ()
        )
        domain_strings
    )

  method clear () =
    Hashtbl.clear keys

  method add_key key =
    let domains = key # domain in
    let realm = key # realm in

    List.iter (check_neturl "add_key") domains;

    let domain_strings =
      if domains = [] then
        [ "*" ]
      else
        List.map norm_neturl domains in

    List.iter
      (fun dom_s ->
         Hashtbl.replace keys (dom_s, realm) (key, false)
      )
      domain_strings

  method keys =
    Hashtbl.fold
      (fun _ (key,_) acc -> key :: acc)
      keys
      []

end


class proxy_key_handler user password : key_handler =
object
  method inquire_key ~domain ~realm ~auth =
    try
      key ~domain:[] ~realm ~user ~password
    with _ -> raise Not_found
  method invalidate_key _ = ()
  
end



class type auth_handler =
object
  method create_session : secure:bool -> http_call -> http_options ref -> auth_session option
  method create_proxy_session : http_call -> http_options ref -> auth_session option
  method identify_session : http_call -> http_options ref -> 
                            (string * string * string * int) option
  method identify_proxy_session : http_call -> http_options ref -> 
                                  (string * string * string * int) option
  method skip_challenge : bool
  method skip_challenge_session : http_call -> http_options ref -> auth_session option
end


exception Not_applicable

let get_domain_uri (call : http_call) =
  call # private_api # request_uri_with
    ~path:(Some "/") ~remove_particles:true ()

let get_uri_with_path  (call : http_call) =
  call # private_api # request_uri_with
    ~remove_particles:true ()



let with_port options url =
  match get_default_port options (Neturl.url_scheme url) with
    | Some port ->
        Neturl.default_url ~port url
    | None ->
        (* CHECK: we really need a port number here, so set it 99999 for
           unknown schemes. This is better than failing.
         *)
        Neturl.default_url ~port:99999 url
    

let decode_param  =
  function
  | (n, `Q _) -> assert false
  | (n, `V v) -> (n,v)


let get_all_challenges call is_proxy =
  (* Returns all challenges in the www-authenticate or proxy-authenticate
     header(s). The mechanism names are converted to lowercase as well as
     the parameter names.
   *)
  let challenges =
    try
      if is_proxy then
	Nethttp.Header.get_proxy_authenticate call#response_header
      else
	Nethttp.Header.get_www_authenticate call#response_header
    with
      | Not_found -> raise Not_applicable
      | Nethttp.Bad_header_field _ -> raise Not_applicable in  
  List.map
    (fun (name,params) ->
     (String.lowercase name, 
      List.map (fun (n,v) -> (String.lowercase n,v)) params)
    )
    challenges


let get_challenges mech_name call is_proxy =
  (* Get only the challenges for mechanism [mech_name] *)
  let challenges_lc = get_all_challenges call is_proxy in
  let mech_name_lc = String.lowercase mech_name in
  let mech_challenges =
    List.filter
      (fun (name, params) ->
         name = mech_name_lc
      )
      challenges_lc in
  mech_challenges


let get_challenges_with_realm mech_name call is_proxy =
  let l1 = get_challenges mech_name call is_proxy in
  List.filter
    (fun (name, params) ->
       List.mem_assoc "realm" params
    )
    l1


let decode_challenges l =
  List.map
    (fun (name, params) ->
       (name, List.map decode_param params)
    )
    l


let get_realms mech_name call is_proxy =
  let mech_challenges = get_challenges_with_realm mech_name call is_proxy in
  let mech_challenges = decode_challenges mech_challenges in
  let mech_params =
    List.map snd mech_challenges in
  List.map
    (fun params ->
       let realm = List.assoc "realm" params in
       (realm, params)
    )
    mech_params


let rec iterate f args =
  match args with
    | arg :: args' ->
        ( try
            f arg
          with
            | Not_applicable ->
                iterate f args'
        )
    | [] ->
        raise Not_applicable


let format_credentials is_proxy (creds:Nethttp.Header.auth_credentials) =
  if snd creds = [] then
    []
  else
    let hdr = new Netmime.basic_mime_header [] in
    if is_proxy then
      Nethttp.Header.set_proxy_authorization hdr creds
    else
      Nethttp.Header.set_authorization hdr creds;
    hdr#fields


let core_basic_auth_session 
        enable_reauth key_handler is_proxy 
        request_domain auth_domain trans_id realm
        : auth_session =
  (* If enable_reauth, we return [auth_domain] when [auth_domain] is called.
     Usually [auth_domain] is set to the current URI after setting the path to
     "/". That means that any further request for the same domain will be
     reauthenticated. See [auth_cache] below, where [auth_domain] is
     interpreted.
   *)
  let key =
    (* Return the selected key, or raise Not_applicable *)
    try
      key_handler # inquire_key ~domain:request_domain ~realm ~auth:"basic"
    with
	Not_found ->  raise Not_applicable in
  let special_code =
    if is_proxy then 407 else 401 in
  let first_attempt =
    ref true in
  ( object(self)
      method auth_scheme = "basic"
      method auth_domain = (if enable_reauth then auth_domain else []), trans_id
      method auth_realm = key # realm
      method auth_user = key # user
      method auth_session_id = None
      method authenticate call reauth_flag =
        let code = call # private_api # response_code in
        if reauth_flag || code = special_code then
          if reauth_flag || !first_attempt then (
            first_attempt := false;
            let basic_cookie = 
              Netencoding.Base64.encode 
	        (key#user ^ ":" ^ key#password) in
            let creds = ("Basic", [ "credentials", `Q basic_cookie]) in
            `Continue (format_credentials is_proxy creds)
          )
          else `Auth_error
        else `OK
      method invalidate call =
        key_handler # invalidate_key key;
    end
  )


let basic_auth_session enable_reauth
                       key_handler call trans_id is_proxy
    : auth_session =
  let request_domain =
    if is_proxy then None else Some(get_uri_with_path call) in
  let auth_domain =
    if is_proxy then [] else [get_domain_uri call] in
  let realms = get_realms "basic" call is_proxy in
  iterate
    (fun (realm,params) ->
       core_basic_auth_session
         enable_reauth 
         key_handler is_proxy request_domain auth_domain trans_id realm
    )
    realms


class basic_auth_handler ?(enable_reauth=false)
                         ?(skip_challenge=false)
                         (key_handler : #key_handler)
                         : auth_handler =
object(self)
  method create_session ~secure call options =
    try
      let trans_id = call # private_api # transport_layer options in
      Some(basic_auth_session enable_reauth key_handler call trans_id false)
    with
	Not_applicable ->
	  None
  method create_proxy_session call options =
    try
      let trans_id = call # private_api # transport_layer options in
      Some(basic_auth_session enable_reauth key_handler call trans_id true)
    with
	Not_applicable ->
	  None
  method identify_session _ _ = None
  method identify_proxy_session _ _ = None
  method skip_challenge = skip_challenge
  method skip_challenge_session call options =
    try
      let trans_id = call # private_api # transport_layer options in
      Some(core_basic_auth_session
             false key_handler false None [] trans_id "anywhere")
    with
	Not_applicable ->
	  None
end



let contains_auth v =
  List.mem "auth" (split_words v)


let normalize_domain options (call : http_call) s =
  try
    let (nu1,_) =
      parse_url
	~base_url:(call#private_api#request_uri_with())
	options s in
    with_port !options
      (Neturl.remove_from_url
	 ~user:true ~user_param:true ~password:true ~fragment:true nu1
      )
  with
    | Neturl.Malformed_URL ->
        dlog (sprintf "digest: cannot parse 'domain' parameter: %s" s);
        raise Not_found

let dbg_kv f kv_l =
  String.concat ","
    (List.map (fun kv -> let (k,v) = f kv in sprintf "%s=%S" k v) kv_l)

let dbg_l f l =
  String.concat "," (List.map (fun v -> sprintf "%S" (f v)) l)


let identity x = x

let get_match_params options call =
  [ "https", string_of_bool (call # tls_session_props <> None), false;
    "trans_id", 
       string_of_int (call # private_api # transport_layer options), false;
    "conn_id",
       string_of_int (call # private_api # conn_id), false;
    "target-host", call#get_host(), false;
    "target-uri", call#effective_request_uri, false;
  ]

let generic_auth_session_for_challenge
      options
      (key_handler : #key_handler) mech call is_proxy initial_challenge
    : auth_session =
  let module M = (val mech : Nethttp.HTTP_MECHANISM) in
  let mname = M.mechanism_name in
  dlogr (fun () -> sprintf "generic_auth(%s): create" mname);
  let cur_trans_id = ref (call # private_api # transport_layer options) in
  let match_params = get_match_params options call in
  let match_result =
    ref (M.client_match ~params:match_params initial_challenge) in
  let realm, id_option =
    match !match_result with
      | `Accept(realm,id_opt) -> (realm,id_opt)
      | `Reroute(realm,_) -> realm, None
      | `Accept_reroute(realm,id_opt,_) -> realm, id_opt
      | `Reject -> 
          dlogr (fun () -> sprintf "generic_auth(%s): no match " mname);
          raise Not_applicable in
  let request_domain =
    if is_proxy then None else Some(get_uri_with_path call) in
  let key =
    try key_handler # inquire_key ~domain:request_domain ~realm
                                  ~auth:M.mechanism_name
    with Not_found ->
      dlogr (fun () -> sprintf "generic_auth(%s): no key " mname);
      raise Not_applicable in
  let creds =
    M.init_credentials key#credentials in
  let session_lz =
    lazy
      (M.create_client_session
         ~user:key#user ~creds 
         ~params:( [ "realm", realm, true ] @ 
                     match_params @
                       match id_option with
                         | None -> []
                         | Some id -> [ "id", id, true ]
                 )
         ()
      ) in
  let first = ref true in
  let cur_auth_domain = ref [] in
  let dbg_state() =
    match !match_result with
      | `Accept _ ->
           let session = Lazy.force session_lz in
           Netsys_sasl_util.string_of_client_state (M.client_state session)
      | `Accept_reroute _ ->
           let session = Lazy.force session_lz in
           Netsys_sasl_util.string_of_client_state (M.client_state session) ^
             "+reroute"
      | `Reroute _ ->
           "reroute"
      | _ ->
           "" in
  ( object
      method auth_scheme = mname
      method auth_domain = (!cur_auth_domain, !cur_trans_id)
      method auth_realm = key#realm
      method auth_user = key#user
      method authenticate auth_call reauth_flag =
        dlogr
          (fun () ->
             sprintf "generic_auth(%s): authenticate method=%s uri=%s first=%B \
                      reauth_flag=%B is_proxy=%B id=%S state=%s"
                     mname auth_call#request_method
                     auth_call#effective_request_uri !first reauth_flag is_proxy
                     (match id_option with
                        | None -> "n/a" | Some id -> id
                     )
                     (dbg_state())
          );

        let challenge =
          if !first then (
            (* First authentication: just assume that the call is the same *)
            assert (not reauth_flag);
            assert(call = auth_call);
            initial_challenge
          )
          else (
            (* Subsequent authentications: figure out the right challenge *)
            (* NB. We assume here that the session ID never changes. CHECK *)
            try
              if not auth_call#is_served then raise Not_found;
              let challenges =
                get_challenges M.mechanism_name auth_call is_proxy in
              let match_params =
                get_match_params options auth_call in
              List.find
                (fun ch ->
                   match M.client_match ~params:match_params ch with
                     | `Accept(r,id_opt)
                     | `Accept_reroute(r, id_opt, _) -> 
                          r = realm && id_opt = id_option
                     | _ -> false
                )
                challenges
            with Not_found | Not_applicable ->
              (* Several options:
                   - reauth
                   - this is a final server message that does not
                     set www-authenticate, but puts something into the other
                     headers. Hence the challenge is empty.
               *)
              dlogr (fun () -> sprintf "generic_auth(%s): no challenge" mname);
              (fst initial_challenge, [])
          ) in
        dlogr
          (fun () ->
             sprintf "generic_auth(%s): challenge name=%s params: %s"
               mname (fst challenge) (dbg_kv decode_param (snd challenge))
          );
        match !match_result with
          | `Accept _
          | `Accept_reroute _ ->
               let session = Lazy.force session_lz in
               first := false;
               ( match M.client_state session with
                   | `OK ->
                        (* re-authentication *)
                        if reauth_flag then (
                          dlogr (fun () -> 
                                   sprintf "generic_auth(%s): restart" mname);
                          cur_auth_domain := [];
                          let r_match_params =
                            get_match_params options auth_call in
                          let r_params = 
                            [ "realm", realm, true ] @
                              r_match_params @
                                match id_option with
                                  | None -> []
                                  | Some id -> [ "id", id, true ] in
                          M.client_restart ~params:r_params session;
                          dlogr (fun () -> 
                                   sprintf "generic_auth(%s): state=%s" 
                                           mname (dbg_state()));
                        )
                   | `Wait ->
                        dlogr (fun () -> 
                               sprintf "generic_auth(%s): process" mname);
                        assert(not reauth_flag);
                        assert(auth_call#is_served);
                        let meth = auth_call # request_method in
                        let uri = auth_call # effective_request_uri in
                        let hdr = auth_call # response_header in
                        M.client_process_challenge
                          session meth uri hdr challenge;
                        dlogr (fun () -> 
                                 sprintf "generic_auth(%s): state=%s" 
                                         mname (dbg_state()));
                   | `Emit | `Stale ->
                        assert(not reauth_flag);
                        ()
                          (* strange, but just let's skip the challenge *)
                   | `Auth_error _ ->
                        ()
               );
               ( match M.client_state session with
                   | `OK ->
                        (* Save the protection space: *)
                        if not is_proxy then (
                          let auth_domain_s = M.client_domain session in
                          let auth_domain =
                            try
                              List.map
                                (normalize_domain options auth_call)
                                auth_domain_s
                            with Not_found -> [] in
                          dlogr
                            (fun () -> sprintf "generic_auth(%s): domains=%s"
                                       mname (dbg_l norm_neturl auth_domain)
                            );
                          cur_auth_domain := auth_domain;
                        );
                        ( try
                            let gssapi_props = M.client_gssapi_props session in
                            call # private_api # set_gssapi_props gssapi_props;
                          with Not_found -> ()
                        );
                        `OK
                   | `Emit | `Stale ->
                        dlogr (fun () ->
                                 sprintf "generic_auth(%s): emit" mname);
                        let meth = auth_call # request_method in
                        let uri = auth_call # effective_request_uri in
                        let hdr = 
                          if reauth_flag then
                            new Netmime.basic_mime_header []
                          else
                            auth_call # response_header in
                        let (creds, new_headers) = 
                          M.client_emit_response session meth uri hdr in
                        dlogr (fun () -> 
                                 sprintf "generic_auth(%s): state=%s" 
                                         mname (dbg_state()));
                        let out =
                          format_credentials is_proxy creds @ new_headers in
                        dlogr (fun () ->
                                 sprintf "generic_auth(%s): headers %s"
                                         mname (dbg_kv identity out)
                              );
                        ( match !match_result with
                            | `Accept _ ->
                                 `Continue out
                            | `Accept_reroute(realm,id_opt,trans_id) ->
                                 match_result := `Accept(realm,id_opt);
                                 cur_trans_id := trans_id;
                                 `Continue_reroute(out, trans_id)
                            | _ ->
                                 assert false
                        )
                   | `Wait ->
                        assert false
                   | `Auth_error msg ->
                        cur_auth_domain := [];
                        `Auth_error
               )
          | `Reroute(_, trans_id) ->
               cur_trans_id := trans_id;
               `Reroute trans_id
          | `Reject ->
               assert false
                       
      method invalidate call =
        key_handler # invalidate_key key;

      method auth_session_id =
        match !match_result with
          | `Accept _
          | `Accept_reroute _ ->
               let session = Lazy.force session_lz in
               M.client_session_id session
          | _ ->
               None
    end
  )


let generic_auth_session options
                         (key_handler : #key_handler) mech call is_proxy 
    : auth_session =
  let module M = (val mech : Nethttp.HTTP_MECHANISM) in
  dlogr (fun () -> 
           sprintf "generic_auth(%s): searching challenge" M.mechanism_name);
  let match_params = get_match_params options call in
  let mech_challenges =
    try 
      get_challenges M.mechanism_name call is_proxy
    with Not_found | Not_applicable -> [] in
  let matching_challenges =
    List.filter
      (fun challenge ->
         M.client_match ~params:match_params challenge <> `Reject
      )
      mech_challenges in
  dlogr (fun () -> 
           sprintf "generic_auth(%s): challenges mech=%d match=%d" 
                   M.mechanism_name (List.length mech_challenges)
                   (List.length matching_challenges)
        );
  iterate
    (generic_auth_session_for_challenge
       options key_handler mech call is_proxy
    )
    matching_challenges
  

class generic_auth_handler (key_handler : #key_handler) mechs : auth_handler =
  let mechs =
    List.filter
      (fun m -> 
         let module M = (val m : Nethttp.HTTP_MECHANISM) in
         M.available()
      )
      mechs in

  let find_mech challenges is_proxy call =
    let mech =
      List.find
        (fun m ->
         let module M = (val m : Nethttp.HTTP_MECHANISM) in
           let mname = String.lowercase M.mechanism_name in
           List.exists
             (fun (ch_mech,_) -> String.lowercase ch_mech = mname)
             challenges
        )
        mechs in
    mech in

  let create_session ~secure is_proxy call options =
    try
      dlogr
        (fun () -> sprintf "generic_auth: create_session is_proxy=%B" is_proxy);
      let all_challenges = get_all_challenges call is_proxy in
      List.iter
        (fun (ch_name, ch_params) ->
           dlogr
             (fun () -> sprintf "generic_auth: challenge %s params: %s"
                           ch_name (dbg_kv decode_param ch_params))
        )
        all_challenges;
      let mech = find_mech all_challenges is_proxy call in
      dlogr
        (fun () -> 
           let module M = (val mech : Nethttp.HTTP_MECHANISM) in
           sprintf "generic_auth(%s): selected this mechanism" M.mechanism_name
        );
      Some (generic_auth_session options key_handler mech call is_proxy)
    with
      | Not_found
      | Not_applicable -> None in

  let identify_session is_proxy (call : http_call) options =
    let trans_id = call # private_api # transport_layer options in
    let match_params = get_match_params options call in
    try
      let all_challenges = get_all_challenges call is_proxy in
      let mech = find_mech all_challenges is_proxy call in
      let module M = (val mech) in
      let challenges = get_challenges M.mechanism_name call is_proxy in
      let challenges_m =
        List.map
          (fun ch ->
             (ch, M.client_match ~params:match_params ch)
          )
          challenges in
      let (challenge, sess_data) =
        List.find
          (fun (ch, m) ->
             match m with
               | `Accept(_, Some _) -> true
               | `Accept_reroute(_, Some _, _) -> true
               | _ -> false
          )
          challenges_m in
      let (realm, session_id) =
        match sess_data with
          | `Accept(r, Some sid) -> (r,sid)
          | `Accept_reroute(r, Some sid, _) -> (r,sid)
          | _ -> raise Not_found in
      Some(M.mechanism_name, realm, session_id, trans_id)
    with
      | Not_found
      | Not_applicable -> None in
  (object(self)
     method create_session ~secure =
       create_session ~secure false
     method create_proxy_session =
       create_session ~secure:false true
     method identify_session =
       identify_session false
     method identify_proxy_session =
       identify_session true
     method skip_challenge = false
     method skip_challenge_session _ _ = assert false
   end
  )


class digest_auth_handler key_handler =
  generic_auth_handler key_handler [(module Netmech_digest_http.Digest)]


class unified_auth_handler ?(insecure=false)
                           (key_handler : #key_handler) : auth_handler =
  let dg = new digest_auth_handler key_handler in
object(self)
  method create_session ~secure call options =
    match dg # create_session ~secure call options with
      | Some s -> Some s
      | None ->
          if secure || insecure then
            let trans_id = call # private_api # transport_layer options in
            try Some(basic_auth_session false key_handler call trans_id false)
            with Not_applicable ->
	      None
          else
            None
  method create_proxy_session call options =
    match dg # create_proxy_session call options with
      | Some s -> Some s
      | None ->
          let trans_id = call # private_api # transport_layer options in
          try Some(basic_auth_session false key_handler call trans_id true)
          with Not_applicable ->
	    None
  method identify_session _ _  = None
  method identify_proxy_session _ _  = None
  method skip_challenge = false
  method skip_challenge_session _ _ = assert false
end

let cache_limit = 1000
  (* Currently, an arbitrary limit. FIXME *)


module AuthSessSet =
  Set.Make(struct type t = auth_session let compare = compare end)

class auth_cache =
object(self)
  val mutable auth_handlers = []
  val mutable sessions_by_id1 = Hashtbl.create 10
  val mutable sessions_by_id2 = Hashtbl.create 10
  val mutable sessions_by_dom1 = Hashtbl.create 10
  val mutable sessions_by_dom2 = Hashtbl.create 10
  val mutable sessions_by_dom1_adds = 0

  (* There are two mechanisms for picking up old sessions:

     - By authentication ID: This is used when the server includes an ID
       in the response to the initial, unauthenticated HTTP request, and the
       ID refers to an old session. So far no auth protocol uses this.
       This scheme is enabled when http_mechanism#client_match returns a
       session ID.
     - By URL prefix: By returning something for http_mechanism#auth_domain
       any following request for the same file tree may pick up the old
       session.
   *)

  method add_auth_handler (h : auth_handler) =
    auth_handlers <- auth_handlers @ [h]

  method private add_session_by_id sess trans_id =
    match sess#auth_session_id with
      | None -> ()
      | Some id ->
          let skey = (sess#auth_scheme, sess#auth_realm, id, trans_id) in
          Hashtbl.replace sessions_by_id1 skey sess;
          if Hashtbl.length sessions_by_id1 >= cache_limit then (
            sessions_by_id2 <- sessions_by_id1;
            sessions_by_id1 <- Hashtbl.create 19
          )

  method private find_session_by_id ((mech, realm, id, trans_id) as skey) =
    try
      Hashtbl.find sessions_by_id1 skey
    with
      | Not_found ->
          Hashtbl.find sessions_by_id2 skey


  method auth_response ~secure (call : http_call) options =
    (* Resume an existing session or create a new session, after a 401 *)
    let trans_id = call # private_api # transport_layer options in
    let create() =
      List.fold_left
        (fun acc_opt handler ->
           match acc_opt with
             | None ->
                 ( match handler # create_session ~secure call options with
                     | Some sess ->
                         self # add_session_by_id sess trans_id;
                         Some sess
                     | None ->
                         None
                 )
             | Some acc ->
                 acc_opt
        )
        None
        auth_handlers in
    let old_session_opt =
      List.fold_left
        (fun acc_opt handler ->
           match acc_opt with
             | None ->
                 handler # identify_session call options
             | Some acc ->
                 acc_opt
        )
        None
        auth_handlers in
    match old_session_opt with
      | Some skey ->
          ( try 
              Some(self # find_session_by_id skey)
            with Not_found ->
              create()
          )
      | None ->
          create()

  method add_successful_session (sess : auth_session) =
    (* Called by [postprocess_complete_message] when authentication was
     * successful. If enabled, [sess] can be used for authentication
     * in advance.
     *)
    let auth_domain, trans_id = sess#auth_domain in
    dlogr
      (fun () -> sprintf "add_successful_session domains=%s trans_id=%d"
                         (String.concat "," (List.map norm_neturl auth_domain))
                         trans_id
      );
    List.iter
      (fun dom_uri ->
         try
	   let dom_uri' = norm_neturl dom_uri in
           let aset =
             try Hashtbl.find sessions_by_dom1 (dom_uri', trans_id)
             with Not_found -> AuthSessSet.empty in
           let aset' =
             AuthSessSet.add sess aset in
	   Hashtbl.replace sessions_by_dom1 (dom_uri', trans_id) aset';
           sessions_by_dom1_adds <- sessions_by_dom1_adds + 1
	 with
	   | Neturl.Malformed_URL -> ()
      )
      auth_domain;
    if sessions_by_dom1_adds > cache_limit then (
      sessions_by_dom2 <- sessions_by_dom1;
      sessions_by_dom1 <- Hashtbl.create 19;
      sessions_by_dom1_adds <- 0
    )

  method remove_session (sess : auth_session) =
    (* Called by [postprocess_complete_message] when authentication 
     * failed, or when a session is in use for reauth
     *)
    let auth_domain, trans_id = sess#auth_domain in
    dlogr
      (fun () -> sprintf "remove_session domains=%s trans_id=%d"
                         (String.concat "," (List.map norm_neturl auth_domain))
                         trans_id
      );
    List.iter
      (fun dom_uri ->
	 try
	   let dom_uri' = norm_neturl dom_uri in
           List.iter
             (fun sessions_by_dom ->
                let aset = 
                  Hashtbl.find sessions_by_dom (dom_uri', trans_id) in
                let aset' = AuthSessSet.remove sess aset in
                if aset' = AuthSessSet.empty then
                  Hashtbl.remove sessions_by_dom (dom_uri', trans_id)
                else
                  Hashtbl.replace sessions_by_dom (dom_uri', trans_id) aset'
             )
             [ sessions_by_dom1; sessions_by_dom2 ]
	 with
	   | Neturl.Malformed_URL -> ()
           | Not_found -> ()
      )
      auth_domain;


  method find_session_for_reauth (call : http_call) options =
    (* Find a session suitable for reauthentication. We haven't seen a 401
       yet and need to prepare [call].
     *)
    let trans_id = call # private_api # transport_layer options in
    let uri = call # private_api # request_uri_with() in
    dlogr (fun () -> sprintf "find_session_for_reauth: uri=%S trans_id=%d" 
                             (norm_neturl uri) trans_id);
    (* We are not only looking for [uri], but also for all prefixes of [uri] *)
    try
      let prefixes = prefixes_of_neturl uri in
      let string_prefixes =
        List.flatten
          (List.map
             (fun p -> try [norm_neturl p] with _ -> [])
             prefixes
          ) in
      dlogr (fun () -> sprintf "find_session_for_reauth: checking prefixes: %s"
                               (String.concat "," string_prefixes));
      try
        let string_prefix =
	  List.find (* or Not_found *)
	    (fun string_prefix ->
	       Hashtbl.mem sessions_by_dom1 (string_prefix, trans_id) ||
	         Hashtbl.mem sessions_by_dom2 (string_prefix, trans_id)
	    )
	    string_prefixes in
        let aset =
          try Hashtbl.find sessions_by_dom1 (string_prefix, trans_id)
          with Not_found ->
            Hashtbl.find sessions_by_dom2 (string_prefix, trans_id) in
        dlogr (fun () -> sprintf "find_session_for_reauth: found prefix: %s"
                                 string_prefix);
        AuthSessSet.min_elt aset
      with
        | Not_found ->
            dlog "find_session_for_reauth: no session found";
            (* Also try skip_challenge authentication *)
            let h =
              List.find (* or Not_found *)
                (fun handler ->
                   handler#skip_challenge
                )
                auth_handlers in
            dlog "find_session_for_reauth: trying skip_challenge";
            ( match
                h # skip_challenge_session call options
              with
                | Some sess -> 
                    dlog "find_session_for_reauth: successful skip_challenge";
                    sess
                | None -> raise Not_found
            )
    with
      | Neturl.Malformed_URL ->
	  raise Not_found
end


(*
let password = "XXX";;
let proxy_password = "XXX";;

#use "topfind";;
#require "netclient,nettls-gnutls";;
open Nethttp_client;;
Debug.enable := true;;
let ring = new key_ring();;
ring # add_key (key ~user:"gerd" ~realm:"Private @ gps.dynxs.de" ~password ~domain:[]);;
let p = new pipeline;;

p # set_proxy "voip.camlcity.org" 3128;;
p # set_proxy_auth "gerd" proxy_password;;

let c = new get "https://gps.dynxs.de/private/";;

let h = new unified_auth_handler ring;;
p # add_auth_handler h;;
p # add c;;
p # run();;

let gh = new generic_auth_handler ring [ (module Netmech_digest_http.Digest) ];;
let gh = new generic_auth_handler ring [ (module Netmech_digest_http.Digest_mutual) ];;

p # add_auth_handler gh;;
p # add c;;
p # run();;
 *)



(**********************************************************************)
(***                 THE CONNECTION CACHE                           ***)
(**********************************************************************)

type connection_cache = Nethttp_client_conncache.connection_cache

let close_connection_cache conn_cache =
  conn_cache # close_all()

let create_restrictive_cache() = new restrictive_cache()

let create_aggressive_cache() = new aggressive_cache()


(**********************************************************************)
(***                       THE I/O BUFFER                           ***)
(**********************************************************************)

(* io_buffer performs the socket I/O.
 *
 * TODO: COMMENT OUT OF DATE
 * The input side is a queue of octets which can be filled by
 * a Unix.read call at its end, and from which octets can be removed
 * at its beginning ("consuming octets").
 * There is also functionality to remove 1XX responses at the beginning
 * of the buffer, and to interpret the beginning of the buffer as HTTP
 * status line.
 * The idea of the buffer is that octets can be added at the end of the
 * buffer while the beginning of the buffer is interpreted as the beginning
 * of the next message. Once enough octets have been added that the message
 * is complete, it is removed (consumed) from the buffer, and the possibly
 * remaining octets are the beginning of the following message.
 *)

exception Garbage_received of string
  (* This exception is raised by [parse_response] when a protocol error
   * occurred before the response status line has been completely received.
   * Such errors are not transferred to the http_call.
   *)


let status_re = 
  Netstring_str.regexp "^\\([^ \t]+\\)\
                        [ \t]+\
                        \\([0-9][0-9][0-9]\\)\
                        \\([ \t]+\\([^\r\n]*\\)\\)?\
                        \r?$" ;;

let chunk_re = 
  Netstring_str.regexp "[ \t]*\
                        \\([0-9a-fA-F]+\\)\
                        [ \t]*\
                        \\(;[^\r\n\000]*\\)?\
                        \r?" ;;

type sockstate =
    Down
  | Up_rw 
  | Up_r
;;


type send_token =
  | Send_header of int * string * string * Netmime.mime_header_ro
      (* call_id, method, url, header *)
  | Send_body of Uq_io.in_device * int64 option
      (* data, length_opt *)
  | Send_body_chunked of Uq_io.in_device
  | Send_eof


let max_line_len = 65536

let input_line_opt_e ?max_len dev =
  Uq_io.input_line_e ?max_len dev >> Uq_io.eof_as_none >>
    (fun st ->
       match st with
	 | `Error Uq_io.Line_too_long ->
	     raise(Garbage_received "Line too long")
	 | _ -> st
    )

let input_opt_e dev s p n =
  Uq_io.input_e dev s p n >> Uq_io.eof_as_none


class type io_buffer =
object
  method socket_state : sockstate
  method socket : Unix.file_descr
  method socket_str : string
  method close : followup:(unit->unit) -> unit -> unit
  method down : unit -> unit
  method status_seen : bool
  method configure_read : fetch_call:(unit -> http_call) -> unit -> unit
  method read_e : Unixqueue.event_system -> http_call option Uq_engines.engine
  method write_activity : bool
  method add : send_token -> unit
  method write_e : Unixqueue.event_system -> unit Uq_engines.engine
end


let io_buffer options fd mplex fd_state : io_buffer =
  let dev = `Multiplex mplex in
  let buf_in_dev = `Buffer_in(Uq_io.create_in_buffer dev) in

  ( object (self)

    (****************************** SOCKET ********************************)
      
      val mutable socket_state = fd_state
      val mutable status_seen = false

      method socket_state = socket_state

      method socket = 
	match socket_state with
	  | Down -> failwith "Socket is down"
	  | _ -> fd
	      
      method socket_str =
	try
	  Int64.to_string (Netsys.int64_of_file_descr self # socket)
	with _ -> "n/a"
	  

      method close ~followup () =
	match socket_state with
	  | Down  -> followup ()
	  | _     -> 
	      let fd_str = self # socket_str in
	      if !options.verbose_connection then 
		dlogr (fun () ->
			 sprintf "FD %s - HTTP connection: Closing socket!"
			   fd_str);
	      mplex # cancel_reading();
	      mplex # cancel_writing();
	      mplex # start_shutting_down
		~when_done:(fun x_opt ->
			      Netlog.Debug.release_fd fd;
			      mplex # inactivate();
			      match x_opt with
				| None -> 
				    followup ()
				| Some err ->
				    if !options.verbose_connection then
				      dlog(sprintf
					     "FD %s - Shutdown error: %s"
					     fd_str (Netexn.to_string err));
				    followup()
			   )
		();
	      socket_state <- Down

      method down() =
	socket_state <- Down  (* our view *)

    (****************************** INPUT ********************************)

    val mutable cfg_fetch_call = (fun () -> raise Not_found)


    method status_seen =
      (* Whether at least one status line (incl. status 1XX) has been seen *)
      status_seen

    method configure_read ~fetch_call () =
      (* fetch_call: This is called when the status line is read to get
	 the corresponding call. This function may raise [Not_found] in
	 which case the status line is an error
       *)
      cfg_fetch_call <- fetch_call


    method read_e esys =
      (* This engine returns the next call read from the input, or
	 returns None for EOF.
       *)
      assert (socket_state = Up_r || socket_state = Up_rw);

      let cur_call = ref None in
      (* remember the call - only for postprocessing on error *)
      
      let rec read_status_line_e call_opt =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (fun line_opt ->
	      match line_opt with
		| None ->
		    eps_e (`Done None) esys
		| Some line ->
	            if !options.verbose_status then
	              dlogr
	                (fun () ->
	                   sprintf "FD %Ld - Status: %S"
                                   (Netsys.int64_of_file_descr fd)
                                   line
                        );
		    ( match Netstring_str.string_match status_re line 0 with
			| None ->
			    raise (Garbage_received "Bad status line")
			| Some m ->
			    let proto = Netstring_str.matched_group m 1 line in
			    let code_str = Netstring_str.matched_group m 2 line in
			    let code = int_of_string code_str in
			    let text =
			      try Netstring_str.matched_group m 4 line
			      with Not_found -> "" in
			    if code < 100 || code > 599 then 
			      raise (Garbage_received "Bad status code");
			    status_seen <- true (* code >= 200 *);
			    let call_opt = 
			      if call_opt = None then (
				let call_opt' = 
				  try Some(cfg_fetch_call())
				  with Not_found -> None in
				cur_call := call_opt';
				call_opt'
			      )
			      else
				call_opt in
			    ( match call_opt with
				| None ->
				    raise(Garbage_received "Spontaneous data")
				| Some call ->
(* eprintf "code=%d text=%s proto=%s\n%!" code text proto; *)
				    if code >= 200 then
				      call # private_api # set_response_status
					code text proto;
				    let hdr_buf = Buffer.create 500 in
				    read_header_e code call hdr_buf
			    )
		    )
	   )

      and read_header_e code call hdr_buf =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (fun line_opt ->
	      match line_opt with
		| None ->
		    let msg = "EOF where response header expected" in
		    raise (Garbage_received msg)
		| Some line ->
		    Buffer.add_string hdr_buf line;
		    Buffer.add_string hdr_buf "\n";
		    if line = "" || line = "\r" then (
		      parse_header_e code call (Buffer.contents hdr_buf)
		    )
		    else (
		      if Buffer.length hdr_buf > 100000 then (
			let msg ="Response header too long" in
			raise (Garbage_received msg)
		      );
		      read_header_e code call hdr_buf
		    )
	   )
	
      and parse_header_e code call hdr_str =
	let header_l, real_end_pos =
	  try
	    Netmime_string.scan_header
	      ~downcase:false ~unfold:true ~strip:true hdr_str
	      ~start_pos:0 ~end_pos:(String.length hdr_str)
	  with
	    | Failure _ ->
		let msg = "Bad response header" in
		raise (Garbage_received msg)
	in
	assert(real_end_pos = String.length hdr_str);
	let header = new Netmime.basic_mime_header header_l in
	if !options.verbose_response_header then
	  dump_header "HTTP response " header#fields;
        call # private_api # set_continue (code < 200);
	(* Calling set_continue may trigger that the send loop
	   in [transmitter] resumes its send task. We need to do it
	   whatever code we see here - not only for code=100.
	 *)
	if code < 200 then (
	  if !options.verbose_events then
	    dlogr (fun () ->
		     sprintf "FD %s - HTTP events: Got 100 Continue line"
		       self#socket_str
		  );
	  read_status_line_e None
	)
	else (
	  call # private_api # set_response_header header;
	  read_body_e code header call
	)

      and read_body_e code header call =
	(* First determine whether a body is expected: 
	   - Normally, the call has either always a body (like GET), or
	     it does not (like HEAD). Exceptions are only the codes 204
	     and 304
	   - The CONNECT method is broken in this respect. If a 200
	     response is emitted, the response body is missing. Any
	     error response includes a body, though.
	 *)
	let have_body =
	  call # has_resp_body && code <> 204 && code <> 304 &&
	    (call # request_method <> "CONNECT" || code >= 300) in
	if have_body then (
	  let out_dev = call # private_api # response_body_open_wr esys in
	  (* Check if we have chunked encoding: *)
	  let is_chunked =
	    try header # field "Transfer-encoding" <> "identity"
	    with Not_found -> false in
	  if is_chunked then
	    read_chunked_body_e code header out_dev call
	  else (
	    let length_opt =
	      try 
		let l = Int64.of_string (header # field "Content-Length") in
		if l < 0L then raise (Bad_message "Bad Content-Length field");
		Some l
	      with
		| Failure _ -> raise (Bad_message "Bad Content-Length field")
		| Not_found -> None in
	    read_plain_body_e code header out_dev length_opt call
	  )
	) else (
	  read_end_e call
        )
	
      and read_plain_body_e code header out_dev length_opt call =
	(* Parses a non-chunked HTTP body. If length_opt=None, the message
	 * is terminated by EOF. If length_opt=Some len, the message has
	 * this length.
	 *)
	( Uq_io.copy_e
	    ?len64:length_opt
	    buf_in_dev
	    out_dev
	  >> (function
		| `Done n ->
		    ( match length_opt with
			| None -> ()
			| Some l ->
			    if n <> l then
			      raise(Bad_message "EOF in response message")
		    );
	            if !options.verbose_response_contents then
	              dlogr
	                (fun () ->
	                 sprintf "FD %Ld - HTTP response body %s"
                           (Netsys.int64_of_file_descr fd)
		           (match length_opt with
		              | None -> "(unknown length)"
		              | Some n -> sprintf "(%Ld bytes)" n
		           )
	                );
		    `Done n
		| st -> st
	     )   (* out_dev is closed in read_end_e *)
	)
	++ (fun n -> read_end_e call)

      and read_chunked_body_e code header out_dev call =
	(* Parses a chunked HTTP body *)
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (function
	      | Some line ->
	            if !options.verbose_response_contents then
	              dlogr
	                (fun () ->
	                   sprintf "FD %Ld - HTTP response chunk size: %S"
                                   (Netsys.int64_of_file_descr fd)
                                   line
                        );
		  ( match Netstring_str.string_match chunk_re line 0 with
		      | None ->
			  raise
			    (Bad_message "Cannot parse chunk of response body");
		      | Some m ->
			  let hex_len = Netstring_str.matched_group m 1 line in
			  let len =
			    try Int64.of_string ("0x" ^ hex_len)
			    with Failure _ -> 
			      raise (Bad_message "Chunk too large") in
			  if len = 0L then
			    let trl_buf = Buffer.create 100 in
			    read_trailer_e code header out_dev call trl_buf
			  else
			    read_chunk_data_e code header out_dev len call
		  )
	      | None ->
		  raise (Bad_message "EOF where next response chunk expected")
	   )

      and read_chunk_data_e code header out_dev len call =
	(* Parses the chunk data following the chunk size field *)
	Uq_io.copy_e
	  ?len64:(Some len)
	  buf_in_dev
	  out_dev
	++ (fun n ->
	      if n <> len then
		raise(Bad_message "EOF in response message");
	      read_chunk_end_e code header out_dev call
	   )

      and read_chunk_end_e code header out_dev call =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (function
	      | Some line ->
	           if !options.verbose_response_contents then
	             dlogr
	               (fun () ->
	                   sprintf "FD %Ld - HTTP response chunk end: %S"
                                   (Netsys.int64_of_file_descr fd)
                                   line
                       );
		  if line = "" || line = "\r" then
		    read_chunked_body_e code header out_dev call
		  else
		    raise (Bad_message "CR/LF after response chunk is missing")
	      | None ->
		  raise (Bad_message "EOF where next response chunk expected")
	   )

      and read_trailer_e code header out_dev call trl_buf =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (fun line_opt ->
	      match line_opt with
		| None ->
		    let msg = "EOF where response trailer expected" in
		    raise (Bad_message msg)
		| Some line ->
	            if !options.verbose_response_contents then
	              dlogr
	                (fun () ->
	                   sprintf "FD %Ld - HTTP response trailer: %S"
                                   (Netsys.int64_of_file_descr fd)
                                   line
                        );
		    Buffer.add_string trl_buf line;
		    Buffer.add_string trl_buf "\n";
		    if line = "" || line = "\r" then (
		      parse_trailer_e
			code header out_dev call (Buffer.contents trl_buf)
		    )
		    else (
		      if Buffer.length trl_buf > 10000 then (
			let msg ="Response trailer too long" in
			raise (Bad_message msg)
		      );
		      read_trailer_e code header out_dev call trl_buf
		    )
	   )
	
      and parse_trailer_e code header out_dev call trl_str =
	let trailer_l, real_end_pos =
	  try
	    Netmime_string.scan_header
	      ~downcase:false ~unfold:true ~strip:true trl_str
	      ~start_pos:0 ~end_pos:(String.length trl_str)
	  with
	    | Failure _ ->
		let msg = "Bad response trailer" in
		raise (Bad_message msg)
	in
	assert(real_end_pos = String.length trl_str);
	let new_header =
	  new Netmime.basic_mime_header (header#fields @ trailer_l) in
	call # private_api # set_response_header new_header;
	if !options.verbose_response_header then
	  dump_header "HTTP response+trailer " new_header#fields;
	(* out_dev is closed in read_end_e *)
	read_end_e call
	  
      and read_end_e call =
	call # private_api # finish_response_e esys  (* will close out_dev *)
	++ (fun () ->
	      eps_e (`Done (Some call)) esys
	   )
      in

      read_status_line_e None
      >> (fun st ->
	    let propagate_garbage = !cur_call <> None in
	    let maybe_cleanup() =
	      match !cur_call with
		| None -> ()
		| Some call -> 
		    call # private_api # cleanup();
		    cur_call := None in
	    match st with
	      | `Error (Garbage_received msg) when propagate_garbage ->
		  maybe_cleanup();
		  `Error (Bad_message msg)
		    (* If we can associate a Garbage_received error with
		       a certain call change the exception to Bad_message.
		       The background is that Garbage_received is not
		       attributed to a specific message by the higher layers.
		     *)
	      | `Error e ->
		  maybe_cleanup();
		  `Error e
	      | _ ->
		  st
	 )

    (****************************** OUTPUT ********************************)

    val send_queue = Q.create()
    val send_buf = String.create 4096

    val mutable sending = false


    method write_activity =
      sending || not(Q.is_empty send_queue)


    method add x =
      Q.add x send_queue


    method write_e esys =
      (* Writes all contents of send_queue *)
      (* It is the task of the caller to close the request body, by
	 calling finish_request_e at the right moment
       *)
      
      let rec write_next_e() =
	if Q.is_empty send_queue then
	  eps_e (`Done()) esys
	else (
	  let token = Q.take send_queue in
	  ( match token with
	      | Send_header (call_id, meth, url, hdr) ->
		  write_header_e call_id meth url hdr
	      | Send_body (dev,length_opt) ->
		  write_body_e dev length_opt
	      | Send_body_chunked dev ->
		  write_body_chunked_e dev
	      | Send_eof ->
		  write_eof_e ()
	  ) 
	  ++ write_next_e
	)

      and write_header_e call_id meth url hdr =
	let buf = B.create 1000 in
	B.add_string buf meth;
	B.add_string buf " ";
	B.add_string buf url;
	B.add_string buf " HTTP/1.1";
	
	if !options.verbose_status then
	  dlogr
	    (fun () ->
	       sprintf "FD %s - Call %d - HTTP request: %s"
		 self#socket_str call_id (B.contents buf));
	
	B.add_string buf "\r\n";
	let ch = new Netchannels.output_netbuffer buf in
	Netmime_string.write_header ch hdr#fields;
	ch # close_out();
	      
	if !options.verbose_request_header then
	  dump_header "HTTP request " hdr#fields;
	      
	Uq_io.output_netbuffer_e dev buf

      and write_body_e d expected_length_opt =
	if !options.verbose_request_contents then
	  dlogr
	    (fun () ->
	       sprintf "FD %Ld - HTTP request body fragment %s"
		 (Netsys.int64_of_file_descr fd)
		 (match expected_length_opt with
		    | None -> "(unknown length)"
		    | Some n -> sprintf "(%Ld bytes)" n
		 )
	    );

	(* N.B. We do not close [d] here - task of caller *)
	Uq_io.copy_e ?len64:expected_length_opt d dev
	>> (function
	      | `Done n ->
		  ( match expected_length_opt with
		      | None -> ()
		      | Some exp_n ->
			  if n <> exp_n then
			    failwith "Nethttp_client: announced length of \
                                      request body does not match actual length"
		  );
		  `Done ()
	      | `Error e -> `Error e
	      | `Aborted -> `Aborted
	   )
	  
      and write_body_chunked_e d =
	write_body_next_chunk_e d ()

      and write_body_next_chunk_e d () =
	input_opt_e d (`String send_buf) 0 (String.length send_buf)
	++ (function
	      | Some n ->
		  if !options.verbose_request_contents then
		    dlogr
		      (fun () ->
			 sprintf "FD %Ld - HTTP request body chunk (%d bytes)"
			   (Netsys.int64_of_file_descr fd) n
		      );
		  let s = sprintf "%x\r\n" n in
		  Uq_io.output_string_e dev s
		  ++ (fun () -> 
			Uq_io.really_output_e dev (`String send_buf) 0 n)
		  ++ (fun () -> Uq_io.output_string_e dev "\r\n")
		  ++ write_body_next_chunk_e d
	      | None ->
		  if !options.verbose_request_contents then
		    dlogr
		      (fun () ->
			 sprintf "FD %Ld - HTTP request body chunk (last)"
			   (Netsys.int64_of_file_descr fd)
		      );
		  let s = "0\r\n\r\n" in
		  Uq_io.output_string_e dev s
		    (* N.B. We do not close [d] here - task of caller *)
	   )

      and write_eof_e () =
	if !options.verbose_request_contents then
	  dlogr (fun () ->
		   sprintf "FD %Ld - HTTP request EOF" 
		     (Netsys.int64_of_file_descr fd)
		);
	Uq_io.write_eof_e dev
	++ (fun flag ->
	      if not flag then
		failwith "Nethttp_client: \
                          no support for closing the write side only";
	      (* It is better to set the new state after a successful
		 write_eof operation than before. It is possible that the whole
		 chain of write operations is aborted. In this case
		 an EOF also needs to be written. If we abort between write_eof
		 and the following assignment, the EOF will just be
		 written twice (which is unproblematic). Otherwise, the EOF
		 could be forgotten, and the client would hang.
	       *)
	      if socket_state = Up_rw then
		socket_state <- Up_r;
	      eps_e (`Done()) esys
	   )
      in

      (* If the socket is already closed for writing, do nothing. This
	 can happen if the read side decides to close the socket.
       *)
      if socket_state = Up_rw then (
        sending <- true;
	write_next_e()
	>> (fun st -> sending <- false; st)
      )
      else 
	eps_e (`Done ()) esys

    end
  )
;;

(**********************************************************************)
(***                PROTOCOL STATE OF A SINGLE MESSAGE              ***)
(**********************************************************************)

(* The class 'transmitter' is a container for the message and the
 * associated protocol state, and defines the methods to send
 * the request, and to receive the reply.
 * The protocol state stored here mainly controls the send and receive
 * buffers (e.g. how many octets have already been sent? Are the octets
 * received so far already a complete message or not?)
 *)


type message_state =
    Unprocessed     (* Not even a byte sent or received *)
  | Sending_hdr     (* Sending currently the header *)
  | Handshake       (* The header has been sent, now waiting for status 100 *)
  | Sending_body    (* The header has been sent, now sending the body *)
  | Finishing       (* The body is almost sent - about to finish *)
  | Sent_request    (* The body has been sent; the reply is being received *)
  | Complete        (* The whole reply has been received *)
;;

(* The states Unprocessed...Sent_request are only set by the request
   sender. We enter Complete only if we can receive the corresponding
   response.

   In the case that the response arrives while we are still sending,
   the processing of the response needs to be delayed until the sending
   is done. What can happen then:
   - We get the status while waiting for "100". This is handled in send_e,
     and the status is set to Finishing.
   - We get the status line later while still sending. This is detected
     by the reader, and send_interrupt is called.
   - If we get the status line when already in state Finishing, the 
     request is not aborted, and the remaining bytes are sent.

   Errors are not reflected in the state.
 *)


let string_of_state =
  function
    | Unprocessed  -> "Unprocessed"
    | Sending_hdr  -> "Sending_hdr"
    | Handshake    -> "Handshake"
    | Sending_body -> "Sending_body"
    | Finishing    -> "Finishing"
    | Sent_request -> "Sent_request"
    | Complete     -> "Complete"



let test_conn_close hdr =
  let conn_list = 
    try Nethttp.Header.get_connection hdr
    with _ (* incl. syntax error *) -> [] in
  List.mem "close" (List.map String.lowercase conn_list)


let test_conn_keep_alive hdr =
  let conn_list = 
    try Nethttp.Header.get_connection hdr
    with _ (* incl. syntax error *) -> [] in
  List.mem "keep-alive" (List.map String.lowercase conn_list)


let test_proxy_conn_close hdr =
  let conn_list = 
    try 
      List.map String.lowercase
	(hdr # multiple_field "proxy-connection")
    with _ (* incl. syntax error *) -> [] in
  List.mem "close" (List.map String.lowercase conn_list)


let test_proxy_conn_keep_alive hdr =
  let conn_list = 
    try 
      List.map String.lowercase
	(hdr # multiple_field "proxy-connection")
    with _ (* incl. syntax error *) -> [] in
  List.mem "keep-alive" (List.map String.lowercase conn_list)


let test_http_1_1 proto_str =
  try
    let proto = Nethttp.protocol_of_string proto_str in
    match proto with
      | `Http((1,n),_) when n >= 1 ->  
	  (* HTTP/1.1 <= proto < HTTP/2.0 *)
	  true
      | _ ->
	  false
  with _ -> false


let transmitter
  peer_is_proxy
  proxy_auth_state
  default_port
  (m : http_call) 
  (f_done : http_call -> unit)
  options
  =
  ( object (self) 
      val mutable state = Unprocessed
      val indicate_done = f_done
      val msg = m
	
(*
      val mutable auth_headers = []
	(* Additional header for _proxy_ authentication *)
 *)

      val mutable send_e_opt = None
      val mutable send_finish_e_opt = None
      val mutable send_interrupted = false
	
      method state = state
	
      method f_done = indicate_done

      method send_interrupted = send_interrupted

      method init conn_id =
	(* Prepare for (re)transmission:
	 * - Set the `Effective request header
	 * - Reset the status info of the http_call
	 * - Initialize transmission state
	 *)
	if !options.verbose_status then
	  dlogr (fun () -> sprintf "Call %d: initialize transmitter" 
		   (Oo.id msg));
	msg # private_api # prepare_transmission();
        msg # private_api # set_conn_id conn_id;
	(* Set the effective URI. This must happen before authentication. *)
	let eff_uri =
	  if peer_is_proxy then
	    msg # request_uri
	  else
	    let path = msg # get_path() in
	    if path = "" then (
	      msg # empty_path_replacement
	    )
	    else path
	in
	msg # private_api # set_effective_request_uri eff_uri;
	let cah = 
	  match msg # private_api # auth_state with
	    | `None -> []
	    | `In_advance session -> 
		( match session # authenticate msg true with
                    | `Continue headers -> headers
                    | `Continue_reroute(headers,_) -> headers
                    | `OK -> []
                    | `Auth_error | `None | `Reroute _ -> []
                        (* Cannot deal with it here *)
                )
	    | `In_reply(_,headers)
	    | `In_reply_reroute(_,headers,_) ->
                headers
            | `OK ->
                []
            | `Auth_error ->
                (* retry *)
                msg # private_api # set_auth_state `None;
                []
            | `Resubmit _ -> assert false in
	let pah =
	  match !proxy_auth_state with
	    | `None -> []
	    | `In_advance session ->
		( match session # authenticate msg true with
                    | `Continue headers -> headers
                    | `Continue_reroute(headers,_) -> headers
                    | `OK -> []
                    | `Auth_error | `None | `Reroute _ -> []
                         (* Cannot deal with it here *)
                )
	    | `In_reply(_,headers)
	    | `In_reply_reroute(_,headers,_) ->
                headers
            | `OK ->
                []
            | `Auth_error ->
                (* retry *)
                proxy_auth_state := `None;
                []
            | `Resubmit _ -> assert false in
	let rh = msg # request_header `Effective in
	List.iter
	  (fun (n,v) ->
	     rh # update_field n v
	  )
	  (cah @ pah);
	if peer_is_proxy then
	  rh # update_field "Proxy-Connection" "keep-alive";
	state <- Unprocessed;
	if not (msg # has_req_body) then (
	  (* Remove certain headers *)
	  rh # delete_field "Expect";
	);
	send_finish_e_opt <- None

      method cleanup() =
	(* release resources *)
	msg # private_api # cleanup()

(*
      method add_auth_header n v =
	auth_headers <- (n,v) :: auth_headers
 *)	
  
      method error_if_unserved error =
	msg # private_api # error_if_unserved !options.verbose_status error

      method send_e (io : io_buffer) 
                    (handshake_cfg : float option)
                    (close_flag : bool)
		    esys =
	(* handshake_cfg: if [Some t], it is waited after the header for the
	   "100 Continue" handshake. [t] is the timeout. The [Expect]
	   header is already set (by the user).
	   close_flag: if true, the "connection:close" header is set
	 *)

	assert (state = Unprocessed);
	assert (not(io # write_activity));
	assert (send_finish_e_opt = None);

	let rec send_header_e() =
	  let rh = msg # request_header `Effective in
	  let host = msg # get_host() in
	  let port = msg # get_port() in
          let include_port =
            match default_port with
              | None -> true
              | Some p -> p <> port in
	  let host_str = host ^ (if include_port then 
                                   ":" ^ string_of_int port
                                 else "") in
	  rh # update_field "Host" host_str;
	  
	  if close_flag then
	    rh # update_field "Connection" "close";

	  io # add (Send_header(self#call_id, 
				msg#request_method,
				msg#effective_request_uri,
				(rh :> Netmime.mime_header_ro)));
	  state <- (if handshake_cfg <> None then Handshake else Sending_hdr);
	  
	  io # write_e esys
	  ++ (match handshake_cfg with
		| Some t when not msg # private_api # continue ->
		    send_handshake_e t
		| _ -> 
		    send_body_e
	     )

	and send_handshake_e tmo () =
	  if !options.verbose_events then
	    dlogr (fun () ->
		     sprintf "Call %d - HTTP events: Waiting for 100 Continue"
		       (Oo.id msg)
		  );
	  msg # private_api # wait_continue_e tmo esys
	  ++ (fun code_100 ->
		if code_100 then
		  send_body_e()
		else
		  (* We got a non-100 response after waiting for 100.
		     We suppress the request body, and instead close the
		     connection on the sender side.
		   *)
		  send_no_body_e()
	     )
	    
	and send_body_e () : unit Uq_engines.engine =
	  if msg # has_req_body then (	  
	    state <- Sending_body;
	    let rh = msg # request_header `Effective in
	    let is_chunked =
	      try rh # field "Transfer-encoding" <> "identity"
	      with Not_found -> false in
	    let d = msg # private_api # request_body_open_rd esys in
	    let tok, need_eof =
	      if is_chunked then
		Send_body_chunked d, false
	      else (
		let length_opt =
		  try 
		    let l = 
		      try Int64.of_string (rh # field "Content-Length") 
		      with
			| Failure _ -> 
			    failwith "Nethttp_client: Bad Content-Length field \
                                      in request" in
		    if l < 0L then
		      failwith "Nethttp_client: Bad Content-Length field in request";
		    Some l
		  with Not_found -> None in
		Send_body(d, length_opt), length_opt = None
	      ) in
	    io # add tok;
	    if need_eof then
	      io # add Send_eof;
	    io # write_e esys
	    ++ (fun () ->
		  state <- Sent_request;
		  msg # private_api # finish_request_e esys
	       )
	  )
	  else (
	    state <- Sent_request;
	    msg # private_api # finish_request_e esys
	  )

	and send_no_body_e () =
	  (* This is only used if we announced a body in the header, but
	     finally do not send it (because we got an error from the server).

             We need to close the connection in this case. Unconditionally,
             because we got out of sync (the server cannot know whether we've
             seen the error before we start sending the request body, so the
             server cannot know what to do).
	   *)
	  state <- Finishing;
	  io # add Send_eof;
	  io # write_e esys
	  ++ (fun () ->
	      state <- Sent_request;
	      eps_e (`Done ()) esys
	     )
	in

	let (fin_e, signal) = Uq_engines.signal_engine esys in
	send_finish_e_opt <- Some fin_e;

	let e = send_header_e() in
	send_e_opt <- Some e;

	let e' =
	  (* This engine handles send interrupts, and returns [true] if an
	     EOF needs to be written
	   *)
	  e >> (fun st ->
		  send_e_opt <- None;
		  match st with
		    | `Aborted when send_interrupted ->
			if !options.verbose_events then
			  dlogr
			    (fun () ->
			       sprintf "Call %d - HTTP event: Send interrupted" 
				 (Oo.id msg));
			`Done true
		    | `Aborted -> `Aborted
		    | `Error err -> `Error err
		    | `Done () -> `Done false
	       ) in
	let e'' =
	  (* This engine writes the EOF if needed *)
	  e' 
	  ++ (fun flag ->
		if flag then (
		  io # add Send_eof;
		  io # write_e esys
		)
		else eps_e (`Done ()) esys
	     ) in

	(* Finally catch errors and signal termination: *)
	e''
	>> (fun st ->
	      ( match st with
		  | `Error err ->
		      if !options.verbose_events then
			dlogr
			  (fun () ->
			     sprintf "Call %d - HTTP event: Send exception: %s" 
			       (Oo.id msg) (Netexn.to_string err)
			  );
		  | _ -> ()
	      );
	      signal st; st
	   )



      method send_interrupt() =
	(* Stop sending immediately. This method is called by the reader
	   when an error status is received while we are still writing
	   the request. The reaction is to close the write side of the
	   connection.

	   For chunked encoding, we could also finish the current chunk,
	   and send an empty chunk. However, this is complicated to
	   get done here, so we always use the close method.
	 *)
	match send_e_opt with
	  | None -> ()
	  | Some e ->
	      send_interrupted <- true;
	      e # abort()

	  
      method send_finish_e esys =
	(* Wait here until the request is completely sent (synchronization with
	   the sender)
	 *)
	match send_finish_e_opt with
	  | None ->
	      eps_e (`Done ()) esys
	  | Some e ->
	      e
	  
	  
      method receive_complete () =
	assert(state = Handshake || state = Sending_body || state = Finishing ||
	    state = Sent_request);
	(* Handshake: this is possible when we were waiting for a 100 response,
	   but got a non-100 instead. In this case, the request is not sent,
	   hence the state remains at Handshake.

	   Sending_body: this is possible when we get a server status while
	   sending the reqest, and stopping the request because of this.
	 *)
	state <- Complete


      method indicate_pipelining =
	(* Return 'true' iff the reply is HTTP/1.1 compliant and does not
	 * contain the 'connection: close' header.
	 *)
	let req_hdr = msg # request_header `Effective in
	let b0 = 
	  if peer_is_proxy then
	    not(test_proxy_conn_close req_hdr) &&
	      not(test_conn_close req_hdr)
	  else
	    not(test_conn_close req_hdr)  in
	b0 && (
	  let resp_header = msg # private_api # response_header in
	  let proto_str = msg # private_api # response_proto in
	  let b1 = 
	    if peer_is_proxy then
	      test_http_1_1 proto_str && not(test_proxy_conn_close resp_header) 
		&& not(test_conn_close resp_header) 
	    else
	      test_http_1_1 proto_str && not(test_conn_close resp_header) in
	  b1 && (
	    try
	      let server = resp_header # field "Server" in
	      not (List.exists 
		     (fun re -> 
			Netstring_str.string_match re server 0 <> None
		     ) 
		     pipeline_blacklist)
	    with
	      | Not_found -> true  (* Nothing known ... Assume the best! *)
	  )
	)
	  
      method indicate_sequential_persistency =
	(* Returns 'true' if persistency without pipelining
	 * is possible.
	 *)
	let b0 = 
	  not(test_conn_close (msg # request_header `Effective)) in
	b0 && (
	  let resp_header = msg # private_api # response_header in
	  let proto_str = msg # private_api # response_proto in
	  let is_http_11 = test_http_1_1 proto_str in
	  let normal_persistency =
	    not peer_is_proxy && 
	      (not (test_conn_close resp_header)) &&
	      (is_http_11 || test_conn_keep_alive resp_header) in
	  let proxy_persistency =
	    peer_is_proxy && 
	      (not (test_conn_close resp_header)) &&
	      (not (test_proxy_conn_close resp_header)) &&
	      (is_http_11 || test_proxy_conn_keep_alive resp_header) in
	  normal_persistency || proxy_persistency
	)
	  
      method message = msg
	
      method call_id = Oo.id msg
	
    end
  )
;;


let drive_postprocessing_e esys options m f_done =
  Uq_engines.delay_engine 0.0
    (fun () ->
       ( try
	   if !options.verbose_status then
	     dlogr (fun () -> 
		      sprintf "Call %d - postprocessing" (Oo.id m));
	   let () = f_done m in ()
	 with
	   | any ->
	       if !options.verbose_status then
		 dlogr (fun () -> 
			  sprintf "Call %d - Exception in postprocessing: %s"
			    (Oo.id m) (Netexn.to_string any));
	       let g = Unixqueue.new_group esys in
	       Unixqueue.once esys g 0.0 (fun () -> raise any);
       );
       eps_e (`Done ()) esys
    )
    esys


let drive_postprocessing_msg esys options m f_done =
  ignore(drive_postprocessing_e esys options m f_done)


(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***           THE PROTOCOL STATE OF THE CONNECTION                 ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

type peer =
    [ `Direct of string * int
    | `Direct_name of string * int
    | `Http_proxy of string * int
    | `Http_proxy_connect of (string * int) * (string * int)
    | `Socks5 of (string * int) * (string * int)
    ]

(* `Direct_name: like `Direct, but the host name must not be rwritten to an
    IP address when the connection is cached
 *)

let first_hop =
  function
    | `Direct (host,port) -> (host,port)
    | `Direct_name (host,port) -> (host,port)
    | `Http_proxy (host,port) -> (host,port)
    | `Http_proxy_connect ((host,port),_) -> (host,port)
    | `Socks5 ((host,port),_) -> (host,port)

let content_hop =
  function
    | `Direct (host,port) -> (host,port)
    | `Direct_name (host,port) -> (host,port)
    | `Http_proxy (host,port) -> (host,port)  (* This case does not work *)
    | `Http_proxy_connect (_,(host,port)) -> (host,port)
    | `Socks5 (_,(host,port)) -> (host,port)


let rewrite_first_hop s =
  function
    | `Direct (host,port) -> `Direct(s,port)
    | `Direct_name (host,port) -> `Direct_name(host,port)  (* no rewriting *)
    | `Http_proxy (host,port) -> `Http_proxy(s,port)
    | `Http_proxy_connect ((host1,port1),(host2,port2)) ->
	`Http_proxy_connect ((s,port1),(host2,port2))
    | `Socks5 ((host1,port1),(host2,port2)) ->
	`Socks5 ((s,port1),(host2,port2))


let string_of_peer peer =
  match peer with
    | `Direct (host,port) -> 
	sprintf "direct connection to %s:%d" host port
    | `Direct_name (host,port) -> 
	sprintf "direct connection to (name) %s:%d" host port
    | `Http_proxy (host,port) ->
	sprintf "proxy connection via %s:%d" host port
    | `Http_proxy_connect ((host1,port1),(host2,port2)) ->
	sprintf "proxy connection via %s:%d to %s:%d" host1 port1 host2 port2
    | `Socks5 ((host1,port1),(host2,port2)) ->
	sprintf "SOCKS connection via %s:%d to %s:%d" host1 port1 host2 port2


let proxy_connect_e esys fd fd_open host_url options msg proxy_auth_handler_opt
                    cur_proxy_session tcp_real_connect_e setup_e =
  (* Send the CONNECT line plus header, and wait for 200.

     The continuation of this engine is either setup_e() if successful,
     or tcp_real_connect_e() if another connection to the proxy needs to
     be opened.
   *)
  let mplex =
    Uq_multiplex.create_multiplex_controller_for_connected_socket
      ~supports_half_open_connection:true
      fd esys in
  (* N.B. No timeout here required because this activity is covered by the
     connect timeout
   *)
  let io = io_buffer options fd mplex Up_rw in
  let hdr = msg # request_header `Effective in
  hdr # update_field "Host" host_url;
  hdr # update_field "Proxy-Connection" "keep-alive";

  let rec request_e () =
    ( match !cur_proxy_session with   (* authentication *)
	| None -> ()
	| Some sess ->
	    ( match sess # authenticate msg false with
                | `Continue pah ->
	            List.iter
	              (fun (n,v) ->
		       hdr # update_field n v
	              )
	              pah
                | _ ->
                    ()
            )
    );
    io#add (Send_header(0, msg#request_method, msg#effective_request_uri, 
			( hdr :> Netmime.mime_header_ro )
		       ));
    io#configure_read ~fetch_call:(fun () -> msg) ();
    io#write_e esys
    ++ (fun () ->
	  io#read_e esys
	  ++ (function
		| None ->
		    failwith "EOF from proxy server"
		| Some m ->
		    assert(m = msg);
		    ( try
			if m#response_status_code = 407 && 
			  !cur_proxy_session = None &&
			  proxy_auth_handler_opt <> None
			then (
			  let ah =
			    match proxy_auth_handler_opt with
			      | None -> assert false
			      | Some ah -> ah in
			  let sess =
			    match ah # create_proxy_session msg options with
			      | None -> raise Not_found
			      | Some sess -> sess in
			  cur_proxy_session := Some sess;
			  (* It is now possible that the proxy closes the
			     connection. If so, we do this too, and reopen
			     another one. Otherwise, just go on.
			   *)
			  let rh = m#response_header in
			  let ps = m#response_protocol in
			  let close_flag =
			    test_conn_close rh || test_proxy_conn_close rh ||
			      (not (test_http_1_1 ps) && 
				 not (test_conn_keep_alive rh) &&
				 not (test_proxy_conn_keep_alive rh)) in
			  if close_flag then (
			    Unix.close fd;
			    fd_open := false;
			    tcp_real_connect_e()
			      (* The value of !cur_proxy_session is kept,
				 so we will use the authentication knowlege
				 we already gathered
			       *)
			  )
			  else
			    request_e ()
			)
			else 
			  raise Not_found
		      with Not_found ->
			match m#status with
			  | `Unserved ->
			      assert false
			  | `Http_protocol_error err ->
			      failwith ("Exception from proxy connection: " ^ 
					  Netexn.to_string err)
			  | `Redirection
			  | `Client_error
			  | `Server_error ->
			      raise(Proxy_error m#response_status_code)
			  | `Successful ->
			      setup_e()
		    )
	   )
       )
  in
  request_e ()
  >> (function
	| `Error (Garbage_received msg) ->
	    `Error (Bad_message msg)
	| st -> st
     )


class type tls_cache =
object
  method set : domain:string -> port:int -> trans:transport_layer_id -> 
               data:string -> unit
  method get : domain:string -> port:int -> trans:transport_layer_id -> string
  method clear : unit -> unit
end


let tcp_connect_e esys tp trans (peer:peer) conn_cache conn_owner tls_cache
                  options proxy_auth_handler_opt =
  (* An engine connecting to peer_host:peer_port. If a connection is still
     available in conn_cache, use this instead.

     Output is:
     - `Done(fd, conn_time): fd is the new connection (established in conn_time
       seconds)
   *)
  let timeout_value = !options.connection_timeout in
  
  let resolve_e, signal_res =
    Uq_engines.signal_engine esys in

  let t0 = Unix.gettimeofday() in

  let (hop1_host,hop1_port) = first_hop peer in

  !options.resolver
    esys
    hop1_host
    (function 
       | None ->
	   if !options.verbose_events then
	     dlog "HTTP events: reset after DNS failure";
	   let err = Name_resolution_error hop1_host in
	   signal_res (`Error err)
	     
       | Some addr ->
	   signal_res (`Done addr)
    );

  let descr = string_of_peer peer in

  let tmo_x =
    Timeout descr in

  let real_host, real_port = content_hop peer in

  resolve_e
  ++ (fun hop1_ip ->
	let hop1_host_ip =
	  Unix.string_of_inet_addr hop1_ip in
	let cache_peer =
	  rewrite_first_hop hop1_host_ip peer in
	try
	  let fd, idata =
            conn_cache # find_inactive_connection cache_peer trans in
	  (* Case: reuse old connection *)
	  conn_cache # set_connection_state fd cache_peer (`Active conn_owner);
	  if !options.verbose_events then
	    dlog (sprintf 
		    "FD %Ld - HTTP events: config input (reused fd) - target %s"
		    (Netsys.int64_of_file_descr fd) descr);
	  let mplex = 
	    tp # continue 
	      fd trans !options.connection_timeout tmo_x 
	      real_host real_port esys
              ( let open Nethttp_client_conncache in
                idata.tls_stashed_endpoint
              ) in
	  eps_e (`Done(fd, cache_peer, mplex, 0.0)) esys
	with
	  | Not_found ->
	      if !options.verbose_connection then
		dlog ("HTTP connection: creating " ^ 
			descr);
	      let proxy_opt = (* SOCKS5 *)
		match peer with
		  | `Socks5 _ ->
		      Some(new Uq_socks5.proxy_client 
			     (`Socket(`Sock_inet(Unix.SOCK_STREAM,
						 hop1_ip,
						 hop1_port),
				      Uq_client.default_connect_options)))
		  | _ -> None in
	      let sockspec =
		match peer with
		  | `Direct _
		  | `Direct_name _
		  | `Http_proxy _ 
		  | `Http_proxy_connect _ ->
		      `Sock_inet(Unix.SOCK_STREAM, hop1_ip, hop1_port)
		  | `Socks5 (_,(host2,port2)) ->
		      `Sock_inet_byname(Unix.SOCK_STREAM, host2, port2) in

	      let cur_proxy_session = ref None in
              let connect_msg = ref None in

	      let rec tcp_real_connect_e () =
		Uq_client.connect_e
		  ?proxy:proxy_opt
		  (`Socket(sockspec, Uq_client.default_connect_options))
		  esys 
		++ (function
		      | `Socket(fd,_) ->
			  let fd_open = ref true in
			  !options.configure_socket fd;
			  let setup_e() =
			    tp # setup_e
			      fd trans !options.connection_timeout tmo_x
			      real_host real_port esys tls_cache
			    >> (function
				  | `Done(mplex) -> 
                                      `Done(fd,mplex)
				  | `Error err -> `Error err
				  | `Aborted -> `Aborted
			       ) in
			  ( match peer with
			      | `Http_proxy_connect(_,(host2,port2)) ->
                                  let host_url = sprintf "%s:%d" host2 port2 in
                                  let conn_msg =
                                    match !connect_msg with
                                      | None ->
                                          let c = new connect host_url in
                                          connect_msg := Some c;
                                          c
                                      | Some c -> c in
				  proxy_connect_e
				    esys fd fd_open host_url options
                                    conn_msg
				    proxy_auth_handler_opt
				    cur_proxy_session
				    tcp_real_connect_e
				    setup_e
			      | _ ->
				  setup_e()
			  )
			  >> (fun st ->
				match st with
				  | `Error _ | `Aborted ->
				      if !fd_open then Unix.close fd;
				      fd_open := false;
				      st
				  | _ -> st
			     )
		      | _ -> assert false
		   ) in

	      let eng = tcp_real_connect_e() in

	      Uq_engines.timeout_engine 
		timeout_value
		(Timeout (sprintf
			    "creating %s" descr))
		eng
	      ++ (fun (fd,mplex) ->
		    let t1 = Unix.gettimeofday() in
		    let d = t1 -. t0 in

		    if !options.verbose_connection then
		      dlog (sprintf 
			      "FD %Ld - HTTP %s: Connected!"
			      (Netsys.int64_of_file_descr fd) descr);

		    Netlog.Debug.track_fd
		      ~owner:"Nethttp_client"
		      ~descr:(sprintf 
				"HTTP %s" descr)
		      fd;
		    (* The release_fd is in Nethttp_client_conncache! *)

		    conn_cache # set_connection_state
		      fd cache_peer (`Active conn_owner);
		    eps_e (`Done(fd, cache_peer, mplex, d)) esys
		 )
	      >> (function
		    | `Error err ->
			if !options.verbose_connection then (
			  dlog(sprintf 
				 "HTTP connection: Cannot create %s: \
                                  Exception %s"
				 descr (Netexn.to_string err))
			);
			`Error err
		    | st -> st
		 )
     )		

(**********************************************************************)

class type transport_channel_type =
object
  method identify_conn_by_name : bool
  method setup_e : Unix.file_descr -> transport_layer_id -> float -> exn ->
                   string -> int -> Unixqueue.event_system ->
                   tls_cache ->
                   Uq_engines.multiplex_controller Uq_engines.engine
  method continue : Unix.file_descr -> transport_layer_id -> float -> exn ->
                   string -> int -> Unixqueue.event_system ->
                   exn option ->
                   Uq_engines.multiplex_controller
  method default_port : int option
end

let simple_transport_channel_type default_port : transport_channel_type =
  ( object(self)
      method identify_conn_by_name = false

      method continue fd trans tmo tmo_x host port esys tls_data =
	Uq_multiplex.create_multiplex_controller_for_connected_socket
	  ~close_inactive_descr:true
	  ~supports_half_open_connection:true
	  ~timeout:( tmo, tmo_x )
	  fd esys
      method setup_e fd trans tmo tmo_x host port esys tls_cache =
	let mplex = self # continue fd trans tmo tmo_x host port esys None in
	eps_e (`Done mplex) esys
      method default_port = default_port
    end
  )

let http_transport_channel_type =
  simple_transport_channel_type (Some 80)

let proxy_transport_channel_type =
  simple_transport_channel_type None


let https_transport_channel_type config : transport_channel_type =
  ( object(self)
      method identify_conn_by_name = true

      method continue fd trans tmo tmo_x host port esys tls_data =
        let mplex1 =
	  Uq_multiplex.create_multiplex_controller_for_connected_socket
	    ~close_inactive_descr:true
	    ~supports_half_open_connection:true
	    ~timeout:( tmo, tmo_x )
	    fd esys in
        let mplex2 =
          match tls_data with
            | None ->
                 Uq_multiplex.tls_multiplex_controller 
                   ~role:`Client ~peer_name:(Some host) config mplex1
            | Some exn ->
                 Uq_multiplex.restore_tls_multiplex_controller
                   exn config mplex1 in
        mplex2
      method setup_e fd trans tmo tmo_x host port esys tls_cache =
        let on_handshake mplex2 =
          match mplex2#tls_session with
            | None -> ()
            | Some(id,data) ->
                 tls_cache # set ~domain:host ~port ~trans ~data in
        let mplex1 =
	  Uq_multiplex.create_multiplex_controller_for_connected_socket
	    ~close_inactive_descr:true
	    ~supports_half_open_connection:true
	    ~timeout:( tmo, tmo_x )
	    fd esys in
        let mplex2 =
          try
            let tls_data = tls_cache # get ~domain:host ~port ~trans in
            Uq_multiplex.tls_multiplex_controller 
              ~resume:tls_data ~on_handshake ~role:`Client 
              ~peer_name:(Some host) config mplex1
          with
            | Not_found ->
                 Uq_multiplex.tls_multiplex_controller 
                   ~role:`Client ~on_handshake ~peer_name:(Some host) 
                   config mplex1 in
	eps_e (`Done mplex2) esys
      method default_port = Some 443
    end
  )


let null_tls_cache() : tls_cache =
  object
    method set ~domain ~port ~trans ~data = ()
    method get ~domain ~port ~trans = raise Not_found
    method clear () = ()
  end


let unlim_tls_cache() : tls_cache =
  let ht = Hashtbl.create 7 in
  object
    method set ~domain ~port ~trans ~data =
      Hashtbl.replace ht (domain,port,trans) data
    method get ~domain ~port ~trans =
      Hashtbl.find ht (domain,port,trans)
    method clear () =
      Hashtbl.clear ht
  end


(**********************************************************************)

let fragile_pipeline 
       esys  trans
       peer cache_peer
       proxy_auth_state proxy_auth_handler_opt
       default_port
       fd mplex connect_time no_pipelining conn_cache
       auth_cache
       counters options =
  (* Implements a pipeline for an existing connection [fd]. This object
     does not implement any way of recovering after errors.
   *)
  (* NB. cache_peer contains IP addresses for the next hop *)

  let connection_e, signal_connection = Uq_engines.signal_engine esys in
    (* Indicates that the connection is closed *)
  let queue_e, signal_queue = Uq_engines.signal_engine esys in
    (* Indicates that all processing of queued messages is done *)
  let finished_e =
    Uq_engines.sync_engine connection_e queue_e 
    >> (function 
	  | `Done _ -> `Done() 
	  | `Error e -> `Error e
	  | `Aborted -> `Aborted
       ) in

  let fd_str =
    Int64.to_string (Netsys.int64_of_file_descr fd) in

  let peer_is_proxy =  (* whether we have a normal HTTP proxy *)
    match cache_peer with
      | `Http_proxy _ -> true
      | _ -> false in

  ( object(self)
      val mutable io = io_buffer options fd mplex Up_rw

      val mutable write_queue = Q.create()
      val mutable read_queue = Q.create()
	(* Invariant: write_queue is a suffix of read_queue *)

      val mutable unserved_queue = Q.create()
        (* whatever counts as unserved (for getting picked up by
           robust_pipeline) *)

      (* The following two variables control whether pipelining is enabled or
       * not. The problem is that it is unclear how old servers react if we
       * send to them several requests at once. The solution is that the first
       * "round" of requests and replies is done in a compatibility mode: After
       * the first request has been sent sending stops, and the client waits
       * until the reply is received. If the reply indicates HTTP/1.1 and does
       * not contain a "connection: close" header, all further requests and 
       * replies will be performed in pipelining mode, i.e. the requests are
       * sent independent of whether the replies of the previous requests have
       * been received or not.
       *
       * sending_first_message: 'true' means that the first request has not yet
       *    been completely sent to the server. 
       * done_first_message: 'true' means that the reply of the first request
       *    has been arrived.
       *)
      val mutable sending_first_message = true
      val mutable done_first_message = false
	
      (* 'inhibit_pipelining_byserver': becomes true if the server is able
       * to keep persistent connections but is not able to use pipelining.
       * (HTTP/1.0 "keep alive" connections)
       *)
      val mutable inhibit_pipelining_byserver = no_pipelining
 
(*
      (* Proxy authorization: If 'proxy_user' is non-empty, the variables
       * 'proxy_user' and 'proxy_password' are interpreted as user and
       * password for proxy authentication. More precisely, once the proxy
       * responds with status code 407, 'proxy_credentials_required' becomes
       * true, and all following requests will include the credentials identifying
       * the user (with the "basic" authentication method).
       * If the proxy responds again with code 407, this reaction will not
       * be handled again but will be visible to the outside.
       *)
      val mutable proxy_credentials_required = false
      val mutable proxy_cookie = ""
 *)

      (* Whether this connection never proves to exchange a message: *)
      val mutable total_failure = false

      (* any kind of socket error, unclean shutdown etc *)
      val mutable problem = false

      (* 'close_connection' indicates that a HTTP/1.0 response was received or 
       * that a response contained a 'connection: close' header.
       *)
      val mutable close_connection = false
      
      (* whether the [after_eof] cleanup has already been done (must only happen
	 once
       *)
      val mutable after_eof = false

      (* The vars govern whether there is an engine writing/reading *)
      val mutable drive_output_active = None
      val mutable drive_input_active = None

      
      method length =
	(* Returns the number of open requests (requests without response) *)
	Q.length read_queue
        + Q.length unserved_queue

      method active =
	(* Whether something is to be done *)
	Q.length read_queue > 0


      method problem = problem


      method iter_unserved_messages f =
	(* Call f with all unserved messages *)
	Queue.iter (fun trans -> f trans) unserved_queue;
	Queue.iter (fun trans -> f trans) read_queue;


      method is_total_failure = total_failure


      method no_pipelining = inhibit_pipelining_byserver


      method finished_e = finished_e
	(* This engine transitions to [`Done()] when the
	   connection is finished, and all messages are processed.

	   At this point, the file descriptor is closed or given back
	   to the cache. The read and write queues are unmodified, and
	   can be used to recover.

	   Errors are reported via the messages.
	 *)


      method add urgent trans =
	(* add: adds to the read_queue/write_queue. This must be possible
	   at any time - even after shutting down the socket and stopping
	   any event processing.
	 *)
	(* urgent: whether to insert the message m into the front-most place
	   (to be used after autorization)
	 *)

	if !options.verbose_connection then
	  dlogr (fun () -> 
		   sprintf "HTTP Connection: adding call %d"
		     (Oo.id trans#message));
	
        (* If close_connection, the connection will be closed asap. *)
        if close_connection then
          Q.add trans unserved_queue
        else (
	  (* Initialize [trans] for transmission: *)
	  trans # init (Oo.id self);

	  let n = Queue.length write_queue in
	  if urgent && n > 0 then (
	    (* Insert [trans] at the ealierst possible place *)
	    Q.add_at (n-1) trans write_queue;
	    Q.add_at (n-1) trans read_queue;
	  )
	  else (
	    Q.add trans write_queue;
	    Q.add trans read_queue;
	  )
        )

      method attach() =
	(* Enables event processing. To be called after [add] *)
	sending_first_message <- true;
	done_first_message <- false;
	close_connection <- false;

(* FIXME: reset other vars? *)

	io # configure_read 
	  ~fetch_call:self#drive_input_fetch ();
	self # drive_input();

	if !options.verbose_events then
	  dlog (sprintf 
		  "FD %Ld - HTTP events: config input"
		  (Netsys.int64_of_file_descr fd));
	self # maintain_polling();

      method reset() =
	(* Compare also with after_eof! *)
	self # abort ~reusable:false ~count:`Crashed;

	Q.iter
	  (fun trans ->
	     trans # cleanup();    (* release resources *)
	     trans # error_if_unserved No_reply;
	     self # drive_postprocessing trans
	  )
	  read_queue;

      signal_queue `Aborted


    (**********************************************************************)
    (* End of interface.                                                  *)	
    (**********************************************************************)

    method private maintain_polling() =

      (* If one of the following conditions is true, we need not to poll
       * the write side of the socket:
       * - The write_queue is empty but the read_queue not
       * - The difference between the read and the write queue is too big
       * - We wait for the reply of the first request send to a server
       * - The write side of the socket is closed
       *)

      if !options.verbose_events then
	dlogr
	  (fun () -> 
	     sprintf "FD %s - HTTP events: maintain_polling"
	     fd_str
	  );

      if io # socket_state <> Down then (
	let actual_max_drift =
	  if inhibit_pipelining_byserver then 0 else
	    match !options.synchronization with
	      | Pipeline drift -> min drift max_pipeline
	      | _              -> 0
		  (* 0: Allow no drift if pipelining is not allowed *)
	in
	
	let have_requests =
	  Q.length read_queue - Q.length write_queue <= actual_max_drift
	  && Q.length write_queue > 0 
          && (Q.peek write_queue) # state = Unprocessed
	  && (done_first_message || sending_first_message) in

	(* Note: sending_first_message is true while the first request is sent.
         * done_first_message becomes true when the response arrived. In the
         * meantime both variables are false, and nothing is sent.
         *)

	let do_release =
	  (io # socket_state = Up_rw &&
	      Q.length read_queue = 0 && Q.length write_queue = 0) in
	(* If the socket is still up, we must send EOF. Normally, this 
         * should have happened already, just to be sure we check this here.
	 *)
	
	let do_close_output = close_connection in
	(* close_connection: this is set in update_characteristics after
	 * receiving the first response 
	 *)

	if !options.verbose_events then
	  dlogr
	    (fun () -> 
	       sprintf "FD %s - HTTP events: maintain_polling \
                        n_read=%d n_write=%d \
                        actual_max_drift=%d have_requests=%B \
                        do_close_output=%B do_release=%B"
		 fd_str
		 (Q.length read_queue) (Q.length write_queue)
		 actual_max_drift have_requests do_close_output do_release
	    );
	
	if drive_output_active = None && io#socket_state = Up_rw then (
	  if !options.verbose_events then
	    dlogr
	      (fun () -> 
		 sprintf "FD %s - HTTP events: config output=%s"
		   fd_str
		   (if do_close_output then "close" else
		      if do_release then "release" else
			if have_requests then "enabled" else
			  "none"
		   )
	      );
	  if do_close_output then
	    self # close_output()
	  else
	    if do_release then
	      self # release_io ()
	    else
	      if have_requests then
		self # drive_output()
	)
      )

    method private abort ~reusable ~count =
      (* This method is called when the connection is in a final (maybe
       *  errorneous) state, and the protocol handler decides to stop
       * processing.
       * 
       * reusable: whether it is possible to reuse this connection later
       *)
      if io # socket_state <> Down then (
	if !options.verbose_connection then 
	  dlog (sprintf 
		  "FD %s - HTTP connection: %s"
		  fd_str
                  (if reusable then "caching for later" else "shutdown")
               );
	let followup() =
	  signal_connection(`Done());
	  if !options.verbose_events then
	    dlogr
	      (fun () ->
		 sprintf "FD %s - HTTP event: Connection processing done" 
		   fd_str) in

	begin match io#socket_state with
	    Down -> 
	      assert false
	  | Up_r -> 
	      io # close
		~followup ()
	  | Up_rw ->
	      if reusable then (
		( try
                    let idata =
                      try
                        let tls_stashed_endpoint =
                          match mplex#tls_session with
                            | None -> None
                            | Some _ -> Some(mplex#tls_stashed_endpoint()) in
                        { Nethttp_client_conncache.conn_trans = trans;
                          tls_stashed_endpoint;
                        }
                      with _ -> raise Not_found in
		    conn_cache # set_connection_state
                                   fd cache_peer (`Inactive idata);
		    io # down();
		  with
		    | Not_found ->
			(* We can do an orderly shutdown: *)
	                 if !options.verbose_connection then 
	                   dlog (sprintf 
                              "FD %s - HTTP connection: shutting down anyway"
		              fd_str);
			 io # close ~followup:(fun () -> ()) ()
		);
		followup()
	      )
	      else
		io # close
		  ~followup ()
	end;
	( match count with
	    | `Timed_out ->
		counters.timed_out_connections <- 
		  counters.timed_out_connections + 1;
	    | `Crashed ->
		counters.crashed_connections <- 
		  counters.crashed_connections + 1;
	    | `Server_eof ->
		counters.server_eof_connections <- 
		  counters.server_eof_connections + 1;
	    | `Successful ->
		counters.successful_connections <- 
		  counters.successful_connections + 1
	    | `Failed ->
		(* By definition of the counter, an abort cannot be failed *)
		assert false;
	);
	if !options.verbose_events then
	  dlog (sprintf "FD %s - HTTP events: reset after shutdown"
		  fd_str);

	self # cancel_output();   (* FIXME: check whether sufficient *)
	self # cancel_input();
      )
 
    (**********************************************************************)
    (***                     THE OUTPUT HANDLER                         ***)
    (**********************************************************************)

    method private drive_output() =
      if !options.verbose_events then
	dlogr (fun() -> 
		 sprintf "FD %s - HTTP events: drive_output" fd_str);

      let can_output =
	drive_output_active = None &&
	io#socket_state = Up_rw &&
	not(Q.is_empty write_queue) in

      assert(can_output);

      if can_output then (
	let trans = Q.peek write_queue in
	assert(trans#state = Unprocessed);

	(* Clear this flag now because we are about to really send a new
	   request
	 *)
	trans # message # private_api # set_retry_anyway false;

	let close_flag =
	  !options.inhibit_persistency in

	let handshake_cfg =
	  try
	    (* Proper parsing not required, because [Expect] is
             * set by the user.
             * [continue]: Already seen status 100
             *)
	    let rh = trans # message # request_header `Effective in
	    if (not (trans # message # private_api # continue) &&
		  String.lowercase(rh # field "expect") = "100-continue")
	    then Some !options.handshake_timeout
	    else None
	  with
	    | Not_found -> None
	in

	let e =
	  trans # send_e io handshake_cfg close_flag esys
	  >> (fun st ->
		drive_output_active <- None;
		if !options.verbose_events then
		  dlogr (fun() -> 
			   sprintf "FD %s - HTTP events: done with output \
                                    state=%s"
			     fd_str (string_of_state trans#state));
		match st with
		  | `Done () ->
		      assert
			(trans#state = Sent_request || trans#state = Complete
			  || trans#state = Handshake
			    || trans#send_interrupted
			);
		      sending_first_message <- false;
		      ignore (Q.take write_queue);
		      self # maintain_polling();
		      st
		  | `Error err ->
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Exception %s"
			       fd_str (Netexn.to_string err));
		      self # abort ~reusable:false ~count:`Crashed;
		      self # after_eof (Some err);
		      st
		  | `Aborted -> 
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Aborting"
			       fd_str
			  );
		      st
	     ) in

	drive_output_active <- Some e;
      )

    method private close_output() =
      if !options.verbose_events then
	dlogr (fun() -> 
		 sprintf "FD %s - HTTP events: close_output" fd_str);

      let can_close =
	drive_output_active = None &&
	io#socket_state = Up_rw in

      assert(can_close);

      if can_close then (
	assert (not(io # write_activity));      
	io # add Send_eof;
	let e =
	  io # write_e esys
	  >> (fun st ->
		drive_output_active <- None;
		if !options.verbose_events then
		  dlogr (fun() -> 
			   sprintf "FD %s - HTTP events: close_output done"
			     fd_str);
		match st with
		  | `Done () -> 
		      (* The connection closure can be driven by several
			 reasons, so there can still be messages on the
			 queues!
		       *)
		      self # abort ~reusable:false ~count:`Successful;
		      self # after_eof None;
		      st
		  | `Error err ->
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Exception %s"
			       fd_str (Netexn.to_string err));
		      self # abort ~reusable:false ~count:`Crashed;
		      self # after_eof (Some err);
		      st
		  | `Aborted -> 
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Aborting"
			       fd_str
			  );
		      st
	     )
	in
	drive_output_active <- Some e;
      )


    method private cancel_output() =
      match drive_output_active with
	| None -> ()
	| Some e -> 
	    drive_output_active <- None; e # abort()
      

    method private release_io() =
      (* Give fd/mplex back to the connection cache *)
      assert(io # socket_state = Up_rw);
      self # abort ~reusable:true ~count:`Successful;
      self # after_eof None
	(* FIXME: check whether sufficient *)


    (**********************************************************************)
    (***                     THE INPUT HANDLER                          ***)
    (**********************************************************************)

    method private drive_input() =
      
      let rec read_loop_e() =
	io # read_e esys
	++ (fun call_opt ->
	      match call_opt with
		| None ->
		    if !options.verbose_connection then
		      dlogr
			(fun () ->
			   sprintf "FD %s - HTTP connection: Got EOF!"  fd_str);
		    (* The sender may be still writing, so try to stop it *)
		    if Q.length write_queue > 0 then
		      (Queue.peek write_queue) # send_interrupt();
		    self # abort ~reusable:false ~count:`Server_eof;
		    self # after_eof None;
		    eps_e (`Done()) esys
		| Some call ->
		    if !options.verbose_connection then
		      dlogr
			(fun () ->
			   sprintf "FD %s - HTTP connection: Got Call %d!" 
			     fd_str (Oo.id call));
		    let trans =
		      try Q.peek read_queue
		      with Q.Empty -> assert false in
		    assert(trans#message = call);
		    (* It is possible that the write is still ongoing.
		       By calling send_interrupt we try to stop this,
		       but nevertheless we have to wait until the
		       writer is done! The special state Finishing
		       indicates that it is not worth-while to stop
		       sending because it will end soon anyway.

		       Note that send_finish_e may also report errors
		       of the sending side.
		     *)
		    if trans#state <> Finishing then
		      trans # send_interrupt();
		    trans # send_finish_e esys
		    ++ (fun () ->
			  trans # receive_complete();
			  self # update_characteristics trans;
			  ignore(Q.take read_queue);
			  (* Note that postprocessing may imply a
			     re-initialization of [trans]! So don't access
			     [trans] later than this point.
			   *)
			  self # postprocess_complete_message_e trans
			  ++ (fun () ->
				self # maintain_polling();
				if io#socket_state <> Down then
				  read_loop_e()
				else
				  eps_e (`Done()) esys
			     )
		       )
	   )
      in

      if !options.verbose_events then
	dlogr
	  (fun () ->
	     sprintf "FD %s - HTTP event: Starting read loop" 
	       fd_str);
      let e = 
	read_loop_e() in
      drive_input_active <- Some e;
      ignore(e >> 
	       (fun st -> 
		  drive_input_active <- None;
		  if !options.verbose_events then
		    dlogr
		      (fun () ->
			 sprintf "FD %s - HTTP event: Leaving read loop: %s" 
			   fd_str
			   ( match st with
			       | `Done _ -> "regular"
			       | `Error e -> "exception " ^ Netexn.to_string e
			       | `Aborted -> "aborted"
			   )
		      );
		  ( match st with
		      | `Error err ->
			  self # abort ~reusable:false ~count:`Crashed;
			  self # after_eof (Some err)
		      | _ -> ()
		  );
		  st
	       )
	    )

    method cancel_input() =
      match drive_input_active with
	| None -> ()
	| Some e -> 
	    drive_input_active <- None; e # abort()


    method drive_input_fetch() =
      (* This is called by io when the next status line has been received *)
      if Q.is_empty read_queue then (
	if !options.verbose_connection then (
	  dlogr
	    (fun () ->
	       sprintf 
		 "FD %s - HTTP connection: \
                  Got data of spontaneous response" fd_str);
	);
	raise Not_found
      );
      let trans = Q.peek read_queue in
      match trans # state with
	| Unprocessed ->
	    (* We get response data before we even tried to send
             * the request. We allow this, although this is weird.
             *)
	    if !options.verbose_connection then (
	      dlogr
		(fun () ->
		   sprintf 
		     "FD %s - HTTP connection: \
                     Got response before sending request" fd_str);
	    );
	    trans#message

	| Sending_hdr ->
	    (* We get response data before we finished
             * the request. We allow this for pragmatic reasons.
             *)
	    if !options.verbose_connection then (
	      dlogr
		(fun () ->
		   sprintf
		     "FD %s - HTTP connection: \
                      Got response before finishing request" fd_str)
	    );
	    trans#message

	| Handshake 
	| Sending_body ->
	    (* This is perfectly legal - we get the response after
	       we sent at least the header
	     *)
	    trans#message

	| Sent_request ->
	    (* The normal case *)
	    trans#message

	| _ ->
	    (* Should not happen *)
	    assert false
	    

    method private update_characteristics trans =
      (* After getting a response, check the type, and adapt the
	 transmission style
       *)

      let old_close_connection = close_connection in

      let able_to_pipeline = 
	trans # indicate_pipelining in
      (* able_to_pipeline: is true if we assume that the server
       * is HTTP/1.1-compliant and thus is able to manage pipelined
       * connections.
       * Update first 'close_connection': This variable becomes
       * true if the connection is not assumed to be pipelined
       * which forces that the CLIENT closes the connection 
       * immediately (this is tested in maintain_polling).
       *)
      
      let only_sequential_persistency =
	not able_to_pipeline && 
	  trans # indicate_sequential_persistency in
      (* only_sequential_persistency: is true if the connection is
       * HTTP/1.0, and the server indicated a persistent connection.
       * In this case, pipelining is disabled.
       *)
      
      if only_sequential_persistency then begin
	(* 'close_connection': not set.
	 *)
	if !options.verbose_connection then 
	  dlogr
	    (fun () ->
	       sprintf "FD %s - HTTP connection: \
                               using HTTP/1.0 style persistent connection"
		 fd_str);
	inhibit_pipelining_byserver <- true;
      end
      else
	close_connection  <- close_connection  || not able_to_pipeline;

      if !options.verbose_connection then 
	dlogr
	  (fun () ->
	     sprintf "FD %s - HTTP connection: pipelining=%B persistency=%B close_connection=%B->%B"
	       fd_str able_to_pipeline only_sequential_persistency
	       old_close_connection close_connection
	  );

      (* Remember that the first request/reply round is over: *)
      done_first_message <- true



    method private after_eof err_opt =
      (* Postprocess error information *)
      if not after_eof then (
	(* THINK: We could here process Garbage_received exceptions differently.
	   It is questionable to attribute such exceptions to specific 
	   calls.
	 *)

	(* First check if the connection was a total failure, i.e. if not
	 * even a status line was received. This is just reported to the
	 * robust_pipeline, where the counter for totally failed connections
	 * can be increased.
	 *)
	
	total_failure <- not io#status_seen;
        problem <- err_opt <> None || total_failure;

	(* Assertions about queues: write queue is a suffix of read queue *)
	
	let n_read  = Q.length read_queue in
	let n_write = Q.length write_queue in
	assert (n_read >= n_write);
	
	(* Clean up the transmitters: *)
	
	Q.iter
	  (fun trans ->
	     trans # cleanup();    (* release resources *)
	     match err_opt with
	       | None ->
		   if total_failure then
		     trans # error_if_unserved (Bad_message "Protocol error")
	       | Some err ->
		   let err' =
		     match err with
		       | Garbage_received _ ->
			   Bad_message "Protocol error"
		       | No_reply ->
			   Bad_message "Protocol error"
		       | err -> err in
		   trans # error_if_unserved err'
	  )
	  read_queue;
	
	(* Increase the error counter of the head of read_queue: *)
	if n_read > 0 && err_opt <> None then (
	  let trans = Q.peek read_queue in
	  let m = trans # message in
	  let e = m # private_api # get_error_counter in
	  m # private_api # set_error_counter (e+1);
	);
	
	(* We have reached the logical end: *)
	signal_queue (`Done());
	if !options.verbose_events then
	  dlogr
	    (fun () ->
	       sprintf "FD %s - HTTP event: Queue processing done" 
		 fd_str);

	after_eof <- true
      )


    (**********************************************************************)
    (***                     AUTHENTICATION                             ***)
    (**********************************************************************)

    method private postprocess_complete_message_e trans =
      (* This method is invoked for every complete reply. The following
       * cases are handled at this stage of processing:
       *
       * - Status code 407: The proxy demands authorization. If the request
       *   already contains credentials for the proxy, this status code
       *   isn't handled here. Otherwise, the request is added again onto
       *   the queue, and a flag ('proxy_credentials_required') is set which 
       *   forces that the proxy credentials must be added for every new 
       *   request.
       *   Note: The necessary authentication header fields are added in
       *   the 'add' method.
       *
       * - Status code 401: XXX
       *
       * All other status codes are not handled here. Note that it is not
       * possible to react on the redirection codes because this object
       * represents the connection to exactly one server.
       * As default behaviour, the method 'postprocess' of the 
       * transmitter object is invoked; this method incorporates
       * all the intelligence not coded here.
       *)

      let default_action_e() =
	trans#cleanup();
	drive_postprocessing_e esys options trans#message trans#f_done in

      let msg = trans # message in
      let secure =
        match mplex # tls_session_props with
          | None -> false
          | Some props ->
              msg # private_api # set_tls_session_props props;
              true in

      let code = msg # private_api # response_code in
      let _req_hdr = msg # request_header `Effective in
      let _resp_hdr = msg # private_api # response_header in

      (* We are checking the authentication state no matter what the
         response code is. If we get a 401 or 407, though, special
         actions need to be taken.
       *)
      let proxy_response1 =
        match !proxy_auth_state with
          | `In_advance sess
          | `In_reply (sess,_) ->
              (* Continue an already started auth protocol: *)
              sess # authenticate msg false
                (* CHECK: we proxy-authenticate messages even if the code is
                   not 407, and we cannot be sure whether the msg is about
                   proxy authentication.
                 *)
          | `Auth_error -> `Auth_error
          | `None | `OK ->
              (* Check whether the server needs authentication: *)
              if code = 407 && peer_is_proxy then (
                match proxy_auth_handler_opt with
                  | None -> `Auth_error
                  | Some ah ->
                      ( match ah # create_proxy_session msg options with
		           | None -> `Auth_error
                           | Some sess -> 
                               proxy_auth_state := `In_reply(sess,[]);
                               sess # authenticate msg false
                      )
              ) else
                `OK
          | `Resubmit _
          | `In_reply_reroute _ -> assert false in
      let is_about_proxy =
        (code=407 && peer_is_proxy) ||
          ( match proxy_response1 with `Continue _ -> true | _ -> false) in
      let content_response1 =
        if is_about_proxy then
          None  (* this msg was not about content server auth *)
        else
          match msg # private_api # auth_state with
            | `In_advance sess
            | `In_reply (sess,_) ->
                (* Continue an already started auth protocol: *)
                Some (sess # authenticate msg false)
            | `Auth_error -> Some `Auth_error
            | `None | `OK ->
                (* Check whether the server needs authentication: *)
                if code = 401 then (
                  match auth_cache # auth_response ~secure msg options with
                    | None -> Some `Auth_error
                    | Some sess ->
                        msg # private_api # set_auth_state (`In_reply(sess,[]));
                        Some(sess # authenticate msg false)
                ) else
                  Some `OK 
            | `Resubmit _ 
            | `In_reply_reroute _ -> assert false in
      (* If we get a 401/407 but still think we are ok, change to error: *)
      let proxy_response2 =
        if proxy_response1 = `OK && code=407 && peer_is_proxy then
          `Auth_error
        else
          proxy_response1 in
      let content_response2 =
        if content_response1 = Some `OK && code=401 then
          Some `Auth_error
        else
          content_response1 in
      (* If we can continue an auth session, remember the headers: *)
      ( match !proxy_auth_state, proxy_response2 with
          | (`In_reply(sess,_) | `In_advance sess), `Continue next_headers ->
              proxy_auth_state := `In_reply(sess, next_headers)
          | (`In_reply(sess,_) | `In_advance sess), 
                (`Auth_error | `Reroute _ | `Continue_reroute _) ->
              (* NB. rerouting is invalid for proxy connections *)
              sess # invalidate msg;
              proxy_auth_state := `Auth_error
          | (`In_reply(sess,_) | `In_advance sess), `OK ->
              (* remember the session for later reauthentication: *)
              proxy_auth_state := `In_advance sess
          | _, `OK ->
              proxy_auth_state := `OK
          | _, (`Auth_error | `Reroute _ | `None) ->
              proxy_auth_state := `Auth_error
          | _, (`Continue _ | `Continue_reroute _) ->
              assert false
      );
      ( match msg # private_api # auth_state, content_response2 with
          | (`In_reply(sess,_)|`In_advance sess), Some(`Continue headers) ->
              msg # private_api # set_auth_state (`In_reply(sess, headers))
          | (`In_reply(sess,_)|`In_advance sess), 
                Some(`Continue_reroute(headers, trans_id)) ->
              msg # private_api # set_auth_state
                (if msg # private_api # reroutable then
                   `In_reply_reroute(sess, headers, trans_id)
                 else
                   `Auth_error
                )
          | (`In_reply(sess,_)|`In_advance sess), Some `Auth_error ->
              sess # invalidate msg;
              msg # private_api # set_auth_state  `Auth_error;
              auth_cache # remove_session sess;
          | (`In_reply(sess,_)|`In_advance sess), Some `OK ->
              msg # private_api # set_auth_state `OK;
              auth_cache # add_successful_session sess
          | _, Some `OK ->
              msg # private_api # set_auth_state `OK
          | _, Some (`Auth_error | `None) ->
              msg # private_api # set_auth_state `Auth_error
          | _, Some(`Reroute trans_id) ->
               msg # private_api # set_auth_state
                 (if msg # private_api # reroutable then
                    `Resubmit trans_id
                  else
                    `Auth_error
                 )
          | _, Some (`Continue _ | `Continue_reroute _) ->
              assert false
          | _, None ->
              ()
      );
      (* Decide what to do next: *)
      let continue =
        ( match !proxy_auth_state with `In_reply _ -> true | _ -> false ) ||
          ( match msg # private_api # auth_state with
                `In_reply _ -> true | _ -> false ) in
      if continue then (
        (* NB. In_reply_reroute cannot take this abbreviation *)
        msg # private_api # set_retry_anyway true;
        if close_connection then (
          (* this won't picked up by this connection. The robust_pipeline with
             find it and re-add it. The point is here that we don't reinit
             trans now, because this is later done anyway. Good for reauth
             (e.g. we don't skip an nc with digest auth)
           *)
          trans # cleanup();
          Q.add trans unserved_queue
        )
        else
          (* do an urgent add on this connection: *)
          ignore (self # add true trans);
        eps_e (`Done()) esys
      )
      else
        default_action_e()


    (**********************************************************************)
    (***    POSTPROCESS = INVOKE USER CALLBACK FUNCTION                 ***)
    (**********************************************************************)
	
    method private drive_postprocessing trans =
      trans#cleanup();
      drive_postprocessing_msg esys options trans#message trans#f_done
     
    end
  )

(**********************************************************************)

let robust_pipeline 
      esys tp trans
      peer
      proxy_auth_handler_opt
      default_port
      conn_cache conn_owner 
      auth_cache
      tls_cache
      counters options =
  (* Implements a pipeline that connects to the peer after the first
     message is added, and that is able to reconnect as often as
     necessary
   *)

  let proxy_auth_state =  ref `None in
  let peer_is_proxy =  (* whether we have a normal HTTP proxy *)
    match peer with
      | `Http_proxy _ -> true
      | _ -> false in


  ( object(self)
      val mutable fp_opt = None
	(* The fragile_pipeline, if any *)

      val queue = Q.create()
	(* Queued requests before being connected *)

      (* 'connecting' may be set to [Some e] where [e] is the connecting engine.
       *)
      val mutable connecting = None
	
      (* 'connect_pause': seconds to wait until the next connection is tried.
       * There seems to be a problem with some operating systems (namely
       * Linux 2.2) which do not like immediate reconnects if the previous
       * connection did not end in a sane state.
       * A value of 0.5 seems to be sufficient for reconnects (see below).
       *)			      
      val mutable connect_pause = 0.0
	
      (* no_pipelining: true if the server has somehow indicated that it
	 is better not to rely on pipelining
       *)
      val mutable no_pipelining = false
	
      (* 'connect_time': how many seconds the last 'connect' took
       *)
      val mutable connect_time = 0.0
	
      (* If a connection does not lead to any type of response, the client 
       * simply re-opens a new connection and tries again. The following 
       * variables limit the number of attempts until the client gives up.
       *
       * 'totally_failed_connections': counts the number of failed connections
       *      without any response from the server.
       *)
      val mutable totally_failed_connections = 0
	
    method length =
      match fp_opt with
	| None -> 0
	| Some fp -> fp#length


    method active =
      match fp_opt with
	| None -> false
	| Some fp -> fp#active


    method add urgent (m : http_call) f_done =
      (* NB. m may already carry an auth session here *)
      (* Check whether we can authenticate in advance: *)
      ( try
          if m # private_api # auth_state <> `None then raise Not_found;
	  let sess = auth_cache # find_session_for_reauth m options in
          (* Unfortunately we need to remove sess from the cache while
             the re-authentication is in progress. This can involve state
             changes of sess, and it would be an error if two requests picked
             up the same session for re-authentication
           *)
          auth_cache # remove_session sess;
	  m # private_api # set_auth_state (`In_advance sess)
	with
	    Not_found -> ()
      );
      let trans = 
	transmitter peer_is_proxy proxy_auth_state default_port
                    m f_done options in
      match fp_opt with
	| None ->
	    Q.add (urgent,trans) queue;
	    if connecting = None then
	      self # reconnect()

	| Some fp ->
	    (* Even if fp is already shut down! This only means that fp#finish_e
	       has not yet signalled that we are actually finished. This will
	       happen soon, though, and the added message will be picked up
	       then
	     *)
	    fp # add urgent trans


    method reset () =
      if !options.verbose_connection then
	dlog "HTTP connection: reset";

      ( match connecting with
	  | None -> ()
	  | Some e ->
	      e#abort();
	      connecting <- None
      );

      ( match fp_opt with
	  | None -> ()
	  | Some fp ->
	      fp # reset();
	      fp_opt <- None
		(* note that [fp#reset] causes that finished_e transitions
		   to `Aborted. Hence, conn_is_done is not called 

		   The remaining messages in fp are already postprocessed.
		 *)
      );

      Q.iter
	(fun (_,trans) ->
	   trans# message # private_api # error_if_unserved 
	     !options.verbose_connection No_reply;
	   self # drive_postprocessing trans
	)
	queue;
      Q.clear queue;

      totally_failed_connections <- 0



    method private reconnect() =
      assert(connecting = None);
      assert(fp_opt = None);
      let e =
	Uq_engines.delay_engine
	  connect_pause
	  (fun () ->
	     tcp_connect_e 
	       esys tp trans peer conn_cache conn_owner tls_cache options 
	       proxy_auth_handler_opt
	  )
	  esys in
      connecting <- Some e;
      Uq_engines.when_state
	~is_error:(fun err ->
		     connecting <- None;
		     self#conn_is_error err
		  )
	~is_done:(fun (fd,cache_peer,mplex,t) ->
		    connect_time <- t;
		    connect_pause <- 0.0;
		    connecting <- None;
		    let fp =
		      fragile_pipeline
			esys trans peer cache_peer
			proxy_auth_state proxy_auth_handler_opt
                        default_port
			fd mplex t no_pipelining conn_cache
			auth_cache
			counters options in
		    fp_opt <- Some fp;
		    Q.iter
		      (fun (urgent,trans) -> 
			 fp # add urgent trans
		      )
		      queue;
		    Q.clear queue;
		    fp # attach();
		    Uq_engines.when_state
		      ~is_done:(self#conn_is_done fp)
		      fp#finished_e
		 )
	e

    method private conn_is_error error =
      (* It was not possible to reach the peer *)

      counters.crashed_connections <- 
	counters.crashed_connections + 1;

      let q = Q.create() in
      Q.transfer queue q;

      self # conn_retry true true error q;

    method private conn_is_done fp () =
      (* Check the result of fp, and move the requests back to [queue] that
	 can be tried again
       *)
      fp_opt <- None;
      no_pipelining <- fp#no_pipelining;  (* remember that *)

      let q = Q.create() in
      fp # iter_unserved_messages
	(fun trans ->
	   trans # cleanup();
	   Q.add (false, trans) q
	);

      self # conn_retry fp#is_total_failure fp#problem No_reply q


    method private conn_retry is_total_failure problem subst_error q =
      (* Maybe we have a "total failure" - got no response from server *)
      let too_many_total_failures =
	if is_total_failure then (
	  totally_failed_connections <- totally_failed_connections + 1;
	  if !options.verbose_connection then
	    dlog "HTTP connection: total failure";
	  
	  totally_failed_connections >= !options.maximum_connection_failures 
	)
	else (
	  totally_failed_connections <- 0;
	  false
	) in

      if !options.verbose_connection then
	dlog "HTTP connection: checking remaining pipeline requests";

      if too_many_total_failures then
	counters.failed_connections <- 
	  counters.failed_connections + 1;

      (* Check all requests individually *)
      Q.iter
	(fun (urgent,trans) ->
           let m = trans#message in
	   let e = m # private_api # get_error_counter in
(*dlog (sprintf "e=%d idem=%B" e m#is_idempotent);*)
	   let try_again =
	     not too_many_total_failures &&
	       e <= !options.maximum_message_errors &&
	       (m # private_api # error_exception <> Some Response_too_large) &&
	       (m # private_api # retry_anyway ||
		  ( match m # get_reconnect_mode with
		      | Send_again -> true
		      | Request_fails -> false
		      | Send_again_if_idem -> m # is_idempotent
		      | Inquire f ->
			  (* Ask the function 'f' whether to reconnect or not: *)
			  ( try f m    (* returns true or false *)
			    with
				(* The invocation of 'f' may raise an exception.
				 * It is printed to stderr (there is no other
				 * way to report it).
				 *)
				x ->
				  dlog ("Exception caught in Nethttp_client: " 
					^ (Netexn.to_string x));
				  false
			  )
		  )
	       ) in
	   if try_again then (
	     (* Ok, this request is tried again. *)
	     if !options.verbose_status then
	       dlogr (fun () -> 
			sprintf "Call %d - rescheduling" (Oo.id m));
	     Q.add (urgent,trans) queue;
	   )
	   else (
	     m # private_api # error_if_unserved
	       !options.verbose_connection subst_error;
	     self # drive_postprocessing trans
	   )
	)
	q;

      (* Try again: *)
      if Q.length queue > 0 then (

	if !options.verbose_connection then
	  dlog "HTTP connection: retrying";
	
	connect_pause <- (if problem then 2.0 else 0.1) *. connect_time;
	self#reconnect()      
      )


    (**********************************************************************)
    (***    POSTPROCESS = INVOKE USER CALLBACK FUNCTION                 ***)
    (**********************************************************************)

    (* Same as for fragile_pipeline *)
	
    method private drive_postprocessing trans =
      drive_postprocessing_msg esys options trans#message trans#f_done

    end
  )


(**********************************************************************)


(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***                 THE PIPELINE INTERFACE                         ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* The following class, 'pipeline' defines the interface for the outside
 * world.
 *)

type proxy_type = [`Http_proxy | `Socks5 ] 

let parse_proxy_setting ~insecure url =
  let syn = Neturl.ip_url_syntax in
  let (nu,trans) = 
    try parse_url_0 [ "http", syn, Some 80, http_trans_id ] url
    with Not_found -> failwith ("Invalid proxy URL: " ^ url) in
  let auth =
    try
      let u = Neturl.url_user nu in       (* may raise Not_found *)
      let p = Neturl.url_password nu in   (* may raise Not_found *)
      Some(u,p,insecure)
    with Not_found -> None in
  (Neturl.url_host nu, Neturl.url_port nu, auth)

let parse_no_proxy =
  split_words_by_commas


class pipeline =
  object (self)
    val mutable esys = Unixqueue.create_unix_event_system()

    val mutable proxy = ""
    val mutable proxy_port = 80
    val mutable proxy_auth = false
    val mutable proxy_user = ""
    val mutable proxy_password = ""
    val mutable proxy_insecure = false
    val mutable proxy_type = `Http_proxy
    val tproxy = Hashtbl.create 5  (* transport-specific proxy *)

    val mutable no_proxy_for = []

    val mutable connections = Hashtbl.create 10

    val mutable open_messages = 0

    val mutable open_connections = 0

    val options =
      let tls =
        match Netsys_crypto.current_tls_opt() with
          | None -> None
          | Some tls_provider ->
               Some(Netsys_tls.create_x509_config
                      ~system_trust:true
                      ~peer_auth:`Required
                      tls_provider) in
      ref
	{ (* Default values: *)
	  synchronization = Pipeline 5;
	  maximum_connection_failures = 2;
	  maximum_message_errors = 2;
	  inhibit_persistency = false;
	  connection_timeout = 300.0;
	  number_of_parallel_connections = 2;
	  maximum_redirections = 10;
	  handshake_timeout = 1.0;
	  resolver = sync_resolver;
	  configure_socket = (fun _ -> ());
          tls;
	  schemes = default_schemes;
	  verbose_status = true;
	  verbose_request_header = false;
	  verbose_response_header = false;
	  verbose_request_contents = false;
	  verbose_response_contents = false;
	  verbose_connection = true;
	  verbose_events = false;
	}

    val auth_cache = new auth_cache

    val mutable conn_cache = create_restrictive_cache()

    val mutable tls_cache = null_tls_cache()

    val counters =
      { new_connections = 0;
	timed_out_connections = 0;
	crashed_connections = 0;
	server_eof_connections = 0;
	successful_connections = 0;
	failed_connections = 0;
      }

    val transports = Hashtbl.create 5
    val mutable transports_upd_https = true

    initializer (
      Hashtbl.add transports http_trans_id http_transport_channel_type;
      Hashtbl.add transports proxy_only_trans_id proxy_transport_channel_type;
      self # update_https_transport()
    )

    method private update_https_transport() =
      if transports_upd_https then
        match (!options).tls with
          | None ->
               List.iter
                 (fun trans -> Hashtbl.remove transports trans)
                 https_list
          | Some config ->
               let https_tct = https_transport_channel_type config in
               List.iter
                 (fun trans -> Hashtbl.replace transports trans https_tct)
                 https_list


    method event_system = esys

    method set_event_system new_esys =
      esys <- new_esys;
      Hashtbl.clear connections;

    method connection_cache = conn_cache

    method set_connection_cache cc = conn_cache <- cc

    method add_auth_handler (h : auth_handler) =
      auth_cache # add_auth_handler h

    method set_proxy the_proxy the_port =
      (* proxy="": disables proxy *)
      proxy       <- the_proxy;
      proxy_port  <- the_port;
      proxy_type  <- `Http_proxy;
      ()

    method set_proxy_auth ~insecure user passwd =
      (* sets 'user' and 'password' if demanded by a proxy *)
      proxy_auth     <- user <> "";
      proxy_user     <- user;
      proxy_password <- passwd;
      proxy_insecure <- insecure


    method avoid_proxy_for l =
      (* l: List of hosts or domains *)
      no_proxy_for <- l


    method set_proxy_from_environment ~insecure () =
      (* Is the environment variable "http_proxy" set? *)
      let http_proxy =
	try Sys.getenv "http_proxy" with Not_found -> "" in
      if http_proxy <> "" then (
	let (host,port,auth) = parse_proxy_setting ~insecure http_proxy in
	self # set_proxy host port;
	match auth with
	  | None -> ()
	  | Some(u,p,iflag) ->
	      self # set_proxy_auth ~insecure:iflag u p
      );

      (* Is the environment variable "no_proxy" set? *)
      let no_proxy =
	try Sys.getenv "no_proxy" with Not_found -> "" in
      let no_proxy_list =
	parse_no_proxy no_proxy in
      self # avoid_proxy_for no_proxy_list;


    method set_transport_proxy_from_environment ~insecure l =
      List.iter
	(fun (var_name, trans_id) ->
	   let var =
	     try Sys.getenv var_name with Not_found -> "" in
	   if var <> "" then (
	     let (host,port,auth) = parse_proxy_setting ~insecure var in
	     self # set_transport_proxy trans_id host port auth `Http_proxy
	   );
	)
	l;

      (* Is the environment variable "no_proxy" set? *)
      let no_proxy =
	try Sys.getenv "no_proxy" with Not_found -> "" in
      let no_proxy_list =
	parse_no_proxy no_proxy in
      self # avoid_proxy_for no_proxy_list
      

    method set_socks5_proxy host port =
      proxy       <- host;
      proxy_port  <- port;
      proxy_type  <- `Socks5


    method configure_transport (trans:int) (tp:transport_channel_type) =
      Hashtbl.replace transports trans tp;
      if is_https trans then
        transports_upd_https <- false  (* never change this magically again *)

    method set_tls_cache c =
      tls_cache <- c

    method set_transport_proxy trans host port auth (pt:proxy_type) =
      Hashtbl.replace tproxy trans (host,port,auth,pt)

    method reset () =
      (* deletes all pending requests; closes connection *)

      (* Reset all connections: *)
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		c # reset())
	     !cl)
	connections;

(*
   - well, this _should_ do nothing
      List.iter
	(fun fd -> 
	   conn_cache # forget_connection fd;
	   Unix.close fd;
	)
	(conn_cache # find_my_connections (self :> < >));
 *)

      self # reset_counters()


    method private peer_of_call request =
      request # private_api # parse_request_uri options;

      let host = request # get_host() in
      let port = request # get_port() in
      let trans = request # private_api # transport_layer options in

      let peer_of_proxy h p pt =
      	match pt with
	  | `Http_proxy ->
	      if request # proxy_use_connect then
		`Http_proxy_connect((h,p),(host,port))
	      else
		`Http_proxy(h,p)
	  | `Socks5 ->
	      `Socks5((h,p),(host,port)) in

      let proxy_possible =
	request # proxy_enabled &&
	  (not
             (List.exists
		(fun dom ->
		   if dom <> "" &&
                     dom.[0] = '.' &&
		     String.length host > String.length dom
		   then
                     let ld = String.length dom in
                     String.lowercase(String.sub 
					host 
					(String.length host - ld) 
					ld)
                     = String.lowercase dom
		   else
                     dom = host)
		no_proxy_for
	     )) in
      try
	if proxy_possible then ( 
	  try
	    let (h,p,a,pt) = Hashtbl.find tproxy trans in
	    (peer_of_proxy h p pt, a)
	  with
	    | Not_found ->
		if proxy = "" then raise Not_found;
		(peer_of_proxy proxy proxy_port proxy_type, 
		 if proxy_auth then
		   Some(proxy_user, proxy_password, proxy_insecure)
		 else
		   None
		)
	)
	else raise Not_found
      with Not_found ->
        try
          let tct = Hashtbl.find transports trans in
          if not(tct#identify_conn_by_name) then raise Not_found;
          (`Direct_name(host,port), None)
        with Not_found ->
	  (`Direct(host,port), None)


    method proxy_type_of_call request =
      let peer, auth = self # peer_of_call request in
      match peer with
	| `Http_proxy _ -> Some `Http_proxy
	| `Http_proxy_connect _ -> Some `Http_proxy
	| `Socks5 _ -> Some `Socks5
	| `Direct _ -> None
	| `Direct_name _ -> None


    method proxy_type url : proxy_type option =
      let request = new get url in  (* pseudo request *)
      self # proxy_type_of_call request


    method transport_layer (req : http_call) =
      req # private_api # transport_layer options


    method private add_with_callback_2 (request : http_call) f_done =

      let trans = request # private_api # transport_layer options in

      (* find out the effective peer: *)
      let peer, auth = self # peer_of_call request in
      let use_proxy =
	match peer with
	  | `Direct _ -> false
	  | _ -> true in

      (* Find out if there is already a connection to this peer: *)

      let conn = 
	let connlist = 
	  try
	    Hashtbl.find connections (peer, trans) 
	  with
	      Not_found ->
		let new_connlist = ref [] in
		Hashtbl.add
		  connections (peer, trans) new_connlist;
		new_connlist
	in
        let empty_conns = List.exists (fun c -> c#length = 0) !connlist in
	if not empty_conns &&
             List.length !connlist < !options.number_of_parallel_connections 
	then begin
	    let tp =
	      try 
		if trans = proxy_only_trans_id && not use_proxy then
                  raise Not_found;
		Hashtbl.find transports trans
	      with Not_found ->
		failwith "Nethttp_client: No transport for this transport ID" in
	    let proxy_auth_handler_opt =
	      match auth with
		| Some(u,p,insecure) ->
		    let kh = new proxy_key_handler u p in
		    Some(new unified_auth_handler ~insecure kh)
		| None -> 
		    None in
	    let new_conn = robust_pipeline
	                     esys tp trans
	                     peer
	                     proxy_auth_handler_opt
                             tp#default_port
			     conn_cache
			     (self :> < >)
			     auth_cache
                             tls_cache
			     counters
			     options in
	    open_connections <- open_connections + 1;
	    counters.new_connections <- counters.new_connections + 1;
	    connlist := new_conn :: !connlist;
	    new_conn
	  end 
	  else begin
	    (* Find the connection with the lowest number of queue entries: *)
	    List.fold_left
	      (fun best_conn a_conn ->
		 if a_conn # length < best_conn # length then
		   a_conn
		 else
		   best_conn)
	      (List.hd !connlist)
	      (List.tl !connlist)
	  end
      in
      
      (* Add the request to the queue of this connection: *)

      conn # add false request 
	(fun m ->
	   (* Update 'open_connections', 'connections', and 'open_messages' *)
           (* We don't delete proxy connections, because there is no risk
              that this overflows. Keeping the connections is advantageous
              because we also keep the auth session information that way
            *)
	   if not conn#active && not use_proxy then begin
	     (* Check whether the connection is still in the [connections]
              * hash. It is possible that it is already deleted there.
              *)

	     let connlist =
	       try
		 Hashtbl.find connections (peer, trans);
	       with
		   Not_found -> ref []
	     in
	     if List.exists (fun c -> c == conn) !connlist then (
	       connlist := List.filter (fun c -> c != conn) !connlist;
	       if !connlist = [] then
		 Hashtbl.remove connections (peer, trans);
	     )
	   end;
	   self # update_open_messages;
	   (* Do user action: *)
	   f_done m;
	);

      open_messages <- open_messages + 1;

    method private update_open_messages =
      open_messages <- 0;
      open_connections <- 0;
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		if c # active then (
                  open_connections <- open_connections + 1;
		  open_messages <- open_messages + (c # length)
                )
             )
	     !cl)
	connections;


    method private add_with_callback_1 (request : http_call) f_done =
      (* this only handles auth-rerouting *)
      self # add_with_callback_2
        request
        (fun m ->
           match m # private_api # auth_state with
             | `Resubmit trans_id ->
	          if !options.verbose_status then
                    dlog
                      (sprintf "Call %d - auth-resubmitting to trans ID %d"
                               (Oo.id m) trans_id);
                  m # private_api # reroute trans_id;
                  m # private_api # set_auth_state `None;
	          m # private_api # set_error_counter 0;
		  self # add_with_callback_2 m f_done
             | `In_reply_reroute(sess,headers,trans_id) ->
	          if !options.verbose_status then
                    dlog
                      (sprintf "Call %d - auth-rerouting to trans ID %d"
                               (Oo.id m) trans_id);
                  m # private_api # reroute trans_id;
                  m # private_api # set_auth_state (`In_reply(sess,headers));
	          m # private_api # set_error_counter 0;
		  self # add_with_callback_2 m f_done
             | _ ->
                  m # private_api # set_auth_state `None;
                  f_done m
        )

    method add_with_callback (request : http_call) f_done =
      request # private_api # set_auth_state `None;
      request # private_api # parse_request_uri options;
      self # add_with_callback_1
	request
	(fun m ->
	   try
	     let (_,code,_) = m # dest_status() in
	     match code with
		 (301|302) ->
		   (* Simply repeat the request with a different URI *)
		   let do_redirection =
		     match m # get_redirect_mode with
			 Redirect -> true
		       | Do_not_redirect -> false
		       | Redirect_if_idem -> m # is_idempotent
		       | Redirect_inquire f ->
			   (* Ask the function 'f' whether to redirect: *)
			   begin 
			     try f m    (* returns true or false *)
			     with
			     (* The invocation of 'f' may raise an exception.
			      * It is printed to stderr (there is no other
			      * way to report it).
			      *)
				 x ->
				   dlog (sprintf 
					   "Call %d - \
                                            Exception caught in Nethttp_client %s"
					   (Oo.id m)
					   (Netexn.to_string x));
				   false
			   end
		   in

		   if do_redirection then begin
		     (* Maybe the redirection limit is exceeded: *)
		     let rc = m # private_api # get_redir_counter in
		     if rc >= !options.maximum_redirections
		     then (
		       m # private_api # set_error_exception Too_many_redirections;
		       f_done m
		     )
		     else (
		       let location = m # assoc_resp_header "location" in
		       (* or raise Not_found *)
		       let location' =
			 if location <> "" && location.[0] = '/' then
			   (* Problem: "Location" header must be absolute due
			    * to RFC specs. Now it is relative (with full path).
			    * Workaround: Interpret relative to old server
			    *)
			   ( try
			       let (nu,_) =
				 parse_url 
				   ~base_url:(m # private_api # request_uri_with
					      ())
				   options
				   location in
			       Neturl.string_of_url nu
			     with _ -> location
			   )
			 else
			   location in
		       let ok =
			 try
			   m # set_request_uri location';
			   true
			 with
			   | _ ->
			       (* Bad URL! *)
			       let e = URL_syntax_error location' in
			       m # private_api # set_error_exception e;
			       false
		       in
		       if ok then (
			 m # private_api # set_redir_counter (rc+1);
			 m # private_api # set_error_counter 0;
			 self # add_with_callback m f_done
		       )
		       else f_done m
		     )
		   end
		     else f_done m

	       | _ -> 
		   f_done m
	     with
		 (Http_protocol _ | Not_found) -> 
		   f_done m
	)


    method add request =
      self # add_with_callback request (fun _ -> ())

    method add_e request =
      let (e, signal) = Uq_engines.signal_engine esys in
      self # add_with_callback request (fun c -> signal (`Done c));
      e

    method run () =
	 Unixqueue.run esys

    method get_options = !options

    method set_options p =
      let old_opts = !options in
      options := p;
      (* Otherwise changing the tls config wouldn't have any effect: *)
      let tls_changed =
        match old_opts.tls, p.tls with
          | None, Some _ -> true
          | Some _, None -> true
          | Some t1, Some t2 -> t1 != t2
          | _ -> false in
      if tls_changed then
        self # update_https_transport()

    method number_of_open_messages = open_messages

    method number_of_open_connections = open_connections

    method connections =
      let l = ref [] in
      Hashtbl.iter
	(fun (peer, _) conns ->
	   List.iter
	     (fun conn ->
		let host, port = first_hop peer in
		l := (host, port, conn#length) :: !l
	     )
	     !conns
	)
	connections;
      !l

    method cnt_new_connections = counters.new_connections

    method cnt_timed_out_connections = counters.timed_out_connections

    method cnt_crashed_connections = counters.crashed_connections

    method cnt_server_eof_connections = counters.server_eof_connections

    method cnt_successful_connections = counters.successful_connections

    method cnt_failed_connections = counters.failed_connections

    method reset_counters() =
      counters.new_connections <- 0;
      counters.timed_out_connections <- 0;
      counters.crashed_connections <- 0;
      counters.server_eof_connections <- 0;
      counters.successful_connections <- 0;
      counters.failed_connections <- 0;


  end
;;

(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***                 THE CONVENIENCE MODULE                         ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* This module is intended for beginners and for simple applications
 * of this HTTP implementation.
 *)

let omtp = !Netsys_oothr.provider
let mutex = omtp # create_mutex()

let serialize f arg =
  mutex # lock();
  try
    let r = f arg in
    mutex # unlock();
    r
  with
      err -> mutex # unlock(); raise err
;;
	

module Convenience =
  struct

    let http_trials = ref 3
    let http_user = ref ""
    let http_password = ref ""

    let this_user = ref ""
    let this_password = ref ""

    let http_insecure = ref false

    let conv_verbose = ref false

    class simple_key_handler : key_handler =
    object
      method inquire_key ~domain ~realm ~auth =
	if !this_user <> "" then
          key ~user:!this_user ~password:!this_password ~realm ~domain:[]
	else
	  if !http_user <> "" then
            key ~user:!http_user ~password:!this_password ~realm ~domain:[]
	  else
	    raise Not_found
      method invalidate_key (_ : key) = ()
    end


    let key_handler = new simple_key_handler

    let get_default_pipe() =

      let p = new pipeline in

      p # set_proxy_from_environment ~insecure:!http_insecure ();

      (* Add authentication methods: *)
      let h = new unified_auth_handler ~insecure:!http_insecure key_handler in
      p # add_auth_handler h;

      (* That's it: *)
      p


    let pipe = lazy (get_default_pipe())
    let pipe_empty = ref true


    let configure ?insecure () =
      ( match insecure with
          | None -> ()
          | Some flag -> http_insecure := flag
      )


    let configure_pipeline f =
      let p = Lazy.force pipe in
      f p

    let request m =
      serialize
	(fun trials ->
	   let p = Lazy.force pipe in
	   m # set_accept_encoding();
	   p # add m;
	   try
	     p # run()
	   with
	     | error -> p # reset(); raise error
	)

    let prepare_url =
      serialize
	(fun url ->
	   let p = Lazy.force pipe in
	   try
	     this_user := "";
	     let (nu, _) = parse_url (ref p#get_options) url in
	     if Neturl.url_provides ~user:true nu then (
	       this_user := Neturl.url_user nu;
	       this_password := "";
	       if Neturl.url_provides ~password:true nu then
		 this_password := Neturl.url_password nu;
	     );
	     Neturl.string_of_url
	       (Neturl.remove_from_url
		  ~user:true ~user_param:true ~password:true
		  nu)
	   with
	     | Not_found | Neturl.Malformed_URL -> 
		 url
	)

    let http_get_message url =
      let m = new get (prepare_url url) in
      request m !http_trials;
      m

    let http_get url = (http_get_message url) # get_resp_body()

    let http_head_message url =
      let m = new head (prepare_url url) in
      request m !http_trials;
      m

    let http_post_message url params =
      let m = new post (prepare_url url) params in
      request m 1;
      m

    let http_post url params = (http_post_message url params) # get_resp_body()

    let http_put_message url content =
      let m = new put (prepare_url url) content in
      request m !http_trials;
      m

    let http_put url content = (http_put_message url content) # get_resp_body()

    let http_delete_message url =
      let m = new delete (prepare_url url) in
      request m 1;
      m

    let http_delete url = (http_delete_message url) # get_resp_body()


    let http_verbose 
         ?(verbose_status=true)
         ?(verbose_request_header=true)
         ?(verbose_response_header=true)
         ?(verbose_request_contents=true)
         ?(verbose_response_contents=true)
         ?(verbose_connection=true)
         ?(verbose_events=true) =
      serialize
	(fun () ->
	   let p = Lazy.force pipe in
	   let opt = p # get_options in
	   p # set_options
	     { opt with verbose_status = verbose_status;
	         verbose_request_header = verbose_request_header;
		 verbose_response_header = verbose_response_header;
		 verbose_request_contents = verbose_request_contents;
		 verbose_response_contents = verbose_response_contents;
		 verbose_connection = verbose_connection;
		 verbose_events = verbose_events
             };
	   conv_verbose := true;
	   Debug.enable := true;
	)
  end

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml