Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: http_client.ml 1619 2011-06-12 13:53:26Z gerd $
 * ----------------------------------------------------------------------
 *
 *)

(* - CHECK: fd tracking - OK
   - half-open connections and https - OK
   - Timeout for https - OK
   - suite - OK
   - CONNECT - OK
     * localhost als proxy konfigurieren - OK
     * basic auth for CONNECT -OK
     * Proxy: digest auth, also for CONNECT - OK
   - CONNECT + auth in test suite - OK
   - SOCKS - OK
   - Always include "Proxy-Connection: keep-alive" headers - OK
   - check: upper/lowercase for "close" tokens etc. - OK
   - Look for all "80" defaults - OK
   - Netfs - OK
   - HTTPS in Convenience - OK
   - GZIP response - OK
   - Http_util stuff - OK
   - Proxy: allow foreign schemes, e.g. ftp: URLs
 *)

(* TODO:
   - Also support automatic compression of uploads. Be prepared to get
     a 415 response, and fall back to [identity] (or whatever is available)
     Plan:
     * message: set_content_encoding
     * message, private-api: allow automatic compression in open_value_rd
     * Uq_io: implement filter_in_buffer
     * postprocess: intercept 415 responses, and choose a different
       encoding
 *)


(* Reference documents:
 * RFC 2068, 2616:      HTTP 1.1
 * RFC 2069, 2617:      Digest Authentication
 *)

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Http_client" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Http_client" Debug.enable

let () =
  Netlog.Debug.register_module "Http_client" Debug.enable


open Http_client_conncache
open Printf
open Uq_engines.Operators

exception Bad_message of string;;
exception Http_error of (int * string);;
exception Http_protocol of exn;;
exception Proxy_error of int
exception No_reply;;
exception Too_many_redirections;;
exception Name_resolution_error = Uq_resolver.Host_not_found
exception URL_syntax_error of string
exception Timeout of string
exception Response_too_large

let () =
  Netexn.register_printer
    (Http_protocol Not_found)
    (fun e ->
       match e with
	 | Http_protocol e' ->
	     "Http_client.Http_protocol(" ^ Netexn.to_string e' ^ ")"
	 | _ ->
	     assert false
    )

let() =
  Netsys_signal.init()


type status =
  [ `Unserved
  | `Http_protocol_error of exn
  | `Successful
  | `Redirection
  | `Client_error
  | `Server_error
  ]

type response_body_storage =
    [ `Memory
    | `File of unit -> string
    | `Body of unit -> Netmime.mime_body
    | `Device of unit -> Uq_io.out_device
    ]

type 'message_class how_to_reconnect =
    Send_again
  | Request_fails
  | Inquire of ('message_class -> bool)
  | Send_again_if_idem
;;

type 'message_class how_to_redirect =
    Redirect
  | Do_not_redirect
  | Redirect_inquire of ('message_class -> bool)
  | Redirect_if_idem
;;

type resolver =
    Unixqueue.unix_event_system -> 
    string -> 
    (Unix.inet_addr option -> unit) -> 
      unit
;;

type counters =
    { mutable new_connections : int;
      mutable timed_out_connections : int;
      mutable crashed_connections: int;
      mutable server_eof_connections : int;
      mutable successful_connections : int;
      mutable failed_connections : int;
    }


let better_unix_error f arg =
  try
    f arg
  with
    Unix.Unix_error (e,syscall,param) ->
      let error = Unix.error_message e in
      if param = "" then
	failwith error
      else
	failwith (param ^ ": " ^ error)


let rec syscall f =
  (* Invoke system call, and handle EINTR *)
  try
    f()
  with
      Unix.Unix_error(Unix.EINTR,_,_) ->
	(* "interrupted system call": A signal happened while the system
	 * blocked.
	 * Simply restart the call.
	 *)
	syscall f
;;


let hex_digits = [| '0'; '1'; '2'; '3'; '4'; '5'; '6'; '7';
		    '8'; '9'; 'a'; 'b'; 'c'; 'd'; 'e'; 'f' |];;

let encode_hex s =
  (* encode with lowercase hex digits *)
  let l = String.length s in
  let t = String.make (2*l) ' ' in
  for n = 0 to l - 1 do
    let x = Char.code s.[n] in
    t.[2*n]   <- hex_digits.( x lsr 4 );
    t.[2*n+1] <- hex_digits.( x land 15 );
  done;
  t
;;


type synchronization =
  | Sync
  | Pipeline of int
;;


let max_pipeline = 8 ;;

let pipeline_blacklist =
  (* Stolen from Mozilla *)
  [ Netstring_str.regexp "Microsoft-IIS/";
    Netstring_str.regexp "Netscape-Enterprise/3.";
  ]
;;


type channel_binding_id = int

type http_options = 
    { synchronization : synchronization;
      maximum_connection_failures : int;
      maximum_message_errors : int;
      inhibit_persistency : bool;
      connection_timeout : float;
      number_of_parallel_connections : int;
      maximum_redirections : int;
      handshake_timeout : float;
      resolver : resolver;
      configure_socket : Unix.file_descr -> unit;
      schemes : (string * Neturl.url_syntax * int option * channel_binding_id)
	         list;
      verbose_status : bool;
      verbose_request_header : bool;
      verbose_response_header : bool;
      verbose_request_contents : bool;
      verbose_response_contents : bool;
      verbose_connection : bool;
      verbose_events : bool;
    }
;;

type header_kind = [ `Base | `Effective ]

(*
let http_re =
  Netstring_str.regexp
    "^https?://\
     \\(\
       \\([^/:@]+\\)\
       \\(:\\([^/:@]+\\)\\)?@\
     \\)?\
     \\([^/:@]+\\)\
     \\(:\\([0-9]+\\)\\)?\
     \\([/?].*\\)?$"
 *)

let parse_url ?base_url options url =
  try
    let sch = 
      try Neturl.extract_url_scheme url 
      with err ->
	( match base_url with
	    | None -> raise err
	    | Some b -> Neturl.url_scheme b
	) in
    let (_,syn,p_opt,cb) = 
      List.find (fun (sch',_,_,_) -> sch=sch') !options.schemes in
    let ht = Hashtbl.create 1 in
    Hashtbl.add ht sch syn;
    let url' = Neturl.fixup_url_string url in
    let nu1 = 
      Neturl.parse_url 
	?base_syntax:(match base_url with
			| None -> None
			| Some b -> Some(Neturl.url_syntax_of_url b)
		     )
	~schemes:ht ~accept_8bits:true url' in
    let nu2 = 
      Neturl.ensure_absolute_url ?base:base_url nu1 in
    (Neturl.default_url ?port:p_opt nu2, cb)
  with
    | Neturl.Malformed_URL -> raise Not_found

(*
  match Netstring_str.string_match http_re url 0 with
    | None ->
	raise Not_found
    | Some m ->
	let is_https =
	  String.sub url 0 5 = "https" in
	let user =
	  try Some(Netstring_str.matched_group m 2 url) 
	  with Not_found -> None in
	let password =
	  try Some(Netstring_str.matched_group m 4 url)
	  with Not_found -> None in
	let host =
	  Netstring_str.matched_group m 5 url in
	let port =
	  try int_of_string(Netstring_str.matched_group m 7 url)
	  with Not_found -> (if is_https then 443 else 80) in
	let path =
	  try Netstring_str.matched_group m 8 url with Not_found -> "" in
	(is_https,user,password,host,port,path)
 *)


let comma_re = Netstring_str.regexp "[ \t\n\r]*,[ \t\n\r]*" ;;

let split_words_by_commas s =
  Netstring_str.split comma_re s ;;

let space_re = Netstring_str.regexp "[ \t\n\r]+" ;;

let split_words s =
  Netstring_str.split space_re s ;;


let sync_resolver esys name reply =
  (* FIXME: Use Uq_resolver also in the async case! *)
  let addr =
    try
      let h = Uq_resolver.get_host_by_name name in
      Some h.Unix.h_addr_list.(0)
    with Uq_resolver.Host_not_found _ -> None in
  reply addr
;;


(**********************************************************************)
(***                          BUFFERS                               ***)
(**********************************************************************)

(* Similar to Buffer, but may be accessed in a more efficient way. *)

module B = Netbuffer;;

type buffer_type = B.t;;


module Q = 
struct
  (* This queue allows one to insert elements. *)

  type 'a t = 'a Queue.t

  exception Empty = Queue.Empty

  let create = Queue.create

  let add = Queue.add

  let push = add 

  let add_at n x q =
    (* Add x behind the n-the last element of q, i.e.
     * add_at 0 x q = Add behind the last element = add x q
     * add_at 1 x q = Add before the last element
     * ...
     * This algorithm is usually called for n ~ length, and is quite
     * efficient in this case.
     *)
    let l = Queue.length q in
    let q' = Queue.create() in
    for k = 1 to l-n do
      Queue.add (Queue.take q) q'
    done;
    Queue.add x q';
    Queue.transfer q q';
    Queue.transfer q' q

  let take = Queue.take
     
  let pop = take

  let peek = Queue.peek

  let top = peek

  let clear = Queue.clear

  let copy = Queue.copy

  let is_empty = Queue.is_empty

  let length = Queue.length

  let iter = Queue.iter

  let transfer = Queue.transfer

  (* fold: omitted *)

end



(**********************************************************************)
(***                  THE HTTP CALL CONTAINER                       ***)
(**********************************************************************)


let dump_header prefix h =
  List.iter
    (fun (n,v) ->
       dlog (prefix ^ n ^ ": " ^ v))
    h



type 'session auth_state =  (* 'session = auth_session, defined below *)
    [ `None
         (* Authentication has not yet been tried *)
    | `In_advance of 'session
         (* This session had been tried before a 401 response was seen *)
    | `In_reply of 'session
         (* This session was tried after a 401 response was seen *)
    ]

class type http_call =
object
  method is_served : bool
  method status : status
  method request_method : string
  method request_uri : string
  method set_request_uri : string -> unit
(* method is_https : bool *)
  method request_header : header_kind -> Netmime.mime_header
  method set_request_header : Netmime.mime_header -> unit
  method set_expect_handshake : unit -> unit
  method set_chunked_request : unit -> unit
  method effective_request_uri : string
  method request_body : Netmime.mime_body
  method set_request_body : Netmime.mime_body -> unit
  method set_request_device : (unit -> Uq_io.in_device) -> unit
  method set_accept_encoding : unit -> unit
  method response_status_code : int
  method response_status_text : string
  method response_status : Nethttp.http_status
  method response_protocol : string
  method response_header : Netmime.mime_header
  method response_body : Netmime.mime_body
  method response_body_storage : response_body_storage
  method set_response_body_storage : response_body_storage -> unit
  method max_response_body_length : int64
  method set_max_response_body_length : int64 -> unit
  method get_reconnect_mode : http_call how_to_reconnect
  method set_reconnect_mode : http_call how_to_reconnect -> unit
  method get_redirect_mode : http_call how_to_redirect
  method set_redirect_mode : http_call how_to_redirect -> unit
  method proxy_enabled : bool
  method set_proxy_enabled : bool -> unit
  method no_proxy : unit -> unit
  method is_proxy_allowed : unit -> bool
  method proxy_use_connect : bool
  method empty_path_replacement : string
  method is_idempotent : bool
  method has_req_body : bool
  method has_resp_body : bool
  method channel_binding : channel_binding_id
  method set_channel_binding : channel_binding_id -> unit
  method same_call : unit -> http_call
  method get_req_method : unit -> string
  method get_host : unit -> string
  method get_port : unit -> int
  method get_path : unit -> string
  method get_uri : unit -> string
  method get_req_body : unit -> string
  method get_req_header : unit -> (string * string) list
  method assoc_req_header : string -> string
  method assoc_multi_req_header : string -> string list
  method set_req_header : string -> string -> unit
  method get_resp_header : unit -> (string * string) list
  method assoc_resp_header : string -> string
  method assoc_multi_resp_header : string -> string list
  method get_resp_body : unit -> string
  method dest_status : unit -> (string * int * string)
  method private_api : private_api
end

and private_api =
object
  method parse_request_uri : http_options ref -> unit
  method request_uri_with : ?path:string option -> ?remove_particles:bool ->
                            unit -> Neturl.url

  method get_error_counter : int
  method set_error_counter : int -> unit
  method set_error_exception : exn -> unit
  method error_exception : exn option

  method error_if_unserved : bool -> exn -> unit

  method get_redir_counter : int
  method set_redir_counter : int -> unit

  method continue : bool
    (* The value of the continue flag *)
  method set_continue : bool -> unit
    (* Set the continue flag to [true]. The argument says whether a
       "100" code was received or not. (In the latter case the reaction is
       different.)
     *)
  method wait_continue_e : float -> Unixqueue.event_system -> 
                                                         bool Uq_engines.engine
    (* Waits until [set_continue] is called, or until the time is over.
       The bool result says whether a "100" code was received or not.
     *)

  method auth_state : auth_session auth_state
  method set_auth_state : auth_session auth_state -> unit

  method set_effective_request_uri : string -> unit

  method prepare_transmission : unit -> unit
    (* Initializes the `Effective request header, and creates the response
     * objects. Sets the status to [`Unserved]. The error and redirection
     * counters are not changed.
     *)
  method set_response_status : int -> string -> string -> unit
    (* code, text, proto *)
  method set_response_header : Netmime.mime_header -> unit
    (* Sets the response header *)
  method response_body_open_wr : Unixqueue.event_system -> Uq_io.out_device
    (* Opens the response body for writing *)
  method request_body_open_rd : Unixqueue.event_system -> Uq_io.in_device
    (* Opens the request body for reading *)
  method finish_request_e : Unixqueue.event_system -> unit Uq_engines.engine
    (* Finish the request part of the call *)
  method finish_response_e : Unixqueue.event_system -> unit Uq_engines.engine
    (* The call is finished. The [status] is set to [`Successful], 
     * [`Redirection], [`Client_error], or [`Server_error] depending on
     * the response.
     *)
  method cleanup : unit -> unit
    (* Release resources *)

  method response_code : int
  method response_proto : string
  method response_header : Netmime.mime_header

  method decompression_enabled : bool

  method dump_status : unit -> unit
  method dump_response_header : unit -> unit
  method dump_response_body : unit -> unit
end

and auth_session =
object
  method auth_scheme : string
  method auth_domain : Neturl.url list
  method auth_realm : string
  method auth_user : string
  method auth_in_advance : bool
  method authenticate : http_call -> (string * string) list
  method invalidate : http_call -> bool
end


class type virtual gen_call =
object
  inherit http_call
  method private virtual fixup_request : unit -> unit
  method private virtual def_request_method : string
  method private virtual def_empty_path_replacement : string
  method private virtual def_is_idempotent : bool
  method private virtual def_has_req_body : bool
  method private virtual def_has_resp_body : bool
end


let new_cb_id () =
  Oo.id (object end)

let http_cb_id = new_cb_id()
let https_cb_id = new_cb_id()
let proxy_only_cb_id = new_cb_id()


class virtual generic_call : gen_call =
object(self)
  val mutable status = (`Unserved : status)
  val mutable finished = false

  val mutable req_uri = None
      (* The req_uri must always include the port number *)
  val mutable req_uri_raw = ""

  val mutable req_cb = http_cb_id
  val mutable req_secure = false
  val mutable req_host = ""   (* derived from req_uri *)
  val mutable req_port = 80   (* derived from req_uri *)
  val mutable req_path = ""   (* derived from req_uri *)
  val mutable req_base_header = new Netmime.basic_mime_header []
  val mutable req_work_header = new Netmime.basic_mime_header []
  val mutable req_body = new Netmime.memory_mime_body ""
  val mutable req_dev = None

  val mutable eff_req_uri = ""

  val mutable resp_code = 0
  val mutable resp_text = ""
  val mutable resp_proto = ""
  val mutable resp_header = new Netmime.basic_mime_header []
  val mutable resp_header_set = false
  val mutable resp_body = None
  val mutable resp_body_max = Int64.max_int
  val mutable resp_decompress = false

  val mutable resp_body_storage = (`Memory : response_body_storage)
  val mutable reconn_mode = Send_again_if_idem
  val mutable redir_mode = Redirect_if_idem
  val mutable proxy_enabled = true

  val mutable private_api = None

  val mutable resp_handle = None
  val mutable req_handle = None

  val mutable continue = false   (* Whether 100-Continue has been seen *)    
  val mutable continue_e = None
  val mutable continue_aborted = false
  val mutable continue_100 = false

  val mutable error_counter = 0
  val mutable redir_counter = 0
  val mutable auth_state = `None


  method private resp_body =
    match resp_body with
      | None ->
	  let rbody =
	    match resp_body_storage with
	      | `Memory ->
		  new Netmime.memory_mime_body ""
	      | `File f ->
		  let name = f() in
		  new Netmime.file_mime_body name
	      | `Body f ->
		  f () 
	      | `Device _ ->
		  failwith "Http_client: response is forwarded to device - \
                            no accessible response_body" in
	  resp_body <- Some rbody;
	  rbody
      | Some rbody -> rbody 


  method private def_private_api fixup_request =
    ( object(pself) 
	method private release_resources() =
(* eprintf "release_resources call=%d\n%!" (Oo.id self); *)
	  ( match req_handle with
	      | None -> ()
	      | Some d ->
		  Uq_io.inactivate d; req_handle <- None
	  );
	  ( match resp_handle with
	      | None -> ()
	      | Some d ->
		  Uq_io.inactivate d; resp_handle <- None
	  );
	  ( match continue_e with
	      | None -> ()
	      | Some e -> 
		  continue_e <- None;
		  continue_aborted <- true;
		  e#abort()
		  
	  )

	method get_error_counter = error_counter
	method set_error_counter n = error_counter <- n
	method get_redir_counter = redir_counter
	method set_redir_counter n = redir_counter <- n

	method continue = continue
	method set_continue code_100 = 
	  continue <- true;
	  continue_100 <- code_100;
	  match continue_e with
	    | None -> ()
	    | Some e -> 
		continue_e <- None;
		e#abort()

	method wait_continue_e tmo esys =
	  if continue then
	    eps_e (`Done true) esys
	  else (
	    assert(continue_e = None); (* cannot be called multiple times *)
	    let e = 
	      Uq_engines.delay_engine tmo
		(fun () -> eps_e (`Done ()) esys)
		esys in
	    continue_e <- Some e;
	    ( e
	      >> (fun _ -> 
		    continue_e <- None; 
		    if continue_aborted then `Aborted else  `Done continue_100
		 )
	    )
	  )

	method auth_state = auth_state
	method set_auth_state s = auth_state <- s

	method set_effective_request_uri s = eff_req_uri <- s

	method parse_request_uri options =
	  if req_uri = None then (
	    try
	      let (nu, cb) = parse_url options req_uri_raw in
	      if (Neturl.url_provides ~user:true nu || 
		    Neturl.url_provides ~password:true nu)
	      then
		failwith "Http_client: URL must not contain user or password";
	      req_uri <- Some nu;
	      req_host <- Neturl.url_host nu;
	      req_port <- Neturl.url_port nu;
	      req_path <- Neturl.join_path(Neturl.url_path nu) ^ 
                          (try "?" ^ Neturl.url_query nu with Not_found -> "");
	      req_cb <- cb;
	    with
		Not_found ->
		  failwith "Http_client: bad URL"
	  )

	method request_uri_with ?path ?(remove_particles=false) () =
	  match req_uri with
	    | None ->
		assert false
	    | Some ru ->
		let nu = 
		  Neturl.modify_url
		    ?path:(match path with
			     | None | Some None -> None
			     | Some (Some p) -> Some(Neturl.split_path p)
			  )
		    (Neturl.remove_from_url
		       ~path:(path = Some None)
		       ~query:remove_particles
		       ~param:remove_particles
		       ~fragment:remove_particles
		       ru) in
		nu


	method decompression_enabled = resp_decompress


	method prepare_transmission () =
	  pself # release_resources();
	  status <- `Unserved;
	  req_work_header <- new Netmime.basic_mime_header 
	                             req_base_header#fields;
	  resp_code <- 0;
	  resp_text <- "";
	  resp_proto <- "";
	  resp_header <- new Netmime.basic_mime_header [];
	  resp_header_set <- false;
	  resp_body <- None;
	  fixup_request();
	  ( try ignore(req_work_header # field "Date")
	    with Not_found ->
	      Nethttp.Header.set_date req_work_header (Unix.time())
	  );
	  ( try ignore(req_work_header # field "User-agent")
	    with Not_found ->
	      req_work_header # update_field "User-agent" "Netclient"
	  );
	  continue <- false;
	  continue_e <- None;
	  continue_aborted <- false;
	  continue_100 <- false

	method request_body_open_rd esys =
	  match req_dev with
	    | Some f ->
		let d = f() in
		req_handle <- Some d;
		d
	    | _ ->
		let ch = req_body # open_value_rd() in
		let d = 
		  `Async_in(new Uq_engines.pseudo_async_in_channel ch,esys) in
		req_handle <- Some d;
		d


	method set_response_status code text proto =
	  resp_code <- code;
	  resp_text <- text;
	  resp_proto <- proto

	method set_response_header h =
	  resp_header <- h;
	  resp_header_set <- true;
	  assert(resp_code <> 0);

	method response_body_open_wr esys =
	  let d0 =
	    match resp_body_storage with
	      | `Device f ->
		  f()
	      | _ ->
		  let rbody = self#resp_body in
		  let ch = rbody # open_value_wr() in
		  `Async_out(new Uq_engines.pseudo_async_out_channel ch,esys) in
	  (* Check for maximum response length: *)
	  let c = ref 0L in
	  let c_max = self # max_response_body_length in
	  let out_count n =
	    c := Int64.add !c (Int64.of_int n);
	    if !c > c_max then raise Response_too_large in
	  let d1 = `Count_out(out_count, d0) in
	  (* Check for decompression: *)
	  let d2 =
	    if pself#decompression_enabled then
	      let algos = 
		try Nethttp.Header.get_content_encoding resp_header
		with Not_found -> [] in
	      match algos with
		| [ algo ] ->
		    ( try 
			let decoder = 
			  Netcompression.lookup_decoder ~iana_name:algo () in
			resp_header # delete_field "Content-Encoding";
			let b = 
			  Uq_io.filter_out_buffer
			    ~max:(Some 4096) decoder d1 in
			`Buffer_out b
		      with
			| Not_found -> d1
		    )
		| _ -> 
		    d1
	    else
	      d1 in
	  resp_handle <- Some d2;
	  d2

	method response_code = resp_code
	method response_proto = resp_proto
	method response_header = resp_header


	method finish_request_e esys =
(* eprintf "finish_request call=%d\n%!" (Oo.id self); *)
	  match req_handle with
	    | None ->
		eps_e (`Done ()) esys
	    | Some d ->
		req_handle <- None;
		Uq_io.shutdown_e d
		>> (fun st -> Uq_io.inactivate d; st)

	method finish_response_e esys =
(* eprintf "finish_response call=%d\n%!" (Oo.id self); *)
	  assert(resp_code <> 0);
	  finished <- true;
	  status <- (if resp_code >= 200 && resp_code <= 299 then
		       `Successful
		     else if resp_code >= 300 && resp_code <= 399 then
		       `Redirection
		     else if resp_code >= 400 && resp_code <= 499 then
		       `Client_error
		     else
		       `Server_error);
	  (* do this last - it can trigger user callback functions: *)
	  match resp_handle with
	    | None ->
		eps_e (`Done ()) esys
	    | Some d ->
(* prerr_endline "Http_cllent SHUTDOWN"; *)
		resp_handle <- None;
		Uq_io.shutdown_e d
		>> (fun st -> Uq_io.inactivate d; st)

	method error_if_unserved verbose error =
	  match status with
	    | `Unserved ->
		if verbose then
		  dlogr (fun () -> 
			   sprintf "Call %d - error %s"
			     (Oo.id self)
			     (Netexn.to_string error));
		pself # set_error_exception error;
	    | _ ->
		()

	method set_error_exception x =
	  finished <- true;
	  ( match x with
		Http_error(_,_) -> assert false   (* not allowed *)
	      | _ ->
		  status <- (`Http_protocol_error x);
	  );
	  (* do this last - it can trigger user callback functions: *)
	  pself # release_resources();

	method error_exception =
	  match status with
	    | `Http_protocol_error x -> Some x
	    | _ -> None

	method cleanup () =
	  pself # release_resources()

	method dump_status () =
	  dlog (sprintf
		  "Call %d - HTTP response code: %d (%s)"
		  (Oo.id self) resp_code resp_text);
	  dlog (sprintf
		  "Call %d - HTTP response protocol: %s"
		  (Oo.id self) resp_proto);

	method dump_response_header () =
	  dump_header 
	    (sprintf 
	       "Call %d - HTTP response "
	       (Oo.id self))
	    (resp_header # fields)

	method dump_response_body () =
	  dlog (sprintf "Call %d - HTTP response body:\n%s\n"
		  (Oo.id self) 
		  (match resp_body with
		     | Some rbody -> rbody#value
		     | None -> ""
		  ))
      end
    )


  (* Call state *)

  method is_served = finished
  method status = status

  (* Accessing the request message (new style) *)

  method request_method = self # def_request_method
  method request_uri = req_uri_raw
  method set_request_uri uri = req_uri_raw <- uri; req_uri <- None

  method channel_binding = req_cb
  method set_channel_binding cb = req_cb <- cb


  method request_header (k:header_kind) =
    match k with
      | `Base -> req_base_header
      | `Effective -> req_work_header
  method set_request_header h =
    req_base_header <- h
  method set_expect_handshake() =
    req_base_header # update_field "Expect" "100-continue"
  method set_chunked_request() =
    req_base_header # update_field "Transfer-encoding" "chunked"
  method request_body = 
    if req_dev <> None then
      failwith "Http_client: No request_body - using a device instead";
    req_body
  method set_request_body b = req_body <- b; req_dev <- None
  method set_request_device f = req_dev <- Some f

  method effective_request_uri = eff_req_uri

  method set_accept_encoding() =
    let all_algos =
      Netcompression.all_decoders() in
    Nethttp.Header.set_accept_encoding
      req_base_header
      (if all_algos = [] then ["identity",[]] else
	 (List.map (fun token -> (token,[])) all_algos));
    resp_decompress <- true

  (* Accessing the response message (new style) *)

  method private check_response() =
    match status with
      | `Unserved ->
	  if not resp_header_set then
	    failwith "Http_client: HTTP call is unserved, no response yet"
      | `Http_protocol_error e -> 
	  raise (Http_protocol e)
      | _ -> ()

  method response_status_code = self#check_response(); resp_code
  method response_status_text = self#check_response(); resp_text
  method response_status = 
    self#check_response(); Nethttp.http_status_of_int resp_code
  method response_protocol = self#check_response(); resp_proto
  method response_header = self#check_response(); resp_header
  method response_body = 
    (* [status] is already set after the header is available. The presence
       of the body is indicated by [finished].
     *)
    self#check_response(); 
    if not finished then 
      failwith "Http_client: HTTP call is unserved, response not yet complete";
    self#resp_body

  (* Options *)

  method response_body_storage = resp_body_storage
  method set_response_body_storage s = resp_body_storage <- s
  method max_response_body_length = resp_body_max
  method set_max_response_body_length n = resp_body_max <- n
  method get_reconnect_mode = reconn_mode
  method set_reconnect_mode m = reconn_mode <- m
  method get_redirect_mode = redir_mode
  method set_redirect_mode m = redir_mode <- m
  method proxy_enabled = proxy_enabled
  method set_proxy_enabled e = proxy_enabled <- e
  method no_proxy() = proxy_enabled <- false
  method is_proxy_allowed() = proxy_enabled
  method proxy_use_connect = (req_cb = https_cb_id)

  (* Method characteristics *)

  method empty_path_replacement = self # def_empty_path_replacement
  method is_idempotent = self # def_is_idempotent
  method has_req_body = self # def_has_req_body
  method has_resp_body = self # def_has_resp_body

  (* Repeating calls *)
    
  method same_call() =
    let same =
      {< status = `Unserved;
	 finished = false;
	 resp_header = new Netmime.basic_mime_header [];
	 resp_header_set = false;
	 resp_body = None;
	 eff_req_uri = "";
	 private_api = None;
	 error_counter = 0;
	 redir_counter = 0;
	 continue = false;
	 continue_aborted = false;
	 continue_100 = false;
	 continue_e = None;
	 resp_handle = None;
	 req_handle = None;
	 auth_state = `None
      >} in
    (same : #http_call :> http_call)

  (* Old style access methods *)

  method get_req_method() = self # def_request_method
  method get_host() = req_host
  method get_port() = req_port
  method get_path() = req_path
  method get_uri() = req_uri_raw
  method get_req_body() = req_body # value
  method get_req_header () =
    List.map (fun (n,v) -> (String.lowercase n, v)) req_base_header#fields
  method assoc_req_header n =
    req_base_header # field n
  method assoc_multi_req_header n =
    req_base_header # multiple_field n
  method set_req_header n v =
    req_base_header # update_field n v
  method get_resp_header() =
    self#check_response(); 
    List.map (fun (n,v) -> (String.lowercase n, v)) resp_header#fields
  method assoc_resp_header n =
    self#check_response(); 
    resp_header # field n
  method assoc_multi_resp_header n =
    self#check_response(); 
    resp_header # multiple_field n
  method get_resp_body() =
    self#check_response();
    if resp_code >= 200 && resp_code <= 299 then
      self # resp_body # value
    else
      raise(Http_error(resp_code, self#resp_body#value))
  method dest_status() =
    self#check_response();
    (resp_proto, resp_code, resp_text)

  (* Private *)

  method private_api = 
    match private_api with
      | None ->
	  let api = self # def_private_api self#fixup_request in
	  private_api <- Some api;
	  api
      | Some api ->
	  api

  (* Virtual methods *)

  method virtual private fixup_request : unit -> unit
  method virtual private def_request_method : string
  method virtual private def_empty_path_replacement : string
  method virtual private def_is_idempotent : bool
  method virtual private def_has_req_body : bool
  method virtual private def_has_resp_body : bool
end

(******** SUBCLASSES IMPLEMENTING HTTP METHODS ************************)

class get_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "GET"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class head_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "HEAD"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = false
  method private def_empty_path_replacement = "/"
end

class trace_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "TRACE"
  method private def_is_idempotent = false
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class options_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "OPTIONS"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "*"
end

class post_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "POST"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class put_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "PUT"
  method private def_is_idempotent = true (* sic! *)
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class delete_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "DELETE"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class connect_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "CONNECT"
  method private def_is_idempotent = false
  method private def_has_req_body = false
  method private def_has_resp_body = true
    (* This is broken to some degree. See the comment below on when
       CONNECT has a response body
     *)
  method private def_empty_path_replacement = "/"
end


class get the_query =
  object (self)
    inherit get_call
    initializer
      self # set_request_uri the_query
  end


class trace the_query max_hops =
  object (self)
    inherit trace_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      (self # request_header `Effective) # update_field 
	                                "max-forwards" (string_of_int max_hops)
  end


