Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: http_client.ml 1417 2010-02-16 14:59:02Z gerd $
 * ----------------------------------------------------------------------
 *
 *)

(* Reference documents:
 * RFC 2068, 2616:      HTTP 1.1
 * RFC 2069, 2617:      Digest Authentication
 *)

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Http_client" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Http_client" Debug.enable

let () =
  Netlog.Debug.register_module "Http_client" Debug.enable


open Printf

exception Bad_message of string;;
exception Http_error of (int * string);;
exception Http_protocol of exn;;
exception No_reply;;
exception Too_many_redirections;;
exception Name_resolution_error of string
exception URL_syntax_error of string

let () =
  Netexn.register_printer
    (Http_protocol Not_found)
    (fun e ->
       match e with
	 | Http_protocol e' ->
	     "Http_client.Http_protocol(" ^ Netexn.to_string e' ^ ")"
	 | _ ->
	     assert false
    )

let() =
  Netsys_signal.init()


type status =
  [ `Unserved
  | `Http_protocol_error of exn
  | `Successful
  | `Redirection
  | `Client_error
  | `Server_error
  ]

type response_body_storage =
    [ `Memory
    | `File of unit -> string
    | `Body of unit -> Netmime.mime_body
    ]

type 'message_class how_to_reconnect =
    Send_again
  | Request_fails
  | Inquire of ('message_class -> bool)
  | Send_again_if_idem
;;

type 'message_class how_to_redirect =
    Redirect
  | Do_not_redirect
  | Redirect_inquire of ('message_class -> bool)
  | Redirect_if_idem
;;

type resolver =
    Unixqueue.unix_event_system -> 
    string -> 
    (Unix.inet_addr option -> unit) -> 
      unit
;;

type counters =
    { mutable new_connections : int;
      mutable timed_out_connections : int;
      mutable crashed_connections: int;
      mutable server_eof_connections : int;
      mutable successful_connections : int;
      mutable failed_connections : int;
    }


let better_unix_error f arg =
  try
    f arg
  with
    Unix.Unix_error (e,syscall,param) ->
      let error = Unix.error_message e in
      if param = "" then
	failwith error
      else
	failwith (param ^ ": " ^ error)


let rec syscall f =
  (* Invoke system call, and handle EINTR *)
  try
    f()
  with
      Unix.Unix_error(Unix.EINTR,_,_) ->
	(* "interrupted system call": A signal happened while the system
	 * blocked.
	 * Simply restart the call.
	 *)
	syscall f
;;


let hex_digits = [| '0'; '1'; '2'; '3'; '4'; '5'; '6'; '7';
		    '8'; '9'; 'a'; 'b'; 'c'; 'd'; 'e'; 'f' |];;

let encode_hex s =
  (* encode with lowercase hex digits *)
  let l = String.length s in
  let t = String.make (2*l) ' ' in
  for n = 0 to l - 1 do
    let x = Char.code s.[n] in
    t.[2*n]   <- hex_digits.( x lsr 4 );
    t.[2*n+1] <- hex_digits.( x land 15 );
  done;
  t
;;


type synchronization =
  | Sync
  | Pipeline of int
;;


let max_pipeline = 8 ;;

let pipeline_blacklist =
  (* Stolen from Mozilla *)
  [ Netstring_pcre.regexp "Microsoft-IIS/";
    Netstring_pcre.regexp "Netscape-Enterprise/3.";
  ]
;;


type http_options = 
    { synchronization : synchronization;
      maximum_connection_failures : int;
      maximum_message_errors : int;
      inhibit_persistency : bool;
      connection_timeout : float;
      number_of_parallel_connections : int;
      maximum_redirections : int;
      handshake_timeout : float;
      resolver : resolver;
      configure_socket : Unix.file_descr -> unit;
      verbose_status : bool;
      verbose_request_header : bool;
      verbose_response_header : bool;
      verbose_request_contents : bool;
      verbose_response_contents : bool;
      verbose_connection : bool;
    }
;;

type header_kind = [ `Base | `Effective ]

let http_re =
  Netstring_pcre.regexp
    "^http://(([^/:@]+)(:([^/:@]+))?@)?([^/:@]+)(:([0-9]+))?([/?].*)?$"

let parse_http_url url =
  match Netstring_pcre.string_match http_re url 0 with
    | None ->
	raise Not_found
    | Some m ->
	let user =
	  try Some(Netstring_pcre.matched_group m 2 url) 
	  with Not_found -> None in
	let password =
	  try Some(Netstring_pcre.matched_group m 4 url)
	  with Not_found -> None in
	let host =
	  Netstring_pcre.matched_group m 5 url in
	let port =
	  try int_of_string(Netstring_pcre.matched_group m 7 url)
	  with Not_found -> 80 in
	let path =
	  try Netstring_pcre.matched_group m 8 url with Not_found -> "" in
	(user,password,host,port,path)
;;


let comma_re = Netstring_pcre.regexp "[ \t\n\r]*,[ \t\n\r]*" ;;

let split_words_by_commas s =
  Netstring_pcre.split comma_re s ;;

let space_re = Netstring_pcre.regexp "[ \t\n\r]+" ;;

let split_words s =
  Netstring_pcre.split space_re s ;;


let sync_resolver esys name reply =
  let addr =
    try
      Some (Unix.inet_addr_of_string name)
    with
	Failure _ ->
	  try
	    let h = Unix.gethostbyname name in
	    Some h.Unix.h_addr_list.(0)
	  with Not_found ->
	    None in
  reply addr
;;


(**********************************************************************)
(***                          BUFFERS                               ***)
(**********************************************************************)

(* Similar to Buffer, but may be accessed in a more efficient way. *)

module B = Netbuffer;;

type buffer_type = B.t;;


module Q = 
struct
  (* This queue allows one to insert elements. *)

  type 'a t = 'a Queue.t

  exception Empty = Queue.Empty

  let create = Queue.create

  let add = Queue.add

  let push = add 

  let add_at n x q =
    (* Add x behind the n-the last element of q, i.e.
     * add_at 0 x q = Add behind the last element = add x q
     * add_at 1 x q = Add before the last element
     * ...
     * This algorithm is usually called for n ~ length, and is quite
     * efficient in this case.
     *)
    let l = Queue.length q in
    let q' = Queue.create() in
    for k = 1 to l-n do
      Queue.add (Queue.take q) q'
    done;
    Queue.add x q';
    Queue.transfer q q';
    Queue.transfer q' q

  let take = Queue.take
     
  let pop = take

  let peek = Queue.peek

  let top = peek

  let clear = Queue.clear

  let copy = Queue.copy

  let is_empty = Queue.is_empty

  let length = Queue.length

  let iter = Queue.iter

  let transfer = Queue.transfer

  (* fold: omitted *)

end



(**********************************************************************)
(***                  THE HTTP CALL CONTAINER                       ***)
(**********************************************************************)


let dump_header prefix h =
  List.iter
    (fun (n,v) ->
       dlog (prefix ^ n ^ ": " ^ v))
    h



type 'session auth_state =  (* 'session = auth_session, defined below *)
    [ `None
         (* Authentication has not yet been tried *)
    | `In_advance of 'session
         (* This session had been tried before a 401 response was seen *)
    | `In_reply of 'session
         (* This session was tried after a 401 response was seen *)
    ]

class type http_call =
object
  method is_served : bool
  method status : status
  method request_method : string
  method request_uri : string
  method set_request_uri : string -> unit
  method request_header : header_kind -> Netmime.mime_header
  method set_request_header : Netmime.mime_header -> unit
  method effective_request_uri : string
  method request_body : Netmime.mime_body
  method set_request_body : Netmime.mime_body -> unit
  method response_status_code : int
  method response_status_text : string
  method response_status : Nethttp.http_status
  method response_protocol : string
  method response_header : Netmime.mime_header
  method response_body : Netmime.mime_body
  method response_body_storage : response_body_storage
  method set_response_body_storage : response_body_storage -> unit
  method get_reconnect_mode : http_call how_to_reconnect
  method set_reconnect_mode : http_call how_to_reconnect -> unit
  method get_redirect_mode : http_call how_to_redirect
  method set_redirect_mode : http_call how_to_redirect -> unit
  method proxy_enabled : bool
  method set_proxy_enabled : bool -> unit
  method no_proxy : unit -> unit
  method is_proxy_allowed : unit -> bool
  method empty_path_replacement : string
  method is_idempotent : bool
  method has_req_body : bool
  method has_resp_body : bool
  method same_call : unit -> http_call
  method get_req_method : unit -> string
  method get_host : unit -> string
  method get_port : unit -> int
  method get_path : unit -> string
  method get_uri : unit -> string
  method get_req_body : unit -> string
  method get_req_header : unit -> (string * string) list
  method assoc_req_header : string -> string
  method assoc_multi_req_header : string -> string list
  method set_req_header : string -> string -> unit
  method get_resp_header : unit -> (string * string) list
  method assoc_resp_header : string -> string
  method assoc_multi_resp_header : string -> string list
  method get_resp_body : unit -> string
  method dest_status : unit -> (string * int * string)
  method private_api : private_api
end

and private_api =
object
  method get_error_counter : int
  method set_error_counter : int -> unit
  method set_error_exception : exn -> unit

  method get_redir_counter : int
  method set_redir_counter : int -> unit

  method continue : bool
  method set_continue : unit -> unit

  method auth_state : auth_session auth_state
  method set_auth_state : auth_session auth_state -> unit

  method set_effective_request_uri : string -> unit

  method prepare_transmission : unit -> unit
    (* Initializes the `Effective request header, and creates the response
     * objects. Sets the status to [`Unserved]. The error and redirection
     * counters are not changed.
     *)
  method set_response_status : int -> string -> string -> unit
    (* code, text, proto *)
  method set_response_header : Netmime.mime_header -> unit
    (* Sets the response header *)
  method response_body_open_wr : unit -> Netchannels.out_obj_channel
    (* Opens the response body for writing *)
  method finish : unit -> unit
    (* The call is finished. The [status] is set to [`Successful], 
     * [`Redirection], [`Client_error], or [`Server_error] depending on
     * the response.
     *)
  method cleanup : unit -> unit
    (* Release resources *)

  method response_code : int
  method response_proto : string
  method response_header : Netmime.mime_header
  method dump_status : unit -> unit
  method dump_response_header : unit -> unit
  method dump_response_body : unit -> unit
end

and auth_session =
object
  method auth_scheme : string
  method auth_domain : string list
  method auth_realm : string
  method auth_user : string
  method auth_in_advance : bool
  method authenticate : http_call -> (string * string) list
  method invalidate : http_call -> bool
end


class type virtual gen_call =
object
  inherit http_call
  method private virtual fixup_request : unit -> unit
  method private virtual def_request_method : string
  method private virtual def_empty_path_replacement : string
  method private virtual def_is_idempotent : bool
  method private virtual def_has_req_body : bool
  method private virtual def_has_resp_body : bool
end


class virtual generic_call : gen_call =
object(self)
  val mutable status = (`Unserved : status)

  val mutable req_uri = ""
  val mutable req_host = ""   (* derived from req_uri *)
  val mutable req_port = 80   (* derived from req_uri *)
  val mutable req_path = ""   (* derived from req_uri *)
  val mutable req_base_header = new Netmime.basic_mime_header []
  val mutable req_work_header = new Netmime.basic_mime_header []
  val mutable req_body = new Netmime.memory_mime_body ""

  val mutable eff_req_uri = ""

  val mutable resp_code = 0
  val mutable resp_text = ""
  val mutable resp_proto = ""
  val mutable resp_header = new Netmime.basic_mime_header []
  val mutable resp_body = new Netmime.memory_mime_body ""

  val mutable resp_body_storage = (`Memory : response_body_storage)
  val mutable reconn_mode = Send_again_if_idem
  val mutable redir_mode = Redirect_if_idem
  val mutable proxy_enabled = true

  val mutable private_api = None

  val mutable continue = false   (* Whether 100-Continue has been seen *)    
  val mutable error_counter = 0
  val mutable redir_counter = 0
  val mutable resp_ch = None
  val mutable auth_state = `None


  method private def_private_api fixup_request =
    ( object(pself) 

	  (* We cannot call methods of [self] due to a bug in O'Caml 3.08
           * (present until 3.08.3, fixed in 3.08.4 and 3.09)
           * PR#3576, PR#3678
           *)

	method private close_resp_ch() =
	  match resp_ch with
	    | None -> ()
	    | Some ch -> ch # close_out(); resp_ch <- None

	method get_error_counter = error_counter
	method set_error_counter n = error_counter <- n
	method get_redir_counter = redir_counter
	method set_redir_counter n = redir_counter <- n

	method continue = continue
	method set_continue() = continue <- true

	method auth_state = auth_state
	method set_auth_state s = auth_state <- s

	method set_effective_request_uri s = eff_req_uri <- s

	method prepare_transmission () =
	  pself # close_resp_ch();
	  status <- `Unserved;
	  req_work_header <- new Netmime.basic_mime_header 
	                             req_base_header#fields;
	  resp_code <- 0;
	  resp_text <- "";
	  resp_proto <- "";
	  resp_header <- new Netmime.basic_mime_header [];
	  resp_body <- ( match resp_body_storage with
			   | `Memory ->
			       new Netmime.memory_mime_body ""
			   | `File f ->
			       let name = f() in
			       new Netmime.file_mime_body name
			   | `Body f ->
			       f ()
		       );
	  fixup_request();
	  ( try ignore(req_work_header # field "Date")
	    with Not_found ->
	      Nethttp.Header.set_date req_work_header (Unix.time())
	  );
	  ( try ignore(req_work_header # field "User-agent")
	    with Not_found ->
	      req_work_header # update_field "User-agent" "Netclient"
	  );

	method set_response_status code text proto =
	  resp_code <- code;
	  resp_text <- text;
	  resp_proto <- proto

	method set_response_header h =
	  resp_header <- h

	method response_body_open_wr() =
	  pself # close_resp_ch();
	  let ch = resp_body # open_value_wr() in
	  resp_ch <- Some ch;
	  ch

	method response_code = resp_code
	method response_proto = resp_proto
	method response_header = resp_header

	method finish() =
	  assert(resp_code <> 0);
	  pself # close_resp_ch();
	  status <- (if resp_code >= 200 && resp_code <= 299 then
		       `Successful
		     else if resp_code >= 300 && resp_code <= 399 then
		       `Redirection
		     else if resp_code >= 400 && resp_code <= 499 then
		       `Client_error
		     else
		       `Server_error);

	method set_error_exception x =
	  pself # close_resp_ch();
	  match x with
	      Http_error(_,_) -> assert false   (* not allowed *)
	    | _ ->
		let do_increment =
		  match status with
		    | `Http_protocol_error _ -> false
		    | _ -> true in
		if do_increment then
		  error_counter <- error_counter + 1;
		status <- (`Http_protocol_error x);

	method cleanup () =
	  pself # close_resp_ch()

	method dump_status () =
	  dlog (sprintf
		  "Call %d - HTTP response code: %d (%s)"
		  (Oo.id self) resp_code resp_text);
	  dlog (sprintf
		  "Call %d - HTTP response protocol: %s"
		  (Oo.id self) resp_proto);

	method dump_response_header () =
	  dump_header 
	    (sprintf 
	       "Call %d - HTTP response "
	       (Oo.id self))
	    (resp_header # fields)

	method dump_response_body () =
	  dlog (sprintf "Call %d - HTTP response body:\n%s\n"
		  (Oo.id self) resp_body#value)
      end
    )


  (* Call state *)

  method is_served = status <> `Unserved
  method status = status

  (* Accessing the request message (new style) *)

  method request_method = self # def_request_method
  method request_uri = req_uri
  method set_request_uri uri = 
    try
      let u, pw, h, pt, ph = parse_http_url uri in
      if u <> None || pw <> None then
	failwith "Http_client, set_request_uri: URL must not contain user or password";
      req_uri <- uri;
      req_host <- h;
      req_port <- pt;
      req_path <- ph
    with
	Not_found ->
	  failwith "Http_client: bad URL"


  method request_header (k:header_kind) =
    match k with
      | `Base -> req_base_header
      | `Effective -> req_work_header
  method set_request_header h =
    req_base_header <- h
  method request_body = req_body
  method set_request_body b = req_body <- b

  method effective_request_uri = eff_req_uri

  (* Accessing the response message (new style) *)

  method private check_response() =
    match status with
      | `Unserved -> 
	  failwith "Http_client: HTTP call is unserved, no response yet"
      | `Http_protocol_error e -> 
	  raise (Http_protocol e)
      | _ -> ()

  method response_status_code = self#check_response(); resp_code
  method response_status_text = self#check_response(); resp_text
  method response_status = 
    self#check_response(); Nethttp.http_status_of_int resp_code
  method response_protocol = self#check_response(); resp_proto
  method response_header = self#check_response(); resp_header
  method response_body = self#check_response(); resp_body

  (* Options *)

  method response_body_storage = resp_body_storage
  method set_response_body_storage s = resp_body_storage <- s
  method get_reconnect_mode = reconn_mode
  method set_reconnect_mode m = reconn_mode <- m
  method get_redirect_mode = redir_mode
  method set_redirect_mode m = redir_mode <- m
  method proxy_enabled = proxy_enabled
  method set_proxy_enabled e = proxy_enabled <- e
  method no_proxy() = proxy_enabled <- false
  method is_proxy_allowed() = proxy_enabled

  (* Method characteristics *)

  method empty_path_replacement = self # def_empty_path_replacement
  method is_idempotent = self # def_is_idempotent
  method has_req_body = self # def_has_req_body
  method has_resp_body = self # def_has_resp_body

  (* Repeating calls *)
    
  method same_call() =
    let same =
      {< status = `Unserved;
	 resp_header = new Netmime.basic_mime_header [];
	 resp_body = new Netmime.memory_mime_body "";
	 eff_req_uri = "";
	 private_api = None;
	 error_counter = 0;
	 redir_counter = 0;
	 continue = false;
	 resp_ch = None;
	 auth_state = `None
      >} in
    (same : #http_call :> http_call)

  (* Old style access methods *)

  method get_req_method() = self # def_request_method
  method get_host() = req_host
  method get_port() = req_port
  method get_path() = req_path
  method get_uri() = req_uri
  method get_req_body() = req_body # value
  method get_req_header () =
    List.map (fun (n,v) -> (String.lowercase n, v)) req_base_header#fields
  method assoc_req_header n =
    req_base_header # field n
  method assoc_multi_req_header n =
    req_base_header # multiple_field n
  method set_req_header n v =
    req_base_header # update_field n v
  method get_resp_header() =
    self#check_response(); 
    List.map (fun (n,v) -> (String.lowercase n, v)) resp_header#fields
  method assoc_resp_header n =
    self#check_response(); 
    resp_header # field n
  method assoc_multi_resp_header n =
    self#check_response(); 
    resp_header # multiple_field n
  method get_resp_body() =
    self#check_response();
    if resp_code >= 200 && resp_code <= 299 then
      resp_body # value
    else
      raise(Http_error(resp_code, resp_body#value))
  method dest_status() =
    self#check_response();
    (resp_proto, resp_code, resp_text)

  (* Private *)

  method private_api = 
    match private_api with
      | None ->
	  let api = self # def_private_api self#fixup_request in
	  private_api <- Some api;
	  api
      | Some api ->
	  api

  (* Virtual methods *)

  method virtual private fixup_request : unit -> unit
  method virtual private def_request_method : string
  method virtual private def_empty_path_replacement : string
  method virtual private def_is_idempotent : bool
  method virtual private def_has_req_body : bool
  method virtual private def_has_resp_body : bool
end

(******** SUBCLASSES IMPLEMENTING HTTP METHODS ************************)

class get_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "GET"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class head_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "HEAD"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = false
  method private def_empty_path_replacement = "/"
end

class trace_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "TRACE"
  method private def_is_idempotent = false
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class options_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "OPTIONS"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "*"
end

class post_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "POST"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class put_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "PUT"
  method private def_is_idempotent = false
  method private def_has_req_body = true
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end

class delete_call =
object(self)
  inherit generic_call
  method private fixup_request() = ()
  method private def_request_method = "DELETE"
  method private def_is_idempotent = true
  method private def_has_req_body = false
  method private def_has_resp_body = true
  method private def_empty_path_replacement = "/"
end


class get the_query =
  object (self)
    inherit get_call
    initializer
      self # set_request_uri the_query
  end


class trace the_query max_hops =
  object (self)
    inherit trace_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      (self # request_header `Effective) # update_field 
	                                "max-forwards" (string_of_int max_hops)
  end


class options the_query =
  object (self)
    inherit options_call
    initializer
      self # set_request_uri the_query
  end


class head the_query =
  object (self)
    inherit head_call
    initializer
      self # set_request_uri the_query
  end


class post query params =
  object (self)
    inherit post_call
    initializer
      self # set_request_uri query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      rh # update_field "Content-type" "application/x-www-form-urlencoded";
      let l = List.map (fun (n,v) -> n ^ "=" ^ Netencoding.Url.encode v) params in
      let s = String.concat "&" l in
      rh # update_field "Content-length" (string_of_int (String.length s));
      self # request_body # set_value s
  end
;;


class post_raw the_query s =
  object (self)
    inherit post_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      self # request_body # set_value s;
      rh # update_field "Content-length" (string_of_int (String.length s));
  end
;;


class put the_query s =
  object (self)
    inherit put_call
    initializer
      self # set_request_uri the_query
    method private fixup_request() =
      let rh = self # request_header `Effective in
      self # request_body # set_value s;
      rh # update_field "Content-length" (string_of_int (String.length s));
  end
;;


class delete the_query =
  object (self)
    inherit delete_call
    initializer
      self # set_request_uri the_query
  end
;;


(**********************************************************************)
(***                  AUTHENTICATION METHODS                        ***)
(**********************************************************************)

class type key =
object
  method user : string
  method password : string
  method realm : string
  method domain : string list
end


class type key_handler =
object
  method inquire_key :
            domain:string list -> realms:string list -> auth:string -> key
  method invalidate_key : key -> unit
end


class key_ring ?uplink () =
object(self)
  val mutable keys = (Hashtbl.create 10 : 
			(string list * string, key * bool) Hashtbl.t)
    (* Maps (domain, realm) to (key, from_uplink) *)

  method inquire_key ~domain ~realms ~(auth:string) =
    let l =
      List.flatten
	(List.map
	   (fun realm ->
	      try
		[ Hashtbl.find keys (domain, realm) ]
	      with
		  Not_found -> [])
	   realms) in
    match l with
      | (key,_) :: _ ->
	  key
      | [] ->
	  ( match uplink with
	      | None -> raise Not_found
	      | Some h ->
		  let key = h # inquire_key ~domain ~realms ~auth in
		  (* or Not_found *)
		  Hashtbl.replace keys (key#domain, key#realm) (key,true);
		  key
	  )

  method invalidate_key (key : key) =
    let domain = key # domain in
    let realm = key # realm in
    try
      let (_, from_uplink) = Hashtbl.find keys (domain, realm) in
      Hashtbl.remove keys (domain, realm);
      if from_uplink then
	( match uplink with
	    | None -> assert false
	    | Some h -> h # invalidate_key key
	)
    with
	Not_found -> ()

  method clear () =
    Hashtbl.clear keys

  method add_key key =
    let domain = key # domain in
    let realm = key # realm in
    Hashtbl.replace keys (domain, realm) (key, false)

  method keys =
    Hashtbl.fold
      (fun _ (key,_) acc -> key :: acc)
      keys
      []

end



class type auth_handler =
object
  method create_session : http_call -> auth_session option
end


exception Not_applicable

let get_domain_uri call =
  let h = call # get_host() in
  let p = call # get_port() in
  "http://" ^ h ^ ":" ^ string_of_int p ^ "/"


class basic_auth_session enable_auth_in_advance 
                         key_handler init_call
                         : auth_session =
  let domain = [ get_domain_uri init_call ] in
  let basic_realms =
    (* Return all "Basic" realms in www-authenticate, or raise Not_applicable *)
    let auth_list = 
      try
	Nethttp.Header.get_www_authenticate init_call#response_header
      with
	| Not_found -> raise Not_applicable
	| Nethttp.Bad_header_field _ -> raise Not_applicable in
    let basic_auth_list =
      List.filter 
	(fun (scheme,_) -> String.lowercase scheme = "basic") auth_list in
    let basic_auth_realm_list =
      List.flatten
	(List.map 
	   (fun (_,params) ->
	      try
		let (_,realm) =
		  List.find (fun (pname,_) -> 
			       String.lowercase pname = "realm") params in
		[realm]
	      with 
		  Not_found -> [])
	   basic_auth_list) in
    if basic_auth_realm_list = [] then
      raise Not_applicable
    else
      basic_auth_realm_list
  in
  let key =
    (* Return the selected key, or raise Not_applicable *)
    try
      key_handler # inquire_key ~domain ~realms:basic_realms ~auth:"basic"
    with
	Not_found -> raise Not_applicable
  in
  (* Check the key: *)
  let () =
    if not (List.mem key#realm basic_realms) then raise Not_applicable;
    if key#domain <> domain then raise Not_applicable;
  in
object(self)
  method auth_scheme = "basic"
  method auth_domain = domain
  method auth_realm = key # realm
  method auth_user = key # user
  method auth_in_advance = enable_auth_in_advance
  method authenticate call =
    let basic_cookie = 
      Netencoding.Base64.encode 
	(key#user ^ ":" ^ key#password) in
    let cred = "Basic " ^ basic_cookie in
    [ "Authorization", cred ]
  method invalidate call =
    key_handler # invalidate_key key;
    false
end


class basic_auth_handler ?(enable_auth_in_advance=false) 
                         (key_handler : #key_handler)
                         : auth_handler =
object(self)
  method create_session call =
    try
      Some(new basic_auth_session enable_auth_in_advance key_handler call)
    with
	Not_applicable ->
	  None
end



let contains_auth v =
  List.mem "auth" (split_words v)


class digest_auth_session enable_auth_in_advance 
                          key_handler init_call
                          : auth_session =
  let normalize_domain s =
    if s <> "" && s.[0] = '/' then
      let host = init_call#get_host() in
      let port = init_call#get_port() in
      "http://" ^ host ^ ":" ^ string_of_int port ^ s
    else
      ( try
	  let (_,_,host,port,path) = parse_http_url s in
	  "http://" ^ host ^ ":" ^ string_of_int port ^ path
	with
	    Not_found -> s
      )
  in
  let digest_request =
    (* Return the "Digest" params in www-authenticate, or raise Not_applicable *)
    let auth_list = 
      try
	Nethttp.Header.get_www_authenticate init_call#response_header
      with
	| Not_found -> raise Not_applicable
	| Nethttp.Bad_header_field _ -> raise Not_applicable in
    let digest_auth_list =
      List.filter 
	(fun (scheme,params) -> 
	   String.lowercase scheme = "digest"
	    && List.mem_assoc "realm" params
	    && (try List.mem (List.assoc "algorithm" params) ["MD5";"MD5-sess"]
		with Not_found -> true)
	    && (try contains_auth (List.assoc "qop" params)
		with Not_found -> true)
	    && List.mem_assoc "nonce" params
	) 
	auth_list in
    match  digest_auth_list with
      | [] ->
	  raise Not_applicable
      | (_,params) :: _ ->
	  (* Restriction: only the first request can be processed *)
	  params
  in
  let domain =
    try 
      List.map
	normalize_domain
	(split_words (List.assoc "domain" digest_request))
    with
	Not_found -> [ get_domain_uri init_call ] in
  let realm =
    try List.assoc "realm" digest_request
    with Not_found -> assert false in
  let key =
    (* Return the selected key, or raise Not_applicable *)
    try
      key_handler # inquire_key ~domain ~realms:[realm] ~auth:"digest"
    with
	Not_found -> raise Not_applicable
  in
  (* Check the key: *)
  let () =
    if key#realm <> realm then raise Not_applicable;
    if key#domain <> domain then raise Not_applicable;
  in
  let algorithm =
    try List.assoc "algorithm" digest_request
    with Not_found -> "MD5" in
  let qop =
    if List.mem_assoc "qop" digest_request then "auth" else "" in
    (* "" = RFC 2069 mode *)
  let nonce =
    try List.assoc "nonce" digest_request
    with Not_found -> assert false in
object(self)
  val mutable cnonce_init = string_of_float (Unix.time())
  val mutable cnonce_incr = 0
  val mutable nc = 0
  val mutable opaque = None
  val mutable a1 = None

  method private first_cnonce =
    Digest.to_hex
      (Digest.string (cnonce_init ^ ":0"))

  method private next_cnonce() =
    let cnonce =
      Digest.to_hex
	(Digest.string (cnonce_init ^ ":" ^ string_of_int cnonce_incr)) in
    cnonce_incr <- cnonce_incr + 1;
    cnonce

  method private next_nc() =
    let r = nc in
    nc <- nc + 1;
    r

  method private fn_h data =
    encode_hex (Digest.string data)

  method private fn_kd secret data =
    encode_hex (Digest.string (secret ^ ":" ^ data))

  method private a1 =
    match a1 with
      | Some v -> v
      | None ->
	  let v =
	    match algorithm with
	      | "MD5" ->
		  key#user ^ ":" ^ realm ^ ":" ^ key#password
	      | "MD5-sess" ->
		  (self # fn_h
		     (key#user ^ ":" ^ realm ^ ":" ^ key#password)) ^ 
		  ":" ^ nonce ^ ":" ^ self#first_cnonce
	      | _ ->
		  assert false
	  in
	  a1 <- Some v;
	  v

  method private a2 call =
    let meth = call # request_method in
    let uri = call # effective_request_uri in
    meth ^ ":" ^ uri

  method authenticate call =
    let cnonce = self#next_cnonce() in
    let nc = self # next_nc() in
    let digest =
      match qop with
	| "auth" ->
	    self#fn_kd
	      (self#fn_h self#a1)
	      (nonce ^ ":" ^ (Printf.sprintf "%08x" nc) ^ ":" ^ cnonce ^ ":" ^ 
		 "auth:" ^ (self#fn_h (self#a2 call)))
	| "" ->
	    self#fn_kd
	      (self#fn_h self#a1)
	      (nonce ^ ":" ^ (self#fn_h (self#a2 call))) 
	| _ ->
	    assert false  (* such digests are not accepted *)
    in
    let creds =
      Printf.sprintf
	"Digest username=\"%s\",realm=\"%s\",nonce=\"%s\",uri=\"%s\",response=\"%s\",algorithm=%s,cnonce=\"%s\",%s%snc=%08d"
	key#user
	realm
	nonce
	call#effective_request_uri
	digest
	algorithm
	cnonce
	(match opaque with
	   | None -> ""
	   | Some s -> "opaque=\"" ^ s ^ "\",")
	(match qop with
	   | "" -> ""
	   | "auth" -> "qop=auth,"
	   | _ -> assert false)
	nc in
    [ "Authorization", creds ]

  method auth_scheme = "digest"
  method auth_domain = domain
  method auth_realm = key # realm
  method auth_user = key # user
  method auth_in_advance = enable_auth_in_advance

  method invalidate call =
    (* Check if the [stale] flag is set for our nonce: *)
    let is_stale =
      try
	let auth_list = 
	  Nethttp.Header.get_www_authenticate call#response_header in
	List.exists
	  (fun (scheme,params) -> 
	     String.lowercase scheme = "digest"
	      && (try List.assoc "realm" params = realm
		  with Not_found -> false) 
	      && (try List.assoc "nonce" params = nonce
		  with Not_found -> false)
	      && (try String.lowercase (List.assoc "stale" params) = "true"
		  with Not_found -> false)
	  ) 
	  auth_list
      with
	| Not_found -> false  (* No www-authenticate header *)
	| Nethttp.Bad_header_field _ -> false in
    is_stale || (
      key_handler # invalidate_key key;
      false
    )
end


class digest_auth_handler ?(enable_auth_in_advance=false) 
                         (key_handler : #key_handler)
                         : auth_handler =
object(self)
  method create_session call =
    try
      Some(new digest_auth_session enable_auth_in_advance key_handler call)
    with
	Not_applicable ->
	  None
end


let only_http =
  let http_syntax = Hashtbl.find Neturl.common_url_syntax "http" in
  let schemes = Hashtbl.create 1 in
  Hashtbl.add schemes "http" http_syntax;
  schemes

let parse_http_neturl s =
  (* Parses the http URL s *)
  Neturl.parse_url
    ~schemes:only_http
    ~accept_8bits:true
    ~enable_fragment:true
    s

let norm_neturl neturl =
  (* Returns the neturl as normalized string (esp. normalized % sequences) *)
  let neturl' =
    Neturl.make_url 
      ~encoded:false
      ~scheme:(Neturl.url_scheme neturl)
      ~host:(Neturl.url_host neturl)
      ~port:(try Neturl.url_port neturl with Not_found -> 80)
      ~path:(try Neturl.url_path neturl with Not_found -> [])
      ?query:(try Some(Neturl.url_query neturl) with Not_found -> None)
      ?fragment:(try Some(Neturl.url_fragment neturl) with Not_found -> None)
      (Neturl.url_syntax_of_url neturl) in
  Neturl.string_of_url neturl'

let prefixes_of_neturl s_url =
  (* Returns a list of all legal prefixes of the absolute URI s.
   * The prefixes are in Neturl format.
   *)
  let rec rev_path_prefixes rev_path =
    match rev_path with
      | [] -> []
      | [ "" ] -> [ rev_path; [] ]
      | [ ""; "" ] -> assert false
      | [ _; "" ] -> rev_path :: rev_path_prefixes [ "" ]
      | "" :: rev_path' ->
	  if rev_path' = [ ""; "" ] then
	    rev_path :: rev_path_prefixes [ "" ]
	  else
	    rev_path :: rev_path_prefixes rev_path'
      | _ :: rev_path' ->
	  rev_path :: (rev_path_prefixes ("" :: rev_path'))
  in
  let path_prefixes path =
    List.map List.rev (rev_path_prefixes (List.rev path)) in
  let s_nofrag_url = Neturl.remove_from_url ~fragment:true s_url in
  let s_noquery_url = Neturl.remove_from_url ~query:true s_nofrag_url in
  let path = Neturl.url_path s_noquery_url in
  s_url :: s_nofrag_url ::
    (List.map
       (fun prefix -> Neturl.modify_url ~path:prefix s_noquery_url
       )
       (path_prefixes path))



class auth_cache =
object(self)
  val mutable auth_handlers = []
  val sessions = Hashtbl.create 10
    (* Only sessions that can be used for authentication in advance. 
     * The hash table maps domain URIs to sessions.
     *)

  method add_auth_handler (h : auth_handler) =
    auth_handlers <- auth_handlers @ [h]

  method create_session (call : http_call) =
    (* Create a new session after a 401 reply *)
    let rec find l =
      match l with
	| [] -> None
	| h :: l' ->
	    ( match h # create_session call with
		| None ->
		    find l'
		| Some s ->
		    Some s
	    )
    in
    find auth_handlers

  method tell_successful_session (sess : auth_session) =
    (* Called by [postprocess_complete_message] when authentication was
     * successful. If enabled, [sess] can be used for authentication
     * in advance.
     *)
    if sess # auth_in_advance then (
      List.iter
	(fun dom_uri ->
	   try
	     let dom_uri' = parse_http_neturl dom_uri in
	     let dom_uri'' = norm_neturl dom_uri' in
	     Hashtbl.replace sessions dom_uri'' sess
	   with
	     | Neturl.Malformed_URL -> ()
	)
	sess#auth_domain
    )

  method tell_failed_session (sess : auth_session) =
    (* Called by [postprocess_complete_message] when authentication 
     * failed
     *)
    List.iter
      (fun dom_uri ->
	 try
	   let dom_uri' = parse_http_neturl dom_uri in
	   let dom_uri'' = norm_neturl dom_uri' in
	   Hashtbl.remove sessions dom_uri''
	 with
	   | Neturl.Malformed_URL -> ()
      )
      sess#auth_domain;


  method find_session_in_advance (call : http_call) =
    (* Find a session suitable for authentication in advance *)
    let uri = call # request_uri in
    (* We are not only looking for [uri], but also for all prefixes of [uri] *)
    try
      let uri' = parse_http_neturl uri in
      let prefixes = prefixes_of_neturl uri' in
      let prefix =
	List.find (* or Not_found *)
	  (fun prefix ->
	     let s = norm_neturl prefix in
	     Hashtbl.mem sessions s
	  )
	  prefixes in
      Hashtbl.find sessions (norm_neturl prefix)
    with
      | Neturl.Malformed_URL ->
	  raise Not_found
end


(* Backwards compatibility: *)

class key_backing_store =
object(self)
  val db = (Hashtbl.create 10 : (string, (string*string)) Hashtbl.t)
  method set_realm realm user password =
    Hashtbl.replace db realm (user,password)
  method inquire_key ~domain ~realms ~(auth:string) =
    let realm = List.find (fun realm -> Hashtbl.mem db realm) realms in
    let (user, password) = Hashtbl.find db realm in
    ( object
	method user = user
	method password = password
	method realm = realm
	method domain = (domain : string list)
      end
    )
  method invalidate_key (_ : key) = ()
end


class auth_method name (mk_auth_handler : key_ring -> auth_handler) =
  let key_bs =
    new key_backing_store in
  let key_ring = 
    new key_ring ~uplink:key_bs () in
  let auth_handler = 
    mk_auth_handler key_ring in
object(self)
  method name = (name : string)
  method set_realm realm user password =
    key_bs # set_realm realm user password
  method as_auth_handler =
    auth_handler
end


class basic_auth_method =
  auth_method 
    "basic"
    (fun kr -> 
       new basic_auth_handler ~enable_auth_in_advance:true kr)

class digest_auth_method =
  auth_method
    "digest"
    (fun kr -> 
       new digest_auth_handler ~enable_auth_in_advance:true kr)


(**********************************************************************)
(***                 THE CONNECTION CACHE                           ***)
(**********************************************************************)

type conn_state = [ `Inactive | `Active of < > ]
  (** A TCP connection may be either [`Inactive], i.e. it is not used
    * by any pipeline, or [`Active obj], i.e. it is in use by the pipeline
    * [obj].
   *)

class type connection_cache =
object
  method get_connection_state : Unix.file_descr -> conn_state
    (** Returns the state of the file descriptor *)
  method set_connection_state : Unix.file_descr -> conn_state -> unit
    (** Sets the state of the file descriptor. It is allowed that
      * inactive descriptors are simply closed and forgotten.
     *)
  method find_inactive_connection : Unix.sockaddr -> Unix.file_descr
    (** Returns an inactive connection to the passed peer, or raise
      * [Not_found].
     *)
  method find_my_connections : < > -> Unix.file_descr list
    (** Returns all active connections owned by the object *)
  method close_connection : Unix.file_descr -> unit
    (** Deletes the connection from the cache, and closes it *)
  method close_all : unit -> unit
    (** Closes all descriptors known to the cache *)
end


let close_connection_cache conn_cache =
  conn_cache # close_all()


class restrictive_cache : connection_cache =
object(self)
  val mutable active_conns = Hashtbl.create 10
  val mutable rev_active_conns = Hashtbl.create 10

  method get_connection_state fd =
    `Active(Hashtbl.find active_conns fd)

  method set_connection_state fd state =
    match state with
      | `Active owner ->
	  Hashtbl.replace active_conns fd owner;
	  let fd_list = 
	    try Hashtbl.find rev_active_conns owner with Not_found -> [] in
	  if not (List.mem fd fd_list) then
	    Hashtbl.replace rev_active_conns owner (fd :: fd_list);
	  
      | `Inactive ->
	  self # close_connection fd

  method find_inactive_connection _ = raise Not_found

  method find_my_connections owner =
    try
      Hashtbl.find rev_active_conns owner
    with
	Not_found -> []

  method close_connection fd =
    ( try
	let owner = Hashtbl.find active_conns fd in
	let fd_list = 
	  try Hashtbl.find rev_active_conns owner with Not_found -> [] in
	let fd_list' =
	  List.filter (fun fd' -> fd' <> fd) fd_list in
	Hashtbl.replace rev_active_conns owner fd_list'
      with
	  Not_found -> ()
    );
    Hashtbl.remove active_conns fd;
    Netlog.Debug.release_fd fd;
    Unix.close fd

  method close_all () =
    Hashtbl.iter
      (fun fd _ ->
	 Netlog.Debug.release_fd fd;
	 Unix.close fd)
      active_conns;
    Hashtbl.clear active_conns;
    Hashtbl.clear rev_active_conns
	  
end


let create_restrictive_cache() = new restrictive_cache


class aggressive_cache : connection_cache =
object(self)
  val mutable active_conns = Hashtbl.create 10
    (* maps file_descr to owner *)
  val mutable rev_active_conns = Hashtbl.create 10
    (* maps owner to file_descr list *)
  val mutable inactive_conns = Hashtbl.create 10
    (* maps file_descr to sockaddr *)
  val mutable rev_inactive_conns = Hashtbl.create 10
    (* maps sockaddr to file_descr list *)

  method get_connection_state fd =
    try
      `Active(Hashtbl.find active_conns fd)
    with
	Not_found ->
	  if Hashtbl.mem inactive_conns fd then
	    `Inactive
	  else
	    raise Not_found

  method set_connection_state fd state =
    match state with
      | `Active owner ->
	  self # forget_inactive_connection fd;
	  Hashtbl.replace active_conns fd owner;
	  let fd_list = 
	    try Hashtbl.find rev_active_conns owner with Not_found -> [] in
	  if not (List.mem fd fd_list) then
	    Hashtbl.replace rev_active_conns owner (fd :: fd_list);
      | `Inactive ->
	  ( try
	      let peer = Netsys.getpeername fd in
	      self # forget_active_connection fd;
	      Hashtbl.replace inactive_conns fd peer;
	      let fd_list =
		try Hashtbl.find rev_inactive_conns peer with Not_found -> [] in
	      if not (List.mem fd fd_list) then
		Hashtbl.replace rev_inactive_conns peer (fd :: fd_list)
	    with
	      | Unix.Unix_error(Unix.ENOTCONN,_,_) ->
		  self # close_connection fd
	  )

  method find_inactive_connection peer =
    match Hashtbl.find rev_inactive_conns peer with
      | [] -> raise Not_found
      | fd :: _ -> fd

  method find_my_connections owner =
    try
      Hashtbl.find rev_active_conns owner
    with
	Not_found -> []

  method private forget_active_connection fd =
    ( try
	let owner = Hashtbl.find active_conns fd in
	let fd_list = 
	  try Hashtbl.find rev_active_conns owner with Not_found -> [] in
	let fd_list' =
	  List.filter (fun fd' -> fd' <> fd) fd_list in
	if fd_list' <> [] then 
	  Hashtbl.replace rev_active_conns owner fd_list'
	else
	  Hashtbl.remove rev_active_conns owner
      with
	  Not_found -> ()
    );
    Hashtbl.remove active_conns fd;
   

  method private forget_inactive_connection fd =
    try
      let peer = Hashtbl.find inactive_conns fd in
      (* Do not use getpeername! fd might be disconnected in the meantime! *)
      let fd_list = 
	try Hashtbl.find rev_inactive_conns peer with Not_found -> [] in
      let fd_list' =
	List.filter (fun fd' -> fd' <> fd) fd_list in
      if fd_list' <> [] then 
	Hashtbl.replace rev_inactive_conns peer fd_list'
      else
	Hashtbl.remove rev_inactive_conns peer;
      Hashtbl.remove inactive_conns fd;
    with
      | Not_found ->
	  ()


  method close_connection fd =
    self # forget_active_connection fd;
    self # forget_inactive_connection fd;
    Netlog.Debug.release_fd fd;
    Unix.close fd


  method close_all () =
    Hashtbl.iter
      (fun fd _ ->
	 Netlog.Debug.release_fd fd;
	 Unix.close fd)
      active_conns;
    Hashtbl.clear active_conns;
    Hashtbl.clear rev_active_conns;
    Hashtbl.iter
      (fun fd _ ->
	 Netlog.Debug.release_fd fd;
	 Unix.close fd)
      inactive_conns;
    Hashtbl.clear inactive_conns;
    Hashtbl.clear rev_inactive_conns
end


let create_aggressive_cache() = new aggressive_cache


(**********************************************************************)
(***                       THE I/O BUFFER                           ***)
(**********************************************************************)

(* io_buffer performs the socket I/O.
 *
 * TODO: COMMENT OUT OF DATE
 * The input side is a queue of octets which can be filled by
 * a Unix.read call at its end, and from which octets can be removed
 * at its beginning ("consuming octets").
 * There is also functionality to remove 1XX responses at the beginning
 * of the buffer, and to interpret the beginning of the buffer as HTTP
 * status line.
 * The idea of the buffer is that octets can be added at the end of the
 * buffer while the beginning of the buffer is interpreted as the beginning
 * of the next message. Once enough octets have been added that the message
 * is complete, it is removed (consumed) from the buffer, and the possibly
 * remaining octets are the beginning of the following message.
 *)

exception Garbage_received of string
  (* This exception is raised by [parse_response] when a protocol error
   * occurred before the response status line has been completely received.
   * Such errors are not transferred to the http_call.
   *)


let line_end_re = Netstring_pcre.regexp "[^\\x00\r\n]+\r?\n";;

let line_end2_re = Netstring_pcre.regexp "([^\\x00\r\n]+\r?\n)*\r?\n";;

let status_re = Netstring_pcre.regexp "^([^ \t]+)[ \t]+([0-9][0-9][0-9])([ \t]+([^\r\n]*))?\r?\n$" ;;

let chunk_re = Netstring_pcre.regexp "[ \t]*([0-9a-fA-F]+)[ \t]*(;[^\r\n\\x00]*)?\r?\n" ;;

let crlf_re = Netstring_pcre.regexp "\r?\n";;

type sockstate =
    Down
  | Up_rw 
  | Up_r
;;


class io_buffer options conn_cache fd fd_state =
  object (self)

    (****************************** SOCKET ********************************)

    val mutable socket_state = fd_state

    method socket_state = socket_state

    method socket = 
      match socket_state with
	| Down -> failwith "Socket is down"
	| _ -> fd

    method socket_str =
      try
	Int64.to_string (Netsys.int64_of_file_descr self # socket)
      with _ -> "n/a"


    method close_out() =
      match socket_state with
	| Down  -> ()
	| Up_rw -> 
	    if options.verbose_connection then 
	      dlogr (fun () ->
		       sprintf "FD %Ld - HTTP connection: Sending EOF!"
			 (Netsys.int64_of_file_descr fd));
	    Unix.shutdown fd Unix.SHUTDOWN_SEND; 
	    socket_state <- Up_r
	| Up_r  -> 
	    ()

    method close() =
      match socket_state with
	| Down  -> ()
	| _     -> 
	    if options.verbose_connection then 
	      dlogr (fun () ->
		       sprintf "FD %Ld - HTTP connection: Closing socket!"
			 (Netsys.int64_of_file_descr fd));
	    socket_state <- Down;
	    conn_cache # close_connection fd

    method release() =
      (* Give socket back to cache: *)
      match socket_state with
	| Down  -> ()
	| Up_r  -> self # close()
	| Up_rw -> 
	    conn_cache # set_connection_state fd `Inactive;
	    socket_state <- Down  (* our view *)


    (****************************** INPUT ********************************)

    val mutable in_buf = B.create 8192
    val mutable in_eof = false
    val mutable in_eof_parsed = false (* whether eof has been seen by parser *)
    val mutable in_pos = 0            (* parse position *)
    val mutable status_seen = false
    val mutable timeout = false
    val mutable restart_parser = (fun _ -> assert false)

    initializer
      restart_parser <- self # parse_status_line;

      (* Ensure the 0-byte invariant holds (see comment in unix_read): *)
      let b = B.unsafe_buffer in_buf in
      if B.length in_buf < String.length b then
	b.[B.length in_buf] <- '\000'

    method unix_read () =
      if not in_eof && not timeout then (
	let n = 
	  B.add_inplace 
	    in_buf
	    (fun io_buf pos len ->
	       syscall (fun() -> Unix.recv fd io_buf pos len []))
	in
	if n = 0 then
	  in_eof <- true;
	(* None of the regexps matches the null byte. So this byte can be
         * used as guard that indicates the end of the buffer. We only
         * have to make sure it is always there.
         *)
	let b = B.unsafe_buffer in_buf in
	if B.length in_buf < String.length b then
	  b.[B.length in_buf] <- '\000';
      )
	    
    method timeout =
      timeout

    method set_timeout =
      timeout <- true

    method has_unparsed_data =
      (* whether data has been received that must go through the parser *)
      (in_eof && not in_eof_parsed) || (in_pos < B.length in_buf)

    method has_unparsed_bytes =
      (* whether there are unparsed data consisting of at least one byte *)
      in_pos < B.length in_buf

    method status_seen =
      (* Whether at least one status line (incl. status 1XX) has been seen *)
      status_seen

    method in_eof = 
      (* end of stream indicator *)
      in_eof

    method parse_response (call : http_call) =
      (* Parses the [in_buf] buffer and puts the parsed data into
       * [call].
       *
       * This function must only be called if at least one additional
       * byte has been received. The function returns when
       * - not enough bytes are available, or
       * - the response has been completely parsed. This latter case
       *   can be recognized because the [call] is finished then.
       *)
      try
	restart_parser call;
	assert(in_pos >= 0 && in_pos <= B.length in_buf);
	   (* Many versions of Netbuffer do not check arguments of [delete]
            * correctly.
            *)
	B.delete in_buf 0 in_pos;
	in_pos <- 0;
	in_eof_parsed <- in_eof;
	(* Ensure the 0-byte invariant holds (see comment in unix_read): *)
	let b = B.unsafe_buffer in_buf in
	if B.length in_buf < String.length b then
	  b.[B.length in_buf] <- '\000'
      with
	| Bad_message _ as err -> 
	    call # private_api # set_error_exception err;
	    assert(status_seen)  (* otherwise cleanup code will not work *)
        (* Garbage_received not caught! *)
      

    method private parse_status_line call =
      (* Parses the status line. If 1XX: do XXX *)
      restart_parser <- self # parse_status_line;
      let b = B.unsafe_buffer in_buf in
      match Netstring_pcre.string_match line_end_re b in_pos with
	| None ->
	    if B.length in_buf - in_pos > 500 then 
	      raise (Garbage_received "Status line too long");
	    if in_eof then 
	      raise (Garbage_received "EOF where status line expected")
	| Some m ->
	    in_pos <- Netstring_pcre.match_end m;
	    assert(in_pos <= B.length in_buf);
	    let s = Netstring_pcre.matched_string m b in
	    ( match Netstring_pcre.string_match status_re s 0 with
		| None ->
		    raise (Garbage_received "Bad status line")
		| Some m ->
		    let proto = Netstring_pcre.matched_group m 1 s in
		    let code_str = Netstring_pcre.matched_group m 2 s in
		    let code = int_of_string code_str in
		    let text =
		      try Netstring_pcre.matched_group m 4 s 
		      with Not_found -> "" in
		    if code < 100 || code > 599 then 
		      raise (Garbage_received "Bad status code");
		    status_seen <- code >= 200;
		    call # private_api # set_response_status code text proto;
		    self # parse_header code call
	    )

    method private parse_header code call =
      (* Parses the HTTP header following the status line *)
      restart_parser <- self # parse_header code;
      let b = B.unsafe_buffer in_buf in
      match Netstring_pcre.string_match line_end2_re b in_pos with
	| None ->
	    if B.length in_buf - in_pos > 100000 then (
	      if code < 200 then
		raise (Garbage_received "Response header too long")
	      else
		raise (Bad_message "Response header too long")
	    );
	    if in_eof then (
	      if code < 200 then
		raise (Garbage_received "EOF where response header expected")
	      else
		raise (Bad_message "EOF where response header expected")
	    )
	| Some m ->
	    let start = in_pos in
	    in_pos <- Netstring_pcre.match_end m;
	    assert(in_pos <= B.length in_buf);
	    let _ch =
	      new Netchannels.input_string ~pos:start ~len:(in_pos-start) b in
	    let header_l, real_end_pos =
	      try
		Mimestring.scan_header
	          ~downcase:false ~unfold:true ~strip:true b 
		  ~start_pos:start ~end_pos:in_pos 
	      with
		| Failure _ ->
		    if code < 200 then
		      raise (Garbage_received "Bad response header")
		    else
		      raise (Bad_message "Bad response header")
	    in
	    assert(real_end_pos = in_pos);
	    let header = new Netmime.basic_mime_header header_l in
	    if code >= 100 && code <= 199 then (
	      call # private_api # set_continue();
	      self # parse_status_line call
	    )
	    else (
	      call # private_api # set_response_header header;
	      self # parse_body code header call
	    )

    method private parse_body code header call =
      (* Parses the whole HTTP body *)
      restart_parser <- self # parse_body code header;
      (* First determine whether a body is expected: *)
      let have_body =
	call # has_resp_body && code <> 204 && code <> 304 in
      if have_body then (
	let ch = call # private_api # response_body_open_wr() in
	(* Check if we have chunked encoding: *)
	let is_chunked =
	  try header # field "Transfer-encoding" <> "identity"
	  with Not_found -> false in
	if is_chunked then
	  self # parse_chunked_body code header ch call
	else (
	  let length_opt =
	    try 
	      let l = Int64.of_string (header # field "Content-Length") in
	      if l < 0L then raise (Bad_message "Bad Content-Length field");
	      Some l
	    with
	      | Failure _ -> raise (Bad_message "Bad Content-Length field")
	      | Not_found -> None in
	  self # parse_plain_body code header ch length_opt call
	)
      )
      else
	self # parse_end call

    method private parse_plain_body code header ch length_opt call =
      (* Parses a non-chunked HTTP body. If length_opt=None, the message
       * is terminated by EOF. If length_opt=Some len, the message has
       * this length.
       *)
      restart_parser <- self # parse_plain_body code header ch length_opt;
      let av_len = B.length in_buf - in_pos in
      match length_opt with
	| None ->
	    ch # really_output (B.unsafe_buffer in_buf) in_pos av_len;
	    in_pos <- in_pos + av_len;
	    assert(in_pos <= B.length in_buf);
	    if in_eof then self # parse_end call

	| Some len ->
	    let l = Int64.to_int (min (Int64.of_int av_len) len) in
	    ch # really_output (B.unsafe_buffer in_buf) in_pos l;
	    in_pos <- in_pos + l;
	    assert(in_pos <= B.length in_buf);
	    let len' = Int64.sub len (Int64.of_int l) in
	    if len' > 0L then (
	      if in_eof then 
		raise (Bad_message "Response body too short");
	      restart_parser <- 
		self # parse_plain_body code header ch (Some len');
	    ) else (
	      self # parse_end call
	    )

    method private parse_chunked_body code header ch call =
      (* Parses a chunked HTTP body *)
      restart_parser <- self # parse_chunked_body code header ch;
      let b = B.unsafe_buffer in_buf in
      match Netstring_pcre.string_match chunk_re b in_pos with
	| None ->
	    if B.length in_buf - in_pos > 5000 then 
	      raise (Bad_message "Cannot parse chunk of response body");
	    if in_eof then 
	      raise (Bad_message "EOF where next response chunk expected")
	| Some m ->
	    in_pos <- Netstring_pcre.match_end m;
	    assert(in_pos <= B.length in_buf);
	    let hex_len = Netstring_pcre.matched_group m 1 b in
	    let len =
	      try Int64.of_string ("0x" ^ hex_len)
	      with Failure _ -> 
		raise (Bad_message "Chunk too large") in
	    if len = 0L then
	      self # parse_trailer code header ch call
	    else
	      self # parse_chunk_data code header ch len call

    method private parse_chunk_data code header ch len call =
      (* Parses the chunk data following the chunk size field *)
      restart_parser <- self # parse_chunk_data code header ch len;
      let av_len = B.length in_buf - in_pos in
      let l = Int64.to_int (min (Int64.of_int av_len) len) in
      ch # really_output (B.unsafe_buffer in_buf) in_pos l;
      in_pos <- in_pos + l;
      assert(in_pos <= B.length in_buf);
      let len' = Int64.sub len (Int64.of_int l) in
      if len' > 0L then (
	if in_eof then 
	  raise (Bad_message "Repsonse chunk terminated by EOF");
	restart_parser <- 
	  self # parse_chunk_data code header ch len'
      ) else
	self # parse_chunk_end code header ch call

    method private parse_chunk_end code header ch call =
      (* Parses the CRLF after the chunk, and the next chunks *)
      restart_parser <- self # parse_chunk_end code header ch;
      let b = B.unsafe_buffer in_buf in
      match Netstring_pcre.string_match crlf_re b in_pos with
	| None ->
	    if B.length in_buf - in_pos > 2 then 
	      raise (Bad_message "CR/LF after response chunk is missing");
	    if in_eof then 
	      raise (Bad_message "EOF where next response chunk expected")
	| Some m ->
	    in_pos <- Netstring_pcre.match_end m;
	    assert(in_pos <= B.length in_buf);
	    self # parse_chunked_body code header ch call

    method private parse_trailer code header ch call =
      (* Parses the trailer *)
      restart_parser <- self # parse_trailer code header ch;
      let b = B.unsafe_buffer in_buf in
      match Netstring_pcre.string_match line_end2_re b in_pos with
	| None ->
	    if B.length in_buf - in_pos > 10000 then 
	      raise (Bad_message "Response trailer too large");
	    if in_eof then 
	      raise (Bad_message "EOF where response trailer expected")
	| Some m ->
	    let start = in_pos in
	    in_pos <- Netstring_pcre.match_end m;
	    assert(in_pos <= B.length in_buf);
	    let _ch =
	      new Netchannels.input_string ~pos:start ~len:(in_pos-start) b in
	    let trailer_l, real_end_pos = 
	      try
		Mimestring.scan_header
	          ~downcase:false ~unfold:true ~strip:true b 
		  ~start_pos:start ~end_pos:in_pos 
	      with
		| Failure _ -> raise(Bad_message "Bad trailer") in
	    assert(real_end_pos = in_pos);
	    (* The trailer is simply added to the header: *)
	    let new_header =
	      new Netmime.basic_mime_header (header#fields @ trailer_l) in
	    call # private_api # set_response_header new_header;
	    self # parse_end call
	      
    method private parse_end call =
      (* The message ends here! *)
      restart_parser <- self # parse_status_line;  (* for the next message *)
      call # private_api # finish()


    (****************************** OUTPUT ********************************)

    val mutable string_to_send = ""
    val mutable send_buffer = String.create 8192
    val mutable send_position = 0
    val mutable send_length = 0

    method send_this_string s =
      string_to_send <- s;
      send_position <- 0;
      send_length <- String.length s

    method send_by_buffer f =
      string_to_send <- send_buffer;
      send_position <- 0;
      send_length <- 0;
      let m = String.length send_buffer in
      let n = f send_buffer 0 m in
      send_length <- n;
      n

    method nothing_to_send =
      send_position = send_length

    method unix_write() =
      let n_to_send = send_length - send_position in
      let n = 
	syscall (fun () -> 
		   Unix.send fd string_to_send send_position n_to_send [])
      in
      send_position <- send_position + n;

    method dump_send_buffer () =
      dlogr (fun () ->
	       sprintf "FD %Ld - HTTP request body fragment:\n%s\n"
		 (Netsys.int64_of_file_descr fd)
		 (String.sub send_buffer 0 send_length)
	    )

  end
;;

(**********************************************************************)
(***                PROTOCOL STATE OF A SINGLE MESSAGE              ***)
(**********************************************************************)

(* The class 'transmitter' is a container for the message and the
 * associated protocol state, and defines the methods to send
 * the request, and to receive the reply.
 * The protocol state stored here mainly controls the send and receive
 * buffers (e.g. how many octets have already been sent? Are the octets
 * received so far already a complete message or not?)
 *)


type message_state =
    Unprocessed     (* Not even a byte sent or received *)
  | Sending_hdr     (* Sending currently the header *)
  | Handshake       (* The header has been sent, now waiting for status 100 *)
  | Sending_body    (* The header has been sent, now sending the body *)
  | Sent_request    (* The body has been sent; the reply is being received *)
  | Complete        (* The whole reply has been received *)
  | Complete_broken (* The whole reply has been received, but the connection
                     * is in a state that does not permit further requests
                     *)
  | Broken          (* Garbage was responded *)
;;

(* Transitions:
 *
 * (1) Normal transitions:
 * 
 * Unprocessed -> Sending_hdr: Always done (a)
 * Sending_hdr -> Handshake: Only if we are handshaking (a)
 * Handshake -> Sending_body: When the 100 Continue status arrives (b)
 * Sending_hdr -> Sending_body: usually (a)
 * Sending_body -> Sent_request: usually (a)
 * Sending_hdr -> Sent_request: if no body is sent (a)
 * Sent_request -> Complete: when the response has arrived (b)
 * Handshake -> Complete_broken:  when the response arrives instead of 100 Continue (b)
 *
 * (2) Unusual transitions:
 *
 * (Unprocessed | Sending_hdr | Sending_body) -> Complete_broken: 
 *    The response arrives while/before sending the header/body (b)
 *    or: Parsing error in the response (b)
 *
 * (Unprocessed | Sending_hdr | Sending_body) -> Broken
 *    Garbage arrives (b)
 *
 *      (a) transition is done by the sending side of the transmitter
 *      (b) transition is done by the receiving side of the transmitter
 *)



class transmitter
  peer_is_proxy
  (m : http_call) 
  (f_done : http_call -> unit)
  options
  =
  object (self) 
    val mutable state = Unprocessed
    val indicate_done = f_done
    val msg = m

    val mutable auth_headers = []
      (* Additional header for _proxy_ authentication *)

    val mutable body = new Netchannels.input_string ""
    val mutable body_length = None   (* Set after [init] *)

    method state = state

    method f_done = indicate_done

    method init() =
      (* Prepare for (re)transmission:
       * - Set the `Effective request header
       * - Reset the status info of the http_call
       * - Initialize transmission state
       *)
      msg # private_api # prepare_transmission();
      (* Set the effective URI. This must happen before authentication. *)
      let eff_uri =
	if peer_is_proxy then
	   msg # request_uri
	else
	  let path = msg # get_path() in
	  if path = "" then (
	    msg # empty_path_replacement
	  )
	  else path
      in
      msg # private_api # set_effective_request_uri eff_uri;
      let ah = 
	match msg # private_api # auth_state with
	  | `None -> []
	  | `In_advance session 
	  | `In_reply session ->
	      session # authenticate msg in
      let rh = msg # request_header `Effective in
      List.iter
	(fun (n,v) ->
	   rh # update_field n v
	)
	(auth_headers @ ah);
      state <- Unprocessed;
      body_length <- ( try
			 let s = rh # field "Content-length" in
			 Some(Int64.of_string s)
		       with Not_found -> None
		     );
      self # close_body();
      let have_body =
	msg # has_req_body &&
	  (body_length = None || body_length <> Some 0L) in
      if not have_body then (
	(* Remove certain headers *)
	rh # delete_field "Expect";
      );

    method cleanup() =
      (* release resources *)
      self # close_body();
      msg # private_api # cleanup()

    method add_auth_header n v =
      auth_headers <- (n,v) :: auth_headers

    method private open_body() =
      body <- msg # request_body # open_value_rd()

    method close_body() =
      try body # close_in() with _ -> ()
      (* also called by [clear_write_queue] *)

    method send (io : io_buffer) do_handshake =
      (* do_handshake: If true, only the header is transmitted, and the
       * state transitions to [Handshake]. This flag is ignored if we have
       * already [Sending_body].
       *)

      assert 
	(state = Unprocessed || state = Sending_hdr || state = Sending_body);

      (* First check whether we have to refill [string_to_send]: *)

      ( match state with
	  | Unprocessed ->
	      (* Fill [string_to_send] with the request line and the header. *)
	
	      let buf = Buffer.create 1000 in
	      let req_meth = msg # request_method in
	      
	      Buffer.add_string buf req_meth;
	      Buffer.add_string buf " ";
	      Buffer.add_string buf msg#effective_request_uri;
	      let rh = msg # request_header `Effective in
	      if true (* not peer_is_proxy *) then (
		(* Setting Host even for proxies does not harm, and some
                   cheap proxy implementations require it
		 *)
		let host = msg # get_host() in
		let port = msg # get_port() in
		let host_str = host ^ (if port = 80 then "" 
				       else ":" ^ string_of_int port) in
		rh # update_field "Host" host_str;
	      );
	      Buffer.add_string buf " HTTP/1.1";
	      
	      if options.verbose_status then
		dlogr
		  (fun () ->
		     sprintf "FD %s - Call %d - HTTP request: %s"
		       io#socket_str (Oo.id m) (Buffer.contents buf));
	      
	      Buffer.add_string buf "\r\n";
	      
	      let ch = new Netchannels.output_buffer buf in
	      Mimestring.write_header ch rh#fields;
	      ch # close_out();
	      
	      if options.verbose_request_header then
		dump_header "HTTP request " rh#fields;
	      
	      io # send_this_string (Buffer.contents buf);
	      state <- Sending_hdr;

	  | Sending_body when io # nothing_to_send ->
	      ( try 
		  let m =
		    match body_length with
		      | None -> 
			  max_int
		      | Some l -> 
			  Int64.to_int
			    (min (Int64.of_int max_int) l) in
		  if m = 0 then raise End_of_file;
		  let n = io # send_by_buffer body#input in
		  ( match body_length with
		      | None -> ()
		      | Some l ->
			  body_length <- Some(Int64.sub l (Int64.of_int n))
		  );
		  if options.verbose_request_contents then
		    io # dump_send_buffer()
		with
		  | End_of_file ->
		      self # close_body();
		      if body_length = None then (
			io # close_out();
		      );
		      state <- Sent_request
		  | error ->
		      self # close_body();
		      raise error
	      )

	  | _ ->
	      ()
      );

      (* Send: *)

      if state = Sending_hdr || state = Sending_body then
	io # unix_write();
      
      (* Update state: *)

      if io # nothing_to_send then (
	match state with
	  | Sending_hdr ->
	      let have_body =
		msg # has_req_body &&
		(body_length = None || body_length <> Some 0L) in

	      if have_body then (
		if do_handshake then
		  state <- Handshake
		else
		  state <- Sending_body;
		self # open_body();
	      )
	      else
		state <- Sent_request

	  | _ ->
	      ()
      )

    method receive (io : io_buffer) =
      (* This method is invoked if some additional octets have been received
       * that 
       * - may begin this reply
       * - or continue this reply
       * - or make this reply complete
       * - or make this reply complete and begin another reply
       * It is checked if the reply is complete, and if so, it is recorded
       * and the octets forming the reply are removed from the input buffer.
       * Raises Bad_message if the message is malformed.
       *)
      try
	io # parse_response msg;
	match msg # status with
	  | `Unserved -> 
	      if state = Handshake && msg # private_api # continue then
		state <- Sending_body
	  | `Http_protocol_error e ->
	      state <- Complete_broken;
	      let e_msg =
		match e with
		  | Bad_message s -> ": Bad message: " ^ s
		  | _ -> ": " ^ Netexn.to_string e in
	      if options.verbose_status then
		dlogr
		  (fun () ->
		     sprintf "FD %s - Call %d - HTTP status: protocol error %s"
		       io#socket_str (Oo.id m) e_msg);
	  | _ -> 
	      if state = Sent_request then 
		state <- Complete 
	      else (
		if options.verbose_status then
		dlogr
		  (fun () ->
		     sprintf "FD %s - Call %d - HTTP status: Got response before request was completely sent"
		       io#socket_str (Oo.id m));
		state <- Complete_broken;
		io # close_out()
	      );
	      if options.verbose_status then
		msg # private_api # dump_status();
	      if options.verbose_response_header then
		msg # private_api # dump_response_header();
	      if options.verbose_response_contents then
		msg # private_api # dump_response_body();
      with
	| Garbage_received s ->
	    state <- Broken;
	    if options.verbose_status then
		dlogr
		  (fun () ->
		     sprintf "FD %s - Call %d - HTTP status: Garbage received: %s"
		       io#socket_str (Oo.id m) s);
	    
    method handshake_timeout() =
      if state = Handshake then
	state <- Sending_body

    method indicate_pipelining =
      (* Return 'true' iff the reply is HTTP/1.1 compliant and does not
       * contain the 'connection: close' header.
       *)
      let resp_header = msg # private_api # response_header in
      let proto_str = msg # private_api # response_proto in
      let b1 =
	try
	  let proto = Nethttp.protocol_of_string proto_str in
	  match proto with
	    | `Http((1,n),_) when n >= 1 ->  (* HTTP/1.1 <= proto < HTTP/2.0 *)
		let conn_list = 
		  try Nethttp.Header.get_connection resp_header 
		  with _ (* incl. syntax error *) -> [] in
		not (List.mem "close" conn_list)
	    | _ ->
		false
	with _ -> false in
      b1 && (
	try
	  let server = resp_header # field "Server" in
	  not (List.exists 
		 (fun re -> 
		    Netstring_pcre.string_match re server 0 <> None
		 ) 
		 pipeline_blacklist)
	with
	  | Not_found -> true  (* Nothing known ... Assume the best! *)
      )
	
    method indicate_sequential_persistency =
      (* Returns 'true' if persistency without pipelining
       * is possible.
       *)
      let resp_header = msg # private_api # response_header in
      let proto_str = msg # private_api # response_proto in
      let is_http_11 =
	try
	  let proto = Nethttp.protocol_of_string proto_str in
	  match proto with
	    | `Http((1,n),_) when n >= 1 ->  (* HTTP/1.1 <= proto < HTTP/2.0 *)
		true
	    | _ ->
		false
	with _ -> false in
      let proxy_connection =
	List.map String.lowercase
	  (resp_header # multiple_field "proxy-connection") in
      let connection = 
	try Nethttp.Header.get_connection resp_header 
	with _ (* incl. syntax error *) -> [] in
      let normal_persistency =
	not peer_is_proxy && 
	  (not (List.mem "close" connection)) &&
	  (is_http_11 || List.mem "keep-alive" connection) in
      let proxy_persistency =
	peer_is_proxy && List.mem "keep-alive" proxy_connection in
      normal_persistency || proxy_persistency
	

    method postprocess =
      self#cleanup();
      indicate_done msg;

    method message = msg
  
  end
;;


(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***           THE PROTOCOL STATE OF THE CONNECTION                 ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* This is the core of this HTTP implementation: The object controlling
 * the state of the connection to a server (or a proxy).
 *)

class connection the_esys 
                 (peer_host,peer_port)
                 peer_is_proxy
                 (proxy_user,proxy_password) 
                 auth_cache
                 conn_cache
		 conn_owner   (* i.e. the pipeline coerced to < > *)
		 counters     (* we count here only when connections close *)
		 the_options
  =
  object (self)
    val mutable esys = the_esys
    val mutable group = None

    (* timeout_groups: Unixqueue groups that must be deleted when the 
     * connection is closed. These groups represent timeout conditions.
     *)
    val mutable timeout_groups = []

    val mutable io = 
      new io_buffer the_options conn_cache Unix.stdin Down  (* dummy *)

    val mutable write_queue = Q.create()
    val mutable read_queue = Q.create()
      (* Invariant: write_queue is a suffix of read_queue *)

    (* polling_wr is 'true' iff the write side of the socket is currently
     * polled. (The read side is always polled.)
     *)

    val mutable polling_wr = false

    (* 'connecting' is 'true' if the 'connect' system call cannot connect
     * immediately, and continues to connect in the background.
     *)

    val mutable connecting = None

    (* 'connect_pause': seconds to wait until the next connection is tried.
     * There seems to be a problem with some operating systems (namely
     * Linux 2.2) which do not like immediate reconnects if the previous
     * connection did not end in a sane state.
     * A value of 0.5 seems to be sufficient for reconnects (see below).
     *)			      

    val mutable connect_pause = 0.0

    (* The following two variables control whether pipelining is enabled or
     * not. The problem is that it is unclear how old servers react if we
     * send to them several requests at once. The solution is that the first
     * "round" of requests and replies is done in a compatibility mode: After
     * the first request has been sent sending stops, and the client waits
     * until the reply is received. If the reply indicates HTTP/1.1 and does
     * not contain a "connection: close" header, all further requests and 
     * replies will be performed in pipelining mode, i.e. the requests are
     * sent independent of whether the replies of the previous requests have
     * been received or not.
     *
     * sending_first_message: 'true' means that the first request has not yet
     *    been completely sent to the server. 
     * done_first_message: 'true' means that the reply of the first request
     *    has been arrived.
     *)

    val mutable sending_first_message = true
    val mutable done_first_message = false

    (* 'inhibit_pipelining_byserver': becomes true if the server is able
     * to keep persistent connections but is not able to use pipelining.
     * (HTTP/1.0 "keep alive" connections)
     *)

    val mutable inhibit_pipelining_byserver = false
 
    (* 'connect_started': when the last 'connect' operation started.
     * 'connect_time': how many seconds the last 'connect' lasted. 
     *)

    val mutable connect_started = 0.0
    val mutable connect_time = 0.0

    (* If a connection does not lead to any type of response, the client 
     * simply re-opens a new connection and tries again. The following 
     * variables limit the number of attempts until the client gives up.
     *
     * 'totally_failed_connections': counts the number of failed connections
     *      without any response from the server.
     *)

    val mutable totally_failed_connections = 0

    (* 'close_connection' indicates that a HTTP/1.0 response was received or 
     * that a response contained a 'connection: close' header.
     *)

    val mutable close_connection = false

    (* Proxy authorization: If 'proxy_user' is non-empty, the variables
     * 'proxy_user' and 'proxy_password' are interpreted as user and
     * password for proxy authentication. More precisely, once the proxy
     * responds with status code 407, 'proxy_credentials_required' becomes
     * true, and all following requests will include the credentials identifying
     * the user (with the "basic" authentication method).
     * If the proxy responds again with code 407, this reaction will not
     * be handled again but will be visible to the outside.
     *)

    val mutable proxy_credentials_required = false
    val mutable proxy_cookie = ""


    (* 'critical_section': true if further additions to the queues and
     * every change of the internal state must be avoided. In this case,
     * additions are put into 'deferred_additions' and actually added later.
     *)

    val mutable deferred_additions = Q.create()
    val mutable critical_section = false

    val mutable options = the_options

    method length =
      (* Returns the number of open requests (requests without response) *)
      Q.length read_queue + Q.length deferred_additions


    method active =
      (* Whether something is to be done *)
      self # length > 0


    method add (m : http_call) f_done =
      (* add 'm' to the read and write queues *)
      ignore(self # add_msg false m f_done)


    method private add_msg ?(critical=critical_section) urgent m f_done =
      (* In contrast to 'add', the new message transporter is returned. *)

      (* Create the transport container for the message and add it to the
       * queues:
       *)
      let trans = new transmitter peer_is_proxy m f_done options in

      (* If proxy authentication is enabled, and it is already known that
       * the proxy demands authentication, add the necessary header fields: 
       * (See also the code and the comments in method 
       * 'postprocess_complete_message')
       *)
      if proxy_credentials_required  &&  peer_is_proxy then begin
	(* If the cookie is not yet computed, do this now: *)
	if proxy_cookie = "" then
	  proxy_cookie <- Netencoding.Base64.encode
	                     (proxy_user ^ ":" ^ proxy_password);
	(* Add the "proxy-authorization" header: *)
	trans # add_auth_header "proxy-authorization" ("Basic " ^ proxy_cookie)
      end;

      (* Check whether we can authenticate in advance: *)
      if m # private_api # auth_state = `None then (
	try
	  let sess = auth_cache # find_session_in_advance m in
	  m # private_api # set_auth_state (`In_advance sess)
	with
	    Not_found -> ()
      );

      (* Initialize [trans] for transmission: (Must be after in-advance
       * authentication)
       *)
      trans # init();

      if critical then begin
	(* This 'add_msg' invocation was done in a callback. This may interfer
	 * with other queue modifications.
	 *)
	Q.add trans deferred_additions;
      end
      else begin
	let n = Queue.length write_queue in
	if urgent && n > 0 then (
	  (* Insert [trans] at the ealierst possible place *)
	  Q.add_at (n-1) trans write_queue;
	  Q.add_at (n-1) trans read_queue;
	)
	else (
	  Q.add trans write_queue;
	  Q.add trans read_queue;
	);

        (* If there is currently no event group, we are detached from the
         * event system (i.e. not receiving events). Attach again.
         *)
	if group = None then self # attach_to_esys;

        (* Update the polling state. *)
	self # maintain_polling;
      end;

      trans


    method leave_critical_section : unit =
      (* Move the entries from 'deferred_additions' to the real queues. *)

      Q.iter
	(fun trans ->
	   Q.add trans write_queue;
	   Q.add trans read_queue;
	)
	deferred_additions;

      if Q.length deferred_additions > 0 then begin
	Q.clear deferred_additions;

        (* If there is currently no event group, we are detached from the
	 * event system (i.e. not receiving events). Attach again.
	 *)
	if group = None then self # attach_to_esys;

        (* Update the polling state. *)
	self # maintain_polling;

      end
 

    method private add_again m_trans =
      (* add 'm_trans' again to the read and write queues (as a consequence
       * of authorization)
       *)
      let m = m_trans # message in
      let f_done = m_trans # f_done in
      self # add_msg ~critical:true true m f_done


    method set_options p =
      options <- p


    method private attach_to_esys =
      assert (group = None);
      assert (io#socket_state = Down);

      let g = Unixqueue.new_group esys in
      group <- Some g;

      connecting <- None;
      sending_first_message <- true;
      done_first_message <- false;
      close_connection <- false;
      polling_wr <- false;
      critical_section <- false;

      (*
      inhibit_pipelining_byserver <- false;
       -- never reset once it is [true]! Reason: Failed connections are often
          caused by missing pipelining capabilities of the server (e.g. IIS)
       *)

      (* Now check how to get a socket connection: *)
      
      let timeout_value = options.connection_timeout in
      
      options.resolver
	esys
	peer_host
	(function 
	   | None ->
	       (* Very early error! *)
	       ( match group with
		     Some g ->
		       Unixqueue.clear esys g;         (* clean up esys *)
		       group <- None
		   | None -> ()
	       );
	       let err = Name_resolution_error peer_host in
	       self # cleanup_on_eof ~force_total_failure:true err;
	       counters.crashed_connections <-
		 counters.crashed_connections + 1;
	       self # leave_critical_section

	   | Some addr ->
	       ( let peer = Unix.ADDR_INET(addr, peer_port) in
		 try
		   let fd = conn_cache # find_inactive_connection peer in
		   connect_time <- 0.0;
		   conn_cache # set_connection_state fd (`Active conn_owner);
		   io <- new io_buffer options conn_cache fd Up_rw;
		   Unixqueue.add_resource esys g (Unixqueue.Wait_in fd, 
						  timeout_value);
		   Unixqueue.add_handler esys g (self # handler);
		 with
		     Not_found ->
		       (* No inactive connection found. Connect: *)
		       Unixqueue.once 
			 esys
			 g
			 connect_pause
			 (fun () -> 
			    let g1 = Unixqueue.new_group esys in
			    connect_pause <- 0.0;

			    if options.verbose_connection then
			      dlog ("HTTP connection: Connecting to " ^ 
					       peer_host);
			    let eng =
			      Uq_engines.connector 
				(`Socket(`Sock_inet(Unix.SOCK_STREAM,
						    addr,
						    peer_port),
					 Uq_engines.default_connect_options))
				esys in

			    connect_started <- Unix.gettimeofday();
			    connecting <- Some eng;

			    Uq_engines.when_state
			      ~is_done:(function
					  | `Socket(s,_) ->
					      Unixqueue.clear esys g1;
					      self # connected 
						peer_host g s peer
					  | _ -> assert false
				       )
			      ~is_error:(function
					   | err ->
					       Unixqueue.clear esys g1;
					       self # connect_error 
						 peer_host g err
					)
			      ~is_aborted:(fun () -> 
					     Unixqueue.clear esys g1;
					     connecting <- None
					  )
			      eng;
			    Unixqueue.once esys g1 timeout_value eng#abort
			 )
	       )
	)


    method private connected peer_host g s peer =
      Netlog.Debug.track_fd
	~owner:"Http_client"
	~descr:("HTTP to " ^ 
		  Netsys.string_of_sockaddr peer)
	s;

      let t1 = Unix.gettimeofday() in
      connect_time <- t1 -. connect_started;
      connecting <- None;

      if options.verbose_connection then
	dlog (sprintf "FD %Ld - HTTP connection to %s: Connected!"
		(Netsys.int64_of_file_descr s) peer_host);

      options.configure_socket s;

      conn_cache # set_connection_state s (`Active conn_owner);
      io <- new io_buffer options conn_cache s Up_rw;

      let timeout_value = options.connection_timeout in
      Unixqueue.add_resource esys g (Unixqueue.Wait_in s, timeout_value);
      Unixqueue.add_handler esys g (self # handler);
      self # maintain_polling;


    method private connect_error peer_host g err =
      connecting <- None;
      (* We cannot call abort_connection, because the
       * state is not fully initialized. So clean up everything
       * manually.
       *)
      if options.verbose_connection then (
	match err with
	  | Unix.Unix_error(e,_,_) ->
	      dlog(sprintf 
		     "HTTP connection: Cannot connect to %s: Unix error %s"
		     peer_host (Unix.error_message e))
	  | _ ->
	      dlog(sprintf 
		     "HTTP connection: Cannot connect to %s: Exception %s"
		     peer_host (Netexn.to_string err))
      );
      Unixqueue.clear esys g;         (* clean up esys *)
      group <- None;
      (* Continue with the regular error path: *)
      counters.crashed_connections <- 
	counters.crashed_connections + 1;
      self # cleanup_on_eof ~force_total_failure:true err;
      self # leave_critical_section
	

    method private maintain_polling =

      (* If one of the following conditions is true, we need not to poll
       * the write side of the socket:
       * - The write_queue is empty but the read_queue not
       * - The difference between the read and the write queue is too big
       * - We wait for the reply of the first request send to a server
       * - The write side of the socket is closed
       *)

      if group <> None && io # socket_state <> Down then (
	let timeout_value = options.connection_timeout in
	
	let actual_max_drift =
	  if inhibit_pipelining_byserver then 0 else
	    match options.synchronization with
		Pipeline drift -> min drift max_pipeline
	      | _              -> 0
		  (* 0: Allow no drift if pipelining is not allowed *)
	in
	
	let have_requests =
	  Q.length read_queue - Q.length write_queue <= actual_max_drift
	  && Q.length write_queue > 0 
	  && (done_first_message || sending_first_message) in

	(* Note: sending_first_message is true while the first request is sent.
         * done_first_message becomes true when the response arrived. In the
         * meantime both variables are false, and nothing is sent.
         *)
	
	let do_close_output =
	  Q.length read_queue = 0 && Q.length write_queue = 0 in
	(* If the socket is still up, we must send EOF. Normally, this 
         * should have happened already, just to be sure we check this here.
	 *)

	let waiting_for_handshake =
	  Q.length read_queue > 0 && 
	    (Q.peek read_queue) # state = Handshake in

	let do_poll_wr =
	  io#socket_state = Up_rw 
	  && ( ( have_requests && not waiting_for_handshake) ||
	       do_close_output ) in
	
	if do_poll_wr && not polling_wr then (
	  let g = match group with
	      Some x -> x
	    | None -> assert false
	  in
	  Unixqueue.add_resource esys g (Unixqueue.Wait_out io#socket, 
					 timeout_value
					);
	);
	
	if not do_poll_wr && polling_wr then (
	  let g = match group with
	      Some x -> x
	    | None -> assert false
	  in
	  Unixqueue.remove_resource esys g (Unixqueue.Wait_out io#socket);
	);
	
	polling_wr <- do_poll_wr;
      )

      (* On the other hand, all of the following conditions must be true
       * to enable polling again:
       * - The write_queue is not empty, or
       *   both the write_queue and the read_queue are empty !!!CHECK!!!
       * - The difference between the read and the write queue is small enough
       * - We send the first request to a server, or do pipelining
       * - The write side of the socket is open
       * - pe_waiting_for_status is false.
       * - waiting_for_status100 is not `Waiting
       *)



    method private clear_timeout g =
      Unixqueue.clear esys g;
      timeout_groups <- List.filter (fun x -> x <> g) timeout_groups;


    method private abort_connection =
      (* This method is called when the connection is in an errorneous
       * state, and the protocol handler decides to give it up.
       *)
      ( match connecting with
	  | None -> ()
	  | Some eng -> eng # abort()
      );
      if io # socket_state <> Down then (
	match group with
	  | Some g -> 
	       if options.verbose_connection then 
		 dlog (sprintf 
			 "FD %s - HTTP connection: Shutdown!"
			 io#socket_str);
	      begin match io#socket_state with
		  Down -> ()
		| Up_r -> 
		    io # close()
		| Up_rw ->
		    io # release()    (* hope this is right *)
	      end;
	      Unixqueue.clear esys g;
	      polling_wr <- false;
	      group <- None;
	      List.iter (Unixqueue.clear esys) timeout_groups;
	      timeout_groups <- []
          | None -> 
	      ()
      )


    method private handler _ _ ev =
      let g = match group with
	  Some x -> x
	| None -> 
	    (* This is possible while shutting down the socket *)
	    raise Equeue.Reject
      in
      match ev with
	  Unixqueue.Input_arrived (g0,fd0) ->
	    if g0 = g then self # handle_input else raise Equeue.Reject
	| Unixqueue.Output_readiness (g0,fd0) ->
	    if g0 = g then self # handle_output else raise Equeue.Reject
	| Unixqueue.Timeout (g0, _) ->
	    if g0 = g then self # handle_timeout else raise Equeue.Reject
	| _ ->
	    raise Equeue.Reject

    (**********************************************************************)
    (***                    THE TIMEOUT HANDLER                         ***)
    (**********************************************************************)

    method private handle_timeout =
      (* No network packet arrived for a period of time.
       * May only happen when a connection is already established
       *)
      io # set_timeout;
      self # handle_input;   (* timeout is similar to EOF *)


    (**********************************************************************)
    (***                     THE INPUT HANDLER                          ***)
    (**********************************************************************)

    method private handle_input =
      (* Data have arrived on the 'socket'. First we receive as much as we
       * can; then the data are interpreted as sequence of messages.
       * This method is also called when the connection times out. In
       * this case [io # timeout] is set.
       *)
      
      (* Ignore this event if the socket is Down (this may happen
       * if the input side is closed while there are still input
       * events in the queue):
       *)
      if io#socket_state = Down then
	raise Equeue.Reject;

      if options.verbose_connection then 
	dlogr 
	  (fun () ->
	     sprintf "FD %s - HTTP connection: Input event!" io#socket_str);

      let _g = match group with
	  Some x -> x
	| None -> assert false
      in

      let end_of_queueing = ref false in
      (* 'end_of_queueing': stores whether there was an EOF or not *)


      (************ ACCEPT THE RECEIVED OCTETS ************)

      if not io # timeout then
	begin try
	  io # unix_read();
	with
	    Unix.Unix_error(Unix.EAGAIN,_,_) ->
	      ();  (* Ignore! *)
	  | Unix.Unix_error(Unix.ECONNRESET,_,_) ->
	      if options.verbose_connection then
		dlogr
		  (fun () ->
		     sprintf
		       "FD %s - HTTP connection: Connection reset by peer"
		       io#socket_str
		  );
	      self # abort_connection;
	      end_of_queueing := true;
	      counters.crashed_connections <-
		counters.crashed_connections + 1;
	  | Unix.Unix_error(e,a,b) as err ->
	      if options.verbose_connection then
		dlogr 
		  (fun () ->
		     sprintf 
		       "FD %s - HTTP connection: Unix error %s"
		       io#socket_str (Unix.error_message e));
	      counters.crashed_connections <- 
		counters.crashed_connections + 1;
	      self # abort_connection;
	      end_of_queueing := true;
	      (* This exception is reported to the message currently being read
               * only.
               *)
	      if Q.length read_queue > 0 then (
		let t = Q.peek read_queue in
		t # message # private_api # set_error_exception err
	      )
	end;

      if io # in_eof || io # timeout then begin
	(* shutdown the connection, and clean up the event system: *)
	if io # in_eof then (
	  if options.verbose_connection	then
	    dlogr
	      (fun () ->
		 sprintf "FD %s - HTTP connection: Got EOF!" io#socket_str);
	  counters.server_eof_connections <- 
	    counters.server_eof_connections + 1
	);
	if io # timeout then (
	  if options.verbose_connection then 
	    dlogr
	      (fun () ->
		 sprintf "FD %s - HTTP connection: Connection timeout!" 
		 io#socket_str);
	  counters.timed_out_connections <- 
	    counters.timed_out_connections + 1;
	);
	self # abort_connection;
	end_of_queueing := true
      end;


      (************ TRY TO INTERPRET THE OCTETS AS MESSAGES **************)

      (* The [read_loop] parses the data in [io] and fills the 
       * [read_queue]. It may raise the excepions:
       * - Q.Empty: No message is expected, but there is data to
       *   parse (or the connection is at EOF)
       *)

      let rec read_loop() =
	if io # has_unparsed_data then begin
	  let this = Q.peek read_queue in    (* may raise Q.Empty *)

	  (*** Process 'this' ***)

	  this # receive io;
	  (* Parse received data for this message. Set this#state. *)

	  ( match this # state with
	      | Unprocessed ->
		  (* We get response data before we even tried to send
                   * the request. We allow this for pragmatic reasons.
                   *)
		  if options.verbose_connection then (
		    if io # has_unparsed_bytes then
		      dlogr
			(fun () ->
			   sprintf 
			     "FD %s - HTTP connection: \
                              Got data of spontaneous response" io#socket_str);
		    if io # in_eof then
		      dlogr
			(fun () ->
			   sprintf 
			     "FD %s - HTTP connection: premature EOF"
			     io#socket_str);
		  );
		  ()

	      | Sending_hdr ->
		  (* We get response data before we finished
                   * the request. We allow this for pragmatic reasons.
                   *)
		  assert (try Q.peek write_queue == this
			  with Q.Empty -> false);
		  if options.verbose_connection then (
		    if io # has_unparsed_bytes then
		      dlogr
			(fun () ->
			   sprintf
			     "FD %s - HTTP connection: \
                              Got data of premature response" io#socket_str);
		    if io # in_eof then
		      dlogr
			(fun () ->
			   sprintf
			     "FD %s - HTTP connection: premature EOF"
			     io#socket_str)
		  );
		  ()

	      | Handshake 
	      | Sending_body ->
		  (* This is perfectly legal: We have sent the request header,
                   * and the server may directly reply with whatever.
                   *)
		  assert (try Q.peek write_queue == this
			  with Q.Empty -> false);
		  ()

	      | Sent_request ->
		  (* Somewhere in the middle of the response... *)
		  ()

	      | Complete ->
		  (* The response has been received, and the connection
                   * is still usable
                   *)
		  ignore (Q.take read_queue);
		  if Q.length write_queue >= 1 &&
		    Q.peek write_queue == this then
		      ignore (Q.take write_queue);

		  (* Initialize for next request/response: *)

		  let able_to_pipeline = 
		    this # indicate_pipelining in
		  (* able_to_pipeline: is true if we assume that the server
		   * is HTTP/1.1-compliant and thus is able to manage pipelined
		   * connections.
		   * Update first 'close_connection': This variable becomes
		   * true if the connection is not assumed to be pipelined
		   * which forces that the CLIENT closes the connection 
		   * immediately (see the code in the output handler).
		   *)

		  let only_sequential_persistency =
		    not able_to_pipeline && 
		    this # indicate_sequential_persistency in
		  (* only_sequential_persistency: is true if the connection is
		   * HTTP/1.0, and the server indicated a persistent connection.
		   * In this case, pipelining is disabled.
		   *)

		  if only_sequential_persistency then begin
		    (* 'close_connection': not set.
		     *)
		    if options.verbose_connection then 
		      dlogr
			(fun () ->
			   sprintf "FD %s - HTTP connection: \
                               using HTTP/1.0 style persistent connection"
			     io#socket_str);
		    inhibit_pipelining_byserver <- true;
		  end
		  else
		    close_connection  <- close_connection  || not able_to_pipeline;

		  if close_connection || options.inhibit_persistency then (
		    self # abort_connection;
		    end_of_queueing := true;
		    (* We do not count this event - it is regular! *)
		  );

		  (* Remember that the first request/reply round is over: *)
		  done_first_message <- true;

		  (* postprocess 'this' (may raise exceptions! (callbacks)) *)
		  this # cleanup();
		  self # postprocess_complete_message this;

		  read_loop()

	      | Complete_broken ->
		  (* The response has been received, but the connection
                   * is in a problematic state, and must be aborted.
                   *)

		  if options.verbose_connection then
		    dlogr
		      (fun () ->
			 sprintf "FD %s - HTTP connection: \
                            Aborting the invalidated connection"
			   io#socket_str);

		  self # abort_connection;
		  end_of_queueing := true;
		  counters.crashed_connections <-
		    counters.crashed_connections + 1;

		  (* If the response has a proper status, we can remove it
                   * from the queue. Otherwise leave it on the queue, so
                   * it can be scheduled again.
                   *)
		  ( match this # message # status with
		      | `Unserved
		      | `Http_protocol_error _ ->
			  ()    (* leave it *)
		      | _ ->
			  (* Remove the message from the queues: We must 
                           * also check write_queue, because we have the
                           * invariant that write_queue is a suffix of
                           * read_queue.
			   *)
			  ignore (Q.take read_queue);
			  if Q.length write_queue >= 1 &&
			     Q.peek write_queue == this then
			       ignore (Q.take write_queue);
			  (* postprocess 'this' (Exceptions! (callbacks)) *)
			  this # cleanup();
			  self # postprocess_complete_message this;
		  );

		  (* Do not continue [read_loop] *)
	      | Broken ->
		  (* Simply stop here. *)
		  if options.verbose_connection then
		    dlogr
		      (fun () ->
			 sprintf "FD %s - HTTP connection: \
                            Aborting the errorneuos connection"
			   io#socket_str);
		  self # abort_connection;
		  end_of_queueing := true;
		  counters.crashed_connections <-
		    counters.crashed_connections + 1;
	  );
	end
      in         (* of "let rec read_loop() = " *)

      begin try
	(* Start the interpretation formulated in 'read_loop' and catch
	 * exceptions.
	 *)
	read_loop();
      with
	| Q.Empty ->
	    (* Either we hit EOF, or there are additional bytes but no
             * message is expected:
             *)
	    if io # has_unparsed_bytes then begin
	      (* No more responses expected, but still octets to interpret.
	       * This is a protocol error, too.
	       *)
	      if options.verbose_connection then
		dlogr
		  (fun () ->
		     sprintf "FD %s - HTTP connection: \
                              Extra octets -- aborting connection"
		       io#socket_str);
	      self # abort_connection;
	      end_of_queueing := true;
	      counters.crashed_connections <-
		counters.crashed_connections + 1;
	    end
      end;

      (************** CLOSE THE CONNECTION IF NECESSARY, ****************)
      (************** AND PREPARE RECONNECTION           ****************)

      if !end_of_queueing then begin
	assert (group = None);
	self # cleanup_on_eof (Bad_message "Incomplete or missing response")
      end;

      (*************** UPDATE THE POLLING STATE **************)

      (* If there were 'add' invocations from callbacks, move these additions
       * to the real queues now.
       *)
      self # leave_critical_section; 

      (* Update polling state: *)
      self # maintain_polling;


    (************** CLOSE THE CONNECTION IF NECESSARY, ****************)
    (************** AND PREPARE RECONNECTION           ****************)

    method private cleanup_on_eof ?(force_total_failure=false) err : unit =
      assert (group = None);

      (* If the socket is closed, it is necessary to check whether all 
       * requests sent so far have got their replies. 
       * Cases:
       * - write_queue and read_queue are empty: all is done.
       * - write_queue and read_queue have the same number of elements:
       *   reconnect
       * - else: some replies are missing. The requests are NOT tried again
       *   by default because the operations might not be idempotent. 
       *   The messages carry a flag with them indicating whether reconnection
       *   is allowed or not.
       * It is not possible that the write_queue is longer than the read_queue.
       *)

      (* First check if the connection was a total failure, i.e. if not
       * even a status line was received. In this case
       * increase the counter for totally failed connections. If the
       * counter exceeeds a limit, all messages on the queues are discarded.
       *)
	
      if force_total_failure || not io#status_seen then begin
	(* It was a total failure. *)
	totally_failed_connections <- totally_failed_connections + 1;
	if options.verbose_connection then
	  dlog "HTTP connection: total failure";
	
	if totally_failed_connections >= options.maximum_connection_failures then
	  begin
	    (* Set the error exception of all remaining messages, and
	     * clear the queues.
	     *)
	    self # clear_read_queue err;
	    self # clear_write_queue ();
	    
	    (* Reset state variables *)

	    totally_failed_connections <- 0;

	    (* Simply continue with the following piece of code, which will
	     * no nothing.
	     *)
	  end
	  
      end
      else (
	(* This connection breaks the series of total failures (if there 
	 * was such a series.
	 *)
	totally_failed_connections <- 0;

	(* Turn pipelining off to be on the safe side: *)
	inhibit_pipelining_byserver <- true;
      );

      (* Now examine the queues, and decide what to do. *)
	
      let n_read  = Q.length read_queue in
      let n_write = Q.length write_queue in
      if n_read > 0 || n_write > 0 then begin
	assert (n_read >= n_write);
	assert (group = None);
	
	connect_pause <- 1.0;
	
	(* ASSERTION:
	 *     read_queue  = q1 . q2
	 *     write_queue =      q2
	 * i.e. the extra elements of the read queue are at the beginning
	 * of the read queue.
	 * 
	 * PLAN: Make that
	 *     read_queue  = q2 . q1'
	 *     write_queue = q2 . q1'
	 * where q1' are the elements of q1 for which a reconnection is
	 * allowed. Reset the error exception for these elements.
	 * For the other elements (q1 \ q1') leave the error
	 * exception as it is, but change every No_exception into 
	 * No_reply.
         *
         * Note: q1 = empty is possible.
	 *)
	for i = 1 to n_read - n_write do
	  let m_trans = Q.take read_queue in
	  let m = m_trans # message in
	  m_trans # cleanup();  (* release resources *)
	  (* Increase error counter *)
	  let e = m # private_api # get_error_counter in
	  m # private_api # set_error_counter (e+1);
	  (* Test: Are reconnections allowed? *)
	  if e+1 <= options.maximum_message_errors then begin
	    let do_reconnect =
	      match m # get_reconnect_mode with
		| Send_again -> true
		| Request_fails -> false
		| Send_again_if_idem -> m # is_idempotent
		| Inquire f ->
		    (* Ask the function 'f' whether to reconnect or not: *)
		    begin 
		      try f m    (* returns true or false *)
		      with
			  (* The invocation of 'f' may raise an exception.
			   * It is printed to stderr (there is no other
			   * way to report it).
			   *)
			  x ->
			    dlog ("Exception caught in Http_client: " 
				  ^ (Netexn.to_string x));
			    false
		    end
	    in
	    if do_reconnect then begin
	      (* Ok, this request is tried again. *)
	      m_trans # init();               (* Reinit call *)
	      Q.add m_trans write_queue;  (* ... add it to the queue of open *)
	      Q.add m_trans read_queue;   (* ...to the unfinished requests. *)
	    end
	    else begin
	      (* Drop this message because reconnection is not allowed *)
	      (* If status of the message is unset, change it into No_reply. 
	       *)
	      ( match m # status with
		  | `Unserved ->
		      m # private_api # set_error_exception No_reply;
		  | _ -> 
		      ()
	      );
	      (* We do not reconnect, so postprocess now. *)
	      self # critical_postprocessing m_trans;
	    end
	  end
	  else (
	    (* drop this message because of too many errors *)
	    (* We do not reconnect, so postprocess now. *)
	    ( match m # status with
		| `Unserved ->
		    m # private_api # set_error_exception No_reply;
		| _ -> 
		    ()
	    );
	    self # critical_postprocessing m_trans;
	  )
	done;

	let n_read  = Q.length read_queue in
	let n_write = Q.length write_queue in
	assert (n_read = n_write);
	  
	(* It is now possible that n_read = n_write = 0, in which case
	 * no more is to do, or that there are remaining requests.
	 *)

	if n_write > 0 then begin
	  assert (Q.peek read_queue == Q.peek write_queue);
	  (* Force reinitialisation of all queue elements: *)
	  Q.iter (fun m -> m#init()) read_queue;
	  (* Process the queues: *)
	  self # attach_to_esys;
	end
	else (
	  if options.verbose_connection then
	    dlog "HTTP connection: Nothing left to do";
	  counters.failed_connections <- counters.failed_connections + 1
	)

      end else (
	(* n_read = 0 && n_write = 0 *)
	counters.successful_connections <- counters.successful_connections + 1
      )
      

    method clear_read_queue err =
      let q' = Q.create() in
      Q.transfer read_queue q';
      Q.iter 
	(fun m ->
	   m # cleanup();
	   ( match m # message # status with
	       | `Unserved ->
		   m # message # private_api # set_error_exception err;
	       | _ ->
		   ()
	   );
	)
	q';

      let do_callbacks() =
	Q.iter
	  (fun m ->
	     self # critical_postprocessing m;     (* because m is dropped *)
	  )
	  q'
      in

      if critical_section then
	let g = Unixqueue.new_group esys in
	Unixqueue.once esys g 0.0 do_callbacks
      else
	do_callbacks()


    method clear_write_queue() =
      Q.iter (fun m -> m # cleanup()) write_queue;
      Q.clear write_queue


    method critical_postprocessing m =
      critical_section <- true;
      try
	m # postprocess;
	critical_section <- false
      with
	  any ->
	    critical_section <- false;
	    raise any

    (**********************************************************************)
    (***                     THE OUTPUT HANDLER                         ***)
    (**********************************************************************)

    method private handle_output =

      (* Ignore this event if the socket is not Up_rw (this may happen
       * if the output side is closed while there are still output
       * events in the queue):
       *)
      if io#socket_state <> Up_rw then
	raise Equeue.Reject;

      if options.verbose_connection then 
	dlogr
	  (fun () ->
	     sprintf "FD %s - HTTP connection: Output event!"
	       io#socket_str);
      let _g = match group with
	  Some x -> x
	| None -> assert false
      in

      (* Leave the write_loop by exceptions:
       * - Q.Empty: No more request to write
       *)

      let rec write_loop () =
	let this = Q.peek write_queue in  (* or Q.Empty *)
	begin match this # state with
	    (Unprocessed | Sending_hdr | Sending_body) ->

	      let rh = this # message # request_header `Effective in

	      (* if no_persistency, set 'connection: close' *)

	      if options.inhibit_persistency && this # state = Unprocessed then
		rh # update_field "connection" "close";

	      (* If a "100 Continue" handshake is requested,
	       * transmit only the header of the request, and set
	       * waiting_for_status100. Furthermore, add a timeout
	       * handler that resets this variable after some time.
	       *)

	      let do_handshake =
		try
		  (* Proper parsing not required, because [Expect] is
                   * set by the user.
                   * [continue]: Already seen status 100
                   *)
		  not (this # message # private_api # continue) &&
		  String.lowercase(rh # field "expect") = "100-continue"
		with
		    Not_found -> false 
	      in

	      ( try
		  this # send io do_handshake;

		  (* If a handshake is requested: set the variable and the
		     * timer.
		   *)
		  if do_handshake  &&  this # state = Handshake
		  then (
		    let timeout = options.handshake_timeout in
		    let tm = Unixqueue.new_group esys in
		    timeout_groups <- tm :: timeout_groups;
		    Unixqueue.once
		      esys tm timeout
		      (fun () ->
			 ( try
			     let this' = Q.peek write_queue in
			     let still_waiting =
			       this == this' && this # state = Handshake in
			     if still_waiting then (
			       this # handshake_timeout();
			       self # maintain_polling;
			     );
			   with Q.Empty -> ()
			 );
			 self # clear_timeout tm;
		      );
		    if options.verbose_connection then
		      dlogr
			(fun () ->
			   sprintf "FD %s - HTTP connection: \
                                    waiting for 100 CONTINUE"
			     io#socket_str);
		  );
		  
		with
		    Unix.Unix_error(Unix.EPIPE,_,_) ->
		      (* Broken pipe: This can happen if the server decides
		       * to close the connection in the same moment when the
		       * client wants to send another request after the 
		       * connection has been idle for a period of time.
		       * Reaction: Close our side of the connection, too,
		       * and open a new connection. The current request will
		       * be silently resent because it is sure that the
		       * request was not received completely; it does not
		       * matter whether the request is idempotent or not.
		       *
		       * Broken pipes are very unlikely because this must 
		       * happen between the 'select' and 'write' system calls.
                       * The [read] syscall will get ECONNRESET, so we do
                       * nothing here.
		       *)
		      if options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: broken pipe"
			       io#socket_str);
		      ()

		  | Unix.Unix_error(Unix.EAGAIN,_,_) ->
		      ()
		      
		  | Unix.Unix_error(e,a,b) ->
		      if options.verbose_connection then
			dlogr
			  (fun () ->
			     sprintf "FD %s - HTTP connection: Unix error %s"
			       io#socket_str (Unix.error_message e));
		      (* Hope the input handler will do the right thing *)
		      ()
	      );
	      if sending_first_message && this # state = Sent_request then
		sending_first_message <- false;
	      if this # state = Sent_request then
		ignore (Q.take write_queue);
	      (* Do not call write_loop: Otherwise other handlers would
               * not have any chance to run
               *)
	  | Handshake ->
	      (* Should normally not happen. *)
	      ()
	  | (Sent_request | Complete | Complete_broken) ->
	      (* If Complete_broken, we also have Up_r. *)
	      ignore (Q.take write_queue);
	      if io # socket_state = Up_rw then
		write_loop()        (* continue with the next message *)
	  | Broken ->
	      (* This case is fully handled by the input handler *)
	      ()
	end
      in

      begin try
	write_loop()
      with
	  Q.Empty -> ()
      end;

      (* Release the connection if the queues have become empty *)

      if (Q.length write_queue = 0 && Q.length read_queue = 0) then (
	self # abort_connection;
	counters.successful_connections <- counters.successful_connections + 1
      );

      self # maintain_polling;


    (**********************************************************************)
    (***                     AUTHENTICATION                             ***)
    (**********************************************************************)

    method private postprocess_complete_message msg_trans =
      (* This method is invoked for every complete reply. The following
       * cases are handled at this stage of processing:
       *
       * - Status code 407: The proxy demands authorization. If the request
       *   already contains credentials for the proxy, this status code
       *   isn't handled here. Otherwise, the request is added again onto
       *   the queue, and a flag ('proxy_credentials_required') is set which 
       *   forces that the proxy credentials must be added for every new 
       *   request.
       *   Note: The necessary authentication header fields are added in
       *   the 'add' method.
       *
       * - Status code 401: XXX
       *
       * All other status codes are not handled here. Note that it is not
       * possible to react on the redirection codes because this object
       * represents the connection to exactly one server.
       * As default behaviour, the method 'postprocess' of the 
       * transmitter object is invoked; this method incorporates
       * all the intelligence not coded here.
       *)

      let default_action() =
	self # critical_postprocessing msg_trans;
      in

      let msg = msg_trans # message in
      let code = msg # private_api # response_code in
      let req_hdr = msg # request_header `Effective in
      let _resp_hdr = msg # private_api # response_header in
      match code with
	| 407 ->
	    (* --------- Proxy authorization required: ---------- *)
	    if
	      try 
		let _ = req_hdr # field "proxy-authorization" in
		if options.verbose_status then
		  dlogr
		    (fun () ->
		       sprintf "Call %d - HTTP auth: proxy authentication \
                                required again" (Oo.id msg_trans));
		false
	      with Not_found -> true
	    then begin
	      (* The request did not contain the "proxy-authorization" header.
	       * Enable proxy authentication if there is a user/password pair.
	       * Otherwise, do the default action.
	       *)
	      if peer_is_proxy then begin
		if options.verbose_status then
		  dlogr
		    (fun () ->
		       sprintf "Call %d - HTTP auth: proxy authentication \
                                required" (Oo.id msg_trans));
		if proxy_user <> "" then begin
		  (* We have a user/password pair: Enable proxy authentication
		   * and add 'msg' again to the queue of messages to be
		   * processed.
		   * Note: Following the HTTP protocol standard, the header
		   * of the response contains a 'proxy-authenticate' field
		   * with the authentication method and the realm. This is
		   * not supported; the method is always "basic" and realms
		   * are not distinguished.
		   *)
		  if not proxy_credentials_required then begin
		    proxy_credentials_required <- true;
		    proxy_cookie <- "";
		  end;
		  if options.verbose_status then
		    dlogr
		      (fun () ->
			 sprintf "Call %d - HTTP auth: proxy credentials \
                                  added" (Oo.id msg_trans));
		  ignore (self # add_again msg_trans);
		end
		else (
		  (* No user/password pair: We cannot authorize ourselves. *)
		  if options.verbose_status then
		    dlogr
		      (fun () ->
			 sprintf "Call %d - HTTP auth: user/password missing"
			   (Oo.id msg_trans));
		  default_action()
		)
	      end
	      else (
		(* The server was not contacted as a proxy, but it demanded
		 * proxy authorization. Regard this as an intrusion.
		 *)
		if options.verbose_status then
		  dlogr
		    (fun () ->
		       sprintf "Call %d - HTTP auth: intrusion by proxy \
                                authentication" (Oo.id msg_trans));
		default_action()
	      )
	    end
	    else 
	      (* The request did already contain "proxy-authenticate". *)
	      default_action()
	      
	| 401 ->
	    (* -------- Content server authorization required: ---------- *)
	    (* Unless a previous authentication attempt failed, just create
             * a new session, and repeat the request.
             *)
	    let try_again =
	      match msg # private_api # auth_state with
		| `None
		| `In_advance _ -> 
		    true
		| `In_reply sess ->
		    (* A previous attempt failed. *)
		    let continue = sess # invalidate msg in
		    if not continue then auth_cache # tell_failed_session sess;
		    continue
	    in
	    if try_again then (
	      match auth_cache # create_session msg with
		| None ->
		    (* Authentication failed immediately *)
		    default_action()
		| Some sess ->
		    (* Remember the new session: *)
		    msg # private_api # set_auth_state (`In_reply sess);
		    ignore(self # add_again msg_trans)
	    )
	    else
	      default_action()
	| n when n >= 200 && n < 400 ->
	    (* Check whether authentication was successful *)
	    ( match msg # private_api # auth_state with
		| `None -> ()
		| `In_advance _ -> ()
		| `In_reply session ->
		    auth_cache # tell_successful_session session
	    );
	    default_action()
	| _ ->
	    default_action()


    (**********************************************************************)
    (***                   RESET COMPLETELY                             ***)
    (**********************************************************************)

    (* [reset] is called by the pipeline object to shutdown any processing *)

    method reset =
      (* Close the socket; clear the Unixqueue *)
      self # abort_connection;

      (* Discard all messages on the queues. *)
      self # clear_read_queue No_reply;
      self # clear_write_queue();

      (* Reset state variables *)

      totally_failed_connections <- 0;

      (* If there were 'add' invocations from callbacks, delete these additions
       * now
       *)
      let q = Queue.create() in
      Queue.iter
	(fun trans ->
	   Queue.push trans q;
	   let m = trans # message in
	   match m # status with
	     | `Unserved ->
		 m # private_api # set_error_exception No_reply;
	     | _ -> 
		 ()
	)
	deferred_additions;
      Queue.clear deferred_additions;

      (* Because [reset] might be called from a critical section, defer
       * callbacks (see also [clear_read_queue])
       *)
      let g = Unixqueue.new_group esys in
      Unixqueue.once
	esys g 0.0
	(fun () ->
	   Queue.iter
	     (fun trans ->
		self # critical_postprocessing trans
	     )
	     q
	)

  end
;;


(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***                 THE PIPELINE INTERFACE                         ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* The following class, 'pipeline' defines the interface for the outside
 * world.
 *)

class pipeline =
  object (self)
    val mutable esys = Unixqueue.create_unix_event_system()

    val mutable proxy = ""
    val mutable proxy_port = 80
    val mutable proxy_auth = false
    val mutable proxy_user = ""
    val mutable proxy_password = ""

    val mutable no_proxy_for = []

    val mutable connections = Hashtbl.create 10

    val mutable open_messages = 0

    val mutable open_connections = 0

    val mutable options =
	    { (* Default values: *)
	      synchronization = Pipeline 5;
	      maximum_connection_failures = 2;
	      maximum_message_errors = 2;
	      inhibit_persistency = false;
	      connection_timeout = 300.0;
	      number_of_parallel_connections = 2;
	      maximum_redirections = 10;
	      handshake_timeout = 1.0;
	      resolver = sync_resolver;
	      configure_socket = (fun _ -> ());
	      verbose_status = true;
	      verbose_request_header = false;
	      verbose_response_header = false;
	      verbose_request_contents = false;
	      verbose_response_contents = false;
	      verbose_connection = true;
	    }

    val auth_cache = new auth_cache

    val mutable conn_cache = create_restrictive_cache()

    val counters =
      { new_connections = 0;
	timed_out_connections = 0;
	crashed_connections = 0;
	server_eof_connections = 0;
	successful_connections = 0;
	failed_connections = 0;
      }

    method event_system = esys

    method set_event_system new_esys =
      esys <- new_esys;
      Hashtbl.clear connections;

    method connection_cache = conn_cache

    method set_connection_cache cc = conn_cache <- cc

    method add_authentication_method ( m : auth_method ) =
      self # add_auth_handler (m # as_auth_handler)

    method add_auth_handler (h : auth_handler) =
      auth_cache # add_auth_handler h

    method set_proxy the_proxy the_port =
      (* proxy="": disables proxy *)
      proxy       <- the_proxy;
      proxy_port  <- the_port;
      ()

    method set_proxy_auth user passwd =
      (* sets 'user' and 'password' if demanded by a proxy *)
      proxy_auth     <- user <> "";
      proxy_user     <- user;
      proxy_password <- passwd


    method avoid_proxy_for l =
      (* l: List of hosts or domains *)
      no_proxy_for <- l


    method set_proxy_from_environment() =
      (* Is the environment variable "http_proxy" set? *)
      let http_proxy =
	try Sys.getenv "http_proxy" with Not_found -> "" in
      begin try
	let (user,password,host,port,path) = parse_http_url http_proxy in
	self # set_proxy (Netencoding.Url.decode host) port;
	match user with
	  Some user_s ->
	    begin match password with
	      Some password_s ->
		self # set_proxy_auth (Netencoding.Url.decode user_s) (Netencoding.Url.decode password_s)
	    | None -> ()
	    end
	| None -> ()
      with
	Not_found -> ()
      end;

      (* Is the environment variable "no_proxy" set? *)
      let no_proxy =
	try Sys.getenv "no_proxy" with Not_found -> "" in
      let no_proxy_list =
	split_words_by_commas no_proxy in
      self # avoid_proxy_for no_proxy_list;


    method reset () =
      (* deletes all pending requests; closes connection *)

      (* Reset all connections: *)
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		c # reset)
	     !cl)
	connections;

(*
   - well, this _should_ do nothing
      List.iter
	(fun fd -> 
	   conn_cache # forget_connection fd;
	   Unix.close fd;
	)
	(conn_cache # find_my_connections (self :> < >));
 *)

      self # reset_counters()
      

    method private add_with_callback_no_redirection (request : http_call) f_done =

      let host = request # get_host() in
      let port = request # get_port() in

      let use_proxy = 
	proxy <> "" &&
	request # proxy_enabled &&
	not
          (List.exists
             (fun dom ->
                if dom <> "" &
                   dom.[0] = '.' &
		   String.length host > String.length dom
                then
                  let ld = String.length dom in
                  String.lowercase(String.sub 
                                     host 
                                     (String.length host - ld) 
                                     ld)
                  = String.lowercase dom
                else
                  dom = host)
             no_proxy_for)
      in

      (* find out the effective peer: *)
      let peer, peer's_port =
	if use_proxy then
	  proxy, proxy_port
	else
	  host, port
      in
      
      (* Find out if there is already a connection to this peer: *)

      let conn = 
	let connlist = 
	  try
	    Hashtbl.find connections (peer, peer's_port, use_proxy) 
	  with
	      Not_found ->
		let new_connlist = ref [] in
		Hashtbl.add connections (peer, peer's_port, use_proxy) new_connlist;
		new_connlist
	in
	if List.length !connlist < options.number_of_parallel_connections 
	  then begin
	    let new_conn = new connection
	                     esys
	                     (peer, peer's_port)
			     use_proxy
	                     (proxy_user, proxy_password) 
			     auth_cache
			     conn_cache
			     (self :> < >)
			     counters
			     options in
	    open_connections <- open_connections + 1;
	    counters.new_connections <- counters.new_connections + 1;
	    connlist := new_conn :: !connlist;
	    new_conn
	  end 
	  else begin
	    (* Find the connection with the lowest number of queue entries: *)
	    List.fold_left
	      (fun best_conn a_conn ->
		 if a_conn # length < best_conn # length then
		   a_conn
		 else
		   best_conn)
	      (List.hd !connlist)
	      (List.tl !connlist)
	  end
      in
      
      (* Add the request to the queue of this connection: *)

      conn # add request 
	(fun m ->
	   (* Update 'open_connections', 'connections', and 'open_messages' *)
	   if not conn#active then begin
	     (* Check whether the connection is still in the [connections]
              * hash. It is possible that it is already deleted here.
              *)

	     let connlist =
	       try
		 Hashtbl.find connections (peer, peer's_port, use_proxy);
	       with
		   Not_found -> ref []
	     in
	     if List.exists (fun c -> c == conn) !connlist then (
	       open_connections <- open_connections - 1;
	       connlist := List.filter (fun c -> c != conn) !connlist;
	       if !connlist = [] then
		 Hashtbl.remove connections (peer, peer's_port, use_proxy);
	     )
	   end;
	   self # update_open_messages;
	   (* Do user action: *)
	   f_done m;
	);

      open_messages <- open_messages + 1;

    method private update_open_messages =
      open_messages <- 0;
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		if c # active then 
		  open_messages <- open_messages + (c # length))
	     !cl)
	connections;


    method add_with_callback (request : http_call) f_done =
      self # add_with_callback_no_redirection
	request
	(fun m ->
	   try
	     let (_,code,_) = m # dest_status() in
	     match code with
		 (301|302) ->
		   (* Simply repeat the request with a different URI *)
		   let do_redirection =
		     match m # get_redirect_mode with
			 Redirect -> true
		       | Do_not_redirect -> false
		       | Redirect_if_idem -> m # is_idempotent
		       | Redirect_inquire f ->
			   (* Ask the function 'f' whether to redirect: *)
			   begin 
			     try f m    (* returns true or false *)
			     with
			     (* The invocation of 'f' may raise an exception.
			      * It is printed to stderr (there is no other
			      * way to report it).
			      *)
				 x ->
				   dlog (sprintf 
					   "Call %d - \
                                            Exception caught in Http_client %s"
					   (Oo.id m)
					   (Netexn.to_string x));
				   false
			   end
		   in

		   if do_redirection then begin
		     (* Maybe the redirection limit is exceeded: *)
		     let rc = m # private_api # get_redir_counter in
		     if rc >= options.maximum_redirections
		     then (
		       m # private_api # set_error_exception Too_many_redirections;
		       f_done m
		     )
		     else (
		       let location = m # assoc_resp_header "location" in
		       (* or raise Not_found *)
		       let location' =
			 if location <> "" && location.[0] = '/' then
			   (* Problem: "Location" header must be absolute due
			    * to RFC specs. Now it is relative (with full path).
			    * Workaround: Interpret relative to old server
			    *)
			   let host = m # get_host() in
			   let port = m # get_port() in
			   let prefix =
			     "http://" ^ host ^ 
			       (if port = 80 then "" else ":" ^ string_of_int port)
			   in
			   prefix ^ location
			 else
			   location in
		       let ok =
			 try
			   m # set_request_uri location';
			   true
			 with
			   | _ ->
			       (* Bad URL! *)
			       let e = URL_syntax_error location' in
			       m # private_api # set_error_exception e;
			       false
		       in
		       if ok then (
			 m # private_api # set_redir_counter (rc+1);
			 m # private_api # set_error_counter 0;
			 self # add_with_callback m f_done
		       )
		       else f_done m
		     )
		   end
		     else f_done m

	       | _ -> 
		   f_done m
	     with
		 (Http_protocol _ | Not_found) -> 
		   f_done m
	)


    method add request =
      self # add_with_callback request (fun _ -> ())

    method run () =
      (* Runs through the requests in the pipeline. If a request can be
       * fulfilled, i.e. the server sends a response, the status of the
       * request is set and the request is removed from the pipeline.
       * If a request cannot be fulfilled (no response, bad response,
       * network error), an exception is raised and the request remains in
       * the pipeline (and is even the head of the pipeline).
       *
       * Exception Broken_connection:
       *  - The server has closed the connection before the full request
       *    could be sent. It is unclear if something happened or not.
       *    The application should figure out the current state and
       *    retry the request.
       *  - Also raised if only parts of the response have been received
       *    and the server closed the connection. This is the same problem.
       *    Note that this can only be detected if a "content-length" has
       *    been sent or "chunked encoding" was chosen. Should normally
       *    work for persistent connections.
       *  - NOT raised if the server forces a "broken pipe" (normally
       *    indicates a serious server problem). The intention of
       *    Broken_connection is that retrying the request will probably
       *    succeed.
       *)

	 Unixqueue.run esys

    method get_options = options

    method set_options p =
      options <- p;
      Hashtbl.iter
	(fun _ cl ->
	   List.iter
	     (fun c ->
		c # set_options p)
	     !cl)
	connections

    method number_of_open_messages = open_messages

    method number_of_open_connections = open_connections

    method connections =
      let l = ref [] in
      Hashtbl.iter
	(fun (peer, port, _) conns ->
	   List.iter
	     (fun conn ->
		l := (peer, port, conn#length) :: !l
	     )
	     !conns
	)
	connections;
      !l

    method cnt_new_connections = counters.new_connections

    method cnt_timed_out_connections = counters.timed_out_connections

    method cnt_crashed_connections = counters.crashed_connections

    method cnt_server_eof_connections = counters.server_eof_connections

    method cnt_successful_connections = counters.successful_connections

    method cnt_failed_connections = counters.failed_connections

    method reset_counters() =
      counters.new_connections <- 0;
      counters.timed_out_connections <- 0;
      counters.crashed_connections <- 0;
      counters.server_eof_connections <- 0;
      counters.successful_connections <- 0;
      counters.failed_connections <- 0;


  end
;;

(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(***                                                                ***)
(***                 THE CONVENIENCE MODULE                         ***)
(***                                                                ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)

(* This module is intended for beginners and for simple applications
 * of this HTTP implementation.
 *)

let omtp = !Netsys_oothr.provider
let mutex = omtp # create_mutex()

let serialize f arg =
  mutex # lock();
  try
    let r = f arg in
    mutex # unlock();
    r
  with
      err -> mutex # unlock(); raise err
;;
	

module Convenience =
  struct

    let http_trials = ref 3
    let http_user = ref ""
    let http_password = ref ""

    let this_user = ref ""
    let this_password = ref ""

    let conv_verbose = ref false

    class simple_key_handler : key_handler =
    object
      method inquire_key ~domain ~realms ~auth =
	if !this_user <> "" then
	  ( object
	      method user = !this_user
	      method password = !this_password
	      method realm = List.hd realms 
	      method domain = domain
	    end )
	else
	  if !http_user <> "" then
	    ( object
		method user = !http_user
		method password = !http_password
		method realm = List.hd realms 
		method domain = domain
	      end )
	  else
	    raise Not_found
      method invalidate_key (_ : key) = ()
    end


    let auth_basic =
      new basic_auth_handler 
	~enable_auth_in_advance:true (new simple_key_handler)

    let auth_digest =
      new basic_auth_handler 
	~enable_auth_in_advance:true (new simple_key_handler)

    let get_default_pipe() =

      let p = new pipeline in

      p # set_proxy_from_environment();

      (* Add authentication methods: *)
      p # add_auth_handler auth_basic;
      p # add_auth_handler auth_digest;

      (* That's it: *)
      p


    let pipe = lazy (get_default_pipe())
    let pipe_empty = ref true

    let request m =
      serialize
	(fun trials ->
	   let p = Lazy.force pipe in
	   if not !pipe_empty then
	     p # reset();
	   p # add_with_callback m (fun _ -> pipe_empty := true);
	   pipe_empty := false;
	   p # run()
	)

    let prepare_url =
      serialize
	(fun url ->
	   try
	     this_user := "";
    	     let (user,password,host,port,path) = parse_http_url url in
	     begin match user with
		 Some user_s ->
		   this_user := Netencoding.Url.decode user_s;
		   this_password := "";
		   begin match password with
		       Some password_s ->
			 this_password := Netencoding.Url.decode password_s
		     | None -> ()
		   end
	       | None -> ()
	     end;
	     "http://" ^ host ^ ":" ^ string_of_int port ^ path
	   with
	       Not_found -> 
		 url
	)

    let http_get_message url =
      let m = new get (prepare_url url) in
      request m !http_trials;
      m

    let http_get url = (http_get_message url) # get_resp_body()

    let http_head_message url =
      let m = new head (prepare_url url) in
      request m !http_trials;
      m

    let http_post_message url params =
      let m = new post (prepare_url url) params in
      request m 1;
      m

    let http_post url params = (http_post_message url params) # get_resp_body()

    let http_put_message url content =
      let m = new put (prepare_url url) content in
      request m !http_trials;
      m

    let http_put url content = (http_put_message url content) # get_resp_body()

    let http_delete_message url =
      let m = new delete (prepare_url url) in
      request m 1;
      m

    let http_delete url = (http_delete_message url) # get_resp_body()


    let http_verbose =
      serialize
	(fun () ->
	   let p = Lazy.force pipe in
	   let opt = p # get_options in
	   p # set_options
	     { opt with verbose_status = true;
	         verbose_request_header = true;
		 verbose_response_header = true;
		 verbose_request_contents = true;
		 verbose_response_contents = true;
		 verbose_connection = true 
             };
	   conv_verbose := true;
	   Debug.enable := true;
	)
  end

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml