(* $Id: http_client.ml 1417 2010-02-16 14:59:02Z gerd $
* ----------------------------------------------------------------------
*
*)
(* Reference documents:
* RFC 2068, 2616: HTTP 1.1
* RFC 2069, 2617: Digest Authentication
*)
module Debug = struct
let enable = ref false
end
let dlog = Netlog.Debug.mk_dlog "Http_client" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Http_client" Debug.enable
let () =
Netlog.Debug.register_module "Http_client" Debug.enable
open Printf
exception Bad_message of string;;
exception Http_error of (int * string);;
exception Http_protocol of exn;;
exception No_reply;;
exception Too_many_redirections;;
exception Name_resolution_error of string
exception URL_syntax_error of string
let () =
Netexn.register_printer
(Http_protocol Not_found)
(fun e ->
match e with
| Http_protocol e' ->
"Http_client.Http_protocol(" ^ Netexn.to_string e' ^ ")"
| _ ->
assert false
)
let() =
Netsys_signal.init()
type status =
[ `Unserved
| `Http_protocol_error of exn
| `Successful
| `Redirection
| `Client_error
| `Server_error
]
type response_body_storage =
[ `Memory
| `File of unit -> string
| `Body of unit -> Netmime.mime_body
]
type 'message_class how_to_reconnect =
Send_again
| Request_fails
| Inquire of ('message_class -> bool)
| Send_again_if_idem
;;
type 'message_class how_to_redirect =
Redirect
| Do_not_redirect
| Redirect_inquire of ('message_class -> bool)
| Redirect_if_idem
;;
type resolver =
Unixqueue.unix_event_system ->
string ->
(Unix.inet_addr option -> unit) ->
unit
;;
type counters =
{ mutable new_connections : int;
mutable timed_out_connections : int;
mutable crashed_connections: int;
mutable server_eof_connections : int;
mutable successful_connections : int;
mutable failed_connections : int;
}
let better_unix_error f arg =
try
f arg
with
Unix.Unix_error (e,syscall,param) ->
let error = Unix.error_message e in
if param = "" then
failwith error
else
failwith (param ^ ": " ^ error)
let rec syscall f =
(* Invoke system call, and handle EINTR *)
try
f()
with
Unix.Unix_error(Unix.EINTR,_,_) ->
(* "interrupted system call": A signal happened while the system
* blocked.
* Simply restart the call.
*)
syscall f
;;
let hex_digits = [| '0'; '1'; '2'; '3'; '4'; '5'; '6'; '7';
'8'; '9'; 'a'; 'b'; 'c'; 'd'; 'e'; 'f' |];;
let encode_hex s =
(* encode with lowercase hex digits *)
let l = String.length s in
let t = String.make (2*l) ' ' in
for n = 0 to l - 1 do
let x = Char.code s.[n] in
t.[2*n] <- hex_digits.( x lsr 4 );
t.[2*n+1] <- hex_digits.( x land 15 );
done;
t
;;
type synchronization =
| Sync
| Pipeline of int
;;
let max_pipeline = 8 ;;
let pipeline_blacklist =
(* Stolen from Mozilla *)
[ Netstring_pcre.regexp "Microsoft-IIS/";
Netstring_pcre.regexp "Netscape-Enterprise/3.";
]
;;
type http_options =
{ synchronization : synchronization;
maximum_connection_failures : int;
maximum_message_errors : int;
inhibit_persistency : bool;
connection_timeout : float;
number_of_parallel_connections : int;
maximum_redirections : int;
handshake_timeout : float;
resolver : resolver;
configure_socket : Unix.file_descr -> unit;
verbose_status : bool;
verbose_request_header : bool;
verbose_response_header : bool;
verbose_request_contents : bool;
verbose_response_contents : bool;
verbose_connection : bool;
}
;;
type header_kind = [ `Base | `Effective ]
let http_re =
Netstring_pcre.regexp
"^http://(([^/:@]+)(:([^/:@]+))?@)?([^/:@]+)(:([0-9]+))?([/?].*)?$"
let parse_http_url url =
match Netstring_pcre.string_match http_re url 0 with
| None ->
raise Not_found
| Some m ->
let user =
try Some(Netstring_pcre.matched_group m 2 url)
with Not_found -> None in
let password =
try Some(Netstring_pcre.matched_group m 4 url)
with Not_found -> None in
let host =
Netstring_pcre.matched_group m 5 url in
let port =
try int_of_string(Netstring_pcre.matched_group m 7 url)
with Not_found -> 80 in
let path =
try Netstring_pcre.matched_group m 8 url with Not_found -> "" in
(user,password,host,port,path)
;;
let comma_re = Netstring_pcre.regexp "[ \t\n\r]*,[ \t\n\r]*" ;;
let split_words_by_commas s =
Netstring_pcre.split comma_re s ;;
let space_re = Netstring_pcre.regexp "[ \t\n\r]+" ;;
let split_words s =
Netstring_pcre.split space_re s ;;
let sync_resolver esys name reply =
let addr =
try
Some (Unix.inet_addr_of_string name)
with
Failure _ ->
try
let h = Unix.gethostbyname name in
Some h.Unix.h_addr_list.(0)
with Not_found ->
None in
reply addr
;;
(**********************************************************************)
(*** BUFFERS ***)
(**********************************************************************)
(* Similar to Buffer, but may be accessed in a more efficient way. *)
module B = Netbuffer;;
type buffer_type = B.t;;
module Q =
struct
(* This queue allows one to insert elements. *)
type 'a t = 'a Queue.t
exception Empty = Queue.Empty
let create = Queue.create
let add = Queue.add
let push = add
let add_at n x q =
(* Add x behind the n-the last element of q, i.e.
* add_at 0 x q = Add behind the last element = add x q
* add_at 1 x q = Add before the last element
* ...
* This algorithm is usually called for n ~ length, and is quite
* efficient in this case.
*)
let l = Queue.length q in
let q' = Queue.create() in
for k = 1 to l-n do
Queue.add (Queue.take q) q'
done;
Queue.add x q';
Queue.transfer q q';
Queue.transfer q' q
let take = Queue.take
let pop = take
let peek = Queue.peek
let top = peek
let clear = Queue.clear
let copy = Queue.copy
let is_empty = Queue.is_empty
let length = Queue.length
let iter = Queue.iter
let transfer = Queue.transfer
(* fold: omitted *)
end
(**********************************************************************)
(*** THE HTTP CALL CONTAINER ***)
(**********************************************************************)
let dump_header prefix h =
List.iter
(fun (n,v) ->
dlog (prefix ^ n ^ ": " ^ v))
h
type 'session auth_state = (* 'session = auth_session, defined below *)
[ `None
(* Authentication has not yet been tried *)
| `In_advance of 'session
(* This session had been tried before a 401 response was seen *)
| `In_reply of 'session
(* This session was tried after a 401 response was seen *)
]
class type http_call =
object
method is_served : bool
method status : status
method request_method : string
method request_uri : string
method set_request_uri : string -> unit
method request_header : header_kind -> Netmime.mime_header
method set_request_header : Netmime.mime_header -> unit
method effective_request_uri : string
method request_body : Netmime.mime_body
method set_request_body : Netmime.mime_body -> unit
method response_status_code : int
method response_status_text : string
method response_status : Nethttp.http_status
method response_protocol : string
method response_header : Netmime.mime_header
method response_body : Netmime.mime_body
method response_body_storage : response_body_storage
method set_response_body_storage : response_body_storage -> unit
method get_reconnect_mode : http_call how_to_reconnect
method set_reconnect_mode : http_call how_to_reconnect -> unit
method get_redirect_mode : http_call how_to_redirect
method set_redirect_mode : http_call how_to_redirect -> unit
method proxy_enabled : bool
method set_proxy_enabled : bool -> unit
method no_proxy : unit -> unit
method is_proxy_allowed : unit -> bool
method empty_path_replacement : string
method is_idempotent : bool
method has_req_body : bool
method has_resp_body : bool
method same_call : unit -> http_call
method get_req_method : unit -> string
method get_host : unit -> string
method get_port : unit -> int
method get_path : unit -> string
method get_uri : unit -> string
method get_req_body : unit -> string
method get_req_header : unit -> (string * string) list
method assoc_req_header : string -> string
method assoc_multi_req_header : string -> string list
method set_req_header : string -> string -> unit
method get_resp_header : unit -> (string * string) list
method assoc_resp_header : string -> string
method assoc_multi_resp_header : string -> string list
method get_resp_body : unit -> string
method dest_status : unit -> (string * int * string)
method private_api : private_api
end
and private_api =
object
method get_error_counter : int
method set_error_counter : int -> unit
method set_error_exception : exn -> unit
method get_redir_counter : int
method set_redir_counter : int -> unit
method continue : bool
method set_continue : unit -> unit
method auth_state : auth_session auth_state
method set_auth_state : auth_session auth_state -> unit
method set_effective_request_uri : string -> unit
method prepare_transmission : unit -> unit
(* Initializes the `Effective request header, and creates the response
* objects. Sets the status to [`Unserved]. The error and redirection
* counters are not changed.
*)
method set_response_status : int -> string -> string -> unit
(* code, text, proto *)
method set_response_header : Netmime.mime_header -> unit
(* Sets the response header *)
method response_body_open_wr : unit -> Netchannels.out_obj_channel
(* Opens the response body for writing *)
method finish : unit -> unit
(* The call is finished. The [status] is set to [`Successful],
* [`Redirection], [`Client_error], or [`Server_error] depending on
* the response.
*)
method cleanup : unit -> unit
(* Release resources *)
method response_code : int
method response_proto : string
method response_header : Netmime.mime_header
method dump_status : unit -> unit
method dump_response_header : unit -> unit
method dump_response_body : unit -> unit
end
and auth_session =
object
method auth_scheme : string
method auth_domain : string list
method auth_realm : string
method auth_user : string
method auth_in_advance : bool
method authenticate : http_call -> (string * string) list
method invalidate : http_call -> bool
end
class type virtual gen_call =
object
inherit http_call
method private virtual fixup_request : unit -> unit
method private virtual def_request_method : string
method private virtual def_empty_path_replacement : string
method private virtual def_is_idempotent : bool
method private virtual def_has_req_body : bool
method private virtual def_has_resp_body : bool
end
class virtual generic_call : gen_call =
object(self)
val mutable status = (`Unserved : status)
val mutable req_uri = ""
val mutable req_host = "" (* derived from req_uri *)
val mutable req_port = 80 (* derived from req_uri *)
val mutable req_path = "" (* derived from req_uri *)
val mutable req_base_header = new Netmime.basic_mime_header []
val mutable req_work_header = new Netmime.basic_mime_header []
val mutable req_body = new Netmime.memory_mime_body ""
val mutable eff_req_uri = ""
val mutable resp_code = 0
val mutable resp_text = ""
val mutable resp_proto = ""
val mutable resp_header = new Netmime.basic_mime_header []
val mutable resp_body = new Netmime.memory_mime_body ""
val mutable resp_body_storage = (`Memory : response_body_storage)
val mutable reconn_mode = Send_again_if_idem
val mutable redir_mode = Redirect_if_idem
val mutable proxy_enabled = true
val mutable private_api = None
val mutable continue = false (* Whether 100-Continue has been seen *)
val mutable error_counter = 0
val mutable redir_counter = 0
val mutable resp_ch = None
val mutable auth_state = `None
method private def_private_api fixup_request =
( object(pself)
(* We cannot call methods of [self] due to a bug in O'Caml 3.08
* (present until 3.08.3, fixed in 3.08.4 and 3.09)
* PR#3576, PR#3678
*)
method private close_resp_ch() =
match resp_ch with
| None -> ()
| Some ch -> ch # close_out(); resp_ch <- None
method get_error_counter = error_counter
method set_error_counter n = error_counter <- n
method get_redir_counter = redir_counter
method set_redir_counter n = redir_counter <- n
method continue = continue
method set_continue() = continue <- true
method auth_state = auth_state
method set_auth_state s = auth_state <- s
method set_effective_request_uri s = eff_req_uri <- s
method prepare_transmission () =
pself # close_resp_ch();
status <- `Unserved;
req_work_header <- new Netmime.basic_mime_header
req_base_header#fields;
resp_code <- 0;
resp_text <- "";
resp_proto <- "";
resp_header <- new Netmime.basic_mime_header [];
resp_body <- ( match resp_body_storage with
| `Memory ->
new Netmime.memory_mime_body ""
| `File f ->
let name = f() in
new Netmime.file_mime_body name
| `Body f ->
f ()
);
fixup_request();
( try ignore(req_work_header # field "Date")
with Not_found ->
Nethttp.Header.set_date req_work_header (Unix.time())
);
( try ignore(req_work_header # field "User-agent")
with Not_found ->
req_work_header # update_field "User-agent" "Netclient"
);
method set_response_status code text proto =
resp_code <- code;
resp_text <- text;
resp_proto <- proto
method set_response_header h =
resp_header <- h
method response_body_open_wr() =
pself # close_resp_ch();
let ch = resp_body # open_value_wr() in
resp_ch <- Some ch;
ch
method response_code = resp_code
method response_proto = resp_proto
method response_header = resp_header
method finish() =
assert(resp_code <> 0);
pself # close_resp_ch();
status <- (if resp_code >= 200 && resp_code <= 299 then
`Successful
else if resp_code >= 300 && resp_code <= 399 then
`Redirection
else if resp_code >= 400 && resp_code <= 499 then
`Client_error
else
`Server_error);
method set_error_exception x =
pself # close_resp_ch();
match x with
Http_error(_,_) -> assert false (* not allowed *)
| _ ->
let do_increment =
match status with
| `Http_protocol_error _ -> false
| _ -> true in
if do_increment then
error_counter <- error_counter + 1;
status <- (`Http_protocol_error x);
method cleanup () =
pself # close_resp_ch()
method dump_status () =
dlog (sprintf
"Call %d - HTTP response code: %d (%s)"
(Oo.id self) resp_code resp_text);
dlog (sprintf
"Call %d - HTTP response protocol: %s"
(Oo.id self) resp_proto);
method dump_response_header () =
dump_header
(sprintf
"Call %d - HTTP response "
(Oo.id self))
(resp_header # fields)
method dump_response_body () =
dlog (sprintf "Call %d - HTTP response body:\n%s\n"
(Oo.id self) resp_body#value)
end
)
(* Call state *)
method is_served = status <> `Unserved
method status = status
(* Accessing the request message (new style) *)
method request_method = self # def_request_method
method request_uri = req_uri
method set_request_uri uri =
try
let u, pw, h, pt, ph = parse_http_url uri in
if u <> None || pw <> None then
failwith "Http_client, set_request_uri: URL must not contain user or password";
req_uri <- uri;
req_host <- h;
req_port <- pt;
req_path <- ph
with
Not_found ->
failwith "Http_client: bad URL"
method request_header (k:header_kind) =
match k with
| `Base -> req_base_header
| `Effective -> req_work_header
method set_request_header h =
req_base_header <- h
method request_body = req_body
method set_request_body b = req_body <- b
method effective_request_uri = eff_req_uri
(* Accessing the response message (new style) *)
method private check_response() =
match status with
| `Unserved ->
failwith "Http_client: HTTP call is unserved, no response yet"
| `Http_protocol_error e ->
raise (Http_protocol e)
| _ -> ()
method response_status_code = self#check_response(); resp_code
method response_status_text = self#check_response(); resp_text
method response_status =
self#check_response(); Nethttp.http_status_of_int resp_code
method response_protocol = self#check_response(); resp_proto
method response_header = self#check_response(); resp_header
method response_body = self#check_response(); resp_body
(* Options *)
method response_body_storage = resp_body_storage
method set_response_body_storage s = resp_body_storage <- s
method get_reconnect_mode = reconn_mode
method set_reconnect_mode m = reconn_mode <- m
method get_redirect_mode = redir_mode
method set_redirect_mode m = redir_mode <- m
method proxy_enabled = proxy_enabled
method set_proxy_enabled e = proxy_enabled <- e
method no_proxy() = proxy_enabled <- false
method is_proxy_allowed() = proxy_enabled
(* Method characteristics *)
method empty_path_replacement = self # def_empty_path_replacement
method is_idempotent = self # def_is_idempotent
method has_req_body = self # def_has_req_body
method has_resp_body = self # def_has_resp_body
(* Repeating calls *)
method same_call() =
let same =
{< status = `Unserved;
resp_header = new Netmime.basic_mime_header [];
resp_body = new Netmime.memory_mime_body "";
eff_req_uri = "";
private_api = None;
error_counter = 0;
redir_counter = 0;
continue = false;
resp_ch = None;
auth_state = `None
>} in
(same : #http_call :> http_call)
(* Old style access methods *)
method get_req_method() = self # def_request_method
method get_host() = req_host
method get_port() = req_port
method get_path() = req_path
method get_uri() = req_uri
method get_req_body() = req_body # value
method get_req_header () =
List.map (fun (n,v) -> (String.lowercase n, v)) req_base_header#fields
method assoc_req_header n =
req_base_header # field n
method assoc_multi_req_header n =
req_base_header # multiple_field n
method set_req_header n v =
req_base_header # update_field n v
method get_resp_header() =
self#check_response();
List.map (fun (n,v) -> (String.lowercase n, v)) resp_header#fields
method assoc_resp_header n =
self#check_response();
resp_header # field n
method assoc_multi_resp_header n =
self#check_response();
resp_header # multiple_field n
method get_resp_body() =
self#check_response();
if resp_code >= 200 && resp_code <= 299 then
resp_body # value
else
raise(Http_error(resp_code, resp_body#value))
method dest_status() =
self#check_response();
(resp_proto, resp_code, resp_text)
(* Private *)
method private_api =
match private_api with
| None ->
let api = self # def_private_api self#fixup_request in
private_api <- Some api;
api
| Some api ->
api
(* Virtual methods *)
method virtual private fixup_request : unit -> unit
method virtual private def_request_method : string
method virtual private def_empty_path_replacement : string
method virtual private def_is_idempotent : bool
method virtual private def_has_req_body : bool
method virtual private def_has_resp_body : bool
end
(******** SUBCLASSES IMPLEMENTING HTTP METHODS ************************)
class get_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "GET"
method private def_is_idempotent = true
method private def_has_req_body = false
method private def_has_resp_body = true
method private def_empty_path_replacement = "/"
end
class head_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "HEAD"
method private def_is_idempotent = true
method private def_has_req_body = false
method private def_has_resp_body = false
method private def_empty_path_replacement = "/"
end
class trace_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "TRACE"
method private def_is_idempotent = false
method private def_has_req_body = false
method private def_has_resp_body = true
method private def_empty_path_replacement = "/"
end
class options_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "OPTIONS"
method private def_is_idempotent = false
method private def_has_req_body = true
method private def_has_resp_body = true
method private def_empty_path_replacement = "*"
end
class post_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "POST"
method private def_is_idempotent = false
method private def_has_req_body = true
method private def_has_resp_body = true
method private def_empty_path_replacement = "/"
end
class put_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "PUT"
method private def_is_idempotent = false
method private def_has_req_body = true
method private def_has_resp_body = true
method private def_empty_path_replacement = "/"
end
class delete_call =
object(self)
inherit generic_call
method private fixup_request() = ()
method private def_request_method = "DELETE"
method private def_is_idempotent = true
method private def_has_req_body = false
method private def_has_resp_body = true
method private def_empty_path_replacement = "/"
end
class get the_query =
object (self)
inherit get_call
initializer
self # set_request_uri the_query
end
class trace the_query max_hops =
object (self)
inherit trace_call
initializer
self # set_request_uri the_query
method private fixup_request() =
(self # request_header `Effective) # update_field
"max-forwards" (string_of_int max_hops)
end
class options the_query =
object (self)
inherit options_call
initializer
self # set_request_uri the_query
end
class head the_query =
object (self)
inherit head_call
initializer
self # set_request_uri the_query
end
class post query params =
object (self)
inherit post_call
initializer
self # set_request_uri query
method private fixup_request() =
let rh = self # request_header `Effective in
rh # update_field "Content-type" "application/x-www-form-urlencoded";
let l = List.map (fun (n,v) -> n ^ "=" ^ Netencoding.Url.encode v) params in
let s = String.concat "&" l in
rh # update_field "Content-length" (string_of_int (String.length s));
self # request_body # set_value s
end
;;
class post_raw the_query s =
object (self)
inherit post_call
initializer
self # set_request_uri the_query
method private fixup_request() =
let rh = self # request_header `Effective in
self # request_body # set_value s;
rh # update_field "Content-length" (string_of_int (String.length s));
end
;;
class put the_query s =
object (self)
inherit put_call
initializer
self # set_request_uri the_query
method private fixup_request() =
let rh = self # request_header `Effective in
self # request_body # set_value s;
rh # update_field "Content-length" (string_of_int (String.length s));
end
;;
class delete the_query =
object (self)
inherit delete_call
initializer
self # set_request_uri the_query
end
;;
(**********************************************************************)
(*** AUTHENTICATION METHODS ***)
(**********************************************************************)
class type key =
object
method user : string
method password : string
method realm : string
method domain : string list
end
class type key_handler =
object
method inquire_key :
domain:string list -> realms:string list -> auth:string -> key
method invalidate_key : key -> unit
end
class key_ring ?uplink () =
object(self)
val mutable keys = (Hashtbl.create 10 :
(string list * string, key * bool) Hashtbl.t)
(* Maps (domain, realm) to (key, from_uplink) *)
method inquire_key ~domain ~realms ~(auth:string) =
let l =
List.flatten
(List.map
(fun realm ->
try
[ Hashtbl.find keys (domain, realm) ]
with
Not_found -> [])
realms) in
match l with
| (key,_) :: _ ->
key
| [] ->
( match uplink with
| None -> raise Not_found
| Some h ->
let key = h # inquire_key ~domain ~realms ~auth in
(* or Not_found *)
Hashtbl.replace keys (key#domain, key#realm) (key,true);
key
)
method invalidate_key (key : key) =
let domain = key # domain in
let realm = key # realm in
try
let (_, from_uplink) = Hashtbl.find keys (domain, realm) in
Hashtbl.remove keys (domain, realm);
if from_uplink then
( match uplink with
| None -> assert false
| Some h -> h # invalidate_key key
)
with
Not_found -> ()
method clear () =
Hashtbl.clear keys
method add_key key =
let domain = key # domain in
let realm = key # realm in
Hashtbl.replace keys (domain, realm) (key, false)
method keys =
Hashtbl.fold
(fun _ (key,_) acc -> key :: acc)
keys
[]
end
class type auth_handler =
object
method create_session : http_call -> auth_session option
end
exception Not_applicable
let get_domain_uri call =
let h = call # get_host() in
let p = call # get_port() in
"http://" ^ h ^ ":" ^ string_of_int p ^ "/"
class basic_auth_session enable_auth_in_advance
key_handler init_call
: auth_session =
let domain = [ get_domain_uri init_call ] in
let basic_realms =
(* Return all "Basic" realms in www-authenticate, or raise Not_applicable *)
let auth_list =
try
Nethttp.Header.get_www_authenticate init_call#response_header
with
| Not_found -> raise Not_applicable
| Nethttp.Bad_header_field _ -> raise Not_applicable in
let basic_auth_list =
List.filter
(fun (scheme,_) -> String.lowercase scheme = "basic") auth_list in
let basic_auth_realm_list =
List.flatten
(List.map
(fun (_,params) ->
try
let (_,realm) =
List.find (fun (pname,_) ->
String.lowercase pname = "realm") params in
[realm]
with
Not_found -> [])
basic_auth_list) in
if basic_auth_realm_list = [] then
raise Not_applicable
else
basic_auth_realm_list
in
let key =
(* Return the selected key, or raise Not_applicable *)
try
key_handler # inquire_key ~domain ~realms:basic_realms ~auth:"basic"
with
Not_found -> raise Not_applicable
in
(* Check the key: *)
let () =
if not (List.mem key#realm basic_realms) then raise Not_applicable;
if key#domain <> domain then raise Not_applicable;
in
object(self)
method auth_scheme = "basic"
method auth_domain = domain
method auth_realm = key # realm
method auth_user = key # user
method auth_in_advance = enable_auth_in_advance
method authenticate call =
let basic_cookie =
Netencoding.Base64.encode
(key#user ^ ":" ^ key#password) in
let cred = "Basic " ^ basic_cookie in
[ "Authorization", cred ]
method invalidate call =
key_handler # invalidate_key key;
false
end
class basic_auth_handler ?(enable_auth_in_advance=false)
(key_handler : #key_handler)
: auth_handler =
object(self)
method create_session call =
try
Some(new basic_auth_session enable_auth_in_advance key_handler call)
with
Not_applicable ->
None
end
let contains_auth v =
List.mem "auth" (split_words v)
class digest_auth_session enable_auth_in_advance
key_handler init_call
: auth_session =
let normalize_domain s =
if s <> "" && s.[0] = '/' then
let host = init_call#get_host() in
let port = init_call#get_port() in
"http://" ^ host ^ ":" ^ string_of_int port ^ s
else
( try
let (_,_,host,port,path) = parse_http_url s in
"http://" ^ host ^ ":" ^ string_of_int port ^ path
with
Not_found -> s
)
in
let digest_request =
(* Return the "Digest" params in www-authenticate, or raise Not_applicable *)
let auth_list =
try
Nethttp.Header.get_www_authenticate init_call#response_header
with
| Not_found -> raise Not_applicable
| Nethttp.Bad_header_field _ -> raise Not_applicable in
let digest_auth_list =
List.filter
(fun (scheme,params) ->
String.lowercase scheme = "digest"
&& List.mem_assoc "realm" params
&& (try List.mem (List.assoc "algorithm" params) ["MD5";"MD5-sess"]
with Not_found -> true)
&& (try contains_auth (List.assoc "qop" params)
with Not_found -> true)
&& List.mem_assoc "nonce" params
)
auth_list in
match digest_auth_list with
| [] ->
raise Not_applicable
| (_,params) :: _ ->
(* Restriction: only the first request can be processed *)
params
in
let domain =
try
List.map
normalize_domain
(split_words (List.assoc "domain" digest_request))
with
Not_found -> [ get_domain_uri init_call ] in
let realm =
try List.assoc "realm" digest_request
with Not_found -> assert false in
let key =
(* Return the selected key, or raise Not_applicable *)
try
key_handler # inquire_key ~domain ~realms:[realm] ~auth:"digest"
with
Not_found -> raise Not_applicable
in
(* Check the key: *)
let () =
if key#realm <> realm then raise Not_applicable;
if key#domain <> domain then raise Not_applicable;
in
let algorithm =
try List.assoc "algorithm" digest_request
with Not_found -> "MD5" in
let qop =
if List.mem_assoc "qop" digest_request then "auth" else "" in
(* "" = RFC 2069 mode *)
let nonce =
try List.assoc "nonce" digest_request
with Not_found -> assert false in
object(self)
val mutable cnonce_init = string_of_float (Unix.time())
val mutable cnonce_incr = 0
val mutable nc = 0
val mutable opaque = None
val mutable a1 = None
method private first_cnonce =
Digest.to_hex
(Digest.string (cnonce_init ^ ":0"))
method private next_cnonce() =
let cnonce =
Digest.to_hex
(Digest.string (cnonce_init ^ ":" ^ string_of_int cnonce_incr)) in
cnonce_incr <- cnonce_incr + 1;
cnonce
method private next_nc() =
let r = nc in
nc <- nc + 1;
r
method private fn_h data =
encode_hex (Digest.string data)
method private fn_kd secret data =
encode_hex (Digest.string (secret ^ ":" ^ data))
method private a1 =
match a1 with
| Some v -> v
| None ->
let v =
match algorithm with
| "MD5" ->
key#user ^ ":" ^ realm ^ ":" ^ key#password
| "MD5-sess" ->
(self # fn_h
(key#user ^ ":" ^ realm ^ ":" ^ key#password)) ^
":" ^ nonce ^ ":" ^ self#first_cnonce
| _ ->
assert false
in
a1 <- Some v;
v
method private a2 call =
let meth = call # request_method in
let uri = call # effective_request_uri in
meth ^ ":" ^ uri
method authenticate call =
let cnonce = self#next_cnonce() in
let nc = self # next_nc() in
let digest =
match qop with
| "auth" ->
self#fn_kd
(self#fn_h self#a1)
(nonce ^ ":" ^ (Printf.sprintf "%08x" nc) ^ ":" ^ cnonce ^ ":" ^
"auth:" ^ (self#fn_h (self#a2 call)))
| "" ->
self#fn_kd
(self#fn_h self#a1)
(nonce ^ ":" ^ (self#fn_h (self#a2 call)))
| _ ->
assert false (* such digests are not accepted *)
in
let creds =
Printf.sprintf
"Digest username=\"%s\",realm=\"%s\",nonce=\"%s\",uri=\"%s\",response=\"%s\",algorithm=%s,cnonce=\"%s\",%s%snc=%08d"
key#user
realm
nonce
call#effective_request_uri
digest
algorithm
cnonce
(match opaque with
| None -> ""
| Some s -> "opaque=\"" ^ s ^ "\",")
(match qop with
| "" -> ""
| "auth" -> "qop=auth,"
| _ -> assert false)
nc in
[ "Authorization", creds ]
method auth_scheme = "digest"
method auth_domain = domain
method auth_realm = key # realm
method auth_user = key # user
method auth_in_advance = enable_auth_in_advance
method invalidate call =
(* Check if the [stale] flag is set for our nonce: *)
let is_stale =
try
let auth_list =
Nethttp.Header.get_www_authenticate call#response_header in
List.exists
(fun (scheme,params) ->
String.lowercase scheme = "digest"
&& (try List.assoc "realm" params = realm
with Not_found -> false)
&& (try List.assoc "nonce" params = nonce
with Not_found -> false)
&& (try String.lowercase (List.assoc "stale" params) = "true"
with Not_found -> false)
)
auth_list
with
| Not_found -> false (* No www-authenticate header *)
| Nethttp.Bad_header_field _ -> false in
is_stale || (
key_handler # invalidate_key key;
false
)
end
class digest_auth_handler ?(enable_auth_in_advance=false)
(key_handler : #key_handler)
: auth_handler =
object(self)
method create_session call =
try
Some(new digest_auth_session enable_auth_in_advance key_handler call)
with
Not_applicable ->
None
end
let only_http =
let http_syntax = Hashtbl.find Neturl.common_url_syntax "http" in
let schemes = Hashtbl.create 1 in
Hashtbl.add schemes "http" http_syntax;
schemes
let parse_http_neturl s =
(* Parses the http URL s *)
Neturl.parse_url
~schemes:only_http
~accept_8bits:true
~enable_fragment:true
s
let norm_neturl neturl =
(* Returns the neturl as normalized string (esp. normalized % sequences) *)
let neturl' =
Neturl.make_url
~encoded:false
~scheme:(Neturl.url_scheme neturl)
~host:(Neturl.url_host neturl)
~port:(try Neturl.url_port neturl with Not_found -> 80)
~path:(try Neturl.url_path neturl with Not_found -> [])
?query:(try Some(Neturl.url_query neturl) with Not_found -> None)
?fragment:(try Some(Neturl.url_fragment neturl) with Not_found -> None)
(Neturl.url_syntax_of_url neturl) in
Neturl.string_of_url neturl'
let prefixes_of_neturl s_url =
(* Returns a list of all legal prefixes of the absolute URI s.
* The prefixes are in Neturl format.
*)
let rec rev_path_prefixes rev_path =
match rev_path with
| [] -> []
| [ "" ] -> [ rev_path; [] ]
| [ ""; "" ] -> assert false
| [ _; "" ] -> rev_path :: rev_path_prefixes [ "" ]
| "" :: rev_path' ->
if rev_path' = [ ""; "" ] then
rev_path :: rev_path_prefixes [ "" ]
else
rev_path :: rev_path_prefixes rev_path'
| _ :: rev_path' ->
rev_path :: (rev_path_prefixes ("" :: rev_path'))
in
let path_prefixes path =
List.map List.rev (rev_path_prefixes (List.rev path)) in
let s_nofrag_url = Neturl.remove_from_url ~fragment:true s_url in
let s_noquery_url = Neturl.remove_from_url ~query:true s_nofrag_url in
let path = Neturl.url_path s_noquery_url in
s_url :: s_nofrag_url ::
(List.map
(fun prefix -> Neturl.modify_url ~path:prefix s_noquery_url
)
(path_prefixes path))
class auth_cache =
object(self)
val mutable auth_handlers = []
val sessions = Hashtbl.create 10
(* Only sessions that can be used for authentication in advance.
* The hash table maps domain URIs to sessions.
*)
method add_auth_handler (h : auth_handler) =
auth_handlers <- auth_handlers @ [h]
method create_session (call : http_call) =
(* Create a new session after a 401 reply *)
let rec find l =
match l with
| [] -> None
| h :: l' ->
( match h # create_session call with
| None ->
find l'
| Some s ->
Some s
)
in
find auth_handlers
method tell_successful_session (sess : auth_session) =
(* Called by [postprocess_complete_message] when authentication was
* successful. If enabled, [sess] can be used for authentication
* in advance.
*)
if sess # auth_in_advance then (
List.iter
(fun dom_uri ->
try
let dom_uri' = parse_http_neturl dom_uri in
let dom_uri'' = norm_neturl dom_uri' in
Hashtbl.replace sessions dom_uri'' sess
with
| Neturl.Malformed_URL -> ()
)
sess#auth_domain
)
method tell_failed_session (sess : auth_session) =
(* Called by [postprocess_complete_message] when authentication
* failed
*)
List.iter
(fun dom_uri ->
try
let dom_uri' = parse_http_neturl dom_uri in
let dom_uri'' = norm_neturl dom_uri' in
Hashtbl.remove sessions dom_uri''
with
| Neturl.Malformed_URL -> ()
)
sess#auth_domain;
method find_session_in_advance (call : http_call) =
(* Find a session suitable for authentication in advance *)
let uri = call # request_uri in
(* We are not only looking for [uri], but also for all prefixes of [uri] *)
try
let uri' = parse_http_neturl uri in
let prefixes = prefixes_of_neturl uri' in
let prefix =
List.find (* or Not_found *)
(fun prefix ->
let s = norm_neturl prefix in
Hashtbl.mem sessions s
)
prefixes in
Hashtbl.find sessions (norm_neturl prefix)
with
| Neturl.Malformed_URL ->
raise Not_found
end
(* Backwards compatibility: *)
class key_backing_store =
object(self)
val db = (Hashtbl.create 10 : (string, (string*string)) Hashtbl.t)
method set_realm realm user password =
Hashtbl.replace db realm (user,password)
method inquire_key ~domain ~realms ~(auth:string) =
let realm = List.find (fun realm -> Hashtbl.mem db realm) realms in
let (user, password) = Hashtbl.find db realm in
( object
method user = user
method password = password
method realm = realm
method domain = (domain : string list)
end
)
method invalidate_key (_ : key) = ()
end
class auth_method name (mk_auth_handler : key_ring -> auth_handler) =
let key_bs =
new key_backing_store in
let key_ring =
new key_ring ~uplink:key_bs () in
let auth_handler =
mk_auth_handler key_ring in
object(self)
method name = (name : string)
method set_realm realm user password =
key_bs # set_realm realm user password
method as_auth_handler =
auth_handler
end
class basic_auth_method =
auth_method
"basic"
(fun kr ->
new basic_auth_handler ~enable_auth_in_advance:true kr)
class digest_auth_method =
auth_method
"digest"
(fun kr ->
new digest_auth_handler ~enable_auth_in_advance:true kr)
(**********************************************************************)
(*** THE CONNECTION CACHE ***)
(**********************************************************************)
type conn_state = [ `Inactive | `Active of < > ]
(** A TCP connection may be either [`Inactive], i.e. it is not used
* by any pipeline, or [`Active obj], i.e. it is in use by the pipeline
* [obj].
*)
class type connection_cache =
object
method get_connection_state : Unix.file_descr -> conn_state
(** Returns the state of the file descriptor *)
method set_connection_state : Unix.file_descr -> conn_state -> unit
(** Sets the state of the file descriptor. It is allowed that
* inactive descriptors are simply closed and forgotten.
*)
method find_inactive_connection : Unix.sockaddr -> Unix.file_descr
(** Returns an inactive connection to the passed peer, or raise
* [Not_found].
*)
method find_my_connections : < > -> Unix.file_descr list
(** Returns all active connections owned by the object *)
method close_connection : Unix.file_descr -> unit
(** Deletes the connection from the cache, and closes it *)
method close_all : unit -> unit
(** Closes all descriptors known to the cache *)
end
let close_connection_cache conn_cache =
conn_cache # close_all()
class restrictive_cache : connection_cache =
object(self)
val mutable active_conns = Hashtbl.create 10
val mutable rev_active_conns = Hashtbl.create 10
method get_connection_state fd =
`Active(Hashtbl.find active_conns fd)
method set_connection_state fd state =
match state with
| `Active owner ->
Hashtbl.replace active_conns fd owner;
let fd_list =
try Hashtbl.find rev_active_conns owner with Not_found -> [] in
if not (List.mem fd fd_list) then
Hashtbl.replace rev_active_conns owner (fd :: fd_list);
| `Inactive ->
self # close_connection fd
method find_inactive_connection _ = raise Not_found
method find_my_connections owner =
try
Hashtbl.find rev_active_conns owner
with
Not_found -> []
method close_connection fd =
( try
let owner = Hashtbl.find active_conns fd in
let fd_list =
try Hashtbl.find rev_active_conns owner with Not_found -> [] in
let fd_list' =
List.filter (fun fd' -> fd' <> fd) fd_list in
Hashtbl.replace rev_active_conns owner fd_list'
with
Not_found -> ()
);
Hashtbl.remove active_conns fd;
Netlog.Debug.release_fd fd;
Unix.close fd
method close_all () =
Hashtbl.iter
(fun fd _ ->
Netlog.Debug.release_fd fd;
Unix.close fd)
active_conns;
Hashtbl.clear active_conns;
Hashtbl.clear rev_active_conns
end
let create_restrictive_cache() = new restrictive_cache
class aggressive_cache : connection_cache =
object(self)
val mutable active_conns = Hashtbl.create 10
(* maps file_descr to owner *)
val mutable rev_active_conns = Hashtbl.create 10
(* maps owner to file_descr list *)
val mutable inactive_conns = Hashtbl.create 10
(* maps file_descr to sockaddr *)
val mutable rev_inactive_conns = Hashtbl.create 10
(* maps sockaddr to file_descr list *)
method get_connection_state fd =
try
`Active(Hashtbl.find active_conns fd)
with
Not_found ->
if Hashtbl.mem inactive_conns fd then
`Inactive
else
raise Not_found
method set_connection_state fd state =
match state with
| `Active owner ->
self # forget_inactive_connection fd;
Hashtbl.replace active_conns fd owner;
let fd_list =
try Hashtbl.find rev_active_conns owner with Not_found -> [] in
if not (List.mem fd fd_list) then
Hashtbl.replace rev_active_conns owner (fd :: fd_list);
| `Inactive ->
( try
let peer = Netsys.getpeername fd in
self # forget_active_connection fd;
Hashtbl.replace inactive_conns fd peer;
let fd_list =
try Hashtbl.find rev_inactive_conns peer with Not_found -> [] in
if not (List.mem fd fd_list) then
Hashtbl.replace rev_inactive_conns peer (fd :: fd_list)
with
| Unix.Unix_error(Unix.ENOTCONN,_,_) ->
self # close_connection fd
)
method find_inactive_connection peer =
match Hashtbl.find rev_inactive_conns peer with
| [] -> raise Not_found
| fd :: _ -> fd
method find_my_connections owner =
try
Hashtbl.find rev_active_conns owner
with
Not_found -> []
method private forget_active_connection fd =
( try
let owner = Hashtbl.find active_conns fd in
let fd_list =
try Hashtbl.find rev_active_conns owner with Not_found -> [] in
let fd_list' =
List.filter (fun fd' -> fd' <> fd) fd_list in
if fd_list' <> [] then
Hashtbl.replace rev_active_conns owner fd_list'
else
Hashtbl.remove rev_active_conns owner
with
Not_found -> ()
);
Hashtbl.remove active_conns fd;
method private forget_inactive_connection fd =
try
let peer = Hashtbl.find inactive_conns fd in
(* Do not use getpeername! fd might be disconnected in the meantime! *)
let fd_list =
try Hashtbl.find rev_inactive_conns peer with Not_found -> [] in
let fd_list' =
List.filter (fun fd' -> fd' <> fd) fd_list in
if fd_list' <> [] then
Hashtbl.replace rev_inactive_conns peer fd_list'
else
Hashtbl.remove rev_inactive_conns peer;
Hashtbl.remove inactive_conns fd;
with
| Not_found ->
()
method close_connection fd =
self # forget_active_connection fd;
self # forget_inactive_connection fd;
Netlog.Debug.release_fd fd;
Unix.close fd
method close_all () =
Hashtbl.iter
(fun fd _ ->
Netlog.Debug.release_fd fd;
Unix.close fd)
active_conns;
Hashtbl.clear active_conns;
Hashtbl.clear rev_active_conns;
Hashtbl.iter
(fun fd _ ->
Netlog.Debug.release_fd fd;
Unix.close fd)
inactive_conns;
Hashtbl.clear inactive_conns;
Hashtbl.clear rev_inactive_conns
end
let create_aggressive_cache() = new aggressive_cache
(**********************************************************************)
(*** THE I/O BUFFER ***)
(**********************************************************************)
(* io_buffer performs the socket I/O.
*
* TODO: COMMENT OUT OF DATE
* The input side is a queue of octets which can be filled by
* a Unix.read call at its end, and from which octets can be removed
* at its beginning ("consuming octets").
* There is also functionality to remove 1XX responses at the beginning
* of the buffer, and to interpret the beginning of the buffer as HTTP
* status line.
* The idea of the buffer is that octets can be added at the end of the
* buffer while the beginning of the buffer is interpreted as the beginning
* of the next message. Once enough octets have been added that the message
* is complete, it is removed (consumed) from the buffer, and the possibly
* remaining octets are the beginning of the following message.
*)
exception Garbage_received of string
(* This exception is raised by [parse_response] when a protocol error
* occurred before the response status line has been completely received.
* Such errors are not transferred to the http_call.
*)
let line_end_re = Netstring_pcre.regexp "[^\\x00\r\n]+\r?\n";;
let line_end2_re = Netstring_pcre.regexp "([^\\x00\r\n]+\r?\n)*\r?\n";;
let status_re = Netstring_pcre.regexp "^([^ \t]+)[ \t]+([0-9][0-9][0-9])([ \t]+([^\r\n]*))?\r?\n$" ;;
let chunk_re = Netstring_pcre.regexp "[ \t]*([0-9a-fA-F]+)[ \t]*(;[^\r\n\\x00]*)?\r?\n" ;;
let crlf_re = Netstring_pcre.regexp "\r?\n";;
type sockstate =
Down
| Up_rw
| Up_r
;;
class io_buffer options conn_cache fd fd_state =
object (self)
(****************************** SOCKET ********************************)
val mutable socket_state = fd_state
method socket_state = socket_state
method socket =
match socket_state with
| Down -> failwith "Socket is down"
| _ -> fd
method socket_str =
try
Int64.to_string (Netsys.int64_of_file_descr self # socket)
with _ -> "n/a"
method close_out() =
match socket_state with
| Down -> ()
| Up_rw ->
if options.verbose_connection then
dlogr (fun () ->
sprintf "FD %Ld - HTTP connection: Sending EOF!"
(Netsys.int64_of_file_descr fd));
Unix.shutdown fd Unix.SHUTDOWN_SEND;
socket_state <- Up_r
| Up_r ->
()
method close() =
match socket_state with
| Down -> ()
| _ ->
if options.verbose_connection then
dlogr (fun () ->
sprintf "FD %Ld - HTTP connection: Closing socket!"
(Netsys.int64_of_file_descr fd));
socket_state <- Down;
conn_cache # close_connection fd
method release() =
(* Give socket back to cache: *)
match socket_state with
| Down -> ()
| Up_r -> self # close()
| Up_rw ->
conn_cache # set_connection_state fd `Inactive;
socket_state <- Down (* our view *)
(****************************** INPUT ********************************)
val mutable in_buf = B.create 8192
val mutable in_eof = false
val mutable in_eof_parsed = false (* whether eof has been seen by parser *)
val mutable in_pos = 0 (* parse position *)
val mutable status_seen = false
val mutable timeout = false
val mutable restart_parser = (fun _ -> assert false)
initializer
restart_parser <- self # parse_status_line;
(* Ensure the 0-byte invariant holds (see comment in unix_read): *)
let b = B.unsafe_buffer in_buf in
if B.length in_buf < String.length b then
b.[B.length in_buf] <- '\000'
method unix_read () =
if not in_eof && not timeout then (
let n =
B.add_inplace
in_buf
(fun io_buf pos len ->
syscall (fun() -> Unix.recv fd io_buf pos len []))
in
if n = 0 then
in_eof <- true;
(* None of the regexps matches the null byte. So this byte can be
* used as guard that indicates the end of the buffer. We only
* have to make sure it is always there.
*)
let b = B.unsafe_buffer in_buf in
if B.length in_buf < String.length b then
b.[B.length in_buf] <- '\000';
)
method timeout =
timeout
method set_timeout =
timeout <- true
method has_unparsed_data =
(* whether data has been received that must go through the parser *)
(in_eof && not in_eof_parsed) || (in_pos < B.length in_buf)
method has_unparsed_bytes =
(* whether there are unparsed data consisting of at least one byte *)
in_pos < B.length in_buf
method status_seen =
(* Whether at least one status line (incl. status 1XX) has been seen *)
status_seen
method in_eof =
(* end of stream indicator *)
in_eof
method parse_response (call : http_call) =
(* Parses the [in_buf] buffer and puts the parsed data into
* [call].
*
* This function must only be called if at least one additional
* byte has been received. The function returns when
* - not enough bytes are available, or
* - the response has been completely parsed. This latter case
* can be recognized because the [call] is finished then.
*)
try
restart_parser call;
assert(in_pos >= 0 && in_pos <= B.length in_buf);
(* Many versions of Netbuffer do not check arguments of [delete]
* correctly.
*)
B.delete in_buf 0 in_pos;
in_pos <- 0;
in_eof_parsed <- in_eof;
(* Ensure the 0-byte invariant holds (see comment in unix_read): *)
let b = B.unsafe_buffer in_buf in
if B.length in_buf < String.length b then
b.[B.length in_buf] <- '\000'
with
| Bad_message _ as err ->
call # private_api # set_error_exception err;
assert(status_seen) (* otherwise cleanup code will not work *)
(* Garbage_received not caught! *)
method private parse_status_line call =
(* Parses the status line. If 1XX: do XXX *)
restart_parser <- self # parse_status_line;
let b = B.unsafe_buffer in_buf in
match Netstring_pcre.string_match line_end_re b in_pos with
| None ->
if B.length in_buf - in_pos > 500 then
raise (Garbage_received "Status line too long");
if in_eof then
raise (Garbage_received "EOF where status line expected")
| Some m ->
in_pos <- Netstring_pcre.match_end m;
assert(in_pos <= B.length in_buf);
let s = Netstring_pcre.matched_string m b in
( match Netstring_pcre.string_match status_re s 0 with
| None ->
raise (Garbage_received "Bad status line")
| Some m ->
let proto = Netstring_pcre.matched_group m 1 s in
let code_str = Netstring_pcre.matched_group m 2 s in
let code = int_of_string code_str in
let text =
try Netstring_pcre.matched_group m 4 s
with Not_found -> "" in
if code < 100 || code > 599 then
raise (Garbage_received "Bad status code");
status_seen <- code >= 200;
call # private_api # set_response_status code text proto;
self # parse_header code call
)
method private parse_header code call =
(* Parses the HTTP header following the status line *)
restart_parser <- self # parse_header code;
let b = B.unsafe_buffer in_buf in
match Netstring_pcre.string_match line_end2_re b in_pos with
| None ->
if B.length in_buf - in_pos > 100000 then (
if code < 200 then
raise (Garbage_received "Response header too long")
else
raise (Bad_message "Response header too long")
);
if in_eof then (
if code < 200 then
raise (Garbage_received "EOF where response header expected")
else
raise (Bad_message "EOF where response header expected")
)
| Some m ->
let start = in_pos in
in_pos <- Netstring_pcre.match_end m;
assert(in_pos <= B.length in_buf);
let _ch =
new Netchannels.input_string ~pos:start ~len:(in_pos-start) b in
let header_l, real_end_pos =
try
Mimestring.scan_header
~downcase:false ~unfold:true ~strip:true b
~start_pos:start ~end_pos:in_pos
with
| Failure _ ->
if code < 200 then
raise (Garbage_received "Bad response header")
else
raise (Bad_message "Bad response header")
in
assert(real_end_pos = in_pos);
let header = new Netmime.basic_mime_header header_l in
if code >= 100 && code <= 199 then (
call # private_api # set_continue();
self # parse_status_line call
)
else (
call # private_api # set_response_header header;
self # parse_body code header call
)
method private parse_body code header call =
(* Parses the whole HTTP body *)
restart_parser <- self # parse_body code header;
(* First determine whether a body is expected: *)
let have_body =
call # has_resp_body && code <> 204 && code <> 304 in
if have_body then (
let ch = call # private_api # response_body_open_wr() in
(* Check if we have chunked encoding: *)
let is_chunked =
try header # field "Transfer-encoding" <> "identity"
with Not_found -> false in
if is_chunked then
self # parse_chunked_body code header ch call
else (
let length_opt =
try
let l = Int64.of_string (header # field "Content-Length") in
if l < 0L then raise (Bad_message "Bad Content-Length field");
Some l
with
| Failure _ -> raise (Bad_message "Bad Content-Length field")
| Not_found -> None in
self # parse_plain_body code header ch length_opt call
)
)
else
self # parse_end call
method private parse_plain_body code header ch length_opt call =
(* Parses a non-chunked HTTP body. If length_opt=None, the message
* is terminated by EOF. If length_opt=Some len, the message has
* this length.
*)
restart_parser <- self # parse_plain_body code header ch length_opt;
let av_len = B.length in_buf - in_pos in
match length_opt with
| None ->
ch # really_output (B.unsafe_buffer in_buf) in_pos av_len;
in_pos <- in_pos + av_len;
assert(in_pos <= B.length in_buf);
if in_eof then self # parse_end call
| Some len ->
let l = Int64.to_int (min (Int64.of_int av_len) len) in
ch # really_output (B.unsafe_buffer in_buf) in_pos l;
in_pos <- in_pos + l;
assert(in_pos <= B.length in_buf);
let len' = Int64.sub len (Int64.of_int l) in
if len' > 0L then (
if in_eof then
raise (Bad_message "Response body too short");
restart_parser <-
self # parse_plain_body code header ch (Some len');
) else (
self # parse_end call
)
method private parse_chunked_body code header ch call =
(* Parses a chunked HTTP body *)
restart_parser <- self # parse_chunked_body code header ch;
let b = B.unsafe_buffer in_buf in
match Netstring_pcre.string_match chunk_re b in_pos with
| None ->
if B.length in_buf - in_pos > 5000 then
raise (Bad_message "Cannot parse chunk of response body");
if in_eof then
raise (Bad_message "EOF where next response chunk expected")
| Some m ->
in_pos <- Netstring_pcre.match_end m;
assert(in_pos <= B.length in_buf);
let hex_len = Netstring_pcre.matched_group m 1 b in
let len =
try Int64.of_string ("0x" ^ hex_len)
with Failure _ ->
raise (Bad_message "Chunk too large") in
if len = 0L then
self # parse_trailer code header ch call
else
self # parse_chunk_data code header ch len call
method private parse_chunk_data code header ch len call =
(* Parses the chunk data following the chunk size field *)
restart_parser <- self # parse_chunk_data code header ch len;
let av_len = B.length in_buf - in_pos in
let l = Int64.to_int (min (Int64.of_int av_len) len) in
ch # really_output (B.unsafe_buffer in_buf) in_pos l;
in_pos <- in_pos + l;
assert(in_pos <= B.length in_buf);
let len' = Int64.sub len (Int64.of_int l) in
if len' > 0L then (
if in_eof then
raise (Bad_message "Repsonse chunk terminated by EOF");
restart_parser <-
self # parse_chunk_data code header ch len'
) else
self # parse_chunk_end code header ch call
method private parse_chunk_end code header ch call =
(* Parses the CRLF after the chunk, and the next chunks *)
restart_parser <- self # parse_chunk_end code header ch;
let b = B.unsafe_buffer in_buf in
match Netstring_pcre.string_match crlf_re b in_pos with
| None ->
if B.length in_buf - in_pos > 2 then
raise (Bad_message "CR/LF after response chunk is missing");
if in_eof then
raise (Bad_message "EOF where next response chunk expected")
| Some m ->
in_pos <- Netstring_pcre.match_end m;
assert(in_pos <= B.length in_buf);
self # parse_chunked_body code header ch call
method private parse_trailer code header ch call =
(* Parses the trailer *)
restart_parser <- self # parse_trailer code header ch;
let b = B.unsafe_buffer in_buf in
match Netstring_pcre.string_match line_end2_re b in_pos with
| None ->
if B.length in_buf - in_pos > 10000 then
raise (Bad_message "Response trailer too large");
if in_eof then
raise (Bad_message "EOF where response trailer expected")
| Some m ->
let start = in_pos in
in_pos <- Netstring_pcre.match_end m;
assert(in_pos <= B.length in_buf);
let _ch =
new Netchannels.input_string ~pos:start ~len:(in_pos-start) b in
let trailer_l, real_end_pos =
try
Mimestring.scan_header
~downcase:false ~unfold:true ~strip:true b
~start_pos:start ~end_pos:in_pos
with
| Failure _ -> raise(Bad_message "Bad trailer") in
assert(real_end_pos = in_pos);
(* The trailer is simply added to the header: *)
let new_header =
new Netmime.basic_mime_header (header#fields @ trailer_l) in
call # private_api # set_response_header new_header;
self # parse_end call
method private parse_end call =
(* The message ends here! *)
restart_parser <- self # parse_status_line; (* for the next message *)
call # private_api # finish()
(****************************** OUTPUT ********************************)
val mutable string_to_send = ""
val mutable send_buffer = String.create 8192
val mutable send_position = 0
val mutable send_length = 0
method send_this_string s =
string_to_send <- s;
send_position <- 0;
send_length <- String.length s
method send_by_buffer f =
string_to_send <- send_buffer;
send_position <- 0;
send_length <- 0;
let m = String.length send_buffer in
let n = f send_buffer 0 m in
send_length <- n;
n
method nothing_to_send =
send_position = send_length
method unix_write() =
let n_to_send = send_length - send_position in
let n =
syscall (fun () ->
Unix.send fd string_to_send send_position n_to_send [])
in
send_position <- send_position + n;
method dump_send_buffer () =
dlogr (fun () ->
sprintf "FD %Ld - HTTP request body fragment:\n%s\n"
(Netsys.int64_of_file_descr fd)
(String.sub send_buffer 0 send_length)
)
end
;;
(**********************************************************************)
(*** PROTOCOL STATE OF A SINGLE MESSAGE ***)
(**********************************************************************)
(* The class 'transmitter' is a container for the message and the
* associated protocol state, and defines the methods to send
* the request, and to receive the reply.
* The protocol state stored here mainly controls the send and receive
* buffers (e.g. how many octets have already been sent? Are the octets
* received so far already a complete message or not?)
*)
type message_state =
Unprocessed (* Not even a byte sent or received *)
| Sending_hdr (* Sending currently the header *)
| Handshake (* The header has been sent, now waiting for status 100 *)
| Sending_body (* The header has been sent, now sending the body *)
| Sent_request (* The body has been sent; the reply is being received *)
| Complete (* The whole reply has been received *)
| Complete_broken (* The whole reply has been received, but the connection
* is in a state that does not permit further requests
*)
| Broken (* Garbage was responded *)
;;
(* Transitions:
*
* (1) Normal transitions:
*
* Unprocessed -> Sending_hdr: Always done (a)
* Sending_hdr -> Handshake: Only if we are handshaking (a)
* Handshake -> Sending_body: When the 100 Continue status arrives (b)
* Sending_hdr -> Sending_body: usually (a)
* Sending_body -> Sent_request: usually (a)
* Sending_hdr -> Sent_request: if no body is sent (a)
* Sent_request -> Complete: when the response has arrived (b)
* Handshake -> Complete_broken: when the response arrives instead of 100 Continue (b)
*
* (2) Unusual transitions:
*
* (Unprocessed | Sending_hdr | Sending_body) -> Complete_broken:
* The response arrives while/before sending the header/body (b)
* or: Parsing error in the response (b)
*
* (Unprocessed | Sending_hdr | Sending_body) -> Broken
* Garbage arrives (b)
*
* (a) transition is done by the sending side of the transmitter
* (b) transition is done by the receiving side of the transmitter
*)
class transmitter
peer_is_proxy
(m : http_call)
(f_done : http_call -> unit)
options
=
object (self)
val mutable state = Unprocessed
val indicate_done = f_done
val msg = m
val mutable auth_headers = []
(* Additional header for _proxy_ authentication *)
val mutable body = new Netchannels.input_string ""
val mutable body_length = None (* Set after [init] *)
method state = state
method f_done = indicate_done
method init() =
(* Prepare for (re)transmission:
* - Set the `Effective request header
* - Reset the status info of the http_call
* - Initialize transmission state
*)
msg # private_api # prepare_transmission();
(* Set the effective URI. This must happen before authentication. *)
let eff_uri =
if peer_is_proxy then
msg # request_uri
else
let path = msg # get_path() in
if path = "" then (
msg # empty_path_replacement
)
else path
in
msg # private_api # set_effective_request_uri eff_uri;
let ah =
match msg # private_api # auth_state with
| `None -> []
| `In_advance session
| `In_reply session ->
session # authenticate msg in
let rh = msg # request_header `Effective in
List.iter
(fun (n,v) ->
rh # update_field n v
)
(auth_headers @ ah);
state <- Unprocessed;
body_length <- ( try
let s = rh # field "Content-length" in
Some(Int64.of_string s)
with Not_found -> None
);
self # close_body();
let have_body =
msg # has_req_body &&
(body_length = None || body_length <> Some 0L) in
if not have_body then (
(* Remove certain headers *)
rh # delete_field "Expect";
);
method cleanup() =
(* release resources *)
self # close_body();
msg # private_api # cleanup()
method add_auth_header n v =
auth_headers <- (n,v) :: auth_headers
method private open_body() =
body <- msg # request_body # open_value_rd()
method close_body() =
try body # close_in() with _ -> ()
(* also called by [clear_write_queue] *)
method send (io : io_buffer) do_handshake =
(* do_handshake: If true, only the header is transmitted, and the
* state transitions to [Handshake]. This flag is ignored if we have
* already [Sending_body].
*)
assert
(state = Unprocessed || state = Sending_hdr || state = Sending_body);
(* First check whether we have to refill [string_to_send]: *)
( match state with
| Unprocessed ->
(* Fill [string_to_send] with the request line and the header. *)
let buf = Buffer.create 1000 in
let req_meth = msg # request_method in
Buffer.add_string buf req_meth;
Buffer.add_string buf " ";
Buffer.add_string buf msg#effective_request_uri;
let rh = msg # request_header `Effective in
if true (* not peer_is_proxy *) then (
(* Setting Host even for proxies does not harm, and some
cheap proxy implementations require it
*)
let host = msg # get_host() in
let port = msg # get_port() in
let host_str = host ^ (if port = 80 then ""
else ":" ^ string_of_int port) in
rh # update_field "Host" host_str;
);
Buffer.add_string buf " HTTP/1.1";
if options.verbose_status then
dlogr
(fun () ->
sprintf "FD %s - Call %d - HTTP request: %s"
io#socket_str (Oo.id m) (Buffer.contents buf));
Buffer.add_string buf "\r\n";
let ch = new Netchannels.output_buffer buf in
Mimestring.write_header ch rh#fields;
ch # close_out();
if options.verbose_request_header then
dump_header "HTTP request " rh#fields;
io # send_this_string (Buffer.contents buf);
state <- Sending_hdr;
| Sending_body when io # nothing_to_send ->
( try
let m =
match body_length with
| None ->
max_int
| Some l ->
Int64.to_int
(min (Int64.of_int max_int) l) in
if m = 0 then raise End_of_file;
let n = io # send_by_buffer body#input in
( match body_length with
| None -> ()
| Some l ->
body_length <- Some(Int64.sub l (Int64.of_int n))
);
if options.verbose_request_contents then
io # dump_send_buffer()
with
| End_of_file ->
self # close_body();
if body_length = None then (
io # close_out();
);
state <- Sent_request
| error ->
self # close_body();
raise error
)
| _ ->
()
);
(* Send: *)
if state = Sending_hdr || state = Sending_body then
io # unix_write();
(* Update state: *)
if io # nothing_to_send then (
match state with
| Sending_hdr ->
let have_body =
msg # has_req_body &&
(body_length = None || body_length <> Some 0L) in
if have_body then (
if do_handshake then
state <- Handshake
else
state <- Sending_body;
self # open_body();
)
else
state <- Sent_request
| _ ->
()
)
method receive (io : io_buffer) =
(* This method is invoked if some additional octets have been received
* that
* - may begin this reply
* - or continue this reply
* - or make this reply complete
* - or make this reply complete and begin another reply
* It is checked if the reply is complete, and if so, it is recorded
* and the octets forming the reply are removed from the input buffer.
* Raises Bad_message if the message is malformed.
*)
try
io # parse_response msg;
match msg # status with
| `Unserved ->
if state = Handshake && msg # private_api # continue then
state <- Sending_body
| `Http_protocol_error e ->
state <- Complete_broken;
let e_msg =
match e with
| Bad_message s -> ": Bad message: " ^ s
| _ -> ": " ^ Netexn.to_string e in
if options.verbose_status then
dlogr
(fun () ->
sprintf "FD %s - Call %d - HTTP status: protocol error %s"
io#socket_str (Oo.id m) e_msg);
| _ ->
if state = Sent_request then
state <- Complete
else (
if options.verbose_status then
dlogr
(fun () ->
sprintf "FD %s - Call %d - HTTP status: Got response before request was completely sent"
io#socket_str (Oo.id m));
state <- Complete_broken;
io # close_out()
);
if options.verbose_status then
msg # private_api # dump_status();
if options.verbose_response_header then
msg # private_api # dump_response_header();
if options.verbose_response_contents then
msg # private_api # dump_response_body();
with
| Garbage_received s ->
state <- Broken;
if options.verbose_status then
dlogr
(fun () ->
sprintf "FD %s - Call %d - HTTP status: Garbage received: %s"
io#socket_str (Oo.id m) s);
method handshake_timeout() =
if state = Handshake then
state <- Sending_body
method indicate_pipelining =
(* Return 'true' iff the reply is HTTP/1.1 compliant and does not
* contain the 'connection: close' header.
*)
let resp_header = msg # private_api # response_header in
let proto_str = msg # private_api # response_proto in
let b1 =
try
let proto = Nethttp.protocol_of_string proto_str in
match proto with
| `Http((1,n),_) when n >= 1 -> (* HTTP/1.1 <= proto < HTTP/2.0 *)
let conn_list =
try Nethttp.Header.get_connection resp_header
with _ (* incl. syntax error *) -> [] in
not (List.mem "close" conn_list)
| _ ->
false
with _ -> false in
b1 && (
try
let server = resp_header # field "Server" in
not (List.exists
(fun re ->
Netstring_pcre.string_match re server 0 <> None
)
pipeline_blacklist)
with
| Not_found -> true (* Nothing known ... Assume the best! *)
)
method indicate_sequential_persistency =
(* Returns 'true' if persistency without pipelining
* is possible.
*)
let resp_header = msg # private_api # response_header in
let proto_str = msg # private_api # response_proto in
let is_http_11 =
try
let proto = Nethttp.protocol_of_string proto_str in
match proto with
| `Http((1,n),_) when n >= 1 -> (* HTTP/1.1 <= proto < HTTP/2.0 *)
true
| _ ->
false
with _ -> false in
let proxy_connection =
List.map String.lowercase
(resp_header # multiple_field "proxy-connection") in
let connection =
try Nethttp.Header.get_connection resp_header
with _ (* incl. syntax error *) -> [] in
let normal_persistency =
not peer_is_proxy &&
(not (List.mem "close" connection)) &&
(is_http_11 || List.mem "keep-alive" connection) in
let proxy_persistency =
peer_is_proxy && List.mem "keep-alive" proxy_connection in
normal_persistency || proxy_persistency
method postprocess =
self#cleanup();
indicate_done msg;
method message = msg
end
;;
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(*** ***)
(*** THE PROTOCOL STATE OF THE CONNECTION ***)
(*** ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(* This is the core of this HTTP implementation: The object controlling
* the state of the connection to a server (or a proxy).
*)
class connection the_esys
(peer_host,peer_port)
peer_is_proxy
(proxy_user,proxy_password)
auth_cache
conn_cache
conn_owner (* i.e. the pipeline coerced to < > *)
counters (* we count here only when connections close *)
the_options
=
object (self)
val mutable esys = the_esys
val mutable group = None
(* timeout_groups: Unixqueue groups that must be deleted when the
* connection is closed. These groups represent timeout conditions.
*)
val mutable timeout_groups = []
val mutable io =
new io_buffer the_options conn_cache Unix.stdin Down (* dummy *)
val mutable write_queue = Q.create()
val mutable read_queue = Q.create()
(* Invariant: write_queue is a suffix of read_queue *)
(* polling_wr is 'true' iff the write side of the socket is currently
* polled. (The read side is always polled.)
*)
val mutable polling_wr = false
(* 'connecting' is 'true' if the 'connect' system call cannot connect
* immediately, and continues to connect in the background.
*)
val mutable connecting = None
(* 'connect_pause': seconds to wait until the next connection is tried.
* There seems to be a problem with some operating systems (namely
* Linux 2.2) which do not like immediate reconnects if the previous
* connection did not end in a sane state.
* A value of 0.5 seems to be sufficient for reconnects (see below).
*)
val mutable connect_pause = 0.0
(* The following two variables control whether pipelining is enabled or
* not. The problem is that it is unclear how old servers react if we
* send to them several requests at once. The solution is that the first
* "round" of requests and replies is done in a compatibility mode: After
* the first request has been sent sending stops, and the client waits
* until the reply is received. If the reply indicates HTTP/1.1 and does
* not contain a "connection: close" header, all further requests and
* replies will be performed in pipelining mode, i.e. the requests are
* sent independent of whether the replies of the previous requests have
* been received or not.
*
* sending_first_message: 'true' means that the first request has not yet
* been completely sent to the server.
* done_first_message: 'true' means that the reply of the first request
* has been arrived.
*)
val mutable sending_first_message = true
val mutable done_first_message = false
(* 'inhibit_pipelining_byserver': becomes true if the server is able
* to keep persistent connections but is not able to use pipelining.
* (HTTP/1.0 "keep alive" connections)
*)
val mutable inhibit_pipelining_byserver = false
(* 'connect_started': when the last 'connect' operation started.
* 'connect_time': how many seconds the last 'connect' lasted.
*)
val mutable connect_started = 0.0
val mutable connect_time = 0.0
(* If a connection does not lead to any type of response, the client
* simply re-opens a new connection and tries again. The following
* variables limit the number of attempts until the client gives up.
*
* 'totally_failed_connections': counts the number of failed connections
* without any response from the server.
*)
val mutable totally_failed_connections = 0
(* 'close_connection' indicates that a HTTP/1.0 response was received or
* that a response contained a 'connection: close' header.
*)
val mutable close_connection = false
(* Proxy authorization: If 'proxy_user' is non-empty, the variables
* 'proxy_user' and 'proxy_password' are interpreted as user and
* password for proxy authentication. More precisely, once the proxy
* responds with status code 407, 'proxy_credentials_required' becomes
* true, and all following requests will include the credentials identifying
* the user (with the "basic" authentication method).
* If the proxy responds again with code 407, this reaction will not
* be handled again but will be visible to the outside.
*)
val mutable proxy_credentials_required = false
val mutable proxy_cookie = ""
(* 'critical_section': true if further additions to the queues and
* every change of the internal state must be avoided. In this case,
* additions are put into 'deferred_additions' and actually added later.
*)
val mutable deferred_additions = Q.create()
val mutable critical_section = false
val mutable options = the_options
method length =
(* Returns the number of open requests (requests without response) *)
Q.length read_queue + Q.length deferred_additions
method active =
(* Whether something is to be done *)
self # length > 0
method add (m : http_call) f_done =
(* add 'm' to the read and write queues *)
ignore(self # add_msg false m f_done)
method private add_msg ?(critical=critical_section) urgent m f_done =
(* In contrast to 'add', the new message transporter is returned. *)
(* Create the transport container for the message and add it to the
* queues:
*)
let trans = new transmitter peer_is_proxy m f_done options in
(* If proxy authentication is enabled, and it is already known that
* the proxy demands authentication, add the necessary header fields:
* (See also the code and the comments in method
* 'postprocess_complete_message')
*)
if proxy_credentials_required && peer_is_proxy then begin
(* If the cookie is not yet computed, do this now: *)
if proxy_cookie = "" then
proxy_cookie <- Netencoding.Base64.encode
(proxy_user ^ ":" ^ proxy_password);
(* Add the "proxy-authorization" header: *)
trans # add_auth_header "proxy-authorization" ("Basic " ^ proxy_cookie)
end;
(* Check whether we can authenticate in advance: *)
if m # private_api # auth_state = `None then (
try
let sess = auth_cache # find_session_in_advance m in
m # private_api # set_auth_state (`In_advance sess)
with
Not_found -> ()
);
(* Initialize [trans] for transmission: (Must be after in-advance
* authentication)
*)
trans # init();
if critical then begin
(* This 'add_msg' invocation was done in a callback. This may interfer
* with other queue modifications.
*)
Q.add trans deferred_additions;
end
else begin
let n = Queue.length write_queue in
if urgent && n > 0 then (
(* Insert [trans] at the ealierst possible place *)
Q.add_at (n-1) trans write_queue;
Q.add_at (n-1) trans read_queue;
)
else (
Q.add trans write_queue;
Q.add trans read_queue;
);
(* If there is currently no event group, we are detached from the
* event system (i.e. not receiving events). Attach again.
*)
if group = None then self # attach_to_esys;
(* Update the polling state. *)
self # maintain_polling;
end;
trans
method leave_critical_section : unit =
(* Move the entries from 'deferred_additions' to the real queues. *)
Q.iter
(fun trans ->
Q.add trans write_queue;
Q.add trans read_queue;
)
deferred_additions;
if Q.length deferred_additions > 0 then begin
Q.clear deferred_additions;
(* If there is currently no event group, we are detached from the
* event system (i.e. not receiving events). Attach again.
*)
if group = None then self # attach_to_esys;
(* Update the polling state. *)
self # maintain_polling;
end
method private add_again m_trans =
(* add 'm_trans' again to the read and write queues (as a consequence
* of authorization)
*)
let m = m_trans # message in
let f_done = m_trans # f_done in
self # add_msg ~critical:true true m f_done
method set_options p =
options <- p
method private attach_to_esys =
assert (group = None);
assert (io#socket_state = Down);
let g = Unixqueue.new_group esys in
group <- Some g;
connecting <- None;
sending_first_message <- true;
done_first_message <- false;
close_connection <- false;
polling_wr <- false;
critical_section <- false;
(*
inhibit_pipelining_byserver <- false;
-- never reset once it is [true]! Reason: Failed connections are often
caused by missing pipelining capabilities of the server (e.g. IIS)
*)
(* Now check how to get a socket connection: *)
let timeout_value = options.connection_timeout in
options.resolver
esys
peer_host
(function
| None ->
(* Very early error! *)
( match group with
Some g ->
Unixqueue.clear esys g; (* clean up esys *)
group <- None
| None -> ()
);
let err = Name_resolution_error peer_host in
self # cleanup_on_eof ~force_total_failure:true err;
counters.crashed_connections <-
counters.crashed_connections + 1;
self # leave_critical_section
| Some addr ->
( let peer = Unix.ADDR_INET(addr, peer_port) in
try
let fd = conn_cache # find_inactive_connection peer in
connect_time <- 0.0;
conn_cache # set_connection_state fd (`Active conn_owner);
io <- new io_buffer options conn_cache fd Up_rw;
Unixqueue.add_resource esys g (Unixqueue.Wait_in fd,
timeout_value);
Unixqueue.add_handler esys g (self # handler);
with
Not_found ->
(* No inactive connection found. Connect: *)
Unixqueue.once
esys
g
connect_pause
(fun () ->
let g1 = Unixqueue.new_group esys in
connect_pause <- 0.0;
if options.verbose_connection then
dlog ("HTTP connection: Connecting to " ^
peer_host);
let eng =
Uq_engines.connector
(`Socket(`Sock_inet(Unix.SOCK_STREAM,
addr,
peer_port),
Uq_engines.default_connect_options))
esys in
connect_started <- Unix.gettimeofday();
connecting <- Some eng;
Uq_engines.when_state
~is_done:(function
| `Socket(s,_) ->
Unixqueue.clear esys g1;
self # connected
peer_host g s peer
| _ -> assert false
)
~is_error:(function
| err ->
Unixqueue.clear esys g1;
self # connect_error
peer_host g err
)
~is_aborted:(fun () ->
Unixqueue.clear esys g1;
connecting <- None
)
eng;
Unixqueue.once esys g1 timeout_value eng#abort
)
)
)
method private connected peer_host g s peer =
Netlog.Debug.track_fd
~owner:"Http_client"
~descr:("HTTP to " ^
Netsys.string_of_sockaddr peer)
s;
let t1 = Unix.gettimeofday() in
connect_time <- t1 -. connect_started;
connecting <- None;
if options.verbose_connection then
dlog (sprintf "FD %Ld - HTTP connection to %s: Connected!"
(Netsys.int64_of_file_descr s) peer_host);
options.configure_socket s;
conn_cache # set_connection_state s (`Active conn_owner);
io <- new io_buffer options conn_cache s Up_rw;
let timeout_value = options.connection_timeout in
Unixqueue.add_resource esys g (Unixqueue.Wait_in s, timeout_value);
Unixqueue.add_handler esys g (self # handler);
self # maintain_polling;
method private connect_error peer_host g err =
connecting <- None;
(* We cannot call abort_connection, because the
* state is not fully initialized. So clean up everything
* manually.
*)
if options.verbose_connection then (
match err with
| Unix.Unix_error(e,_,_) ->
dlog(sprintf
"HTTP connection: Cannot connect to %s: Unix error %s"
peer_host (Unix.error_message e))
| _ ->
dlog(sprintf
"HTTP connection: Cannot connect to %s: Exception %s"
peer_host (Netexn.to_string err))
);
Unixqueue.clear esys g; (* clean up esys *)
group <- None;
(* Continue with the regular error path: *)
counters.crashed_connections <-
counters.crashed_connections + 1;
self # cleanup_on_eof ~force_total_failure:true err;
self # leave_critical_section
method private maintain_polling =
(* If one of the following conditions is true, we need not to poll
* the write side of the socket:
* - The write_queue is empty but the read_queue not
* - The difference between the read and the write queue is too big
* - We wait for the reply of the first request send to a server
* - The write side of the socket is closed
*)
if group <> None && io # socket_state <> Down then (
let timeout_value = options.connection_timeout in
let actual_max_drift =
if inhibit_pipelining_byserver then 0 else
match options.synchronization with
Pipeline drift -> min drift max_pipeline
| _ -> 0
(* 0: Allow no drift if pipelining is not allowed *)
in
let have_requests =
Q.length read_queue - Q.length write_queue <= actual_max_drift
&& Q.length write_queue > 0
&& (done_first_message || sending_first_message) in
(* Note: sending_first_message is true while the first request is sent.
* done_first_message becomes true when the response arrived. In the
* meantime both variables are false, and nothing is sent.
*)
let do_close_output =
Q.length read_queue = 0 && Q.length write_queue = 0 in
(* If the socket is still up, we must send EOF. Normally, this
* should have happened already, just to be sure we check this here.
*)
let waiting_for_handshake =
Q.length read_queue > 0 &&
(Q.peek read_queue) # state = Handshake in
let do_poll_wr =
io#socket_state = Up_rw
&& ( ( have_requests && not waiting_for_handshake) ||
do_close_output ) in
if do_poll_wr && not polling_wr then (
let g = match group with
Some x -> x
| None -> assert false
in
Unixqueue.add_resource esys g (Unixqueue.Wait_out io#socket,
timeout_value
);
);
if not do_poll_wr && polling_wr then (
let g = match group with
Some x -> x
| None -> assert false
in
Unixqueue.remove_resource esys g (Unixqueue.Wait_out io#socket);
);
polling_wr <- do_poll_wr;
)
(* On the other hand, all of the following conditions must be true
* to enable polling again:
* - The write_queue is not empty, or
* both the write_queue and the read_queue are empty !!!CHECK!!!
* - The difference between the read and the write queue is small enough
* - We send the first request to a server, or do pipelining
* - The write side of the socket is open
* - pe_waiting_for_status is false.
* - waiting_for_status100 is not `Waiting
*)
method private clear_timeout g =
Unixqueue.clear esys g;
timeout_groups <- List.filter (fun x -> x <> g) timeout_groups;
method private abort_connection =
(* This method is called when the connection is in an errorneous
* state, and the protocol handler decides to give it up.
*)
( match connecting with
| None -> ()
| Some eng -> eng # abort()
);
if io # socket_state <> Down then (
match group with
| Some g ->
if options.verbose_connection then
dlog (sprintf
"FD %s - HTTP connection: Shutdown!"
io#socket_str);
begin match io#socket_state with
Down -> ()
| Up_r ->
io # close()
| Up_rw ->
io # release() (* hope this is right *)
end;
Unixqueue.clear esys g;
polling_wr <- false;
group <- None;
List.iter (Unixqueue.clear esys) timeout_groups;
timeout_groups <- []
| None ->
()
)
method private handler _ _ ev =
let g = match group with
Some x -> x
| None ->
(* This is possible while shutting down the socket *)
raise Equeue.Reject
in
match ev with
Unixqueue.Input_arrived (g0,fd0) ->
if g0 = g then self # handle_input else raise Equeue.Reject
| Unixqueue.Output_readiness (g0,fd0) ->
if g0 = g then self # handle_output else raise Equeue.Reject
| Unixqueue.Timeout (g0, _) ->
if g0 = g then self # handle_timeout else raise Equeue.Reject
| _ ->
raise Equeue.Reject
(**********************************************************************)
(*** THE TIMEOUT HANDLER ***)
(**********************************************************************)
method private handle_timeout =
(* No network packet arrived for a period of time.
* May only happen when a connection is already established
*)
io # set_timeout;
self # handle_input; (* timeout is similar to EOF *)
(**********************************************************************)
(*** THE INPUT HANDLER ***)
(**********************************************************************)
method private handle_input =
(* Data have arrived on the 'socket'. First we receive as much as we
* can; then the data are interpreted as sequence of messages.
* This method is also called when the connection times out. In
* this case [io # timeout] is set.
*)
(* Ignore this event if the socket is Down (this may happen
* if the input side is closed while there are still input
* events in the queue):
*)
if io#socket_state = Down then
raise Equeue.Reject;
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: Input event!" io#socket_str);
let _g = match group with
Some x -> x
| None -> assert false
in
let end_of_queueing = ref false in
(* 'end_of_queueing': stores whether there was an EOF or not *)
(************ ACCEPT THE RECEIVED OCTETS ************)
if not io # timeout then
begin try
io # unix_read();
with
Unix.Unix_error(Unix.EAGAIN,_,_) ->
(); (* Ignore! *)
| Unix.Unix_error(Unix.ECONNRESET,_,_) ->
if options.verbose_connection then
dlogr
(fun () ->
sprintf
"FD %s - HTTP connection: Connection reset by peer"
io#socket_str
);
self # abort_connection;
end_of_queueing := true;
counters.crashed_connections <-
counters.crashed_connections + 1;
| Unix.Unix_error(e,a,b) as err ->
if options.verbose_connection then
dlogr
(fun () ->
sprintf
"FD %s - HTTP connection: Unix error %s"
io#socket_str (Unix.error_message e));
counters.crashed_connections <-
counters.crashed_connections + 1;
self # abort_connection;
end_of_queueing := true;
(* This exception is reported to the message currently being read
* only.
*)
if Q.length read_queue > 0 then (
let t = Q.peek read_queue in
t # message # private_api # set_error_exception err
)
end;
if io # in_eof || io # timeout then begin
(* shutdown the connection, and clean up the event system: *)
if io # in_eof then (
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: Got EOF!" io#socket_str);
counters.server_eof_connections <-
counters.server_eof_connections + 1
);
if io # timeout then (
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: Connection timeout!"
io#socket_str);
counters.timed_out_connections <-
counters.timed_out_connections + 1;
);
self # abort_connection;
end_of_queueing := true
end;
(************ TRY TO INTERPRET THE OCTETS AS MESSAGES **************)
(* The [read_loop] parses the data in [io] and fills the
* [read_queue]. It may raise the excepions:
* - Q.Empty: No message is expected, but there is data to
* parse (or the connection is at EOF)
*)
let rec read_loop() =
if io # has_unparsed_data then begin
let this = Q.peek read_queue in (* may raise Q.Empty *)
(*** Process 'this' ***)
this # receive io;
(* Parse received data for this message. Set this#state. *)
( match this # state with
| Unprocessed ->
(* We get response data before we even tried to send
* the request. We allow this for pragmatic reasons.
*)
if options.verbose_connection then (
if io # has_unparsed_bytes then
dlogr
(fun () ->
sprintf
"FD %s - HTTP connection: \
Got data of spontaneous response" io#socket_str);
if io # in_eof then
dlogr
(fun () ->
sprintf
"FD %s - HTTP connection: premature EOF"
io#socket_str);
);
()
| Sending_hdr ->
(* We get response data before we finished
* the request. We allow this for pragmatic reasons.
*)
assert (try Q.peek write_queue == this
with Q.Empty -> false);
if options.verbose_connection then (
if io # has_unparsed_bytes then
dlogr
(fun () ->
sprintf
"FD %s - HTTP connection: \
Got data of premature response" io#socket_str);
if io # in_eof then
dlogr
(fun () ->
sprintf
"FD %s - HTTP connection: premature EOF"
io#socket_str)
);
()
| Handshake
| Sending_body ->
(* This is perfectly legal: We have sent the request header,
* and the server may directly reply with whatever.
*)
assert (try Q.peek write_queue == this
with Q.Empty -> false);
()
| Sent_request ->
(* Somewhere in the middle of the response... *)
()
| Complete ->
(* The response has been received, and the connection
* is still usable
*)
ignore (Q.take read_queue);
if Q.length write_queue >= 1 &&
Q.peek write_queue == this then
ignore (Q.take write_queue);
(* Initialize for next request/response: *)
let able_to_pipeline =
this # indicate_pipelining in
(* able_to_pipeline: is true if we assume that the server
* is HTTP/1.1-compliant and thus is able to manage pipelined
* connections.
* Update first 'close_connection': This variable becomes
* true if the connection is not assumed to be pipelined
* which forces that the CLIENT closes the connection
* immediately (see the code in the output handler).
*)
let only_sequential_persistency =
not able_to_pipeline &&
this # indicate_sequential_persistency in
(* only_sequential_persistency: is true if the connection is
* HTTP/1.0, and the server indicated a persistent connection.
* In this case, pipelining is disabled.
*)
if only_sequential_persistency then begin
(* 'close_connection': not set.
*)
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: \
using HTTP/1.0 style persistent connection"
io#socket_str);
inhibit_pipelining_byserver <- true;
end
else
close_connection <- close_connection || not able_to_pipeline;
if close_connection || options.inhibit_persistency then (
self # abort_connection;
end_of_queueing := true;
(* We do not count this event - it is regular! *)
);
(* Remember that the first request/reply round is over: *)
done_first_message <- true;
(* postprocess 'this' (may raise exceptions! (callbacks)) *)
this # cleanup();
self # postprocess_complete_message this;
read_loop()
| Complete_broken ->
(* The response has been received, but the connection
* is in a problematic state, and must be aborted.
*)
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: \
Aborting the invalidated connection"
io#socket_str);
self # abort_connection;
end_of_queueing := true;
counters.crashed_connections <-
counters.crashed_connections + 1;
(* If the response has a proper status, we can remove it
* from the queue. Otherwise leave it on the queue, so
* it can be scheduled again.
*)
( match this # message # status with
| `Unserved
| `Http_protocol_error _ ->
() (* leave it *)
| _ ->
(* Remove the message from the queues: We must
* also check write_queue, because we have the
* invariant that write_queue is a suffix of
* read_queue.
*)
ignore (Q.take read_queue);
if Q.length write_queue >= 1 &&
Q.peek write_queue == this then
ignore (Q.take write_queue);
(* postprocess 'this' (Exceptions! (callbacks)) *)
this # cleanup();
self # postprocess_complete_message this;
);
(* Do not continue [read_loop] *)
| Broken ->
(* Simply stop here. *)
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: \
Aborting the errorneuos connection"
io#socket_str);
self # abort_connection;
end_of_queueing := true;
counters.crashed_connections <-
counters.crashed_connections + 1;
);
end
in (* of "let rec read_loop() = " *)
begin try
(* Start the interpretation formulated in 'read_loop' and catch
* exceptions.
*)
read_loop();
with
| Q.Empty ->
(* Either we hit EOF, or there are additional bytes but no
* message is expected:
*)
if io # has_unparsed_bytes then begin
(* No more responses expected, but still octets to interpret.
* This is a protocol error, too.
*)
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: \
Extra octets -- aborting connection"
io#socket_str);
self # abort_connection;
end_of_queueing := true;
counters.crashed_connections <-
counters.crashed_connections + 1;
end
end;
(************** CLOSE THE CONNECTION IF NECESSARY, ****************)
(************** AND PREPARE RECONNECTION ****************)
if !end_of_queueing then begin
assert (group = None);
self # cleanup_on_eof (Bad_message "Incomplete or missing response")
end;
(*************** UPDATE THE POLLING STATE **************)
(* If there were 'add' invocations from callbacks, move these additions
* to the real queues now.
*)
self # leave_critical_section;
(* Update polling state: *)
self # maintain_polling;
(************** CLOSE THE CONNECTION IF NECESSARY, ****************)
(************** AND PREPARE RECONNECTION ****************)
method private cleanup_on_eof ?(force_total_failure=false) err : unit =
assert (group = None);
(* If the socket is closed, it is necessary to check whether all
* requests sent so far have got their replies.
* Cases:
* - write_queue and read_queue are empty: all is done.
* - write_queue and read_queue have the same number of elements:
* reconnect
* - else: some replies are missing. The requests are NOT tried again
* by default because the operations might not be idempotent.
* The messages carry a flag with them indicating whether reconnection
* is allowed or not.
* It is not possible that the write_queue is longer than the read_queue.
*)
(* First check if the connection was a total failure, i.e. if not
* even a status line was received. In this case
* increase the counter for totally failed connections. If the
* counter exceeeds a limit, all messages on the queues are discarded.
*)
if force_total_failure || not io#status_seen then begin
(* It was a total failure. *)
totally_failed_connections <- totally_failed_connections + 1;
if options.verbose_connection then
dlog "HTTP connection: total failure";
if totally_failed_connections >= options.maximum_connection_failures then
begin
(* Set the error exception of all remaining messages, and
* clear the queues.
*)
self # clear_read_queue err;
self # clear_write_queue ();
(* Reset state variables *)
totally_failed_connections <- 0;
(* Simply continue with the following piece of code, which will
* no nothing.
*)
end
end
else (
(* This connection breaks the series of total failures (if there
* was such a series.
*)
totally_failed_connections <- 0;
(* Turn pipelining off to be on the safe side: *)
inhibit_pipelining_byserver <- true;
);
(* Now examine the queues, and decide what to do. *)
let n_read = Q.length read_queue in
let n_write = Q.length write_queue in
if n_read > 0 || n_write > 0 then begin
assert (n_read >= n_write);
assert (group = None);
connect_pause <- 1.0;
(* ASSERTION:
* read_queue = q1 . q2
* write_queue = q2
* i.e. the extra elements of the read queue are at the beginning
* of the read queue.
*
* PLAN: Make that
* read_queue = q2 . q1'
* write_queue = q2 . q1'
* where q1' are the elements of q1 for which a reconnection is
* allowed. Reset the error exception for these elements.
* For the other elements (q1 \ q1') leave the error
* exception as it is, but change every No_exception into
* No_reply.
*
* Note: q1 = empty is possible.
*)
for i = 1 to n_read - n_write do
let m_trans = Q.take read_queue in
let m = m_trans # message in
m_trans # cleanup(); (* release resources *)
(* Increase error counter *)
let e = m # private_api # get_error_counter in
m # private_api # set_error_counter (e+1);
(* Test: Are reconnections allowed? *)
if e+1 <= options.maximum_message_errors then begin
let do_reconnect =
match m # get_reconnect_mode with
| Send_again -> true
| Request_fails -> false
| Send_again_if_idem -> m # is_idempotent
| Inquire f ->
(* Ask the function 'f' whether to reconnect or not: *)
begin
try f m (* returns true or false *)
with
(* The invocation of 'f' may raise an exception.
* It is printed to stderr (there is no other
* way to report it).
*)
x ->
dlog ("Exception caught in Http_client: "
^ (Netexn.to_string x));
false
end
in
if do_reconnect then begin
(* Ok, this request is tried again. *)
m_trans # init(); (* Reinit call *)
Q.add m_trans write_queue; (* ... add it to the queue of open *)
Q.add m_trans read_queue; (* ...to the unfinished requests. *)
end
else begin
(* Drop this message because reconnection is not allowed *)
(* If status of the message is unset, change it into No_reply.
*)
( match m # status with
| `Unserved ->
m # private_api # set_error_exception No_reply;
| _ ->
()
);
(* We do not reconnect, so postprocess now. *)
self # critical_postprocessing m_trans;
end
end
else (
(* drop this message because of too many errors *)
(* We do not reconnect, so postprocess now. *)
( match m # status with
| `Unserved ->
m # private_api # set_error_exception No_reply;
| _ ->
()
);
self # critical_postprocessing m_trans;
)
done;
let n_read = Q.length read_queue in
let n_write = Q.length write_queue in
assert (n_read = n_write);
(* It is now possible that n_read = n_write = 0, in which case
* no more is to do, or that there are remaining requests.
*)
if n_write > 0 then begin
assert (Q.peek read_queue == Q.peek write_queue);
(* Force reinitialisation of all queue elements: *)
Q.iter (fun m -> m#init()) read_queue;
(* Process the queues: *)
self # attach_to_esys;
end
else (
if options.verbose_connection then
dlog "HTTP connection: Nothing left to do";
counters.failed_connections <- counters.failed_connections + 1
)
end else (
(* n_read = 0 && n_write = 0 *)
counters.successful_connections <- counters.successful_connections + 1
)
method clear_read_queue err =
let q' = Q.create() in
Q.transfer read_queue q';
Q.iter
(fun m ->
m # cleanup();
( match m # message # status with
| `Unserved ->
m # message # private_api # set_error_exception err;
| _ ->
()
);
)
q';
let do_callbacks() =
Q.iter
(fun m ->
self # critical_postprocessing m; (* because m is dropped *)
)
q'
in
if critical_section then
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0 do_callbacks
else
do_callbacks()
method clear_write_queue() =
Q.iter (fun m -> m # cleanup()) write_queue;
Q.clear write_queue
method critical_postprocessing m =
critical_section <- true;
try
m # postprocess;
critical_section <- false
with
any ->
critical_section <- false;
raise any
(**********************************************************************)
(*** THE OUTPUT HANDLER ***)
(**********************************************************************)
method private handle_output =
(* Ignore this event if the socket is not Up_rw (this may happen
* if the output side is closed while there are still output
* events in the queue):
*)
if io#socket_state <> Up_rw then
raise Equeue.Reject;
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: Output event!"
io#socket_str);
let _g = match group with
Some x -> x
| None -> assert false
in
(* Leave the write_loop by exceptions:
* - Q.Empty: No more request to write
*)
let rec write_loop () =
let this = Q.peek write_queue in (* or Q.Empty *)
begin match this # state with
(Unprocessed | Sending_hdr | Sending_body) ->
let rh = this # message # request_header `Effective in
(* if no_persistency, set 'connection: close' *)
if options.inhibit_persistency && this # state = Unprocessed then
rh # update_field "connection" "close";
(* If a "100 Continue" handshake is requested,
* transmit only the header of the request, and set
* waiting_for_status100. Furthermore, add a timeout
* handler that resets this variable after some time.
*)
let do_handshake =
try
(* Proper parsing not required, because [Expect] is
* set by the user.
* [continue]: Already seen status 100
*)
not (this # message # private_api # continue) &&
String.lowercase(rh # field "expect") = "100-continue"
with
Not_found -> false
in
( try
this # send io do_handshake;
(* If a handshake is requested: set the variable and the
* timer.
*)
if do_handshake && this # state = Handshake
then (
let timeout = options.handshake_timeout in
let tm = Unixqueue.new_group esys in
timeout_groups <- tm :: timeout_groups;
Unixqueue.once
esys tm timeout
(fun () ->
( try
let this' = Q.peek write_queue in
let still_waiting =
this == this' && this # state = Handshake in
if still_waiting then (
this # handshake_timeout();
self # maintain_polling;
);
with Q.Empty -> ()
);
self # clear_timeout tm;
);
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: \
waiting for 100 CONTINUE"
io#socket_str);
);
with
Unix.Unix_error(Unix.EPIPE,_,_) ->
(* Broken pipe: This can happen if the server decides
* to close the connection in the same moment when the
* client wants to send another request after the
* connection has been idle for a period of time.
* Reaction: Close our side of the connection, too,
* and open a new connection. The current request will
* be silently resent because it is sure that the
* request was not received completely; it does not
* matter whether the request is idempotent or not.
*
* Broken pipes are very unlikely because this must
* happen between the 'select' and 'write' system calls.
* The [read] syscall will get ECONNRESET, so we do
* nothing here.
*)
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: broken pipe"
io#socket_str);
()
| Unix.Unix_error(Unix.EAGAIN,_,_) ->
()
| Unix.Unix_error(e,a,b) ->
if options.verbose_connection then
dlogr
(fun () ->
sprintf "FD %s - HTTP connection: Unix error %s"
io#socket_str (Unix.error_message e));
(* Hope the input handler will do the right thing *)
()
);
if sending_first_message && this # state = Sent_request then
sending_first_message <- false;
if this # state = Sent_request then
ignore (Q.take write_queue);
(* Do not call write_loop: Otherwise other handlers would
* not have any chance to run
*)
| Handshake ->
(* Should normally not happen. *)
()
| (Sent_request | Complete | Complete_broken) ->
(* If Complete_broken, we also have Up_r. *)
ignore (Q.take write_queue);
if io # socket_state = Up_rw then
write_loop() (* continue with the next message *)
| Broken ->
(* This case is fully handled by the input handler *)
()
end
in
begin try
write_loop()
with
Q.Empty -> ()
end;
(* Release the connection if the queues have become empty *)
if (Q.length write_queue = 0 && Q.length read_queue = 0) then (
self # abort_connection;
counters.successful_connections <- counters.successful_connections + 1
);
self # maintain_polling;
(**********************************************************************)
(*** AUTHENTICATION ***)
(**********************************************************************)
method private postprocess_complete_message msg_trans =
(* This method is invoked for every complete reply. The following
* cases are handled at this stage of processing:
*
* - Status code 407: The proxy demands authorization. If the request
* already contains credentials for the proxy, this status code
* isn't handled here. Otherwise, the request is added again onto
* the queue, and a flag ('proxy_credentials_required') is set which
* forces that the proxy credentials must be added for every new
* request.
* Note: The necessary authentication header fields are added in
* the 'add' method.
*
* - Status code 401: XXX
*
* All other status codes are not handled here. Note that it is not
* possible to react on the redirection codes because this object
* represents the connection to exactly one server.
* As default behaviour, the method 'postprocess' of the
* transmitter object is invoked; this method incorporates
* all the intelligence not coded here.
*)
let default_action() =
self # critical_postprocessing msg_trans;
in
let msg = msg_trans # message in
let code = msg # private_api # response_code in
let req_hdr = msg # request_header `Effective in
let _resp_hdr = msg # private_api # response_header in
match code with
| 407 ->
(* --------- Proxy authorization required: ---------- *)
if
try
let _ = req_hdr # field "proxy-authorization" in
if options.verbose_status then
dlogr
(fun () ->
sprintf "Call %d - HTTP auth: proxy authentication \
required again" (Oo.id msg_trans));
false
with Not_found -> true
then begin
(* The request did not contain the "proxy-authorization" header.
* Enable proxy authentication if there is a user/password pair.
* Otherwise, do the default action.
*)
if peer_is_proxy then begin
if options.verbose_status then
dlogr
(fun () ->
sprintf "Call %d - HTTP auth: proxy authentication \
required" (Oo.id msg_trans));
if proxy_user <> "" then begin
(* We have a user/password pair: Enable proxy authentication
* and add 'msg' again to the queue of messages to be
* processed.
* Note: Following the HTTP protocol standard, the header
* of the response contains a 'proxy-authenticate' field
* with the authentication method and the realm. This is
* not supported; the method is always "basic" and realms
* are not distinguished.
*)
if not proxy_credentials_required then begin
proxy_credentials_required <- true;
proxy_cookie <- "";
end;
if options.verbose_status then
dlogr
(fun () ->
sprintf "Call %d - HTTP auth: proxy credentials \
added" (Oo.id msg_trans));
ignore (self # add_again msg_trans);
end
else (
(* No user/password pair: We cannot authorize ourselves. *)
if options.verbose_status then
dlogr
(fun () ->
sprintf "Call %d - HTTP auth: user/password missing"
(Oo.id msg_trans));
default_action()
)
end
else (
(* The server was not contacted as a proxy, but it demanded
* proxy authorization. Regard this as an intrusion.
*)
if options.verbose_status then
dlogr
(fun () ->
sprintf "Call %d - HTTP auth: intrusion by proxy \
authentication" (Oo.id msg_trans));
default_action()
)
end
else
(* The request did already contain "proxy-authenticate". *)
default_action()
| 401 ->
(* -------- Content server authorization required: ---------- *)
(* Unless a previous authentication attempt failed, just create
* a new session, and repeat the request.
*)
let try_again =
match msg # private_api # auth_state with
| `None
| `In_advance _ ->
true
| `In_reply sess ->
(* A previous attempt failed. *)
let continue = sess # invalidate msg in
if not continue then auth_cache # tell_failed_session sess;
continue
in
if try_again then (
match auth_cache # create_session msg with
| None ->
(* Authentication failed immediately *)
default_action()
| Some sess ->
(* Remember the new session: *)
msg # private_api # set_auth_state (`In_reply sess);
ignore(self # add_again msg_trans)
)
else
default_action()
| n when n >= 200 && n < 400 ->
(* Check whether authentication was successful *)
( match msg # private_api # auth_state with
| `None -> ()
| `In_advance _ -> ()
| `In_reply session ->
auth_cache # tell_successful_session session
);
default_action()
| _ ->
default_action()
(**********************************************************************)
(*** RESET COMPLETELY ***)
(**********************************************************************)
(* [reset] is called by the pipeline object to shutdown any processing *)
method reset =
(* Close the socket; clear the Unixqueue *)
self # abort_connection;
(* Discard all messages on the queues. *)
self # clear_read_queue No_reply;
self # clear_write_queue();
(* Reset state variables *)
totally_failed_connections <- 0;
(* If there were 'add' invocations from callbacks, delete these additions
* now
*)
let q = Queue.create() in
Queue.iter
(fun trans ->
Queue.push trans q;
let m = trans # message in
match m # status with
| `Unserved ->
m # private_api # set_error_exception No_reply;
| _ ->
()
)
deferred_additions;
Queue.clear deferred_additions;
(* Because [reset] might be called from a critical section, defer
* callbacks (see also [clear_read_queue])
*)
let g = Unixqueue.new_group esys in
Unixqueue.once
esys g 0.0
(fun () ->
Queue.iter
(fun trans ->
self # critical_postprocessing trans
)
q
)
end
;;
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(*** ***)
(*** THE PIPELINE INTERFACE ***)
(*** ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(* The following class, 'pipeline' defines the interface for the outside
* world.
*)
class pipeline =
object (self)
val mutable esys = Unixqueue.create_unix_event_system()
val mutable proxy = ""
val mutable proxy_port = 80
val mutable proxy_auth = false
val mutable proxy_user = ""
val mutable proxy_password = ""
val mutable no_proxy_for = []
val mutable connections = Hashtbl.create 10
val mutable open_messages = 0
val mutable open_connections = 0
val mutable options =
{ (* Default values: *)
synchronization = Pipeline 5;
maximum_connection_failures = 2;
maximum_message_errors = 2;
inhibit_persistency = false;
connection_timeout = 300.0;
number_of_parallel_connections = 2;
maximum_redirections = 10;
handshake_timeout = 1.0;
resolver = sync_resolver;
configure_socket = (fun _ -> ());
verbose_status = true;
verbose_request_header = false;
verbose_response_header = false;
verbose_request_contents = false;
verbose_response_contents = false;
verbose_connection = true;
}
val auth_cache = new auth_cache
val mutable conn_cache = create_restrictive_cache()
val counters =
{ new_connections = 0;
timed_out_connections = 0;
crashed_connections = 0;
server_eof_connections = 0;
successful_connections = 0;
failed_connections = 0;
}
method event_system = esys
method set_event_system new_esys =
esys <- new_esys;
Hashtbl.clear connections;
method connection_cache = conn_cache
method set_connection_cache cc = conn_cache <- cc
method add_authentication_method ( m : auth_method ) =
self # add_auth_handler (m # as_auth_handler)
method add_auth_handler (h : auth_handler) =
auth_cache # add_auth_handler h
method set_proxy the_proxy the_port =
(* proxy="": disables proxy *)
proxy <- the_proxy;
proxy_port <- the_port;
()
method set_proxy_auth user passwd =
(* sets 'user' and 'password' if demanded by a proxy *)
proxy_auth <- user <> "";
proxy_user <- user;
proxy_password <- passwd
method avoid_proxy_for l =
(* l: List of hosts or domains *)
no_proxy_for <- l
method set_proxy_from_environment() =
(* Is the environment variable "http_proxy" set? *)
let http_proxy =
try Sys.getenv "http_proxy" with Not_found -> "" in
begin try
let (user,password,host,port,path) = parse_http_url http_proxy in
self # set_proxy (Netencoding.Url.decode host) port;
match user with
Some user_s ->
begin match password with
Some password_s ->
self # set_proxy_auth (Netencoding.Url.decode user_s) (Netencoding.Url.decode password_s)
| None -> ()
end
| None -> ()
with
Not_found -> ()
end;
(* Is the environment variable "no_proxy" set? *)
let no_proxy =
try Sys.getenv "no_proxy" with Not_found -> "" in
let no_proxy_list =
split_words_by_commas no_proxy in
self # avoid_proxy_for no_proxy_list;
method reset () =
(* deletes all pending requests; closes connection *)
(* Reset all connections: *)
Hashtbl.iter
(fun _ cl ->
List.iter
(fun c ->
c # reset)
!cl)
connections;
(*
- well, this _should_ do nothing
List.iter
(fun fd ->
conn_cache # forget_connection fd;
Unix.close fd;
)
(conn_cache # find_my_connections (self :> < >));
*)
self # reset_counters()
method private add_with_callback_no_redirection (request : http_call) f_done =
let host = request # get_host() in
let port = request # get_port() in
let use_proxy =
proxy <> "" &&
request # proxy_enabled &&
not
(List.exists
(fun dom ->
if dom <> "" &
dom.[0] = '.' &
String.length host > String.length dom
then
let ld = String.length dom in
String.lowercase(String.sub
host
(String.length host - ld)
ld)
= String.lowercase dom
else
dom = host)
no_proxy_for)
in
(* find out the effective peer: *)
let peer, peer's_port =
if use_proxy then
proxy, proxy_port
else
host, port
in
(* Find out if there is already a connection to this peer: *)
let conn =
let connlist =
try
Hashtbl.find connections (peer, peer's_port, use_proxy)
with
Not_found ->
let new_connlist = ref [] in
Hashtbl.add connections (peer, peer's_port, use_proxy) new_connlist;
new_connlist
in
if List.length !connlist < options.number_of_parallel_connections
then begin
let new_conn = new connection
esys
(peer, peer's_port)
use_proxy
(proxy_user, proxy_password)
auth_cache
conn_cache
(self :> < >)
counters
options in
open_connections <- open_connections + 1;
counters.new_connections <- counters.new_connections + 1;
connlist := new_conn :: !connlist;
new_conn
end
else begin
(* Find the connection with the lowest number of queue entries: *)
List.fold_left
(fun best_conn a_conn ->
if a_conn # length < best_conn # length then
a_conn
else
best_conn)
(List.hd !connlist)
(List.tl !connlist)
end
in
(* Add the request to the queue of this connection: *)
conn # add request
(fun m ->
(* Update 'open_connections', 'connections', and 'open_messages' *)
if not conn#active then begin
(* Check whether the connection is still in the [connections]
* hash. It is possible that it is already deleted here.
*)
let connlist =
try
Hashtbl.find connections (peer, peer's_port, use_proxy);
with
Not_found -> ref []
in
if List.exists (fun c -> c == conn) !connlist then (
open_connections <- open_connections - 1;
connlist := List.filter (fun c -> c != conn) !connlist;
if !connlist = [] then
Hashtbl.remove connections (peer, peer's_port, use_proxy);
)
end;
self # update_open_messages;
(* Do user action: *)
f_done m;
);
open_messages <- open_messages + 1;
method private update_open_messages =
open_messages <- 0;
Hashtbl.iter
(fun _ cl ->
List.iter
(fun c ->
if c # active then
open_messages <- open_messages + (c # length))
!cl)
connections;
method add_with_callback (request : http_call) f_done =
self # add_with_callback_no_redirection
request
(fun m ->
try
let (_,code,_) = m # dest_status() in
match code with
(301|302) ->
(* Simply repeat the request with a different URI *)
let do_redirection =
match m # get_redirect_mode with
Redirect -> true
| Do_not_redirect -> false
| Redirect_if_idem -> m # is_idempotent
| Redirect_inquire f ->
(* Ask the function 'f' whether to redirect: *)
begin
try f m (* returns true or false *)
with
(* The invocation of 'f' may raise an exception.
* It is printed to stderr (there is no other
* way to report it).
*)
x ->
dlog (sprintf
"Call %d - \
Exception caught in Http_client %s"
(Oo.id m)
(Netexn.to_string x));
false
end
in
if do_redirection then begin
(* Maybe the redirection limit is exceeded: *)
let rc = m # private_api # get_redir_counter in
if rc >= options.maximum_redirections
then (
m # private_api # set_error_exception Too_many_redirections;
f_done m
)
else (
let location = m # assoc_resp_header "location" in
(* or raise Not_found *)
let location' =
if location <> "" && location.[0] = '/' then
(* Problem: "Location" header must be absolute due
* to RFC specs. Now it is relative (with full path).
* Workaround: Interpret relative to old server
*)
let host = m # get_host() in
let port = m # get_port() in
let prefix =
"http://" ^ host ^
(if port = 80 then "" else ":" ^ string_of_int port)
in
prefix ^ location
else
location in
let ok =
try
m # set_request_uri location';
true
with
| _ ->
(* Bad URL! *)
let e = URL_syntax_error location' in
m # private_api # set_error_exception e;
false
in
if ok then (
m # private_api # set_redir_counter (rc+1);
m # private_api # set_error_counter 0;
self # add_with_callback m f_done
)
else f_done m
)
end
else f_done m
| _ ->
f_done m
with
(Http_protocol _ | Not_found) ->
f_done m
)
method add request =
self # add_with_callback request (fun _ -> ())
method run () =
(* Runs through the requests in the pipeline. If a request can be
* fulfilled, i.e. the server sends a response, the status of the
* request is set and the request is removed from the pipeline.
* If a request cannot be fulfilled (no response, bad response,
* network error), an exception is raised and the request remains in
* the pipeline (and is even the head of the pipeline).
*
* Exception Broken_connection:
* - The server has closed the connection before the full request
* could be sent. It is unclear if something happened or not.
* The application should figure out the current state and
* retry the request.
* - Also raised if only parts of the response have been received
* and the server closed the connection. This is the same problem.
* Note that this can only be detected if a "content-length" has
* been sent or "chunked encoding" was chosen. Should normally
* work for persistent connections.
* - NOT raised if the server forces a "broken pipe" (normally
* indicates a serious server problem). The intention of
* Broken_connection is that retrying the request will probably
* succeed.
*)
Unixqueue.run esys
method get_options = options
method set_options p =
options <- p;
Hashtbl.iter
(fun _ cl ->
List.iter
(fun c ->
c # set_options p)
!cl)
connections
method number_of_open_messages = open_messages
method number_of_open_connections = open_connections
method connections =
let l = ref [] in
Hashtbl.iter
(fun (peer, port, _) conns ->
List.iter
(fun conn ->
l := (peer, port, conn#length) :: !l
)
!conns
)
connections;
!l
method cnt_new_connections = counters.new_connections
method cnt_timed_out_connections = counters.timed_out_connections
method cnt_crashed_connections = counters.crashed_connections
method cnt_server_eof_connections = counters.server_eof_connections
method cnt_successful_connections = counters.successful_connections
method cnt_failed_connections = counters.failed_connections
method reset_counters() =
counters.new_connections <- 0;
counters.timed_out_connections <- 0;
counters.crashed_connections <- 0;
counters.server_eof_connections <- 0;
counters.successful_connections <- 0;
counters.failed_connections <- 0;
end
;;
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(*** ***)
(*** THE CONVENIENCE MODULE ***)
(*** ***)
(**********************************************************************)
(**********************************************************************)
(**********************************************************************)
(* This module is intended for beginners and for simple applications
* of this HTTP implementation.
*)
let omtp = !Netsys_oothr.provider
let mutex = omtp # create_mutex()
let serialize f arg =
mutex # lock();
try
let r = f arg in
mutex # unlock();
r
with
err -> mutex # unlock(); raise err
;;
module Convenience =
struct
let http_trials = ref 3
let http_user = ref ""
let http_password = ref ""
let this_user = ref ""
let this_password = ref ""
let conv_verbose = ref false
class simple_key_handler : key_handler =
object
method inquire_key ~domain ~realms ~auth =
if !this_user <> "" then
( object
method user = !this_user
method password = !this_password
method realm = List.hd realms
method domain = domain
end )
else
if !http_user <> "" then
( object
method user = !http_user
method password = !http_password
method realm = List.hd realms
method domain = domain
end )
else
raise Not_found
method invalidate_key (_ : key) = ()
end
let auth_basic =
new basic_auth_handler
~enable_auth_in_advance:true (new simple_key_handler)
let auth_digest =
new basic_auth_handler
~enable_auth_in_advance:true (new simple_key_handler)
let get_default_pipe() =
let p = new pipeline in
p # set_proxy_from_environment();
(* Add authentication methods: *)
p # add_auth_handler auth_basic;
p # add_auth_handler auth_digest;
(* That's it: *)
p
let pipe = lazy (get_default_pipe())
let pipe_empty = ref true
let request m =
serialize
(fun trials ->
let p = Lazy.force pipe in
if not !pipe_empty then
p # reset();
p # add_with_callback m (fun _ -> pipe_empty := true);
pipe_empty := false;
p # run()
)
let prepare_url =
serialize
(fun url ->
try
this_user := "";
let (user,password,host,port,path) = parse_http_url url in
begin match user with
Some user_s ->
this_user := Netencoding.Url.decode user_s;
this_password := "";
begin match password with
Some password_s ->
this_password := Netencoding.Url.decode password_s
| None -> ()
end
| None -> ()
end;
"http://" ^ host ^ ":" ^ string_of_int port ^ path
with
Not_found ->
url
)
let http_get_message url =
let m = new get (prepare_url url) in
request m !http_trials;
m
let http_get url = (http_get_message url) # get_resp_body()
let http_head_message url =
let m = new head (prepare_url url) in
request m !http_trials;
m
let http_post_message url params =
let m = new post (prepare_url url) params in
request m 1;
m
let http_post url params = (http_post_message url params) # get_resp_body()
let http_put_message url content =
let m = new put (prepare_url url) content in
request m !http_trials;
m
let http_put url content = (http_put_message url content) # get_resp_body()
let http_delete_message url =
let m = new delete (prepare_url url) in
request m 1;
m
let http_delete url = (http_delete_message url) # get_resp_body()
let http_verbose =
serialize
(fun () ->
let p = Lazy.force pipe in
let opt = p # get_options in
p # set_options
{ opt with verbose_status = true;
verbose_request_header = true;
verbose_response_header = true;
verbose_request_contents = true;
verbose_response_contents = true;
verbose_connection = true
};
conv_verbose := true;
Debug.enable := true;
)
end