class options the_query =
  object (self)
    inherit options_call
    initializer
      self # set_request_uri the_query
  end


class head the_query =
  object (self)
    inherit head_call
    initializer
      self # set_request_uri the_query
  end


class post query params =
  object (self)
    inherit post_call
    initializer
      self # set_request_uri query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      let is_chunked =
	try rh # field "Transfer-encoding" <> "identity"
	with Not_found -> false in
      rh # update_field "Content-type" "application/x-www-form-urlencoded";
      let l = List.map (fun (n,v) -> n ^ "=" ^ Netencoding.Url.encode v) params in
      let s = String.concat "&" l in
      if not is_chunked then
	rh # update_field "Content-length" (string_of_int (String.length s));
      self # request_body # set_value s
  end
;;


class post_raw the_query s =
  object (self)
    inherit post_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      let is_chunked =
	try rh # field "Transfer-encoding" <> "identity"
	with Not_found -> false in
      self # request_body # set_value s;
      if not is_chunked then
	rh # update_field "Content-length" (string_of_int (String.length s));
  end
;;


class put the_query s =
  object (self)
    inherit put_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      let is_chunked =
	try rh # field "Transfer-encoding" <> "identity"
	with Not_found -> false in
      self # request_body # set_value s;
      if not is_chunked then
	rh # update_field "Content-length" (string_of_int (String.length s));
  end
;;


class delete the_query =
  object (self)
    inherit delete_call
    initializer
      self # set_request_uri the_query
  end
;;


class connect the_query (* host:port only *) =
  object (self)
    inherit connect_call
    initializer
      self # set_request_uri ("http://" ^ the_query)
    method effective_request_uri =
      the_query
  end
;;


(**********************************************************************)
(***                  AUTHENTICATION METHODS                        ***)
(**********************************************************************)

class type key =
object
  method user : string
  method password : string
  method realm : string
  method domain : string list
end


let key ~user ~password ~realm ~domain =
  ( object
      method user = user
      method password = password
      method realm = realm
      method domain = domain
    end
  )


class type key_handler =
object
  method inquire_key :
            domain:string list -> realms:string list -> auth:string -> key
  method invalidate_key : key -> unit
end


class key_ring ?uplink () =
object(self)
  val mutable keys = (Hashtbl.create 10 : 
			(string list * string, key * bool) Hashtbl.t)
    (* Maps (domain, realm) to (key, from_uplink) *)

  method inquire_key ~domain ~realms ~(auth:string) =
    let l =
      List.flatten
	(List.map
	   (fun realm ->
	      try
		[ Hashtbl.find keys (domain, realm) ]
	      with
		  Not_found -> [])
	   realms) in
    match l with
      | (key,_) :: _ ->
	  key
      | [] ->
	  ( match uplink with
	      | None -> raise Not_found
	      | Some h ->
		  let key = h # inquire_key ~domain ~realms ~auth in
		  (* or Not_found *)
		  Hashtbl.replace keys (key#domain, key#realm) (key,true);
		  key
	  )

  method invalidate_key (key : key) =
    let domain = key # domain in
    let realm = key # realm in
    try
      let (_, from_uplink) = Hashtbl.find keys (domain, realm) in
      Hashtbl.remove keys (domain, realm);
      if from_uplink then
	( match uplink with
	    | None -> assert false
	    | Some h -> h # invalidate_key key
	)
    with
	Not_found -> ()

  method clear () =
    Hashtbl.clear keys

  method add_key key =
    let domain = key # domain in
    let realm = key # realm in
    Hashtbl.replace keys (domain, realm) (key, false)

  method keys =
    Hashtbl.fold
      (fun _ (key,_) acc -> key :: acc)
      keys
      []

end


class proxy_key_handler user password : key_handler =
object
  method inquire_key ~domain ~realms ~auth =
    try
      let realm = List.hd realms in
      key ~domain ~realm ~user ~password
    with _ -> raise Not_found
  method invalidate_key _ = ()
  
end



class type auth_handler =
object
  method create_session : http_call -> http_options ref -> auth_session option
  method create_proxy_session : http_call -> http_options ref -> auth_session option
end


exception Not_applicable

let get_domain_uri (call : http_call) =
  call # private_api # request_uri_with
    ~path:(Some "/") ~remove_particles:true ()


class basic_auth_session enable_auth_in_advance 
                         key_handler init_call for_proxy
                         : auth_session =
  let domain_uri = if for_proxy then [] else [ get_domain_uri init_call ] in
  let domain = List.map Neturl.string_of_url domain_uri in
  let basic_realms =
    (* Return all "Basic" realms in www-authenticate, or raise Not_applicable *)
    let auth_list = 
      try
	if for_proxy then
	  Nethttp.Header.get_proxy_authenticate init_call#response_header
	else
	  Nethttp.Header.get_www_authenticate init_call#response_header
      with
	| Not_found -> raise Not_applicable
	| Nethttp.Bad_header_field _ -> raise Not_applicable in
    let basic_auth_list =
      List.filter 
	(fun (scheme,_) -> String.lowercase scheme = "basic") auth_list in
    let basic_auth_realm_list =
      List.flatten
	(List.map 
	   (fun (_,params) ->
	      try
		let (_,realm) =
		  List.find (fun (pname,_) -> 
			       String.lowercase pname = "realm") params in
		[realm]
	      with 
		  Not_found -> [])
	   basic_auth_list) in
    if basic_auth_realm_list = [] then
      raise Not_applicable
    else
      basic_auth_realm_list
  in
  let key =
    (* Return the selected key, or raise Not_applicable *)
    try
      key_handler # inquire_key ~domain ~realms:basic_realms ~auth:"basic"
    with
	Not_found -> raise Not_applicable
  in
  (* Check the key: *)
  let () =
    if not (List.mem key#realm basic_realms) then raise Not_applicable;
    if key#domain <> domain then raise Not_applicable;
  in
object(self)
  method auth_scheme = "basic"
  method auth_domain = domain_uri
  method auth_realm = key # realm
  method auth_user = key # user
  method auth_in_advance = enable_auth_in_advance
  method authenticate call =
    let basic_cookie = 
      Netencoding.Base64.encode 
	(key#user ^ ":" ^ key#password) in
    let cred = "Basic " ^ basic_cookie in
    let field_name = 
      if for_proxy then "Proxy-Authorization" else "Authorization" in
    [ field_name, cred ]
  method invalidate call =
    key_handler # invalidate_key key;
    false
end


class basic_auth_handler ?(enable_auth_in_advance=false) 
                         (key_handler : #key_handler)
                         : auth_handler =
object(self)
  method create_session call options =
    try
      Some(new basic_auth_session enable_auth_in_advance key_handler call false)
    with
	Not_applicable ->
	  None
  method create_proxy_session call options =
    try
      Some(new basic_auth_session enable_auth_in_advance key_handler call true)
    with
	Not_applicable ->
	  None
end



let contains_auth v =
  List.mem "auth" (split_words v)


class digest_auth_session enable_auth_in_advance options
                          key_handler (init_call : http_call) for_proxy
                          : auth_session =
  let normalize_domain s =
    try
      let (nu1,_) =
	parse_url
	  ~base_url:(init_call#private_api#request_uri_with())
	  options s in
      Neturl.remove_from_url
	~user:true ~user_param:true ~password:true ~fragment:true nu1
    with
      | Neturl.Malformed_URL -> raise Not_found

(*
    if s <> "" && s.[0] = '/' then
      init_call#private_api#request_uri_with
	~path:None ~remove_particles:true ()
      ^ s
    else
      ( try
	  let (is_https,_,_,host,port,path) = parse_http_url s in
	  let scheme = if is_https then "https" else "http" in
	  scheme ^ "://" ^ host ^ ":" ^ string_of_int port ^ path
	with
	  | Not_found -> s
      )
 *)
  in
  let digest_request =
    (* Return the "Digest" params in www-authenticate, or raise Not_applicable *)
    let auth_list = 
      try
	if for_proxy then
	  Nethttp.Header.get_proxy_authenticate init_call#response_header
	else
	  Nethttp.Header.get_www_authenticate init_call#response_header
      with
	| Not_found -> raise Not_applicable
	| Nethttp.Bad_header_field _ -> raise Not_applicable in
    let digest_auth_list =
      List.filter 
	(fun (scheme,params) -> 
	   String.lowercase scheme = "digest"
	    && List.mem_assoc "realm" params
	    && (try List.mem (List.assoc "algorithm" params) ["MD5";"MD5-sess"]
		with Not_found -> true)
	    && (try contains_auth (List.assoc "qop" params)
		with Not_found -> true)
	    && List.mem_assoc "nonce" params
	) 
	auth_list in
    match  digest_auth_list with
      | [] ->
	  raise Not_applicable
      | (_,params) :: _ ->
	  (* Restriction: only the first request can be processed *)
	  params
  in
  let domain_url =
    if for_proxy then [] else
      try 
	List.map
	  normalize_domain
	  (split_words (List.assoc "domain" digest_request))
      with
	  Not_found -> [ get_domain_uri init_call ] in
  let domain =
    List.map Neturl.string_of_url domain_url in
  let realm =
    try List.assoc "realm" digest_request
    with Not_found -> assert false in
  let key =
    (* Return the selected key, or raise Not_applicable *)
    try
      key_handler # inquire_key ~domain ~realms:[realm] ~auth:"digest"
    with
	Not_found -> raise Not_applicable
  in
  (* Check the key: *)
  let () =
    if key#realm <> realm then raise Not_applicable;
    if key#domain <> domain then raise Not_applicable;
  in
  let algorithm =
    try List.assoc "algorithm" digest_request
    with Not_found -> "MD5" in
  let qop =
    if List.mem_assoc "qop" digest_request then "auth" else "" in
    (* "" = RFC 2069 mode *)
  let nonce =
    try List.assoc "nonce" digest_request
    with Not_found -> assert false in
  let cnonce_init0 = 
    try 
      let s = String.make 8 'X' in
      let () = Netsys_rng.fill_random s in
      s
    with _ -> string_of_float (Unix.gettimeofday()) in
object(self)
  val mutable cnonce_init = cnonce_init0
  val mutable cnonce_incr = 0
  val mutable nc = 0
  val mutable opaque = None
  val mutable a1 = None

  method private first_cnonce =
    Digest.to_hex
      (Digest.string (cnonce_init ^ ":0"))

  method private next_cnonce() =
    let cnonce =
      Digest.to_hex
	(Digest.string (cnonce_init ^ ":" ^ string_of_int cnonce_incr)) in
    cnonce_incr <- cnonce_incr + 1;
    cnonce

  method private next_nc() =
    let r = nc in
    nc <- nc + 1;
    r

  method private fn_h data =
    encode_hex (Digest.string data)

  method private fn_kd secret data =
    encode_hex (Digest.string (secret ^ ":" ^ data))

  method private a1 =
    match a1 with
      | Some v -> v
      | None ->
	  let v =
	    match algorithm with
	      | "MD5" ->
		  key#user ^ ":" ^ realm ^ ":" ^ key#password
	      | "MD5-sess" ->
		  (self # fn_h
		     (key#user ^ ":" ^ realm ^ ":" ^ key#password)) ^ 
		  ":" ^ nonce ^ ":" ^ self#first_cnonce
	      | _ ->
		  assert false
	  in
	  a1 <- Some v;
	  v

  method private a2 call =
    let meth = call # request_method in
    let uri = call # effective_request_uri in
    meth ^ ":" ^ uri

  method authenticate call =
    let cnonce = self#next_cnonce() in
    let nc = self # next_nc() in
    let digest =
      match qop with
	| "auth" ->
	    self#fn_kd
	      (self#fn_h self#a1)
	      (nonce ^ ":" ^ (Printf.sprintf "%08x" nc) ^ ":" ^ cnonce ^ ":" ^ 
		 "auth:" ^ (self#fn_h (self#a2 call)))
	| "" ->
	    self#fn_kd
	      (self#fn_h self#a1)
	      (nonce ^ ":" ^ (self#fn_h (self#a2 call))) 
	| _ ->
	    assert false  (* such digests are not accepted *)
    in
    let creds =
      Printf.sprintf
	"Digest username=\"%s\",realm=\"%s\",nonce=\"%s\",uri=\"%s\",response=\"%s\",algorithm=%s,cnonce=\"%s\",%s%snc=%08d"
	key#user
	realm
	nonce
	call#effective_request_uri
	digest
	algorithm
	cnonce
	(match opaque with
	   | None -> ""
	   | Some s -> "opaque=\"" ^ s ^ "\",")
	(match qop with
	   | "" -> ""
	   | "auth" -> "qop=auth,"
	   | _ -> assert false)
	nc in
    let field_name =
      if for_proxy then "Proxy-Authorization" else "Authorization" in
    [ field_name, creds ]

  method auth_scheme = "digest"
  method auth_domain = domain_url
  method auth_realm = key # realm
  method auth_user = key # user
  method auth_in_advance = enable_auth_in_advance

  method invalidate call =
    (* Check if the [stale] flag is set for our nonce: *)
    let is_stale =
      try
	let auth_list = 
	  Nethttp.Header.get_www_authenticate call#response_header in
	List.exists
	  (fun (scheme,params) -> 
	     String.lowercase scheme = "digest"
	      && (try List.assoc "realm" params = realm
		  with Not_found -> false) 
	      && (try List.assoc "nonce" params = nonce
		  with Not_found -> false)
	      && (try String.lowercase (List.assoc "stale" params) = "true"
		  with Not_found -> false)
	  ) 
	  auth_list
      with
	| Not_found -> false  (* No www-authenticate header *)
	| Nethttp.Bad_header_field _ -> false in
    is_stale || (
      key_handler # invalidate_key key;
      false
    )
end


class digest_auth_handler ?(enable_auth_in_advance=false) 
                         (key_handler : #key_handler)
                         : auth_handler =
object(self)
  method create_session call options =
    try
      Some(new digest_auth_session
	     enable_auth_in_advance options key_handler call false)
    with
	Not_applicable ->
	  None
  method create_proxy_session call options =
    try
      Some(new digest_auth_session 
	     enable_auth_in_advance options key_handler call true)
    with
	Not_applicable ->
	  None
end


class unified_auth_handler (key_handler : #key_handler) : auth_handler =
object(self)
  method create_session call options =
    try Some(new digest_auth_session false options key_handler call false)
    with Not_applicable ->
      try Some(new basic_auth_session false key_handler call false)
      with Not_applicable ->
	None
  method create_proxy_session call options =
    try Some(new digest_auth_session false options key_handler call true)
    with Not_applicable ->
      try Some(new basic_auth_session false key_handler call true)
      with Not_applicable ->
	None
end



let norm_neturl neturl =
  (* Returns the neturl as normalized string (esp. normalized % sequences) *)
  assert(Neturl.url_provides ~port:true neturl);
  let neturl' =
    Neturl.make_url 
      ~encoded:false
      ~scheme:(Neturl.url_scheme neturl)
      ~host:(Neturl.url_host neturl)
      ~path:(try Neturl.url_path neturl with Not_found -> [])
      ?query:(try Some(Neturl.url_query neturl) with Not_found -> None)
      ?fragment:(try Some(Neturl.url_fragment neturl) with Not_found -> None)
      (Neturl.url_syntax_of_url neturl) in
  Neturl.string_of_url neturl'


let prefixes_of_neturl s_url =
  (* Returns a list of all legal prefixes of the absolute URI s.
   * The prefixes are in Neturl format.
   *)
  let rec rev_path_prefixes rev_path =
    match rev_path with
      | [] -> []
      | [ "" ] -> [ rev_path; [] ]
      | [ ""; "" ] -> assert false
      | [ _; "" ] -> rev_path :: rev_path_prefixes [ "" ]
      | "" :: rev_path' ->
	  if rev_path' = [ ""; "" ] then
	    rev_path :: rev_path_prefixes [ "" ]
	  else
	    rev_path :: rev_path_prefixes rev_path'
      | _ :: rev_path' ->
	  rev_path :: (rev_path_prefixes ("" :: rev_path'))
  in
  let path_prefixes path =
    List.map List.rev (rev_path_prefixes (List.rev path)) in
  let s_nofrag_url = Neturl.remove_from_url ~fragment:true s_url in
  let s_noquery_url = Neturl.remove_from_url ~query:true s_nofrag_url in
  let path = Neturl.url_path s_noquery_url in
  s_url :: s_nofrag_url ::
    (List.map
       (fun prefix -> Neturl.modify_url ~path:prefix s_noquery_url
       )
       (path_prefixes path))



class auth_cache =
object(self)
  val mutable auth_handlers = []
  val sessions = Hashtbl.create 10
    (* Only sessions that can be used for authentication in advance. 
     * The hash table maps domain URIs to sessions.
     *)

  method add_auth_handler (h : auth_handler) =
    auth_handlers <- auth_handlers @ [h]

  method create_session (call : http_call) options =
    (* Create a new session after a 401 reply *)
    let rec find l =
      match l with
	| [] -> None
	| h :: l' ->
	    ( match h # create_session call options with
		| None ->
		    find l'
		| Some s ->
		    Some s
	    )
    in
    find auth_handlers

  method tell_successful_session (sess : auth_session) =
    (* Called by [postprocess_complete_message] when authentication was
     * successful. If enabled, [sess] can be used for authentication
     * in advance.
     *)
    if sess # auth_in_advance then (
      List.iter
	(fun dom_uri ->
	   try
	     let dom_uri' = norm_neturl dom_uri in
	     Hashtbl.replace sessions dom_uri' sess
	   with
	     | Neturl.Malformed_URL -> ()
	)
	sess#auth_domain
    )

  method tell_failed_session (sess : auth_session) =
    (* Called by [postprocess_complete_message] when authentication 
     * failed
     *)
    List.iter
      (fun dom_uri ->
	 try
	   let dom_uri' = norm_neturl dom_uri in
	   Hashtbl.remove sessions dom_uri'
	 with
	   | Neturl.Malformed_URL -> ()
      )
      sess#auth_domain;


  method find_session_in_advance (call : http_call) =
    (* Find a session suitable for authentication in advance *)
    let uri = call # private_api # request_uri_with() in
    (* We are not only looking for [uri], but also for all prefixes of [uri] *)
    try
      let prefixes = prefixes_of_neturl uri in
      let prefix =
	List.find (* or Not_found *)
	  (fun prefix ->
	     let s = norm_neturl prefix in
	     Hashtbl.mem sessions s
	  )
	  prefixes in
      Hashtbl.find sessions (norm_neturl prefix)
    with
      | Neturl.Malformed_URL ->
	  raise Not_found
end


(* Backwards compatibility: *)

class key_backing_store =
object(self)
  val db = (Hashtbl.create 10 : (string, (string*string)) Hashtbl.t)
  method set_realm realm user password =
    Hashtbl.replace db realm (user,password)
  method inquire_key ~domain ~realms ~(auth:string) =
    let realm = List.find (fun realm -> Hashtbl.mem db realm) realms in
    let (user, password) = Hashtbl.find db realm in
    ( object
	method user = user
	method password = password
	method realm = realm
	method domain = (domain : string list)
      end
    )
  method invalidate_key (_ : key) = ()
end


class auth_method name (mk_auth_handler : key_ring -> auth_handler) =
  let key_bs =
    new key_backing_store in
  let key_ring = 
    new key_ring ~uplink:key_bs () in
  let auth_handler = 
    mk_auth_handler key_ring in
object(self)
  method name = (name : string)
  method set_realm realm user password =
    key_bs # set_realm realm user password
  method as_auth_handler =
    auth_handler
end


class basic_auth_method =
  auth_method 
    "basic"
    (fun kr -> 
       new basic_auth_handler ~enable_auth_in_advance:true kr)

class digest_auth_method =
  auth_method
    "digest"
    (fun kr -> 
       new digest_auth_handler ~enable_auth_in_advance:true kr)


(**********************************************************************)
(***                 THE CONNECTION CACHE                           ***)
(**********************************************************************)

type connection_cache = Http_client_conncache.connection_cache

let close_connection_cache conn_cache =
  conn_cache # close_all()

let create_restrictive_cache() = new restrictive_cache()

let create_aggressive_cache() = new aggressive_cache()


(**********************************************************************)
(***                       THE I/O BUFFER                           ***)
(**********************************************************************)

(* io_buffer performs the socket I/O.
 *
 * TODO: COMMENT OUT OF DATE
 * The input side is a queue of octets which can be filled by
 * a Unix.read call at its end, and from which octets can be removed
 * at its beginning ("consuming octets").
 * There is also functionality to remove 1XX responses at the beginning
 * of the buffer, and to interpret the beginning of the buffer as HTTP
 * status line.
 * The idea of the buffer is that octets can be added at the end of the
 * buffer while the beginning of the buffer is interpreted as the beginning
 * of the next message. Once enough octets have been added that the message
 * is complete, it is removed (consumed) from the buffer, and the possibly
 * remaining octets are the beginning of the following message.
 *)

exception Garbage_received of string
  (* This exception is raised by [parse_response] when a protocol error
   * occurred before the response status line has been completely received.
   * Such errors are not transferred to the http_call.
   *)


let status_re = 
  Netstring_str.regexp "^\\([^ \t]+\\)\
                        [ \t]+\
                        \\([0-9][0-9][0-9]\\)\
                        \\([ \t]+\\([^\r\n]*\\)\\)?\
                        \r?$" ;;

let chunk_re = 
  Netstring_str.regexp "[ \t]*\
                        \\([0-9a-fA-F]+\\)\
                        [ \t]*\
                        \\(;[^\r\n\000]*\\)?\
                        \r?" ;;

type sockstate =
    Down
  | Up_rw 
  | Up_r
;;


type send_token =
  | Send_header of int * string * string * Netmime.mime_header_ro
      (* call_id, method, url, header *)
  | Send_body of Uq_io.in_device * int64 option
      (* data, length_opt *)
  | Send_body_chunked of Uq_io.in_device
  | Send_eof


let max_line_len = 65536

let input_line_opt_e ?max_len dev =
  Uq_io.input_line_e ?max_len dev >> Uq_io.eof_as_none >>
    (fun st ->
       match st with
	 | `Error Uq_io.Line_too_long ->
	     raise(Garbage_received "Line too long")
	 | _ -> st
    )

let input_opt_e dev s p n =
  Uq_io.input_e dev s p n >> Uq_io.eof_as_none


class type io_buffer =
object
  method socket_state : sockstate
  method socket : Unix.file_descr
  method socket_str : string
  method close : followup:(unit->unit) -> unit -> unit
  method down : unit -> unit
  method status_seen : bool
  method configure_read : fetch_call:(unit -> http_call) -> unit -> unit
  method read_e : Unixqueue.event_system -> http_call option Uq_engines.engine
  method write_activity : bool
  method add : send_token -> unit
  method write_e : Unixqueue.event_system -> unit Uq_engines.engine
end


let io_buffer options fd mplex fd_state : io_buffer =
  let dev = `Multiplex mplex in
  let buf_in_dev = `Buffer_in(Uq_io.create_in_buffer dev) in

  ( object (self)

    (****************************** SOCKET ********************************)
      
      val mutable socket_state = fd_state
      val mutable status_seen = false

      method socket_state = socket_state

      method socket = 
	match socket_state with
	  | Down -> failwith "Socket is down"
	  | _ -> fd
	      
      method socket_str =
	try
	  Int64.to_string (Netsys.int64_of_file_descr self # socket)
	with _ -> "n/a"
	  

      method close ~followup () =
	match socket_state with
	  | Down  -> followup ()
	  | _     -> 
	      let fd_str = self # socket_str in
	      if !options.verbose_connection then 
		dlogr (fun () ->
			 sprintf "FD %s - HTTP connection: Closing socket!"
			   fd_str);
	      mplex # cancel_reading();
	      mplex # cancel_writing();
	      mplex # start_shutting_down
		~when_done:(fun x_opt ->
			      Netlog.Debug.release_fd fd;
			      mplex # inactivate();
			      match x_opt with
				| None -> 
				    followup ()
				| Some err ->
				    if !options.verbose_connection then
				      dlog(sprintf
					     "FD %s - Shutdown error: %s"
					     fd_str (Netexn.to_string err));
				    followup()
			   )
		();
	      socket_state <- Down

      method down() =
	socket_state <- Down  (* our view *)

    (****************************** INPUT ********************************)

    val mutable cfg_fetch_call = (fun () -> raise Not_found)


    method status_seen =
      (* Whether at least one status line (incl. status 1XX) has been seen *)
      status_seen

    method configure_read ~fetch_call () =
      (* fetch_call: This is called when the status line is read to get
	 the corresponding call. This function may raise [Not_found] in
	 which case the status line is an error
       *)
      cfg_fetch_call <- fetch_call


    method read_e esys =
      (* This engine returns the next call read from the input, or
	 returns None for EOF.
       *)
      assert (socket_state = Up_r || socket_state = Up_rw);

      let cur_call = ref None in
      (* remember the call - only for postprocessing on error *)
      
      let rec read_status_line_e call_opt =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (fun line_opt ->
	      match line_opt with
		| None ->
		    eps_e (`Done None) esys
		| Some line ->
		    ( match Netstring_str.string_match status_re line 0 with
			| None ->
			    raise (Garbage_received "Bad status line")
			| Some m ->
			    let proto = Netstring_str.matched_group m 1 line in
			    let code_str = Netstring_str.matched_group m 2 line in
			    let code = int_of_string code_str in
			    let text =
			      try Netstring_str.matched_group m 4 line
			      with Not_found -> "" in
			    if code < 100 || code > 599 then 
			      raise (Garbage_received "Bad status code");
			    status_seen <- true (* code >= 200 *);
			    let call_opt = 
			      if call_opt = None then (
				let call_opt' = 
				  try Some(cfg_fetch_call())
				  with Not_found -> None in
				cur_call := call_opt';
				call_opt'
			      )
			      else
				call_opt in
			    ( match call_opt with
				| None ->
				    raise(Garbage_received "Spontaneous data")
				| Some call ->
(* eprintf "code=%d text=%s proto=%s\n%!" code text proto; *)
				    if code >= 200 then
				      call # private_api # set_response_status
					code text proto;
				    let hdr_buf = Buffer.create 500 in
				    read_header_e code call hdr_buf
			    )
		    )
	   )

      and read_header_e code call hdr_buf =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (fun line_opt ->
	      match line_opt with
		| None ->
		    let msg = "EOF where response header expected" in
		    raise (Garbage_received msg)
		| Some line ->
		    Buffer.add_string hdr_buf line;
		    Buffer.add_string hdr_buf "\n";
		    if line = "" || line = "\r" then (
		      parse_header_e code call (Buffer.contents hdr_buf)
		    )
		    else (
		      if Buffer.length hdr_buf > 100000 then (
			let msg ="Response header too long" in
			raise (Garbage_received msg)
		      );
		      read_header_e code call hdr_buf
		    )
	   )
	
      and parse_header_e code call hdr_str =
	let header_l, real_end_pos =
	  try
	    Mimestring.scan_header
	      ~downcase:false ~unfold:true ~strip:true hdr_str
	      ~start_pos:0 ~end_pos:(String.length hdr_str)
	  with
	    | Failure _ ->
		let msg = "Bad response header" in
		raise (Garbage_received msg)
	in
	assert(real_end_pos = String.length hdr_str);
	let header = new Netmime.basic_mime_header header_l in
	call # private_api # set_continue (code < 200);
	(* Calling set_continue may trigger that the send loop
	   in [transmitter] resumes its send task. We need to do it
	   whatever code we see here - not only for code=100.
	 *)
	if code < 200 then (
	  if !options.verbose_events then
	    dlogr (fun () ->
		     sprintf "FD %s - HTTP events: Got 100 Continue line"
		       self#socket_str
		  );
	  read_status_line_e None
	)
	else (
	  call # private_api # set_response_header header;
	  read_body_e code header call
	)

      and read_body_e code header call =
	(* First determine whether a body is expected: 
	   - Normally, the call has either always a body (like GET), or
	     it does not (like HEAD). Exceptions are only the codes 204
	     and 304
	   - The CONNECT method is broken in this respect. If a 200
	     response is emitted, the response body is missing. Any
	     error response includes a body, though.
	 *)
	let have_body =
	  call # has_resp_body && code <> 204 && code <> 304 &&
	    (call # request_method <> "CONNECT" || code >= 300) in
	if have_body then (
	  let out_dev = call # private_api # response_body_open_wr esys in
	  (* Check if we have chunked encoding: *)
	  let is_chunked =
	    try header # field "Transfer-encoding" <> "identity"
	    with Not_found -> false in
	  if is_chunked then
	    read_chunked_body_e code header out_dev call
	  else (
	    let length_opt =
	      try 
		let l = Int64.of_string (header # field "Content-Length") in
		if l < 0L then raise (Bad_message "Bad Content-Length field");
		Some l
	      with
		| Failure _ -> raise (Bad_message "Bad Content-Length field")
		| Not_found -> None in
	    read_plain_body_e code header out_dev length_opt call
	  )
	) else
	  read_end_e call
	
      and read_plain_body_e code header out_dev length_opt call =
	(* Parses a non-chunked HTTP body. If length_opt=None, the message
	 * is terminated by EOF. If length_opt=Some len, the message has
	 * this length.
	 *)
	( Uq_io.copy_e
	    ?len64:length_opt
	    buf_in_dev
	    out_dev
	  >> (function
		| `Done n ->
		    ( match length_opt with
			| None -> ()
			| Some l ->
			    if n <> l then
			      raise(Bad_message "EOF in response message")
		    );
		    `Done n
		| st -> st
	     )   (* out_dev is closed in read_end_e *)
	)
	++ (fun n -> read_end_e call)

      and read_chunked_body_e code header out_dev call =
	(* Parses a chunked HTTP body *)
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (function
	      | Some line ->
		  ( match Netstring_str.string_match chunk_re line 0 with
		      | None ->
			  raise
			    (Bad_message "Cannot parse chunk of response body");
		      | Some m ->
			  let hex_len = Netstring_str.matched_group m 1 line in
			  let len =
			    try Int64.of_string ("0x" ^ hex_len)
			    with Failure _ -> 
			      raise (Bad_message "Chunk too large") in
			  if len = 0L then
			    let trl_buf = Buffer.create 100 in
			    read_trailer_e code header out_dev call trl_buf
			  else
			    read_chunk_data_e code header out_dev len call
		  )
	      | None ->
		  raise (Bad_message "EOF where next response chunk expected")
	   )

      and read_chunk_data_e code header out_dev len call =
	(* Parses the chunk data following the chunk size field *)
	Uq_io.copy_e
	  ?len64:(Some len)
	  buf_in_dev
	  out_dev
	++ (fun n ->
	      if n <> len then
		raise(Bad_message "EOF in response message");
	      read_chunk_end_e code header out_dev call
	   )

      and read_chunk_end_e code header out_dev call =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (function
	      | Some line ->
		  if line = "" || line = "\r" then
		    read_chunked_body_e code header out_dev call
		  else
		    raise (Bad_message "CR/LF after response chunk is missing")
	      | None ->
		  raise (Bad_message "EOF where next response chunk expected")
	   )

      and read_trailer_e code header out_dev call trl_buf =
	input_line_opt_e ~max_len:max_line_len buf_in_dev
	++ (fun line_opt ->
	      match line_opt with
		| None ->
		    let msg = "EOF where response trailer expected" in
		    raise (Bad_message msg)
		| Some line ->
		    Buffer.add_string trl_buf line;
		    Buffer.add_string trl_buf "\n";
		    if line = "" || line = "\r" then (
		      parse_trailer_e
			code header out_dev call (Buffer.contents trl_buf)
		    )
		    else (
		      if Buffer.length trl_buf > 10000 then (
			let msg ="Response trailer too long" in
			raise (Bad_message msg)
		      );
		      read_trailer_e code header out_dev call trl_buf
		    )
	   )
	
      and parse_trailer_e code header out_dev call trl_str =
	let trailer_l, real_end_pos =
	  try
	    Mimestring.scan_header
	      ~downcase:false ~unfold:true ~strip:true trl_str
	      ~start_pos:0 ~end_pos:(String.length trl_str)
	  with
	    | Failure _ ->
		let msg = "Bad response trailer" in
		raise (Bad_message msg)
	in
	assert(real_end_pos = String.length trl_str);
	let new_header =
	  new Netmime.basic_mime_header (header#fields @ trailer_l) in
	call # private_api # set_response_header new_header;
	(* out_dev is closed in read_end_e *)
	read_end_e call
	  
      and read_end_e call =
	call # private_api # finish_response_e esys  (* will close out_dev *)
	++ (fun () ->
	      eps_e (`Done (Some call)) esys
	   )
      in

      read_status_line_e None
      >> (fun st ->
	    let propagate_garbage = !cur_call <> None in
	    let maybe_cleanup() =
	      match !cur_call with
		| None -> ()
		| Some call -> 
		    call # private_api # cleanup();
		    cur_call := None in
	    match st with
	      | `Error (Garbage_received msg) when propagate_garbage ->
		  maybe_cleanup();
		  `Error (Bad_message msg)
		    (* If we can associate a Garbage_received error with
		       a certain call change the exception to Bad_message.
		       The background is that Garbage_received is not
		       attributed to a specific message by the higher layers.
		     *)
	      | `Error e ->
		  maybe_cleanup();
		  `Error e
	      | _ ->
		  st
	 )

    (****************************** OUTPUT ********************************)

    val send_queue = Q.create()
    val send_buf = String.create 4096

    val mutable sending = false


    method write_activity =
      sending || not(Q.is_empty send_queue)


    method add x =
      Q.add x send_queue


    method write_e esys =
      (* Writes all contents of send_queue *)
      (* It is the task of the caller to close the request body, by
	 calling finish_request_e at the right moment
       *)
      
      let rec write_next_e() =
	if Q.is_empty send_queue then
	  eps_e (`Done()) esys
	else (
	  let token = Q.take send_queue in
	  ( match token with
	      | Send_header (call_id, meth, url, hdr) ->
		  write_header_e call_id meth url hdr
	      | Send_body (dev,length_opt) ->
		  write_body_e dev length_opt
	      | Send_body_chunked dev ->
		  write_body_chunked_e dev
	      | Send_eof ->
		  write_eof_e ()
	  ) 
	  ++ write_next_e
	)

      and write_header_e call_id meth url hdr =
	let buf = B.create 1000 in
	B.add_string buf meth;
	B.add_string buf " ";
	B.add_string buf url;
	B.add_string buf " HTTP/1.1";
	
	if !options.verbose_status then
	  dlogr
	    (fun () ->
	       sprintf "FD %s - Call %d - HTTP request: %s"
		 self#socket_str call_id (B.contents buf));
	
	B.add_string buf "\r\n";
	let ch = new Netchannels.output_netbuffer buf in
	Mimestring.write_header ch hdr#fields;
	ch # close_out();
	      
	if !options.verbose_request_header then
	  dump_header "HTTP request " hdr#fields;
	      
	Uq_io.output_netbuffer_e dev buf

      and write_body_e d expected_length_opt =
	if !options.verbose_request_contents then
	  dlogr
	    (fun () ->
	       sprintf "FD %Ld - HTTP request body fragment %s"
		 (Netsys.int64_of_file_descr fd)
		 (match expected_length_opt with
		    | None -> "(unknown length)"
		    | Some n -> sprintf "(%Ld bytes)" n
		 )
	    );

	(* N.B. We do not close [d] here - task of caller *)
	Uq_io.copy_e ?len64:expected_length_opt d dev
	>> (function
	      | `Done n ->
		  ( match expected_length_opt with
		      | None -> ()
		      | Some exp_n ->
			  if n <> exp_n then
			    failwith "Http_client: announced length of \
                                      request body does not match actual length"
		  );
		  `Done ()
	      | `Error e -> `Error e
	      | `Aborted -> `Aborted
	   )
	  
      and write_body_chunked_e d =
	write_body_next_chunk_e d ()

      and write_body_next_chunk_e d () =
	input_opt_e d (`String send_buf) 0 (String.length send_buf)
	++ (function
	      | Some n ->
		  if !options.verbose_request_contents then
		    dlogr
		      (fun () ->
			 sprintf "FD %Ld - HTTP request body chunk (%d bytes)"
			   (Netsys.int64_of_file_descr fd) n
		      );
		  let s = sprintf "%x\r\n" n in
		  Uq_io.output_string_e dev s
		  ++ (fun () -> 
			Uq_io.really_output_e dev (`String send_buf) 0 n)
		  ++ (fun () -> Uq_io.output_string_e dev "\r\n")
		  ++ write_body_next_chunk_e d
	      | None ->
		  if !options.verbose_request_contents then
		    dlogr
		      (fun () ->
			 sprintf "FD %Ld - HTTP request body chunk (last)"
			   (Netsys.int64_of_file_descr fd)
		      );
		  let s = "0\r\n\r\n" in
		  Uq_io.output_string_e dev s
		    (* N.B. We do not close [d] here - task of caller *)
	   )

      and write_eof_e () =
	if !options.verbose_request_contents then
	  dlogr (fun () ->
		   sprintf "FD %Ld - HTTP request EOF" 
		     (Netsys.int64_of_file_descr fd)
		);
	Uq_io.write_eof_e dev
	++ (fun flag ->
	      if not flag then
		failwith "Http_client: \
                          no support for closing the write side only";
	      (* It is better to set the new state after a successful
		 write_eof operation than before. It is possible that the whole
		 chain of write operations is aborted. In this case
		 an EOF also needs to be written. If we abort between write_eof
		 and the following assignment, the EOF will just be
		 written twice (which is unproblematic). Otherwise, the EOF
		 could be forgotten, and the client would hang.
	       *)
	      if socket_state = Up_rw then
		socket_state <- Up_r;
	      eps_e (`Done()) esys
	   )
      in

      (* If the socket is already closed for writing, do nothing. This
	 can happen if the read side decides to close the socket.
       *)
      if socket_state = Up_rw then (
        sending <- true;
	write_next_e()
	>> (fun st -> sending <- false; st)
      )
      else 
	eps_e (`Done ()) esys

    end
  )
;;

(**********************************************************************)
(***                PROTOCOL STATE OF A SINGLE MESSAGE              ***)
(**********************************************************************)

(* The class 'transmitter' is a container for the message and the
 * associated protocol state, and defines the methods to send
 * the request, and to receive the reply.
 * The protocol state stored here mainly controls the send and receive
 * buffers (e.g. how many octets have already been sent? Are the octets
 * received so far already a complete message or not?)
 *)


type message_state =
    Unprocessed     (* Not even a byte sent or received *)
  | Sending_hdr     (* Sending currently the header *)
  | Handshake       (* The header has been sent, now waiting for status 100 *)
  | Sending_body    (* The header has been sent, now sending the body *)
  | Finishing       (* The body is almost sent - about to finish *)
  | Sent_request    (* The body has been sent; the reply is being received *)
  | Complete        (* The whole reply has been received *)
;;

(* The states Unprocessed...Sent_request are only set by the request
   sender. We enter Complete only if we can receive the corresponding
   response.

   In the case that the response arrives while we are still sending,
   the processing of the response needs to be delayed until the sending
   is done. What can happen then:
   - We get the status while waiting for "100". This is handled in send_e,
     and the status is set to Finishing.
   - We get the status line later while still sending. This is detected
     by the reader, and send_interrupt is called.
   - If we get the status line when already in state Finishing, the 
     request is not aborted, and the remaining bytes are sent.

   Errors are not reflected in the state.
 *)


let string_of_state =
  function
    | Unprocessed  -> "Unprocessed"
    | Sending_hdr  -> "Sending_hdr"
    | Handshake    -> "Handshake"
    | Sending_body -> "Sending_body"
    | Finishing    -> "Finishing"
    | Sent_request -> "Sent_request"
    | Complete     -> "Complete"



let test_conn_close hdr =
  let conn_list = 
    try Nethttp.Header.get_connection hdr
    with _ (* incl. syntax error *) -> [] in
  List.mem "close" (List.map String.lowercase conn_list)


let test_conn_keep_alive hdr =
  let conn_list = 
    try Nethttp.Header.get_connection hdr
    with _ (* incl. syntax error *) -> [] in
  List.mem "keep-alive" (List.map String.lowercase conn_list)


let test_proxy_conn_close hdr =
  let conn_list = 
    try 
      List.map String.lowercase
	(hdr # multiple_field "proxy-connection")
    with _ (* incl. syntax error *) -> [] in
  List.mem "close" (List.map String.lowercase conn_list)


let test_proxy_conn_keep_alive hdr =
  let conn_list = 
    try 
      List.map String.lowercase
	(hdr # multiple_field "proxy-connection")
    with _ (* incl. syntax error *) -> [] in
  List.mem "keep-alive" (List.map String.lowercase conn_list)


let test_http_1_1 proto_str =
  try
    let proto = Nethttp.protocol_of_string proto_str in
    match proto with
      | `Http((1,n),_) when n >= 1 ->  
	  (* HTTP/1.1 <= proto < HTTP/2.0 *)
	  true
      | _ ->
	  false
  with _ -> false


let transmitter
  peer_is_proxy
  proxy_auth_state
  (m : http_call) 
  (f_done : http_call -> unit)
  options
  =
  ( object (self) 
      val mutable state = Unprocessed
      val indicate_done = f_done
      val msg = m
	
(*
      val mutable auth_headers = []
	(* Additional header for _proxy_ authentication *)
 *)

      val mutable send_e_opt = None
      val mutable send_finish_e_opt = None
      val mutable send_interrupted = false
	
      method state = state
	
      method f_done = indicate_done

      method init() =
	(* Prepare for (re)transmission:
	 * - Set the `Effective request header
	 * - Reset the status info of the http_call
	 * - Initialize transmission state
	 *)
	if !options.verbose_status then
	  dlogr (fun () -> sprintf "Call %d: initialize transmitter" 
		   (Oo.id msg));
	msg # private_api # prepare_transmission();
	(* Set the effective URI. This must happen before authentication. *)
	let eff_uri =
	  if peer_is_proxy then
	    msg # request_uri
	  else
	    let path = msg # get_path() in
	    if path = "" then (
	      msg # empty_path_replacement
	    )
	    else path
	in
	msg # private_api # set_effective_request_uri eff_uri;
	let cah = 
	  match msg # private_api # auth_state with
	    | `None -> []
	    | `In_advance session 
	    | `In_reply session ->
		session # authenticate msg in
	let pah =
	  match !proxy_auth_state with
	    | `None -> []
	    | `In_advance session 
	    | `In_reply session ->
		session # authenticate msg in
	let rh = msg # request_header `Effective in
	List.iter
	  (fun (n,v) ->
	     rh # update_field n v
	  )
	  (cah @ pah);
	if peer_is_proxy then
	  rh # update_field "Proxy-Connection" "keep-alive";
	state <- Unprocessed;
	if not (msg # has_req_body) then (
	  (* Remove certain headers *)
	  rh # delete_field "Expect";
	);
	send_finish_e_opt <- None

      method cleanup() =
	(* release resources *)
	msg # private_api # cleanup()

(*
      method add_auth_header n v =
	auth_headers <- (n,v) :: auth_headers
 *)	
  
      method error_if_unserved error =
	msg # private_api # error_if_unserved !options.verbose_status error

      method send_e (io : io_buffer) 
                    (handshake_cfg : float option)
                    (close_flag : bool)
		    esys =
	(* handshake_cfg: if [Some t], it is waited after the header for the
	   "100 Continue" handshake. [t] is the timeout. The [Expect]
	   header is already set (by the user).
	   close_flag: if true, the "connection:close" header is set
	 *)

	assert (state = Unprocessed);
	assert (not(io # write_activity));
	assert (send_finish_e_opt = None);

	let rec send_header_e() =
	  let rh = msg # request_header `Effective in
	  let host = msg # get_host() in
	  let port = msg # get_port() in
	  let host_str = host ^ (if port = 80 then "" 
				 else ":" ^ string_of_int port) in
	  rh # update_field "Host" host_str;
	  
	  if close_flag then
	    rh # update_field "Connection" "close";

	  io # add (Send_header(self#call_id, 
				msg#request_method,
				msg#effective_request_uri,
				(rh :> Netmime.mime_header_ro)));
	  state <- (if handshake_cfg <> None then Handshake else Sending_hdr);
	  
	  io # write_e esys
	  ++ (match handshake_cfg with
		| Some t when not msg # private_api # continue ->
		    send_handshake_e t
		| _ -> 
		    send_body_e
	     )

	and send_handshake_e tmo () =
	  if !options.verbose_events then
	    dlogr (fun () ->
		     sprintf "Call %d - HTTP events: Waiting for 100 Continue"
		       (Oo.id msg)
		  );
	  msg # private_api # wait_continue_e tmo esys
	  ++ (fun code_100 ->
		if code_100 then
		  send_body_e()
		else
		  (* We got a non-100 response after waiting for 100.
		     We suppress the request body, and instead close the
		     connection on the sender side.
		   *)
		  send_no_body_e()
	     )
	    
	and send_body_e () : unit Uq_engines.engine =
	  if msg # has_req_body then (	  
	    state <- Sending_body;
	    let rh = msg # request_header `Effective in
	    let is_chunked =
	      try rh # field "Transfer-encoding" <> "identity"
	      with Not_found -> false in
	    let d = msg # private_api # request_body_open_rd esys in
	    let tok, need_eof =
	      if is_chunked then
		Send_body_chunked d, false
	      else (
		let length_opt =
		  try 
		    let l = 
		      try Int64.of_string (rh # field "Content-Length") 
		      with
			| Failure _ -> 
			    failwith "Http_client: Bad Content-Length field \
                                      in request" in
		    if l < 0L then
		      failwith "Http_client: Bad Content-Length field in request";
		    Some l
		  with Not_found -> None in
		Send_body(d, length_opt), length_opt = None
	      ) in
	    io # add tok;
	    if need_eof then
	      io # add Send_eof;
	    io # write_e esys
	    ++ (fun () ->
		  state <- Sent_request;
		  msg # private_api # finish_request_e esys
	       )
	  )
	  else (
	    state <- Sent_request;
	    msg # private_api # finish_request_e esys
	  )

	and send_no_body_e () =
	  (* This is only used if we announced a body in the header, but
	     finally do not send it (because we got an error from the server).

	     Normally we just close the connection for writing.
	     If we use the chunked encoding, a better way to indicate
	     the end of the body is to send an empty body 
	   *)
	  state <- Finishing;
	  let rh = msg # request_header `Effective in
	  let is_chunked =
	    try rh # field "Transfer-encoding" <> "identity"
	    with Not_found -> false in
	  if is_chunked then (
	    let ch = new Netchannels.input_string "" in
	    let d = `Async_in(new Uq_engines.pseudo_async_in_channel ch,esys) in
	    io # add (Send_body_chunked d);
	  )
	  else
	    io # add Send_eof;
	  io # write_e esys
	  ++ (fun () ->
		state <- Sent_request;
		eps_e (`Done ()) esys
	     )
	in

	let (fin_e, signal) = Uq_engines.signal_engine esys in
	send_finish_e_opt <- Some fin_e;

	let e = send_header_e() in
	send_e_opt <- Some e;

	let e' =
	  (* This engine handles send interrupts, and returns [true] if an
	     EOF needs to be written
	   *)
	  e >> (fun st ->
		  send_e_opt <- None;
		  match st with
		    | `Aborted when send_interrupted ->
			if !options.verbose_events then
			  dlogr
			    (fun () ->
			       sprintf "Call %d - HTTP event: Send interrupted" 
				 (Oo.id msg));
			`Done true
		    | `Aborted -> `Aborted
		    | `Error err -> `Error err
		    | `Done () -> `Done false
	       ) in
	let e'' =
	  (* This engine writes the EOF if needed *)
	  e' 
	  ++ (fun flag ->
		if flag then (
		  io # add Send_eof;
		  io # write_e esys
		)
		else eps_e (`Done ()) esys
	     ) in

	(* Finally catch errors and signal termination: *)
	e''
	>> (fun st ->
	      ( match st with
		  | `Error err ->
		      if !options.verbose_events then
			dlogr
			  (fun () ->
			     sprintf "Call %d - HTTP event: Send exception: %s" 
			       (Oo.id msg) (Netexn.to_string err)
			  );
		  | _ -> ()
	      );
	      signal st; st
	   )



      method send_interrupt() =
	(* Stop sending immediately. This method is called by the reader
	   when an error status is received while we are still writing
	   the request. The reaction is to close the write side of the
	   connection.

	   For chunked encoding, we could also finish the current chunk,
	   and send an empty chunk. However, this is complicated to
	   get done here, so we always use the close method.
	 *)
	match send_e_opt with
	  | None -> ()
	  | Some e ->
	      send_interrupted <- true;
	      e # abort()

	  
      method send_finish_e esys =
	(* Wait here until the request is completely sent (synchronization with
	   the sender)
	 *)
	match send_finish_e_opt with
	  | None ->
	      eps_e (`Done ()) esys
	  | Some e ->
	      e
	  
	  
      method receive_complete () =
	assert(state = Handshake || state = Sending_body || state = Finishing ||
	    state = Sent_request);
	(* Handshake: this is possible when we were waiting for a 100 response,
	   but got a non-100 instead. In this case, the request is not sent,
	   hence the state remains at Handshake.

	   Sending_body: this is possible when we get a server status while
	   sending the reqest, and stopping the request because of this.
	 *)
	state <- Complete


      method indicate_pipelining =
	(* Return 'true' iff the reply is HTTP/1.1 compliant and does not
	 * contain the 'connection: close' header.
	 *)
	let req_hdr = msg # request_header `Effective in
	let b0 = 
	  if peer_is_proxy then
	    not(test_proxy_conn_close req_hdr) &&
	      not(test_conn_close req_hdr)
	  else
	    not(test_conn_close req_hdr)  in
	b0 && (
	  let resp_header = msg # private_api # response_header in
	  let proto_str = msg # private_api # response_proto in
	  let b1 = 
	    if peer_is_proxy then
	      test_http_1_1 proto_str && not(test_proxy_conn_close resp_header) 
		&& not(test_conn_close resp_header) 
	    else
	      test_http_1_1 proto_str && not(test_conn_close resp_header) in
	  b1 && (
	    try
	      let server = resp_header # field "Server" in
	      not (List.exists 
		     (fun re -> 
			Netstring_str.string_match re server 0 <> None
		     ) 
		     pipeline_blacklist)
	    with
	      | Not_found -> true  (* Nothing known ... Assume the best! *)
	  )
	)
	  
      method indicate_sequential_persistency =
	(* Returns 'true' if persistency without pipelining
	 * is possible.
	 *)
	let b0 = 
	  not(test_conn_close (msg # request_header `Effective)) in
	b0 && (
	  let resp_header = msg # private_api # response_header in
	  let proto_str = msg # private_api # response_proto in
	  let is_http_11 = test_http_1_1 proto_str in
	  let normal_persistency =
	    not peer_is_proxy && 
	      (not (test_conn_close resp_header)) &&
	      (is_http_11 || test_conn_keep_alive resp_header) in
	  let proxy_persistency =
	    peer_is_proxy && 
	      (not (test_conn_close resp_header)) &&
	      (not (test_proxy_conn_close resp_header)) &&
	      (is_http_11 || test_proxy_conn_keep_alive resp_header) in
	  normal_persistency || proxy_persistency
	)
	  
      method message = msg
	
      method call_id = Oo.id msg
	
    end
  )
;;


let drive_postprocessing_e esys options m f_done =
  Uq_engines.delay_engine 0.0
    (fun () ->
       ( try
	   if !options.verbose_status then
	     dlogr (fun () -> 
		      sprintf "Call %d - postprocessing" (Oo.id m));
	   let () = f_done m in ()
	 with
	   | any ->
	       if !options.verbose_status then
		 dlogr (fun () -> 
			  sprintf "Call %d - Exception in postprocessing: %s"
			    (Oo.id m) (Netexn.to_string any));
	       let g = Unixqueue.new_group esys in
	       Unixqueue.once esys g 0.0 (fun () -> raise any);
       );
       eps_e (`Done ()) esys
    )
    esys


let drive_postprocessing_msg esys options m f_done =
  ignore(drive_postprocessing_e esys options m f_done)


(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***           THE PROTOCOL STATE OF THE CONNECTION                 ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

type peer =
    [ `Direct of string * int
    | `Http_proxy of string * int
    | `Http_proxy_connect of (string * int) * (string * int)
    | `Socks5 of (string * int) * (string * int)
    ]

let first_hop =
  function
    | `Direct (host,port) -> (host,port)
    | `Http_proxy (host,port) -> (host,port)
    | `Http_proxy_connect ((host,port),_) -> (host,port)
    | `Socks5 ((host,port),_) -> (host,port)

let content_hop =
  function
    | `Direct (host,port) -> (host,port)
    | `Http_proxy (host,port) -> (host,port)  (* This case does not work *)
    | `Http_proxy_connect (_,(host,port)) -> (host,port)
    | `Socks5 (_,(host,port)) -> (host,port)


let rewrite_first_hop s =
  function
    | `Direct (host,port) -> `Direct(s,port)
    | `Http_proxy (host,port) -> `Http_proxy(s,port)
    | `Http_proxy_connect ((host1,port1),(host2,port2)) ->
	`Http_proxy_connect ((s,port1),(host2,port2))
    | `Socks5 ((host1,port1),(host2,port2)) ->
	`Socks5 ((s,port1),(host2,port2))


let proxy_connect_e esys fd fd_open host port options proxy_auth_handler_opt
                    cur_proxy_session tcp_real_connect_e setup_e =
  (* Send the CONNECT line plus header, and wait for 200.

     The continuation of this engine is either setup_e() if successful,
     or tcp_real_connect_e() if another connection to the proxy needs to
     be opened.
   *)
  let mplex =
    Uq_engines.create_multiplex_controller_for_connected_socket
      ~supports_half_open_connection:true
      fd esys in
  (* N.B. No timeout here required because this activity is covered by the
     connect timeout
   *)
  let io = io_buffer options fd mplex Up_rw in
  let host_url = sprintf "%s:%d" host port in
  let msg = new connect host_url in
  let hdr = msg # request_header `Effective in
  hdr # update_field "Host" host_url;
  hdr # update_field "Proxy-Connection" "keep-alive";

  let rec request_e () =
    ( match !cur_proxy_session with   (* authentication *)
	| None -> ()
	| Some sess ->
	    let pah = sess # authenticate msg in
	    List.iter
	      (fun (n,v) ->
		 hdr # update_field n v
	      )
	      pah
    );
    io#add (Send_header(0, msg#request_method, msg#effective_request_uri, 
			( hdr :> Netmime.mime_header_ro )
		       ));
    io#configure_read ~fetch_call:(fun () -> msg) ();
    io#write_e esys
    ++ (fun () ->
	  io#read_e esys
	  ++ (function
		| None ->
		    failwith "EOF from proxy server"
		| Some m ->
		    assert(m = msg);
		    ( try
			if m#response_status_code = 407 && 
			  !cur_proxy_session = None &&
			  proxy_auth_handler_opt <> None
			then (
			  let ah =
			    match proxy_auth_handler_opt with
			      | None -> assert false
			      | Some ah -> ah in
			  let sess =
			    match ah # create_proxy_session msg options with
			      | None -> raise Not_found
			      | Some sess -> sess in
			  cur_proxy_session := Some sess;
			  (* It is now possible that the proxy closes the
			     connection. If so, we do this too, and reopen
			     another one. Otherwise, just go on.
			   *)
			  let rh = m#response_header in
			  let ps = m#response_protocol in
			  let close_flag =
			    test_conn_close rh || test_proxy_conn_close rh ||
			      (not (test_http_1_1 ps) && 
				 not (test_conn_keep_alive rh) &&
				 not (test_proxy_conn_keep_alive rh)) in
			  if close_flag then (
			    Unix.close fd;
			    fd_open := false;
			    tcp_real_connect_e()
			      (* The value of !cur_proxy_session is kept,
				 so we will use the authentication knowlege
				 we already gathered
			       *)
			  )
			  else
			    request_e ()
			)
			else 
			  raise Not_found
		      with Not_found ->
			match m#status with
			  | `Unserved ->
			      assert false
			  | `Http_protocol_error err ->
			      failwith ("Exception from proxy connection: " ^ 
					  Netexn.to_string err)
			  | `Redirection
			  | `Client_error
			  | `Server_error ->
			      raise(Proxy_error m#response_status_code)
			  | `Successful ->
			      setup_e()
		    )
	   )
       )
  in
  request_e ()
  >> (function
	| `Error (Garbage_received msg) ->
	    `Error (Bad_message msg)
	| st -> st
     )


let tcp_connect_e esys tp cb (peer:peer) conn_cache conn_owner options 
                  proxy_auth_handler_opt =
  (* An engine connecting to peer_host:peer_port. If a connection is still
     available in conn_cache, use this instead.

     Output is:
     - `Done(fd, conn_time): fd is the new connection (established in conn_time
       seconds)
   *)
  let timeout_value = !options.connection_timeout in
  
  let resolve_e, signal_res =
    Uq_engines.signal_engine esys in

  let t0 = Unix.gettimeofday() in

  let (hop1_host,hop1_port) = first_hop peer in

  !options.resolver
    esys
    hop1_host
    (function 
       | None ->
	   if !options.verbose_events then
	     dlog "HTTP events: reset after DNS failure";
	   let err = Name_resolution_error hop1_host in
	   signal_res (`Error err)
	     
       | Some addr ->
	   signal_res (`Done addr)
    );

  let descr =
    match peer with
      | `Direct (host,port) -> 
	  sprintf "direct connection to %s:%d" host port
      | `Http_proxy (host,port) ->
	  sprintf "proxy connection via %s:%d" host port
      | `Http_proxy_connect ((host1,port1),(host2,port2)) ->
	  sprintf "proxy connection via %s:%d to %s:%d" host1 port1 host2 port2
      | `Socks5 ((host1,port1),(host2,port2)) ->
	  sprintf "SOCKS connection via %s:%d to %s:%d" host1 port1 host2 port2
  in

  let tmo_x =
    Timeout descr in

  let real_host, real_port = content_hop peer in

  resolve_e
  ++ (fun hop1_ip ->
	let hop1_host_ip =
	  Unix.string_of_inet_addr hop1_ip in
	let cache_peer =
	  rewrite_first_hop hop1_host_ip peer in
	try
	  let fd = conn_cache # find_inactive_connection cache_peer cb in
	  (* Case: reuse old connection *)
	  conn_cache # set_connection_state fd cache_peer (`Active conn_owner);
	  if !options.verbose_events then
	    dlog (sprintf 
		    "FD %Ld - HTTP events: config input (reused fd) - target %s"
		    (Netsys.int64_of_file_descr fd) descr);
	  let mplex = 
	    tp # continue 
	      fd cb !options.connection_timeout tmo_x 
	      real_host real_port esys in
	  eps_e (`Done(fd, mplex, 0.0)) esys
	with
	  | Not_found ->
	      if !options.verbose_connection then
		dlog ("HTTP connection: creating " ^ 
			descr);
	      let proxy_opt = (* SOCKS5 *)
		match peer with
		  | `Socks5 _ ->
		      Some(new Uq_socks5.proxy_client 
			     (`Socket(`Sock_inet(Unix.SOCK_STREAM,
						 hop1_ip,
						 hop1_port),
				      Uq_engines.default_connect_options)))
		  | _ -> None in
	      let sockspec =
		match peer with
		  | `Direct _
		  | `Http_proxy _ 
		  | `Http_proxy_connect _ ->
		      `Sock_inet(Unix.SOCK_STREAM, hop1_ip, hop1_port)
		  | `Socks5 (_,(host2,port2)) ->
		      `Sock_inet_byname(Unix.SOCK_STREAM, host2, port2) in

	      let cur_proxy_session = ref None in

	      let rec tcp_real_connect_e () =
		Uq_engines.connector 
		  ?proxy:proxy_opt
		  (`Socket(sockspec, Uq_engines.default_connect_options))
		  esys 
		++ (function
		      | `Socket(fd,_) ->
			  let fd_open = ref true in
			  !options.configure_socket fd;
			  let setup_e() =
			    tp # setup_e
			      fd cb !options.connection_timeout tmo_x
			      real_host real_port esys
			    >> (function
				  | `Done mplex -> `Done(fd,mplex)
				  | `Error err -> `Error err
				  | `Aborted -> `Aborted
			       ) in
			  ( match peer with
			      | `Http_proxy_connect(_,(host2,port2)) ->
				  proxy_connect_e
				    esys fd fd_open host2 port2 options 
				    proxy_auth_handler_opt
				    cur_proxy_session
				    tcp_real_connect_e
				    setup_e
			      | _ ->
				  setup_e()
			  )
			  >> (fun st ->
				match st with
				  | `Error _ | `Aborted ->
				      if !fd_open then Unix.close fd;
				      fd_open := false;
				      st
				  | _ -> st
			     )
		      | _ -> assert false
		   ) in

	      let eng = tcp_real_connect_e() in

	      Uq_engines.timeout_engine 
		timeout_value
		(Timeout (sprintf
			    "creating %s" descr))
		eng
	      ++ (fun (fd,mplex) ->
		    let t1 = Unix.gettimeofday() in
		    let d = t1 -. t0 in

		    if !options.verbose_connection then
		      dlog (sprintf 
			      "FD %Ld - HTTP %s: Connected!"
			      (Netsys.int64_of_file_descr fd) descr);

		    Netlog.Debug.track_fd
		      ~owner:"Http_client"
		      ~descr:(sprintf 
				"HTTP %s" descr)
		      fd;
		    (* The release_fd is in Http_client_conncache! *)

		    conn_cache # set_connection_state
		      fd cache_peer (`Active conn_owner);
		    eps_e (`Done(fd, mplex, d)) esys
		 )
	      >> (function
		    | `Error err ->
			if !options.verbose_connection then (
			  dlog(sprintf 
				 "HTTP connection: Cannot create %s: \
                                  Exception %s"
				 descr (Netexn.to_string err))
			);
			`Error err
		    | st -> st
		 )
     )		

(**********************************************************************)

class type transport_channel_type =
object
  method setup_e : Unix.file_descr -> channel_binding_id -> float -> exn ->
                   string -> int -> Unixqueue.event_system ->
                   Uq_engines.multiplex_controller Uq_engines.engine
  method continue : Unix.file_descr -> channel_binding_id -> float -> exn ->
                   string -> int -> Unixqueue.event_system ->
                   Uq_engines.multiplex_controller
end

let http_transport_channel_type : transport_channel_type =
  ( object(self)
      method continue fd cb tmo tmo_x host port esys =
	Uq_engines.create_multiplex_controller_for_connected_socket
	  ~close_inactive_descr:true
	  ~supports_half_open_connection:true
	  ~timeout:( tmo, tmo_x )
	  fd esys
      method setup_e fd cb tmo tmo_x host port esys =
	let mplex = self # continue fd cb tmo tmo_x host port esys in
	eps_e (`Done mplex) esys
    end
  )


(**********************************************************************)

let fragile_pipeline 
       esys  cb
       peer
       proxy_auth_state proxy_auth_handler_opt
       fd mplex connect_time no_pipelining conn_cache
       auth_cache
       counters options =
  (* Implements a pipeline for an existing connection [fd]. This object
     does not implement any way of recovering after errors.
   *)
  let connection_e, signal_connection = Uq_engines.signal_engine esys in
    (* Indicates that the connection is closed *)
  let queue_e, signal_queue = Uq_engines.signal_engine esys in
    (* Indicates that all processing of queued messages is done *)
  let finished_e =
    Uq_engines.sync_engine connection_e queue_e 
    >> (function 
	  | `Done _ -> `Done() 
	  | `Error e -> `Error e
	  | `Aborted -> `Aborted
       ) in

  let fd_str =
    Int64.to_string (Netsys.int64_of_file_descr fd) in

  let peer_is_proxy =  (* whether we have a normal HTTP proxy *)
    match peer with
      | `Http_proxy _ -> true
      | _ -> false in

  ( object(self)
      val mutable io = io_buffer options fd mplex Up_rw

      val mutable write_queue = Q.create()
      val mutable read_queue = Q.create()
	(* Invariant: write_queue is a suffix of read_queue *)

      (* The following two variables control whether pipelining is enabled or
       * not. The problem is that it is unclear how old servers react if we
       * send to them several requests at once. The solution is that the first
       * "round" of requests and replies is done in a compatibility mode: After
       * the first request has been sent sending stops, and the client waits
       * until the reply is received. If the reply indicates HTTP/1.1 and does
       * not contain a "connection: close" header, all further requests and 
       * replies will be performed in pipelining mode, i.e. the requests are
       * sent independent of whether the replies of the previous requests have
       * been received or not.
       *
       * sending_first_message: 'true' means that the first request has not yet
       *    been completely sent to the server. 
       * done_first_message: 'true' means that the reply of the first request
       *    has been arrived.
       *)
      val mutable sending_first_message = true
      val mutable done_first_message = false
	
      (* 'inhibit_pipelining_byserver': becomes true if the server is able
       * to keep persistent connections but is not able to use pipelining.
       * (HTTP/1.0 "keep alive" connections)
       *)
      val mutable inhibit_pipelining_byserver = no_pipelining
 
(*
      (* Proxy authorization: If 'proxy_user' is non-empty, the variables
       * 'proxy_user' and 'proxy_password' are interpreted as user and
       * password for proxy authentication. More precisely, once the proxy
       * responds with status code 407, 'proxy_credentials_required' becomes
       * true, and all following requests will include the credentials identifying
       * the user (with the "basic" authentication method).
       * If the proxy responds again with code 407, this reaction will not
       * be handled again but will be visible to the outside.
       *)
      val mutable proxy_credentials_required = false
      val mutable proxy_cookie = ""
 *)

      (* Whether this connection never proves to exchange a message: *)
      val mutable total_failure = false

      (* 'close_connection' indicates that a HTTP/1.0 response was received or 
       * that a response contained a 'connection: close' header.
       *)
      val mutable close_connection = false
      
      (* whether the [after_eof] cleanup has already been done (must only happen
	 once
       *)
      val mutable after_eof = false

      (* The vars govern whether there is an engine writing/reading *)
      val mutable drive_output_active = None
      val mutable drive_input_active = None

      
      method length =
	(* Returns the number of open requests (requests without response) *)
	Q.length read_queue


      method active =
	(* Whether something is to be done *)
	self # length > 0


      method iter_unserved_messages f =
	(* Call f with all unserved messages *)
	Queue.iter (fun trans -> f trans) read_queue


      method is_total_failure = total_failure


      method no_pipelining = inhibit_pipelining_byserver


      method finished_e = finished_e
	(* This engine transitions to [`Done()] when the
	   connection is finished, and all messages are processed.

	   At this point, the file descriptor is closed or given back
	   to the cache. The read and write queues are unmodified, and
	   can be used to recover.

	   Errors are reported via the messages.
	 *)


      method add urgent m f_done =
	(* add: adds to the read_queue/write_queue. This must be possible
	   at any time - even after shutting down the socket and stopping
	   any event processing.
	 *)
	(* urgent: whether to insert the message m into the front-most place
	   (to be used after autorization)
	 *)

	if !options.verbose_connection then
	  dlogr (fun () -> 
		   sprintf "HTTP Connection: adding call %d"
		     (Oo.id m));
	
	(* Create the transport container for the message and add it to the
	 * queues:
	 *)
	let trans = 
	  transmitter peer_is_proxy proxy_auth_state m f_done options in
	
(* (* would not work, so leave disabled *)
	if !proxy_auth_state = `None then (
	  match proxy_auth_handler_opt with
	    | None -> ()
	    | Some ah ->
		(* enable in-advance authentication *)
		( match ah # create_proxy_session m with
		    | None -> ()
		    | Some sess ->
			proxy_auth_state := `In_advance sess
		)
	);
 *)

	(* Initialize [trans] for transmission: *)
	trans # init();
	
	let n = Queue.length write_queue in
	if urgent && n > 0 then (
	  (* Insert [trans] at the ealierst possible place *)
	  Q.add_at (n-1) trans write_queue;
	  Q.add_at (n-1) trans read_queue;
	)
	else (
	  Q.add trans write_queue;
	  Q.add trans read_queue;
	)

      method attach() =
	(* Enables event processing. To be called after [add] *)
	sending_first_message <- true;
	done_first_message <- false;
	close_connection <- false;

(* FIXME: reset other vars? *)

	io # configure_read 
	  ~fetch_call:self#drive_input_fetch ();
	self # drive_input();

	if !options.verbose_events then
	  dlog (sprintf 
		  "FD %Ld - HTTP events: config input"
		  (Netsys.int64_of_file_descr fd));
	self # maintain_polling();

      method reset() =
	(* Compare also with after_eof! *)
	self # abort ~reusable:false ~count:`Crashed;

	Q.iter
	  (fun trans ->
	     trans # cleanup();    (* release resources *)
	     trans # error_if_unserved No_reply;
	     self # drive_postprocessing trans
	  )
	  read_queue;

      signal_queue `Aborted


    (**********************************************************************)
    (* End of interface.                                                  *)	
    (**********************************************************************)

    method private maintain_polling() =

      (* If one of the following conditions is true, we need not to poll
       * the write side of the socket:
       * - The write_queue is empty but the read_queue not
       * - The difference between the read and the write queue is too big
       * - We wait for the reply of the first request send to a server
       * - The write side of the socket is closed
       *)

      if !options.verbose_events then
	dlogr
	  (fun () -> 
	     sprintf "FD %s - HTTP events: maintain_polling"
	     fd_str
	  );

      if io # socket_state <> Down then (
	let actual_max_drift =
	  if inhibit_pipelining_byserver then 0 else
	    match !options.synchronization with
	      | Pipeline drift -> min drift max_pipeline
	      | _              -> 0
		  (* 0: Allow no drift if pipelining is not allowed *)
	in
	
	let have_requests =
	  Q.length read_queue - Q.length write_queue <= actual_max_drift
	  && Q.length write_queue > 0 
          && (Q.peek write_queue) # state = Unprocessed
	  && (done_first_message || sending_first_message) in

	(* Note: sending_first_message is true while the first request is sent.
         * done_first_message becomes true when the response arrived. In the
         * meantime both variables are false, and nothing is sent.
         *)

	let do_release =
	  (io # socket_state = Up_rw &&
	      Q.length read_queue = 0 && Q.length write_queue = 0) in
	(* If the socket is still up, we must send EOF. Normally, this 
         * should have happened already, just to be sure we check this here.
	 *)
	
	let do_close_output = close_connection in
	(* close_connection: this is set in update_characteristics after
	 * receiving the first response 
	 *)

	if !options.verbose_events then
	  dlogr
	    (fun () -> 
	       sprintf "FD %s - HTTP events: maintain_polling \
                        n_read=%d n_write=%d \
                        actual_max_drift=%d have_requests=%B \
                        do_close_output=%B do_release=%B"
		 fd_str
		 (Q.length read_queue) (Q.length write_queue)
		 actual_max_drift have_requests do_close_output do_release
	    );
	
	if drive_output_active = None && io#socket_state = Up_rw then (
	  if !options.verbose_events then
	    dlogr
	      (fun () -> 
		 sprintf "FD %s - HTTP events: config output=%s"
		   fd_str
		   (if do_close_output then "close" else
		      if do_release then "release" else
			if have_requests then "enabled" else
			  "none"
		   )
	      );
	  if do_close_output then
	    self # close_output()
	  else
	    if do_release then
	      self # release_io ()
	    else
	      if have_requests then
		self # drive_output()
	)
      )

    method private abort ~reusable ~count =
      (* This method is called when the connection is in a final (maybe
       *  errorneous) state, and the protocol handler decides to stop
       * processing.
       * 
       * reusable: whether it is possible to reuse this connection later
       *)
      if io # socket_state <> Down then (
	if !options.verbose_connection then 
	  dlog (sprintf 
		  "FD %s - HTTP connection: Shutdown!"
		  fd_str);
	let followup() =
	  signal_connection(`Done());
	  if !options.verbose_events then
	    dlogr
	      (fun () ->
		 sprintf "FD %s - HTTP event: Connection processing done" 
		   fd_str) in

	begin match io#socket_state with
	    Down -> 
	      assert false
	  | Up_r -> 
	      io # close
		~followup ()
	  | Up_rw ->
	      if reusable then (
		( try
		    conn_cache # set_connection_state fd peer (`Inactive cb);
		    io # down();
		  with
		    | Not_found ->
			(* We can do an orderly shutdown: *)
			io # close ~followup:(fun () -> ()) ()
		);
		followup()
	      )
	      else
		io # close
		  ~followup ()
	end;
	( match count with
	    | `Timed_out ->
		counters.timed_out_connections <- 
		  counters.timed_out_connections + 1;
	    | `Crashed ->
		counters.crashed_connections <- 
		  counters.crashed_connections + 1;
	    | `Server_eof ->
		counters.server_eof_connections <- 
		  counters.server_eof_connections + 1;
	    | `Successful ->
		counters.successful_connections <- 
		  counters.successful_connections + 1
	    | `Failed ->
		(* By definition of the counter, an abort cannot be failed *)
		assert false;
	);
	if !options.verbose_events then
	  dlog (sprintf "FD %s - HTTP events: reset after shutdown"
		  fd_str);

	self # cancel_output();   (* FIXME: check whether sufficient *)
	self # cancel_input();
      )
 
    (**********************************************************************)
    (***                     THE OUTPUT HANDLER                         ***)
    (**********************************************************************)

    method private drive_output() =
      if !options.verbose_events then
	dlogr (fun() -> 
		 sprintf "FD %s - HTTP events: drive_output" fd_str);

      let can_output =
	drive_output_active = None &&
	io#socket_state = Up_rw &&
	not(Q.is_empty write_queue) in

      assert(can_output);

      if can_output then (
	let trans = Q.peek write_queue in
	assert(trans#state = Unprocessed);
	
	let close_flag =
	  !options.inhibit_persistency in

	let handshake_cfg =
	  try
	    (* Proper parsing not required, because [Expect] is
             * set by the user.
             * [continue]: Already seen status 100
             *)
	    let rh = trans # message # request_header `Effective in
	    if (not (trans # message # private_api # continue) &&
		  String.lowercase(rh # field "expect") = "100-continue")
	    then Some !options.handshake_timeout
	    else None
	  with
	    | Not_found -> None
	in

	let e =
	  trans # send_e io handshake_cfg close_flag esys
	  >> (fun st ->
		drive_output_active <- None;
		if !options.verbose_events then
		  dlogr (fun() -> 
			   sprintf "FD %s - HTTP events: done with output \
                                    state=%s"
			     fd_str (string_of_state trans#state));
		match st with
		  | `Done () ->
		      assert
			(trans#state = Sent_request || trans#state = Complete
			  || trans#state = Handshake
			);
		      sending_first_message <- false;
		      ignore (Q.take write_queue);
		      self # maintain_polling();
		      st
		  | `Error err ->
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Exception %s"
			       fd_str (Netexn.to_string err));
		      self # abort ~reusable:false ~count:`Crashed;
		      self # after_eof (Some err);
		      st
		  | `Aborted -> 
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Aborting"
			       fd_str
			  );
		      st
	     ) in

	drive_output_active <- Some e;
      )

    method private close_output() =
      if !options.verbose_events then
	dlogr (fun() -> 
		 sprintf "FD %s - HTTP events: close_output" fd_str);

      let can_close =
	drive_output_active = None &&
	io#socket_state = Up_rw in

      assert(can_close);

      if can_close then (
	assert (not(io # write_activity));      
	io # add Send_eof;
	let e =
	  io # write_e esys
	  >> (fun st ->
		drive_output_active <- None;
		if !options.verbose_events then
		  dlogr (fun() -> 
			   sprintf "FD %s - HTTP events: close_output done"
			     fd_str);
		match st with
		  | `Done () -> 
		      (* The connection closure can be driven by several
			 reasons, so there can still be messages on the
			 queues!
		       *)
		      self # abort ~reusable:false ~count:`Successful;
		      self # after_eof None;
		      st
		  | `Error err ->
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Exception %s"
			       fd_str (Netexn.to_string err));
		      self # abort ~reusable:false ~count:`Crashed;
		      self # after_eof (Some err);
		      st
		  | `Aborted -> 
		      if !options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Aborting"
			       fd_str
			  );
		      st
	     )
	in
	drive_output_active <- Some e;
      )


    method private cancel_output() =
      match drive_output_active with
	| None -> ()
	| Some e -> 
	    drive_output_active <- None; e # abort()
      

    method private release_io() =
      (* Give fd/mplex back to the connection cache *)
      assert(io # socket_state = Up_rw);
      self # abort ~reusable:true ~count:`Successful;
      self # after_eof None
	(* FIXME: check whether sufficient *)


    (**********************************************************************)
    (***                     THE INPUT HANDLER                          ***)
    (**********************************************************************)

    method private drive_input() =
      
      let rec read_loop_e() =
	io # read_e esys
	++ (fun call_opt ->
	      match call_opt with
		| None ->
		    if !options.verbose_connection then
		      dlogr
			(fun () ->
			   sprintf "FD %s - HTTP connection: Got EOF!"  fd_str);
		    (* The sender may be still writing, so try to stop it *)
		    if Q.length write_queue > 0 then
		      (Queue.peek write_queue) # send_interrupt();
		    self # abort ~reusable:false ~count:`Server_eof;
		    self # after_eof None;
		    eps_e (`Done()) esys
		| Some call ->
		    if !options.verbose_connection then
		      dlogr
			(fun () ->
			   sprintf "FD %s - HTTP connection: Got Call %d!" 
			     fd_str (Oo.id call));
		    let trans =
		      try Q.peek read_queue
		      with Q.Empty -> assert false in
		    assert(trans#message = call);
		    (* It is possible that the write is still ongoing.
		       By calling send_interrupt we try to stop this,
		       but nevertheless we have to wait until the
		       writer is done! The special state Finishing
		       indicates that it is not worth-while to stop
		       sending because it will end soon anyway.

		       Note that send_finish_e may also report errors
		       of the sending side.
		     *)
		    if trans#state <> Finishing then
		      trans # send_interrupt();
		    trans # send_finish_e esys
		    ++ (fun () ->
			  trans # receive_complete();
			  self # update_characteristics trans;
			  ignore(Q.take read_queue);
			  (* Note that postprocessing may imply a
			     re-initialization of [trans]! So don't access
			     [trans] later than this point.
			   *)
			  self # postprocess_complete_message_e trans
			  ++ (fun () ->
				self # maintain_polling();
				if io#socket_state <> Down then
				  read_loop_e()
				else
				  eps_e (`Done()) esys
			     )
		       )
	   )
      in

      if !options.verbose_events then
	dlogr
	  (fun () ->
	     sprintf "FD %s - HTTP event: Starting read loop" 
	       fd_str);
      let e = 
	read_loop_e() in
      drive_input_active <- Some e;
      ignore(e >> 
	       (fun st -> 
		  drive_input_active <- None;
		  if !options.verbose_events then
		    dlogr
		      (fun () ->
			 sprintf "FD %s - HTTP event: Leaving read loop: %s" 
			   fd_str
			   ( match st with
			       | `Done _ -> "regular"
			       | `Error e -> "exception " ^ Netexn.to_string e
			       | `Aborted -> "aborted"
			   )
		      );
		  ( match st with
		      | `Error err ->
			  self # abort ~reusable:false ~count:`Crashed;
			  self # after_eof (Some err)
		      | _ -> ()
		  );
		  st
	       )
	    )

    method cancel_input() =
      match drive_input_active with
	| None -> ()
	| Some e -> 
	    drive_input_active <- None; e # abort()


    method drive_input_fetch() =
      (* This is called by io when the next status line has been received *)
      if Q.is_empty read_queue then (
	if !options.verbose_connection then (
	  dlogr
	    (fun () ->
	       sprintf 
		 "FD %s - HTTP connection: \
                  Got data of spontaneous response" fd_str);
	);
	raise Not_found
      );
      let trans = Q.peek read_queue in
      match trans # state with
	| Unprocessed ->
	    (* We get response data before we even tried to send
             * the request. We allow this, although this is weird.
             *)
	    if !options.verbose_connection then (
	      dlogr
		(fun () ->
		   sprintf 
		     "FD %s - HTTP connection: \
                     Got response before sending request" fd_str);
	    );
	    trans#message

	| Sending_hdr ->
	    (* We get response data before we finished
             * the request. We allow this for pragmatic reasons.
             *)
	    if !options.verbose_connection then (
	      dlogr
		(fun () ->
		   sprintf
		     "FD %s - HTTP connection: \
                      Got response before finishing request" fd_str)
	    );
	    trans#message

	| Handshake 
	| Sending_body ->
	    (* This is perfectly legal - we get the response after
	       we sent at least the header
	     *)
	    trans#message

	| Sent_request ->
	    (* The normal case *)
	    trans#message

	| _ ->
	    (* Should not happen *)
	    assert false
	    

    method private update_characteristics trans =
      (* After getting a response, check the type, and adapt the
	 transmission style
       *)

      let old_close_connection = close_connection in

      let able_to_pipeline = 
	trans # indicate_pipelining in
      (* able_to_pipeline: is true if we assume that the server
       * is HTTP/1.1-compliant and thus is able to manage pipelined
       * connections.
       * Update first 'close_connection': This variable becomes
       * true if the connection is not assumed to be pipelined
       * which forces that the CLIENT closes the connection 
       * immediately (this is tested in maintain_polling).
       *)
      
      let only_sequential_persistency =
	not able_to_pipeline && 
	  trans # indicate_sequential_persistency in
      (* only_sequential_persistency: is true if the connection is
       * HTTP/1.0, and the server indicated a persistent connection.
       * In this case, pipelining is disabled.
       *)
      
      if only_sequential_persistency then begin
	(* 'close_connection': not set.
	 *)
	if !options.verbose_connection then 
	  dlogr
	    (fun () ->
	       sprintf "FD %s - HTTP connection: \
                               using HTTP/1.0 style persistent connection"
		 fd_str);
	inhibit_pipelining_byserver <- true;
      end
      else
	close_connection  <- close_connection  || not able_to_pipeline;

      if !options.verbose_connection then 
	dlogr
	  (fun () ->
	     sprintf "FD %s - HTTP connection: pipelining=%B persistency=%B close_connection=%B->%B"
	       fd_str able_to_pipeline only_sequential_persistency
	       old_close_connection close_connection
	  );

      (* Remember that the first request/reply round is over: *)
      done_first_message <- true



    method private after_eof err_opt =
      (* Postprocess error information *)
      if not after_eof then (
	(* THINK: We could here process Garbage_received exceptions differently.
	   It is questionable to attribute such exceptions to specific 
	   calls.
	 *)

	(* First check if the connection was a total failure, i.e. if not
	 * even a status line was received. This is just reported to the
	 * robust_pipeline, where the counter for totally failed connections
	 * can be increased.
	 *)
	
	total_failure <- not io#status_seen;

	(* Assertions about queues: write queue is a suffix of read queue *)
	
	let n_read  = Q.length read_queue in
	let n_write = Q.length write_queue in
	assert (n_read >= n_write);
	
	(* Clean up the transmitters: *)
	
	Q.iter
	  (fun trans ->
	     trans # cleanup();    (* release resources *)
	     match err_opt with
	       | None ->
		   if total_failure then
		     trans # error_if_unserved (Bad_message "Protocol error")
	       | Some err ->
		   let err' =
		     match err with
		       | Garbage_received _ ->
			   Bad_message "Protocol error"
		       | No_reply ->
			   Bad_message "Protocol error"
		       | err -> err in
		   trans # error_if_unserved err'
	  )
	  read_queue;
	
	(* Increase the error counter of the head of read_queue: *)
	if n_read > 0 && err_opt <> None then (
	  let trans = Q.peek read_queue in
	  let m = trans # message in
	  let e = m # private_api # get_error_counter in
	  m # private_api # set_error_counter (e+1);
	);
	
	(* We have reached the logical end: *)
	signal_queue (`Done());
	if !options.verbose_events then
	  dlogr
	    (fun () ->
	       sprintf "FD %s - HTTP event: Queue processing done" 
		 fd_str);

	after_eof <- true
      )


    (**********************************************************************)
    (***                     AUTHENTICATION                             ***)
    (**********************************************************************)

    method private postprocess_complete_message_e trans =
      (* This method is invoked for every complete reply. The following
       * cases are handled at this stage of processing:
       *
       * - Status code 407: The proxy demands authorization. If the request
       *   already contains credentials for the proxy, this status code
       *   isn't handled here. Otherwise, the request is added again onto
       *   the queue, and a flag ('proxy_credentials_required') is set which 
       *   forces that the proxy credentials must be added for every new 
       *   request.
       *   Note: The necessary authentication header fields are added in
       *   the 'add' method.
       *
       * - Status code 401: XXX
       *
       * All other status codes are not handled here. Note that it is not
       * possible to react on the redirection codes because this object
       * represents the connection to exactly one server.
       * As default behaviour, the method 'postprocess' of the 
       * transmitter object is invoked; this method incorporates
       * all the intelligence not coded here.
       *)

      let default_action_e() =
	trans#cleanup();
	drive_postprocessing_e esys options trans#message trans#f_done in

      let msg = trans # message in
      let code = msg # private_api # response_code in
      let _req_hdr = msg # request_header `Effective in
      let _resp_hdr = msg # private_api # response_header in
      match code with
	| 407 when peer_is_proxy ->
	    (* --------- Proxy authorization required: ---------- *)
	    let try_again =
	      match !proxy_auth_state with
		| `None
		| `In_advance _ -> 
		    true
		| `In_reply sess ->
		    (* A previous attempt failed. *)
		    let continue = sess # invalidate msg in
		    if not continue then proxy_auth_state := `None;
		    continue
	    in
	    if try_again then (
	      match proxy_auth_handler_opt with
		| None ->
		    default_action_e()
		| Some ah ->
		    (match ah # create_proxy_session msg options with
		       | None ->
			   (* Authentication failed immediately *)
			   proxy_auth_state := `None;
			   default_action_e()
		       | Some sess ->
			   (* Remember the new session: *)
			   proxy_auth_state := `In_reply sess;
			   ignore (self # add true trans#message trans#f_done);
			   eps_e (`Done()) esys
		    )
    	    )
	    else
	      default_action_e()
	| 401 ->
	    (* -------- Content server authorization required: ---------- *)
	    (* Unless a previous authentication attempt failed, just create
             * a new session, and repeat the request.
             *)
	    let try_again =
	      match msg # private_api # auth_state with
		| `None
		| `In_advance _ -> 
		    true
		| `In_reply sess ->
		    (* A previous attempt failed. *)
		    let continue = sess # invalidate msg in
		    if not continue then auth_cache # tell_failed_session sess;
		    continue
	    in
	    if try_again then (
	      match auth_cache # create_session msg options with
		| None ->
		    (* Authentication failed immediately *)
		    default_action_e()
		| Some sess ->
		    (* Remember the new session: *)
		    msg # private_api # set_auth_state (`In_reply sess);
		    ignore (self # add true trans#message trans#f_done);
		    eps_e (`Done()) esys
    	    )
	    else
	      default_action_e()
	| n when n >= 200 && n < 400 ->
	    (* Check whether authentication was successful *)
	    ( match msg # private_api # auth_state with
		| `None -> ()
		| `In_advance _ -> ()
		| `In_reply session ->
		    auth_cache # tell_successful_session session
	    );
	    default_action_e()
	| _ ->
	    default_action_e()

    (**********************************************************************)
    (***    POSTPROCESS = INVOKE USER CALLBACK FUNCTION                 ***)
    (**********************************************************************)
	
    method private drive_postprocessing trans =
      trans#cleanup();
      drive_postprocessing_msg esys options trans#message trans#f_done
     
    end
  )

(**********************************************************************)

let robust_pipeline 
      esys tp cb
      peer
      proxy_auth_handler_opt
      conn_cache conn_owner 
      auth_cache
      counters options =
  (* Implements a pipeline that connects to the peer after the first
     message is added, and that is able to reconnect as often as
     necessary
   *)

  let proxy_auth_state =  ref `None in

  ( object(self)
      val mutable fp_opt = None
	(* The fragile_pipeline, if any *)

      val queue = Q.create()
	(* Queued requests before being connected *)

      (* 'connecting' may be set to [Some e] where [e] is the connecting engine.
       *)
      val mutable connecting = None
	
      (* 'connect_pause': seconds to wait until the next connection is tried.
       * There seems to be a problem with some operating systems (namely
       * Linux 2.2) which do not like immediate reconnects if the previous
       * connection did not end in a sane state.
       * A value of 0.5 seems to be sufficient for reconnects (see below).
       *)			      
      val mutable connect_pause = 0.0
	
      (* no_pipelining: true if the server has somehow indicated that it
	 is better not to rely on pipelining
       *)
      val mutable no_pipelining = false
	
      (* 'connect_time': how many seconds the last 'connect' took
       *)
      val mutable connect_time = 0.0
	
      (* If a connection does not lead to any type of response, the client 
       * simply re-opens a new connection and tries again. The following 
       * variables limit the number of attempts until the client gives up.
       *
       * 'totally_failed_connections': counts the number of failed connections
       *      without any response from the server.
       *)
      val mutable totally_failed_connections = 0
	
    method length =
      match fp_opt with
	| None -> 0
	| Some fp -> fp#length


    method active =
      match fp_opt with
	| None -> false
	| Some fp -> fp#active


    method add urgent (m : http_call) f_done =
      (* Check whether we can authenticate in advance: *)
      if m # private_api # auth_state = `None then (
	try
	  let sess = auth_cache # find_session_in_advance m in
	  m # private_api # set_auth_state (`In_advance sess)
	with
	    Not_found -> ()
      );
      match fp_opt with
	| None ->
	    Q.add (urgent,m,f_done) queue;
	    if connecting = None then
	      self # reconnect()

	| Some fp ->
	    (* Even if fp is already shut down! This only means that fp#finish_e
	       has not yet signalled that we are actually finished. This will
	       happen soon, though, and the added message will be picked up
	       then
	     *)
	    fp # add urgent m f_done


    method reset () =
      if !options.verbose_connection then
	dlog "HTTP connection: reset";

      ( match connecting with
	  | None -> ()
	  | Some e ->
	      e#abort();
	      connecting <- None
      );

      ( match fp_opt with
	  | None -> ()
	  | Some fp ->
	      fp # reset();
	      fp_opt <- None
		(* note that [fp#reset] causes that finished_e transitions
		   to `Aborted. Hence, conn_is_done is not called 

		   The remaining messages in fp are already postprocessed.
		 *)
      );

      Q.iter
	(fun (_,m,f_done) ->
	   m # private_api # error_if_unserved 
	     !options.verbose_connection No_reply;
	   self # drive_postprocessing_msg m f_done
	)
	queue;
      Q.clear queue;

      totally_failed_connections <- 0



    method private reconnect() =
      assert(connecting = None);
      assert(fp_opt = None);
      let e =
	Uq_engines.delay_engine
	  connect_pause
	  (fun () ->
	     tcp_connect_e 
	       esys tp cb peer conn_cache conn_owner options 
	       proxy_auth_handler_opt
	  )
	  esys in
      connecting <- Some e;
      Uq_engines.when_state
	~is_error:(fun err ->
		     connecting <- None;
		     self#conn_is_error err
		  )
	~is_done:(fun (fd,mplex,t) ->
		    connect_time <- t;
		    connect_pause <- 0.0;
		    connecting <- None;
		    let fp =
		      fragile_pipeline
			esys cb peer
			proxy_auth_state proxy_auth_handler_opt
			fd mplex t no_pipelining conn_cache
			auth_cache
			counters options in
		    fp_opt <- Some fp;
		    Q.iter
		      (fun (urgent,m,f_done) -> 
			 fp # add urgent m f_done
		      )
		      queue;
		    Q.clear queue;
		    fp # attach();
		    Uq_engines.when_state
		      ~is_done:(self#conn_is_done fp)
		      fp#finished_e
		 )
	e

    method private conn_is_error error =
      (* It was not possible to reach the peer *)

      counters.crashed_connections <- 
	counters.crashed_connections + 1;

      let q = Q.create() in
      Q.transfer queue q;

      self # conn_retry true error q;

    method private conn_is_done fp () =
      (* Check the result of fp, and move the requests back to [queue] that
	 can be tried again
       *)
      fp_opt <- None;
      no_pipelining <- fp#no_pipelining;  (* remember that *)

      let q = Q.create() in
      fp # iter_unserved_messages
	(fun trans ->
	   trans # cleanup();
	   Q.add (false, trans#message, trans#f_done) q
	);

      self # conn_retry fp#is_total_failure No_reply q


    method private conn_retry is_total_failure subst_error q =
      (* Maybe we have a "total failure" - got no response from server *)
      let too_many_total_failures =
	if is_total_failure then (
	  totally_failed_connections <- totally_failed_connections + 1;
	  if !options.verbose_connection then
	    dlog "HTTP connection: total failure";
	  
	  totally_failed_connections >= !options.maximum_connection_failures 
	)
	else (
	  totally_failed_connections <- 0;
	  false
	) in

      if !options.verbose_connection then
	dlog "HTTP connection: checking remaining pipeline requests";

      if too_many_total_failures then
	counters.failed_connections <- 
	  counters.failed_connections + 1;

      (* Check all requests individually *)
      Q.iter
	(fun (urgent,m,f_done) ->
	   let e = m # private_api # get_error_counter in
(*dlog (sprintf "e=%d idem=%B" e m#is_idempotent);*)
	   let try_again =
	     not too_many_total_failures &&
	       e <= !options.maximum_message_errors &&
	       (m # private_api # error_exception <> Some Response_too_large) &&
	       ( match m # get_reconnect_mode with
		| Send_again -> true
		| Request_fails -> false
		| Send_again_if_idem -> m # is_idempotent
		| Inquire f ->
		    (* Ask the function 'f' whether to reconnect or not: *)
		    ( try f m    (* returns true or false *)
		      with
			  (* The invocation of 'f' may raise an exception.
			   * It is printed to stderr (there is no other
			   * way to report it).
			   *)
			  x ->
			    dlog ("Exception caught in Http_client: " 
				  ^ (Netexn.to_string x));
			    false
		    )
	       ) in
	   if try_again then (
	     (* Ok, this request is tried again. *)
	     if !options.verbose_status then
	       dlogr (fun () -> 
			sprintf "Call %d - rescheduling" (Oo.id m));
	     Q.add (urgent,m,f_done) queue;
	   )
	   else (
	     m # private_api # error_if_unserved
	       !options.verbose_connection subst_error;
	     self # drive_postprocessing_msg m f_done
	   )
	)
	q;

      (* Try again: *)
      if Q.length queue > 0 then (

	if !options.verbose_connection then
	  dlog "HTTP connection: retrying after failure";
	
	connect_pause <- 1.0;
	self#reconnect()      
      )


    (**********************************************************************)
    (***    POSTPROCESS = INVOKE USER CALLBACK FUNCTION                 ***)
    (**********************************************************************)

    (* Same as for fragile_pipeline *)
	
    method private drive_postprocessing_msg m f_done =
      drive_postprocessing_msg esys options m f_done

    end
  )


(**********************************************************************)


(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***                 THE PIPELINE INTERFACE                         ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* The following class, 'pipeline' defines the interface for the outside
 * world.
 *)

class pipeline =
  object (self)
    val mutable esys = Unixqueue.create_unix_event_system()

    val mutable proxy = ""
    val mutable proxy_port = 80
    val mutable proxy_auth = false
    val mutable proxy_user = ""
    val mutable proxy_password = ""
    val mutable proxy_type = `Http_proxy

    val mutable no_proxy_for = []

    val mutable connections = Hashtbl.create 10

    val mutable open_messages = 0

    val mutable open_connections = 0

    val options =
      let http_syn =
	Hashtbl.find Neturl.common_url_syntax "http" in
      let https_syn =
	Hashtbl.find Neturl.common_url_syntax "https" in
      let ipp_syn =
	Hashtbl.find Neturl.common_url_syntax "ipp" in
      ref
	{ (* Default values: *)
	  synchronization = Pipeline 5;
	  maximum_connection_failures = 2;
	  maximum_message_errors = 2;
	  inhibit_persistency = false;
	  connection_timeout = 300.0;
	  number_of_parallel_connections = 2;
	  maximum_redirections = 10;
	  handshake_timeout = 1.0;
	  resolver = sync_resolver;
	  configure_socket = (fun _ -> ());
	  schemes = [ "http", http_syn, Some 80, http_cb_id;
		      "https", https_syn, Some 443, https_cb_id;
		      "ipp", ipp_syn, Some 631, http_cb_id;
		    ];
	  verbose_status = true;
	  verbose_request_header = false;
	  verbose_response_header = false;
	  verbose_request_contents = false;
	  verbose_response_contents = false;
	  verbose_connection = true;
	  verbose_events = false;
	}

    val auth_cache = new auth_cache

    val mutable conn_cache = create_restrictive_cache()

    val counters =
      { new_connections = 0;
	timed_out_connections = 0;
	crashed_connections = 0;
	server_eof_connections = 0;
	successful_connections = 0;
	failed_connections = 0;
      }

    val transports = Hashtbl.create 5

    initializer (
      Hashtbl.add transports http_cb_id http_transport_channel_type;
      Hashtbl.add transports proxy_only_cb_id http_transport_channel_type;
    )

    method event_system = esys

    method set_event_system new_esys =
      esys <- new_esys;
      Hashtbl.clear connections;

    method connection_cache = conn_cache

    method set_connection_cache cc = conn_cache <- cc

    method add_authentication_method ( m : auth_method ) =
      self # add_auth_handler (m # as_auth_handler)

    method add_auth_handler (h : auth_handler) =
      auth_cache # add_auth_handler h

    method set_proxy the_proxy the_port =
      (* proxy="": disables proxy *)
      proxy       <- the_proxy;
      proxy_port  <- the_port;
      proxy_type  <- `Http_proxy;
      ()

    method set_proxy_auth user passwd =
      (* sets 'user' and 'password' if demanded by a proxy *)
      proxy_auth     <- user <> "";
      proxy_user     <- user;
      proxy_password <- passwd


    method avoid_proxy_for l =
      (* l: List of hosts or domains *)
      no_proxy_for <- l


    method set_proxy_from_environment() =
      (* Is the environment variable "http_proxy" set? *)
      let http_proxy =
	try Sys.getenv "http_proxy" with Not_found -> "" in
      begin try
	let (nu,cb) = parse_url options http_proxy in  (* may raise Not_found *)
	self # set_proxy (Neturl.url_host nu) (Neturl.url_port nu);
	let u = Neturl.url_user nu in       (* may raise Not_found *)
	let p = Neturl.url_password nu in   (* may raise Not_found *)
	self # set_proxy_auth u p
      with
	Not_found -> ()
      end;

      (* Is the environment variable "no_proxy" set? *)
      let no_proxy =
	try Sys.getenv "no_proxy" with Not_found -> "" in
      let no_proxy_list =
	split_words_by_commas no_proxy in
      self # avoid_proxy_for no_proxy_list;


    method set_socks5_proxy host port =
      proxy       <- host;
      proxy_port  <- port;
      proxy_type  <- `Socks5


    method configure_transport (cb:int) (tp:transport_channel_type) =
      Hashtbl.replace transports cb tp

    method reset () =
      (* deletes all pending requests; closes connection *)

      (* Reset all connections: *)
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		c # reset())
	     !cl)
	connections;

(*
   - well, this _should_ do nothing
      List.iter
	(fun fd -> 
	   conn_cache # forget_connection fd;
	   Unix.close fd;
	)
	(conn_cache # find_my_connections (self :> < >));
 *)

      self # reset_counters()
      

    method private add_with_callback_no_redirection (request : http_call) f_done =

      let host = request # get_host() in
      let port = request # get_port() in
      let cb = request # channel_binding in

      let use_proxy = 
	proxy <> "" &&
	request # proxy_enabled &&
	not
          (List.exists
             (fun dom ->
                if dom <> "" &
                   dom.[0] = '.' &
		   String.length host > String.length dom
                then
                  let ld = String.length dom in
                  String.lowercase(String.sub 
                                     host 
                                     (String.length host - ld) 
                                     ld)
                  = String.lowercase dom
                else
                  dom = host)
             no_proxy_for)
      in

      (* find out the effective peer: *)
      let peer =
	if use_proxy then
	  match proxy_type with
	    | `Http_proxy ->
		if request # proxy_use_connect then
		  `Http_proxy_connect((proxy,proxy_port),(host,port))
		else
		  `Http_proxy(proxy,proxy_port)
	    | `Socks5 ->
		`Socks5((proxy,proxy_port),(host,port))
	else
	  `Direct(host,port) in

      (* Find out if there is already a connection to this peer: *)

      let conn = 
	let connlist = 
	  try
	    Hashtbl.find connections (peer, cb) 
	  with
	      Not_found ->
		let new_connlist = ref [] in
		Hashtbl.add
		  connections (peer, cb) new_connlist;
		new_connlist
	in
	if List.length !connlist < !options.number_of_parallel_connections 
	  then begin
	    let tp =
	      try 
		if cb = proxy_only_cb_id && not use_proxy then raise Not_found;
		Hashtbl.find transports cb
	      with Not_found ->
		failwith "Http_client: No transport for this channel binding" in
	    let proxy_auth_handler_opt =
	      if proxy_auth then
		let kh = new proxy_key_handler proxy_user proxy_password in
		Some(new unified_auth_handler kh)
	      else
		None in
	    let new_conn = robust_pipeline
	                     esys tp cb
	                     peer
	                     proxy_auth_handler_opt
			     conn_cache
			     (self :> < >)
			     auth_cache
			     counters
			     options in
	    open_connections <- open_connections + 1;
	    counters.new_connections <- counters.new_connections + 1;
	    connlist := new_conn :: !connlist;
	    new_conn
	  end 
	  else begin
	    (* Find the connection with the lowest number of queue entries: *)
	    List.fold_left
	      (fun best_conn a_conn ->
		 if a_conn # length < best_conn # length then
		   a_conn
		 else
		   best_conn)
	      (List.hd !connlist)
	      (List.tl !connlist)
	  end
      in
      
      (* Add the request to the queue of this connection: *)

      conn # add false request 
	(fun m ->
	   (* Update 'open_connections', 'connections', and 'open_messages' *)
	   if not conn#active then begin
	     (* Check whether the connection is still in the [connections]
              * hash. It is possible that it is already deleted here.
              *)

	     let connlist =
	       try
		 Hashtbl.find connections (peer, cb);
	       with
		   Not_found -> ref []
	     in
	     if List.exists (fun c -> c == conn) !connlist then (
	       open_connections <- open_connections - 1;
	       connlist := List.filter (fun c -> c != conn) !connlist;
	       if !connlist = [] then
		 Hashtbl.remove connections (peer, cb);
	     )
	   end;
	   self # update_open_messages;
	   (* Do user action: *)
	   f_done m;
	);

      open_messages <- open_messages + 1;

    method private update_open_messages =
      open_messages <- 0;
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		if c # active then 
		  open_messages <- open_messages + (c # length))
	     !cl)
	connections;


    method add_with_callback (request : http_call) f_done =
      request # private_api # parse_request_uri options;
      self # add_with_callback_no_redirection
	request
	(fun m ->
	   try
	     let (_,code,_) = m # dest_status() in
	     match code with
		 (301|302) ->
		   (* Simply repeat the request with a different URI *)
		   let do_redirection =
		     match m # get_redirect_mode with
			 Redirect -> true
		       | Do_not_redirect -> false
		       | Redirect_if_idem -> m # is_idempotent
		       | Redirect_inquire f ->
			   (* Ask the function 'f' whether to redirect: *)
			   begin 
			     try f m    (* returns true or false *)
			     with
			     (* The invocation of 'f' may raise an exception.
			      * It is printed to stderr (there is no other
			      * way to report it).
			      *)
				 x ->
				   dlog (sprintf 
					   "Call %d - \
                                            Exception caught in Http_client %s"
					   (Oo.id m)
					   (Netexn.to_string x));
				   false
			   end
		   in

		   if do_redirection then begin
		     (* Maybe the redirection limit is exceeded: *)
		     let rc = m # private_api # get_redir_counter in
		     if rc >= !options.maximum_redirections
		     then (
		       m # private_api # set_error_exception Too_many_redirections;
		       f_done m
		     )
		     else (
		       let location = m # assoc_resp_header "location" in
		       (* or raise Not_found *)
		       let location' =
			 if location <> "" && location.[0] = '/' then
			   (* Problem: "Location" header must be absolute due
			    * to RFC specs. Now it is relative (with full path).
			    * Workaround: Interpret relative to old server
			    *)
			   ( try
			       let (nu,_) =
				 parse_url 
				   ~base_url:(m # private_api # request_uri_with
					      ())
				   options
				   location in
			       Neturl.string_of_url nu
			     with _ -> location
			   )
			 else
			   location in
		       let ok =
			 try
			   m # set_request_uri location';
			   true
			 with
			   | _ ->
			       (* Bad URL! *)
			       let e = URL_syntax_error location' in
			       m # private_api # set_error_exception e;
			       false
		       in
		       if ok then (
			 m # private_api # set_redir_counter (rc+1);
			 m # private_api # set_error_counter 0;
			 self # add_with_callback m f_done
		       )
		       else f_done m
		     )
		   end
		     else f_done m

	       | _ -> 
		   f_done m
	     with
		 (Http_protocol _ | Not_found) -> 
		   f_done m
	)


    method add request =
      self # add_with_callback request (fun _ -> ())

    method add_e request =
      let (e, signal) = Uq_engines.signal_engine esys in
      self # add_with_callback request (fun _ -> signal (`Done()) );
      e

    method run () =
	 Unixqueue.run esys

    method get_options = !options

    method set_options p =
      options := p

    method number_of_open_messages = open_messages

    method number_of_open_connections = open_connections

    method connections =
      let l = ref [] in
      Hashtbl.iter
	(fun (peer, _) conns ->
	   List.iter
	     (fun conn ->
		let host, port = first_hop peer in
		l := (host, port, conn#length) :: !l
	     )
	     !conns
	)
	connections;
      !l

    method cnt_new_connections = counters.new_connections

    method cnt_timed_out_connections = counters.timed_out_connections

    method cnt_crashed_connections = counters.crashed_connections

    method cnt_server_eof_connections = counters.server_eof_connections

    method cnt_successful_connections = counters.successful_connections

    method cnt_failed_connections = counters.failed_connections

    method reset_counters() =
      counters.new_connections <- 0;
      counters.timed_out_connections <- 0;
      counters.crashed_connections <- 0;
      counters.server_eof_connections <- 0;
      counters.successful_connections <- 0;
      counters.failed_connections <- 0;


  end
;;

(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***                 THE CONVENIENCE MODULE                         ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* This module is intended for beginners and for simple applications
 * of this HTTP implementation.
 *)

let omtp = !Netsys_oothr.provider
let mutex = omtp # create_mutex()

let serialize f arg =
  mutex # lock();
  try
    let r = f arg in
    mutex # unlock();
    r
  with
      err -> mutex # unlock(); raise err
;;
	

module Convenience =
  struct

    let http_trials = ref 3
    let http_user = ref ""
    let http_password = ref ""

    let this_user = ref ""
    let this_password = ref ""

    let conv_verbose = ref false

    class simple_key_handler : key_handler =
    object
      method inquire_key ~domain ~realms ~auth =
	if !this_user <> "" then
	  ( object
	      method user = !this_user
	      method password = !this_password
	      method realm = List.hd realms 
	      method domain = domain
	    end )
	else
	  if !http_user <> "" then
	    ( object
		method user = !http_user
		method password = !http_password
		method realm = List.hd realms 
		method domain = domain
	      end )
	  else
	    raise Not_found
      method invalidate_key (_ : key) = ()
    end


    let auth_basic =
      new basic_auth_handler 
	~enable_auth_in_advance:true (new simple_key_handler)

    let auth_digest =
      new basic_auth_handler 
	~enable_auth_in_advance:true (new simple_key_handler)

    let get_default_pipe() =

      let p = new pipeline in

      p # set_proxy_from_environment();

      (* Add authentication methods: *)
      p # add_auth_handler auth_basic;
      p # add_auth_handler auth_digest;

      (* That's it: *)
      p


    let pipe = lazy (get_default_pipe())
    let pipe_empty = ref true


    let configure_pipeline f =
      let p = Lazy.force pipe in
      f p

    let request m =
      serialize
	(fun trials ->
	   let p = Lazy.force pipe in
	   m # set_accept_encoding();
	   p # add m;
	   try
	     p # run()
	   with
	     | error -> p # reset(); raise error
	)

    let prepare_url =
      serialize
	(fun url ->
	   let p = Lazy.force pipe in
	   try
	     this_user := "";
	     let (nu, _) = parse_url (ref p#get_options) url in
	     if Neturl.url_provides ~user:true nu then (
	       this_user := Neturl.url_user nu;
	       this_password := "";
	       if Neturl.url_provides ~password:true nu then
		 this_password := Neturl.url_password nu;
	     );
	     Neturl.string_of_url
	       (Neturl.remove_from_url
		  ~user:true ~user_param:true ~password:true
		  nu)
	   with
	     | Not_found | Neturl.Malformed_URL -> 
		 url
	)

    let http_get_message url =
      let m = new get (prepare_url url) in
      request m !http_trials;
      m

    let http_get url = (http_get_message url) # get_resp_body()

    let http_head_message url =
      let m = new head (prepare_url url) in
      request m !http_trials;
      m

    let http_post_message url params =
      let m = new post (prepare_url url) params in
      request m 1;
      m

    let http_post url params = (http_post_message url params) # get_resp_body()

    let http_put_message url content =
      let m = new put (prepare_url url) content in
      request m !http_trials;
      m

    let http_put url content = (http_put_message url content) # get_resp_body()

    let http_delete_message url =
      let m = new delete (prepare_url url) in
      request m 1;
      m

    let http_delete url = (http_delete_message url) # get_resp_body()


    let http_verbose 
         ?(verbose_status=true)
         ?(verbose_request_header=true)
         ?(verbose_response_header=true)
         ?(verbose_request_contents=true)
         ?(verbose_response_contents=true)
         ?(verbose_connection=true)
         ?(verbose_events=true) =
      serialize
	(fun () ->
	   let p = Lazy.force pipe in
	   let opt = p # get_options in
	   p # set_options
	     { opt with verbose_status = verbose_status;
	         verbose_request_header = verbose_request_header;
		 verbose_response_header = verbose_response_header;
		 verbose_request_contents = verbose_request_contents;
		 verbose_response_contents = verbose_response_contents;
		 verbose_connection = verbose_connection;
		 verbose_events = verbose_events
             };
	   conv_verbose := true;
	   Debug.enable := true;
	)
  end

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml