Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: nethttpd_reactor.ml 2195 2015-01-01 12:23:39Z gerd $
 *
 *)

(*
 * Copyright 2005 Baretta s.r.l. and Gerd Stolpmann
 *
 * This file is part of Nethttpd.
 *
 * Nethttpd is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * Nethttpd is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with Nethttpd; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 *)

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Nethttpd_reactor" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Nethttpd_reactor" Debug.enable

let () =
  Netlog.Debug.register_module "Nethttpd_reactor" Debug.enable


open Nethttp
open Nethttp.Header
open Nethttpd_types
open Nethttpd_kernel
open Netchannels
open Printf

class type http_processor_config =
object
  inherit Nethttpd_kernel.http_protocol_config
  method config_timeout_next_request : float
  method config_timeout : float
  method config_cgi : Netcgi.config
  method config_error_response : error_response_params -> string
  method config_log_error : request_info -> string -> unit
  method config_log_access : full_info -> unit
  method config_tls_cert_props : bool
  method config_tls_remote_user : bool
end

class type http_reactor_config =
object
  inherit http_processor_config
  method config_reactor_synch : [ `Connection | `Close | `Flush | `Write ]
end


class type internal_environment =
object
  inherit extended_environment

  method unlock : unit -> unit
  method req_method : http_method
  method response : http_response
  method log_access : unit -> unit
end


class type http_reactive_request =
object
  method environment : extended_environment
  method accept_body : unit -> unit
  method reject_body : unit -> unit
  method finish : unit -> unit
  method finish_request : unit -> unit
end


let get_this_host addr =
  match addr with
    | Unix.ADDR_UNIX path ->
	("", None)   (* questionable *)
    | Unix.ADDR_INET(ia,port) ->
	(Unix.string_of_inet_addr ia, Some port)


let logged_error_response fd_addr peer_addr req_meth_uri_opt 
                          in_cnt req_rej status hdr_opt msg_opt env_opt 
                          resp_opt config =
  let unopt = 
    function Some x -> x | None -> raise Not_found in
  let msg, have_msg =
    match msg_opt with
      | Some msg -> msg, true
      | None -> "", false in
  let code = int_of_http_status status in
  let info =
    ( object
	method server_socket_addr = fd_addr
	method remote_socket_addr = peer_addr
        method tls_session_props =
          match env_opt with
            | None -> None
            | Some env -> env # tls_session_props
	method request_method = fst(unopt req_meth_uri_opt)
	method request_uri = snd(unopt req_meth_uri_opt)
	method input_header = (unopt env_opt) # input_header
	method cgi_properties = (unopt env_opt) # cgi_properties
	method input_body_size = in_cnt
	method response_status_code = code
	method error_message = msg
      end
    ) in
  if have_msg then
    config # config_log_error (info:>request_info) msg;
  match resp_opt with
    | None -> ()
    | Some resp ->
	let body = config # config_error_response info in
	let eff_header = 
	  match hdr_opt with
	    | None ->
		new Netmime.basic_mime_header []
	    | Some hdr ->
		hdr in
	Nethttpd_kernel.send_static_response 
	  resp status (Some eff_header) body;
	let full =
	  new create_full_info
	    ~response_status_code:code
	    ~request_body_rejected:req_rej
	    ~output_header:eff_header
	    ~output_body_size:(Int64.of_int (String.length body))
	    (info:>request_info) in
	config # config_log_access full


let no_info =
  ( object
      method server_socket_addr = raise Not_found
      method remote_socket_addr = raise Not_found
      method tls_session_props = None
      method request_method = raise Not_found
      method request_uri = raise Not_found
      method input_header = raise Not_found
      method cgi_properties = raise Not_found
      method input_body_size = raise Not_found
    end
  )


let cert_atts =
  [ Netx509.DN_attributes.at_commonName, "_CN";
    Netx509.DN_attributes.at_emailAddress, "_Email";    
    Netx509.DN_attributes.at_organizationName, "_O";
    Netx509.DN_attributes.at_organizationalUnitName, "_OU";
    Netx509.DN_attributes.at_countryName, "_C";
    Netx509.DN_attributes.at_stateOrProvinceName, "_SP";
    Netx509.DN_attributes.at_localityName, "_L";
  ]


let gen_dn_props dn prefix =
  let p = prefix ^ "_DN" in
  (p, dn#string) ::
    List.flatten
      (List.map
         (fun (oid, suffix) ->
            try
              let s = Netx509.lookup_dn_ava_utf8 dn oid in
              [ p ^ suffix, s ]
            with Not_found -> []
         )
         cert_atts
      )


let gen_cert_props cert prefix =
  [ prefix ^ "_M_SERIAL", Netencoding.to_hex (cert#serial_number);
    prefix ^ "_V_START", Netdate.mk_internet_date ~zone:0 cert#valid_not_before;
    prefix ^ "_V_END", Netdate.mk_internet_date ~zone:0 cert#valid_not_after;
  ]


class http_environment (proc_config : #http_processor_config)
                       req_meth req_uri req_version req_hdr 
                       fd_addr peer_addr
                       in_ch in_cnt 
                       out_ch output_state resp after_send_file
		       reqrej fdi
                       tls_props
                      : internal_environment =

  (* Decode important input header fields: *)
  let (in_host, in_port_opt) =
    (* Host and port of the [Host] header *)
    try get_host req_hdr
    with 
      | Not_found -> 
	  (* For HTTP/1.1 and later this is not allowed. For earlier protocols, we
	   * just fill in the IP address that accepted the request. 
	   *)
	  ( match req_version with
	      | `Http((1,n),_) when n>= 1 ->
		  raise(Standard_response(`Bad_request, 
				        None,
				        Some "Nethttpd: Bad request: [Host] header is missing"))
	      | _ ->
		  get_this_host fd_addr
	  )
      | Bad_header_field _ ->
	  raise(Standard_response(`Bad_request,
                                   None,
			        Some "Nethttpd: Bad request: Cannot decode [Host] header")) in

  let (script_name, query_string) = decode_query req_uri in

(*
  let full_uri =
    "http://" ^ in_host ^ 
    (match in_port with Some n -> ":" ^ string_of_int n | None -> "") ^ 
    req_uri
 *)

object(self)
  inherit empty_environment

  val mutable locked = true

  val mutable logged_props = []

  val out_state = (output_state : output_state ref)

  initializer (
    config <- proc_config # config_cgi;
    in_header <- req_hdr;
    in_channel <- in_ch;
    out_channel <- out_ch;
    protocol <- req_version;
    properties <- 
      [ "GATEWAY_INTERFACE",   "Nethttpd/0.0";
	"SERVER_SOFTWARE",     "Nethttpd/0.0";
	"SERVER_NAME",         in_host;
	"SERVER_PROTOCOL",     string_of_protocol req_version;
	"REQUEST_METHOD",      req_meth;
	"SCRIPT_NAME",         script_name;
	(* "PATH_INFO",        ""; *)
	(* "PATH_TRANSLATED",  ""; *)
	"QUERY_STRING",        query_string;
	(* "REMOTE_HOST",      ""; *)
	"REMOTE_ADDR",         fst(get_this_host peer_addr);
	(* "AUTH_TYPE",        ""; *)
	(* "REMOTE_USER",      ""; *)
	(* "REMOTE_IDENT",     ""; *)
	"HTTPS",               (if tls_props = None then "off" else "on");
	"REQUEST_URI",         req_uri;
      ] @
        ( match in_port_opt with
	    | Some p -> [ "SERVER_PORT", string_of_int p ]
	    | None   -> []
        ) @
        ( match tls_props with
            | None -> []
            | Some (p:Nettls_support.tls_session_props) ->
                 (* see http://httpd.apache.org/docs/2.4/mod/mod_ssl.html
                    for a reference. Not everything is supported here, though.
                  *)
                 [ "SSL_PROTOCOL", p#protocol;
                   "SSL_VERSION_LIBRARY", "Nethttpd/0.0";
                   "SSL_CIPHER", p#kx_algo ^ "-" ^ p#cipher_algo ^ "-" ^ 
                                   p#mac_algo;
                   "SSL_SESSION_ID", Netencoding.to_hex p#id
                 ] 
                 @
                 (if proc_config # config_tls_remote_user then
                    match p # peer_credentials with
                      | `X509 cert ->
                           ( try
                               let name =
                                 Nettls_support.get_tls_user_name p in
                               [ "REMOTE_USER", name ]
                             with Not_found -> []
                           )
                      | _ -> []

                  else
                    []
                 )
                 @
                 (if proc_config # config_tls_cert_props then
                    ( match p # endpoint_credentials with
                        | `X509 cert ->
                             gen_dn_props cert#subject "SSL_SERVER_S"
                             @ gen_dn_props cert#issuer "SSL_SERVER_I"
                             @ gen_cert_props cert "SSL_SERVER"
                        | _ -> []
                    ) @
                    ( match p # peer_credentials with
                        | `X509 cert ->
                             gen_dn_props cert#subject "SSL_CLIENT_S"
                             @ gen_dn_props cert#issuer "SSL_CLIENT_I"
                             @ gen_cert_props cert "SSL_CLIENT"
                        | _ -> []
                    )
                  else
                    []
                 )
        );
    logged_props <- properties;
    tls_session_props <- tls_props;
  )

  method unlock() =
    dlogr (fun () -> sprintf "FD=%Ld: env.unlock" fdi);
    locked <- false

  method server_socket_addr = fd_addr
  method remote_socket_addr = peer_addr

  method response = resp
  method req_method = (req_meth, req_uri)

  val mutable sent_status = 0
  val mutable sent_resp_hdr = new Netmime.basic_mime_header []

  method send_output_header() =
    dlogr (fun () -> sprintf "FD=%Ld: env.send_output_header out_state=%s"
	     fdi (string_of_output_state !out_state));
    if locked then failwith "Nethttpd_reactor: channel is locked";
    if !out_state = `Start then (
      (* The response status is encoded in the [Status] pseudo header *)
      let (code, phrase) = status_of_cgi_header out_header in
      resp # send (`Resp_status_line(code, phrase));
      (* Create a copy of the header without [Status]: *)
      let h = new Netmime.basic_mime_header out_header#fields in
      h # delete_field "Status";
      sent_status <- code;   (* remember what has been sent for access logging *)
      sent_resp_hdr <- h;
      resp # send (`Resp_header h);
      out_state := `Sending;
    )
      (* netcgi2 may call send_output_header several times, so we have to 
         ignore the later calls after the first one
       *)

  method send_file fd length =
    dlogr (fun () -> sprintf "FD=%Ld: env.send_file out_state=%s" 
	     fdi (string_of_output_state !out_state));
    if locked then failwith "Nethttpd_reactor: channel is locked";
    if !out_state <> `Start then
      failwith "send_file";
    (* The response status is encoded in the [Status] pseudo header *)
    let (code, phrase) = status_of_cgi_header out_header in
    let status = http_status_of_int code in
     (* Create a copy of the header without [Status]: *)
    let h = new Netmime.basic_mime_header out_header#fields in
    h # delete_field "Status";
    sent_status <- code;   (* remember what has been sent for access logging *)
    sent_resp_hdr <- h;
    send_file_response resp status (Some h) fd length;
    out_state := `Sending;
    after_send_file()

  method log_error s =
    let info =
      ( object 
	  method server_socket_addr = fd_addr
	  method remote_socket_addr = peer_addr
	  method request_method = req_meth
	  method request_uri = req_uri
	  method input_header = req_hdr
	  method cgi_properties = logged_props
	  method input_body_size = !in_cnt
          method tls_session_props = tls_props
	end
      ) in
    proc_config # config_log_error info s

  method log_props l =
    logged_props <- l


  val mutable access_logged = false

  method log_access () =
    (* Called when the whole response is written. Do now access logging *)
    if not access_logged then (
      let full_info =
	( object
	    method server_socket_addr = fd_addr
	    method remote_socket_addr = peer_addr
	    method request_method = req_meth
	    method request_uri = req_uri
	    method input_header = req_hdr
	    method cgi_properties = logged_props
	    method input_body_size = !in_cnt
	    method response_status_code = sent_status
	    method request_body_rejected = !reqrej
	    method output_header = sent_resp_hdr
	    method output_body_size = resp#body_size
            method tls_session_props = tls_props
 	  end
	) in
      proc_config # config_log_access full_info;
      access_logged <- true
    )


  method input_body_size = !in_cnt
  method request_body_rejected = !reqrej

  method output_state = out_state
end


class http_reactor_input next_token in_cnt fdi =
  (* an extension of rec_in_channel *)
object(self)
  val mutable current_chunk = None
  val mutable eof = false
  val mutable closed = false
  val mutable locked = true

  method private refill() =
    dlogr (fun () -> sprintf "FD=%Ld: input.refill" fdi);
    match next_token() with
      | `Req_body(s,pos,len) ->
	  assert(len > 0);
	  in_cnt := Int64.add !in_cnt (Int64.of_int len);
	  current_chunk <- Some(s,pos,len)
      | `Req_trailer _ ->
	  self # refill ()   (* ignore *)
      | `Req_end ->
	  current_chunk <- None;
	  eof <- true;
	  raise End_of_file;
      | _ ->
	  (* Something else... Handle this as `Req_end! *)
	  current_chunk <- None;
	  eof <- true;
	  raise End_of_file;
	  

  method input s spos slen =
    dlogr (fun () -> sprintf "FD=%Ld: input.input" fdi);
    if closed then raise Closed_channel;
    if locked then failwith "Nethttpd_reactor: channel is locked";
    if eof then raise End_of_file;
    if current_chunk = None then self#refill();  (* may raise End_of_file *)
    match current_chunk with
      | Some(u,upos,ulen) ->
	  (* We have [ulen] data, copy that to [s] *)
	  let len = min slen ulen in
	  String.blit u upos s spos len;
	  let ulen' = ulen - len in
	  if ulen' = 0 then
	    current_chunk <- None
	  else
	    current_chunk <- Some(u,upos+len,ulen');
	  len
      | None ->
	  (* After [refill] this is not possible *)
	  assert false

  method close_in() =
    dlogr (fun () -> sprintf "FD=%Ld: input.close_in" fdi);
    if not closed then (
      if locked then failwith "Nethttpd_reactor: channel is locked";
      (* It is no problem to ignore further arriving tokens. These will be "eaten" by
       * [finish_request] later. (Of course, we could call [finish_request] here,
       * but that would probably defer the generation of responses.)
       *)
      closed <- true;
    )

  method unlock() =
    dlogr (fun () -> sprintf "FD=%Ld: input.unlock" fdi);
    locked <- false;

  method drop() =
    dlogr (fun () -> sprintf "FD=%Ld: input.drop" fdi);
    locked <- false;
    eof <- true

end


class http_reactor_output config resp synch out_state 
                          (f_access : unit -> unit) fdi =   
  (* an extension of rec_out_channel *)
object
  val mutable closed = false
  val mutable locked = true
  val mutable access_logged = false

  method output s spos slen =
    dlogr (fun () -> sprintf "FD=%Ld: output.output" fdi);
    if closed then raise Closed_channel;
    if locked then failwith "Nethttpd_reactor: channel is locked";
    if !out_state <> `Sending then 
      failwith "output channel: Cannot output now";
    let u = String.sub s spos slen in
    resp # send (`Resp_body(u, 0, String.length u));
    ( match config#config_reactor_synch with
	| `Write ->
	    synch()
	| _ ->
	    ()
    );
    slen

  method flush() =
    dlogr (fun () -> sprintf "FD=%Ld: output.flush" fdi);
    if closed then raise Closed_channel;
    if locked then failwith "Nethttpd_reactor: channel is locked";
    if !out_state = `Sending then 
      match config#config_reactor_synch with
	| `Write
	| `Flush ->
	    synch()
	| _ ->
	    ()
	      
  method close_out() =
    dlogr (fun () -> sprintf "FD=%Ld: output.close_out" fdi);
    if not closed then (
      if locked then failwith "Nethttpd_reactor: channel is locked";
      closed <- true;
      if !out_state = `Sending then
	resp # send `Resp_end;
      out_state := `End;
      ( match config#config_reactor_synch with
	  | `Write
	  | `Flush
	  | `Close ->
	      synch()
	  | _ ->
	      ()
      );
      (* This is the right time for writing the access log entry: *)
      if not access_logged then (
	f_access();
	access_logged <- true
      )
    )
      


  method after_send_file() =
    (* Flush after send_file. This method does not count as close_out,
       though. It is possible that more flushes follow.
     *)
    dlogr (fun () -> sprintf "FD=%Ld: output.after_send_file" fdi);
    out_state := `End;
    ( match config#config_reactor_synch with
	| `Write
	| `Flush
	| `Close ->
	    synch()
	| _ ->
	    ()
    );
    (* This is the right time for writing the access log entry: *)
    if not access_logged then (
      f_access();
      access_logged <- true
    )

  method unlock() =
    dlogr (fun () -> sprintf "FD=%Ld: output.unlock" fdi);
    locked <- false

end


class http_reactive_request_impl config env inch outch resp expect_100_continue
                                 finish_request reqrej
                                 : http_reactive_request =
  (* NB. inch and outch must only be used here to control the locks in these
     channels, not for doing real I/O
   *)
object(self)

  initializer (
    let (m,u) = env # req_method in
    dlogr (fun () ->
	     sprintf "req-%d: %s %s" 
	       (Oo.id self) m u
	  );
  )

  method environment = 
    (env : internal_environment :> extended_environment)

  method accept_body() =
    dlogr (fun () -> sprintf "req-%d: accept_body" (Oo.id self));
    if expect_100_continue then
      resp # send resp_100_continue;
    (* We need not to synch here! The attempt to read the body will synchronize
     * implicitly.
     * We should keep in mind, however, that when the existing body isn't read
     * the "100 Continue" might be transmitted very late. This is no disadvantage,
     * I think.
     *)
    inch # unlock();
    outch # unlock();
    env # unlock()

  method reject_body() =
    dlogr (fun () -> sprintf "req-%d: reject_body" (Oo.id self));
    inch # drop();
    outch # unlock();
    env # unlock();
    reqrej := true    (* for access logging only *)

  val mutable fin_req = false

  method finish_request() =
    if not fin_req then (    (* Do this only once *)
      dlogr (fun () -> sprintf "req-%d: finish_request" (Oo.id self));
      fin_req <- true;
      inch # drop();
      outch # unlock();
      env # unlock();
      finish_request();   (* Read the rest of the request until `Req_end *)
    )

  method finish() =
    self # finish_request();
    dlogr (fun () -> sprintf "req-%d: finish" (Oo.id self));
    match !(env#output_state) with
      | `Start ->
	  (* The whole response is missing! Generate a "Server Error": *)
	  dlogr (fun () -> 
		   sprintf "req-%d: no response, generating server error"
		     (Oo.id self));
	  output_std_response config env `Internal_server_error None 
	    (Some "Nethttpd: Missing response, replying 'Server Error'");
	  (env#output_state) := `End
      | `Sending ->
	  (* The response body is probably incomplete or missing. Try to close
	   * the channel.
	   *)
	  dlogr (fun () -> 
		   sprintf "req-%d: incomplete response, trying to fix"
		     (Oo.id self));
	  ( try env # output_ch # close_out() with Closed_channel -> () );
	  (env#output_state) := `End;
      | `End ->
	  (* Everything ok, just to be sure... *)
	  ( try env # output_ch # close_out() with Closed_channel -> () );
end



class http_reactor ?(config_hooks = fun _ -> ())
                   (config : #http_reactor_config) fd =
  (* note that "new http_reactor" can already raise exceptions, e.g.
     Unix.ENOTCONN
   *)
object(self)
  val proto = new http_protocol config fd
  val fd_addr = Unix.getsockname fd
  val peer_addr = Netsys.getpeername fd

  initializer (
    Netlog.Debug.track_fd
      ~owner:"Nethttpd_reactor"
      ~descr:(sprintf "HTTP %s->%s"
		(Netsys.string_of_sockaddr peer_addr)
		(Netsys.string_of_sockaddr fd_addr))
      fd;
    config_hooks proto#hooks
  )

  method private cycle() =
    let block = 
      if proto # waiting_for_next_message then 
	config#config_timeout_next_request
      else
	config#config_timeout in
    dlogr (fun () -> 
	     sprintf "FD %Ld: cycle block_tmo=%f" 
	       (Netsys.int64_of_file_descr fd) block);
    proto # cycle ~block ();

  method private next_token() =
    if proto # recv_queue_len = 0 then (
      self # cycle();
      self # next_token()
    )
    else (
      let tok = proto # receive() in
      dlogr (fun () ->
	       sprintf "FD %Ld: next_token=%s"
		 (Netsys.int64_of_file_descr fd)
		 (Nethttpd_kernel.string_of_req_token tok));
      tok
    )

  method private peek_token() =
    if proto # recv_queue_len = 0 then (
      self # cycle();
      self # peek_token()
    )
    else (
      let tok = proto # peek_recv() in
      dlogr (fun () ->
	       sprintf "FD %Ld: peek_token=%s"
		 (Netsys.int64_of_file_descr fd)
		 (Nethttpd_kernel.string_of_req_token tok));
      tok
    )

  method private finish_request() =
    (* Read the rest of the previous request, ignoring it *)
    dlogr (fun () -> 
	     sprintf "FD %Ld: reactor_finish_request" 
	       (Netsys.int64_of_file_descr fd));
    match self # peek_token() with
      | `Req_header _
      | `Eof
      | `Fatal_error _
      | `Bad_request_error _
      | `Timeout ->
	  (* Everything ok, do nothing *)
	  ()
      | `Req_end ->
	  (* Just drop this token, the next token starts the new request *)
	  ignore(proto # receive ())
      | `Req_expect_100_continue
      | `Req_body _
      | `Req_trailer _ ->
	  (* Continue to read this request until its end *)
	  while
	    match self # peek_token () with
	      | `Req_header _
	      | `Eof 
	      | `Fatal_error _
	      | `Bad_request_error _ 
	      | `Timeout ->
		  false   (* Do not read further *)
	      | _ ->
		  ignore(self # next_token());
		  true    (* Continue *)
	  do
	    ()
	  done


  method private synch() =
    (* Ensure that all written data are actually transmitted: *)
    dlogr (fun () -> 
	     sprintf "FD %Ld: synch loop" (Netsys.int64_of_file_descr fd));
    while proto # resp_queue_filled do
      self # cycle();
    done;
    dlogr (fun () -> 
	     sprintf "FD %Ld: leaving synch loop"
	       (Netsys.int64_of_file_descr fd));
    (* CHECK: Maybe we have to throw away the remaining tokens of the current request! *)


  method next_request () =
    let tok = self # next_token() in
    match tok with
      | `Req_header (req, req_hdr, resp) ->
	  (* Ok, we have a new request. Initialize the new environment processing
	   * it
	   *)
	  dlogr (fun () -> 
		   sprintf "FD %Ld: reactor_next_request: got header" 
		     (Netsys.int64_of_file_descr fd));
	  let expect_100_continue =
	    try
	      proto # peek_recv() = `Req_expect_100_continue
	    with
		Recv_queue_empty -> false in
	  if expect_100_continue then
	    ignore(proto # receive());

	  let ((req_meth, req_uri), req_version) = req in

	  let env_opt = ref None in  (* set below *)
	  let in_cnt = ref 0L in
	  let input_ch = 
	    new http_reactor_input self#next_token in_cnt
	      (Netsys.int64_of_file_descr fd)  in
	  let output_state = ref `Start in
	  let output_ch = 
	    new http_reactor_output config resp self#synch 
	      output_state 
	      (fun () -> 
		 match !env_opt with
		   | None -> ()
		   | Some env -> env#log_access()
	      )
	      (Netsys.int64_of_file_descr fd) in
	  let lifted_input_ch = 
	    lift_in ~buffered:false (`Rec (input_ch :> rec_in_channel)) in
	  let lifted_output_ch = 
	    lift_out (`Rec (output_ch :> rec_out_channel)) in
	  (* The input channel needs no additional buffer here. The httpd kernel
	   * already implements a good buffer.
	   *
	   * The output channel profits from a buffer. The effect is that the
	   * kernel is driven with output chunks of uniform size. Furthermore,
	   * `Write synchronization is only performed after every of these chunks,
	   * and not after every output method invocation.
	   *)

	  let after_send_file() =
	    output_ch # after_send_file();
	    lifted_output_ch # close_out() in

	  let reqrej = ref false in
	  ( try
	      let env = new http_environment 
			      config 
                              req_meth req_uri req_version req_hdr 
                              fd_addr peer_addr
		              lifted_input_ch in_cnt 
			      lifted_output_ch output_state
			      resp after_send_file reqrej
			      (Netsys.int64_of_file_descr fd)
                              proto#tls_session_props
	      in
	      env_opt := Some env;
	      let req_obj = new http_reactive_request_impl 
			      config env input_ch output_ch resp expect_100_continue 
			      self#finish_request reqrej
	      in
	      dlogr (fun () -> 
		       sprintf 
			 "FD %Ld: reactor_next_request: returning req-%d" 
			 (Netsys.int64_of_file_descr fd)
			 (Oo.id req_obj));
	      Some req_obj
	    with
		Standard_response(status, hdr_opt, msg_opt) ->
		  (* Probably a problem when decoding a header field! *)
		  dlogr (fun () -> 
			   sprintf 
			     "FD %Ld: reactor_next_request: standard response" 
			     (Netsys.int64_of_file_descr fd));
		  logged_error_response
		    fd_addr peer_addr (Some(req_meth,req_uri))
		    !in_cnt !reqrej status hdr_opt msg_opt !env_opt
		    (Some resp) config;
		  self # synch();
		  self # finish_request();
		  self # next_request()
	  )

      | `Eof ->
	  dlogr (fun () -> 
		   sprintf 
		     "FD %Ld: reactor_next_request: got eof" 
		     (Netsys.int64_of_file_descr fd));
	  self # synch();
	  None
	  
      | `Fatal_error e ->
	  (* The connection is already down. Just log the incident: *)
	  dlogr (fun () -> 
		   sprintf 
		     "FD %Ld: reactor_next_request: got fatal error" 
		     (Netsys.int64_of_file_descr fd));
	  if e <> `Broken_pipe_ignore then (
	    let msg = Nethttpd_kernel.string_of_fatal_error e in
	    let status = `Internal_server_error in
	    logged_error_response 
	      fd_addr peer_addr None 0L false status None (Some msg)
	      None None config;
	  );
	  None

      | `Bad_request_error (e, resp) ->
	  (* Log the incident, and reply with a 400 response: *)
	  dlogr (fun () -> 
		   sprintf 
		     "FD %Ld: reactor_next_request: got bad request" 
		     (Netsys.int64_of_file_descr fd));
	  let msg = string_of_bad_request_error e in
	  let status = status_of_bad_request_error e in
	  logged_error_response
	    fd_addr peer_addr None 0L false status None (Some msg)
	    None (Some resp) config;
	  self # next_request()

      | `Timeout ->
	  (* Just ignore. The next token will be `Eof *)
	  dlogr (fun () -> 
		   sprintf 
		     "FD %Ld: reactor_next_request: got timeout" 
		     (Netsys.int64_of_file_descr fd));
	  self # next_request()

      | _ ->
	  (* Everything else means that we lost synchronization, and this is a
	   * fatal error!
	   *)
	  dlogr (fun () -> 
		   sprintf 
		     "FD %Ld: reactor_next_request: out of sync" 
		     (Netsys.int64_of_file_descr fd));
	  logged_error_response
	    fd_addr peer_addr None 0L false `Internal_server_error
	    None (Some "Nethttpd: Reactor out of synchronization")
	    None None config;
	  proto # abort `Server_error;
	  self # next_request()

  method close () =
    dlogr (fun () -> 
	     sprintf "FD %Ld: reactor close" (Netsys.int64_of_file_descr fd));
    ( try
	self # synch();
      with
	| err -> 
	    Netlog.Debug.release_fd fd;
	    Unix.close fd; 
	    raise err
    );
    proto # shutdown();
    if proto # need_linger then (
      dlogr (fun () -> 
	       sprintf "FD %Ld: lingering" (Netsys.int64_of_file_descr fd));
      let lc = 
	new Nethttpd_kernel.lingering_close
	  ~preclose:(fun () -> Netlog.Debug.release_fd fd)
	  fd in
      while lc # lingering do
	lc # cycle ~block:true ()
      done
    )
    else (
      Netlog.Debug.release_fd fd;
      Unix.close fd
    )
end


exception Redirect_response_legal of string * http_header

type x_reaction = 
    [ http_service_reaction
    | `Redirect_request of string * http_header
    ]


let process_connection ?config_hooks config fd (stage1 : 'a http_service) =

  let fd_addr_str =
    try Netsys.string_of_sockaddr (Unix.getsockname fd)
    with _ -> "(noaddr)" in
  let peer_addr_str =
    try Netsys.string_of_sockaddr (Netsys.getpeername fd) 
    with _ -> "(noaddr)" in

  dlogr
    (fun () ->
       sprintf "FD %Ld (%s -> %s) processing connection"
	 (Netsys.int64_of_file_descr fd) peer_addr_str fd_addr_str 
    );

  let protect env f arg =
    try
      f arg
    with
      | Redirect_response_legal(_,_) as e -> raise e

      | Standard_response(status, hdr_opt, errmsg_opt) 
	  when !(env#output_state) = `Start -> (
	    output_std_response config env status hdr_opt errmsg_opt
	  )
 
      | err when !(env#output_state) = `Start ->
	  output_std_response config env `Internal_server_error None 
	    (Some("Nethttpd: Handler exception: " ^ Netexn.to_string err));
  in

  let do_stage3 req env stage3 =
    dlogr (fun () -> sprintf "req-%d: stage3" (Oo.id req));
    try
      stage3 # generate_response env;
      dlogr (fun () -> sprintf "req-%d: stage3 done" (Oo.id req));
    with
      | Redirect_request(_,_) ->
	  failwith "Caught Redirect_request in stage 3, \
                    but it is only allowed in stage 1"
      | Redirect_response(uri,hdr) ->
	  if !(env#output_state) <> `Start then
	    failwith "Caught Redirect_response, \
                      but it is too late for redirections";
	  dlogr (fun () -> sprintf "req-%d: stage3 redirect_response"
		   (Oo.id req));
	  raise (Redirect_response_legal(uri,hdr))
  in

  let do_stage2 req env stage2 =
    dlogr (fun () -> sprintf "req-%d: stage2" (Oo.id req));
    let stage3 = 
      try
	stage2 # process_body env
      with
	| Redirect_request(_,_) ->
	    failwith "Caught Redirect_request in stage 2, \
                      but it is only allowed in stage 1"
	| Redirect_response(_,_) ->
	    failwith "Caught Redirect_response in stage 2, \
                      but it is only allowed in stage 3"
    in
    dlogr (fun () -> sprintf "req-%d: stage2 done" (Oo.id req));
    req # finish_request();
    do_stage3 req env stage3
  in

  let rec process_request req (redir_env:extended_environment) redir_count =
    (* [redir_env]: The environment of the request, possibly rewritten by redirects.
     * [redir_count]: The number of already performed redirections
     * [req]: Contains always the original environment
     *)
    dlogr 
      (fun () ->
	 sprintf "req-%d: process_request redir_count=%d"
	   (Oo.id req) redir_count);
    if redir_count > 10 then
      failwith "Too many redirections";
    dlogr (fun () -> sprintf "req-%d: stage1" (Oo.id req));
    let reaction = 
      try (stage1 # process_header 
	     (redir_env :> extended_environment) :> x_reaction)
      with 
	| Redirect_request(new_uri, new_hdr) ->
	    `Redirect_request(new_uri, new_hdr)
	| Redirect_response(_,_) ->
	    failwith "Caught Redirect_response in stage 1, \
                      but it is only allowed in stage 3"
    in
    dlogr
      (fun () ->
	 let s_reaction =
	   match reaction with
	     | `Accept_body stage2 -> "Accept_body (next stage: stage2)"
	     | `Reject_body stage3 -> "Reject_body (next stage: stage3)"
	     | `Static _ -> "Static"
	     | `File _ -> "File"
	     | `Std_response _ -> "Std_response"
	     | `Redirect_request _ -> "Redirect_request" in
	 sprintf "req-%d: stage1 results in: %s" (Oo.id req) s_reaction
      );
    ( try
	( match reaction with
	    | `Accept_body stage2 ->
		req # accept_body();
		protect redir_env (do_stage2 req redir_env) stage2
	    | `Reject_body stage3 ->
		req # reject_body();
		protect redir_env (do_stage3 req redir_env) stage3
	    | `Static(status, resp_hdr_opt, resp_str) ->
		req # reject_body();
		output_static_response redir_env status resp_hdr_opt resp_str
	    | `File(status, resp_hdr_opt, resp_filename, pos, length) ->
		req # accept_body();
		protect
		  redir_env 
		  (output_file_response 
		     redir_env status resp_hdr_opt resp_filename pos) 
		  length
	    | `Std_response(status, resp_hdr_opt, errlog_opt) ->
		req # reject_body();
		output_std_response 
		  config redir_env status resp_hdr_opt errlog_opt
	    | `Redirect_request(new_uri, new_hdr) ->
		dlogr (fun () -> sprintf "req-%d: redirect_request to: %s"
			 (Oo.id req) new_uri);
		let (new_script_name, new_query_string) = 
		  decode_query new_uri in
		new_hdr # update_multiple_field 
		  "Content-length" 
		  (redir_env # multiple_input_header_field "Content-length");
		let new_properties =
		  update_alist 
		    [ "REQUEST_URI", new_uri;
		      "SCRIPT_NAME", new_script_name;
		      "QUERY_STRING", new_query_string ] 
		    redir_env#cgi_properties in
		let new_env =
		  new redirected_environment 
		    ~properties:new_properties
		    ~in_header:new_hdr
		    ~in_channel:req#environment#input_channel
		    redir_env in
		process_request req new_env (redir_count+1)
	)
      with
	| Redirect_response_legal(new_uri, new_hdr) ->
	    dlogr (fun () -> sprintf "req-%d: redirect_response to: %s"
		     (Oo.id req) new_uri);
	    if !(redir_env#output_state) <> `Start then
	      failwith "Redirect_response is not allowed after \
                        output has started";
	    let (new_script_name, new_query_string) = decode_query new_uri in
	    new_hdr # update_field "Content-length" "0";
	    let new_properties =
	      update_alist 
		[ "REQUEST_URI", new_uri;
		  "SCRIPT_NAME", new_script_name;
		  "QUERY_STRING", new_query_string;
		  "REQUEST_METHOD", "GET"
		] 
		redir_env#cgi_properties in
	    let new_env =
	      new redirected_environment 
		~properties:new_properties
		~in_header:new_hdr
		redir_env in
	    process_request req new_env (redir_count+1)
	    
    );
    dlogr (fun () -> sprintf "req-%d: finish" (Oo.id req));
    req # finish();
    dlogr (fun () -> sprintf "req-%d: done" (Oo.id req));
  in

  let rec fetch_requests reactor =
    match reactor # next_request() with
      | None ->
	  dlogr (fun () -> sprintf "FD %Ld - no next request"
		   (Netsys.int64_of_file_descr fd));
	  ()
      | Some req ->
	  dlogr (fun () -> sprintf "FD %Ld - next request req-%d"
		   (Netsys.int64_of_file_descr fd) (Oo.id req)
		);
	  process_request req req#environment 0;
	  fetch_requests reactor
  in
  
  let reactor = 
    try
      new http_reactor ?config_hooks config fd 
    with
	err ->
	  (* An exception means here that getsockname or getpeername failed.
             We can only close the descriptor!
           *)
	  Unix.close fd;
	     (* No need for release_fd - the descriptor has not yet been
                tracked!
	      *)
	  raise err
  in
  ( try
      fetch_requests reactor
    with
	err ->
          let bt = Printexc.get_backtrace() in
	  let msg1 = Netexn.to_string err in
          let msg =
            msg1 ^ 
              (if Printexc.backtrace_status() then " (backtrace: " ^ bt ^ ")" 
               else "") in
	  dlogr (fun () ->
		   sprintf "Exception forwarding to error log: %s" msg);
	  config # config_log_error no_info
	    ("Nethttpd: Protocol exception: " ^ msg);
  );
  ( try
      reactor # close()
    with
	err ->
          let bt = Printexc.get_backtrace() in
	  let msg1 = Netexn.to_string err in
          let msg =
            msg1 ^ 
              (if Printexc.backtrace_status() then " (backtrace: " ^ bt ^ ")" 
               else "") in
	  dlogr (fun () ->
		   sprintf "Exception forwarding to error log: %s" msg);
	  config # config_log_error no_info
	    ("Nethttpd: Protocol exception: " ^ msg)
  )
;;


let default_http_processor_config =
  ( object
      inherit Nethttpd_kernel.modify_http_protocol_config
	        Nethttpd_kernel.default_http_protocol_config
      method config_timeout_next_request = 15.0
      method config_timeout = 300.0
      method config_cgi = Netcgi.default_config
      method config_error_response = Nethttpd_util.std_error_response
      method config_log_error p msg =
	let s = Nethttpd_util.std_error_log_string p msg in
	Netlog.log `Err s
      method config_log_access p = ()
      method config_tls_cert_props = true
      method config_tls_remote_user = true
    end
  )


let override v opt =
  match opt with
    | None -> v
    | Some x -> x


class modify_http_processor_config
      ?modify_http_protocol_config:(m1 = fun cfg -> cfg)
      ?config_timeout_next_request
      ?config_timeout
      ?config_cgi
      ?config_error_response
      ?config_log_error
      ?config_log_access
      ?config_tls_cert_props
      ?config_tls_remote_user
      (config : http_processor_config) : http_processor_config =
  let config_timeout_next_request =
    override config#config_timeout_next_request config_timeout_next_request in
  let config_timeout =
    override config#config_timeout config_timeout in
  let config_cgi =
    override config#config_cgi config_cgi in
  let config_error_response =
    override config#config_error_response config_error_response in
  let config_log_error =
    override config#config_log_error config_log_error in
  let config_log_access =
    override config#config_log_access config_log_access in
  let config_tls_cert_props =
    override config#config_tls_cert_props config_tls_cert_props in
  let config_tls_remote_user =
    override config#config_tls_remote_user config_tls_remote_user in
object
  inherit modify_http_protocol_config (m1 (config:>http_protocol_config))
  method config_timeout_next_request = config_timeout_next_request
  method config_timeout = config_timeout
  method config_cgi = config_cgi
  method config_error_response = config_error_response
  method config_log_error = config_log_error
  method config_log_access = config_log_access
  method config_tls_cert_props = config_tls_cert_props
  method config_tls_remote_user = config_tls_remote_user
end


let default_http_reactor_config =
  ( object
      inherit modify_http_processor_config default_http_processor_config
      method config_reactor_synch = `Write
    end
  )


class modify_http_reactor_config
      ?modify_http_protocol_config
      ?modify_http_processor_config:(m2 = fun cfg -> cfg)
      ?config_reactor_synch
      (config : http_reactor_config) : http_reactor_config =
  let config_reactor_synch =
    override config#config_reactor_synch config_reactor_synch in
object
  inherit modify_http_processor_config
            ?modify_http_protocol_config
            (m2 (config:>http_processor_config))
  method config_reactor_synch = config_reactor_synch
end

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml