(* $Id: nethttpd_engine.ml 1590 2011-05-05 11:30:08Z gerd $ * *) (* * Copyright 2005 Baretta s.r.l. and Gerd Stolpmann * * This file is part of Nethttpd. * * Nethttpd is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Nethttpd is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Nethttpd; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *) module Debug = struct let enable = ref false end let dlog = Netlog.Debug.mk_dlog "Nethttpd_engine" Debug.enable let dlogr = Netlog.Debug.mk_dlogr "Nethttpd_engine" Debug.enable let () = Netlog.Debug.register_module "Nethttpd_engine" Debug.enable open Nethttp open Nethttp.Header open Nethttpd_types open Nethttpd_kernel open Printf type engine_req_state = [ `Received_header | `Receiving_body | `Received_request | `Finishing ] class type http_engine_config = object inherit Nethttpd_reactor.http_processor_config method config_input_flow_control : bool method config_output_flow_control : bool end class type extended_async_environment = object inherit extended_environment method input_ch_async : Uq_engines.async_in_channel method output_ch_async : Uq_engines.async_out_channel end class type http_request_header_notification = object method req_state : engine_req_state method environment : extended_async_environment method schedule_accept_body : on_request:(http_request_notification -> unit) -> ?on_error:(unit -> unit) -> unit -> unit method schedule_reject_body : on_request:(http_request_notification -> unit) -> ?on_error:(unit -> unit) -> unit -> unit method schedule_finish : unit -> unit end and http_request_notification = object method req_state : engine_req_state method environment : extended_async_environment method schedule_finish : unit -> unit end type conn_state = [ `Active of http_protocol | `Closing of lingering_close | `Closed ] type reset_cond = unit -> unit exception Ev_output_filled of Unixqueue.group * reset_cond (** condition: The output channel filled data into the [http_response] object, and * notifies now the [http_engine] *) (* FIXME: This event is also sent for response objects that are still buffering but not yet writing to the output descriptor. This is not harmful, because it means this event is sent too often. This is inelegant, though, and might be a performance problem. *) exception Ev_input_empty of Unixqueue.group * reset_cond (** condition: The input channel became empty, and the engine must be notified * (used for input flow control) *) class condition ues mk_ev = object(self) val mutable signaled = false (* Records whether there is [ev] on the event queue. As [ev] is used only for expressing a condition, it is nonsense to add [ev] twice to the queue. *) val mutable ev = lazy(assert false) initializer ( let ev0 = mk_ev self#reset in ev <- lazy ev0 ) method signal() = if not signaled then ( Unixqueue.add_event ues (Lazy.force ev); signaled <- true ) method reset() = (* To be called when [ev] is consumed by the event handler *) signaled <- false end class http_engine_input config ues group in_cnt fdi rqid = let cond_input_empty = new condition ues (fun reset -> Unixqueue.Extra(Ev_input_empty(group,reset))) in object(self) (* The input channel is fed with data by the main event handler that invokes * [add_data] and [add_eof] to forward new input data to this channel. *) val mutable front = None val mutable data_queue = Queue.create() val mutable eof = false val mutable pos_in = 0 val mutable closed = false val mutable locked = true val mutable aborted = false val mutable notify_list = [] val mutable notify_list_new = [] method cond_input_empty = cond_input_empty method input s spos slen = dlogr (fun () -> sprintf "FD %Ld req-%d: input.input" fdi !rqid); if closed then raise Netchannels.Closed_channel; if locked then failwith "Nethttpd_engine: channel is locked"; if aborted then failwith "Nethttpd_engine: channel aborted"; (match front with | None -> ( try front <- Some(Queue.take data_queue) with Queue.Empty -> () ) | Some _ -> () ); (match front with | None -> if eof then raise End_of_file else 0 (* buffer underrun *) | Some (u,upos,ulen) -> let len = min slen ulen in String.blit u upos s spos len; if len = ulen then front <- None else front <- Some(u,upos+len,ulen-len); pos_in <- pos_in + len; if not (self # can_input) then ( if config#config_input_flow_control then cond_input_empty # signal(); self # notify(); ); len ) method pos_in = pos_in method close_in() = dlogr (fun () -> sprintf "FD %Ld req-%d: input.close_in" fdi !rqid); if not closed then ( if locked then failwith "Nethttpd_engine: channel is locked"; front <- None; Queue.clear data_queue; closed <- true ) method can_input = not closed && (front <> None || not(Queue.is_empty data_queue) || eof) method request_notification f = notify_list_new <- f :: notify_list_new method private notify() = notify_list <- notify_list @ notify_list_new; notify_list_new <- []; notify_list <- List.filter (fun f -> f()) notify_list method add_data ((_,_,len) as data_chunk) = dlogr (fun () -> sprintf "FD %Ld req-%d: input.add_data" fdi !rqid); assert(len > 0); if not eof then ( let old_can_input = self # can_input in Queue.push data_chunk data_queue; in_cnt := Int64.add !in_cnt (Int64.of_int len); if not old_can_input && not closed then self # notify() ) (* else: we are probably dropping all data! *) method add_eof() = dlogr (fun () -> sprintf "FD %Ld req-%d: input.add_eof" fdi !rqid); if not eof then ( let old_can_input = self # can_input in eof <- true; if not old_can_input && not closed then self # notify() ) method unlock() = dlogr (fun () -> sprintf "FD %Ld req-%d: input.unlock" fdi !rqid); locked <- false method drop() = dlogr (fun () -> sprintf "FD %Ld req-%d: input.drop" fdi !rqid); locked <- false; eof <- true method abort() = dlogr (fun () -> sprintf "FD %Ld req-%d: input.abort" fdi !rqid); aborted <- true end class http_engine_output config ues group resp output_state (f_access : unit->unit) fdi = let cond_output_filled = new condition ues (fun reset -> Unixqueue.Extra(Ev_output_filled(group,reset))) in object(self) (* The output channel adds the incoming data to the [http_response] object * [resp]. The main [http_engine] is notified about new data by a * [Ev_output_filled] event. This gives the [http_engine] has the chance to * check whether it again enables output because new data is to be output. * * Note that [resp] is setup such that it calls [resp_notify] whenever the state * of [resp] changes, or the [resp] queue becomes empty. We do not immediately * forward this notification, but delay it a bit by pusing the invocation * onto the event queue. This indirection * is necessary because the moment of the [resp_notify] invocation is quite * hairy, and failures must not be risked. *) val mutable pos_out = 0 val mutable closed = false val mutable locked = true val mutable aborted = false val mutable notify_list = [] val mutable notify_list_new = [] initializer ( resp # set_callback self#resp_notify ) method cond_output_filled = cond_output_filled method output s spos slen = (* In principle we always accept any amount of output data. For practical reasons, * the length of an individual chunk is limited to 8K - it just prevents * some dumb programming errors. *) dlogr (fun () -> sprintf "FD %Ld resp-%d: output.output" fdi (Oo.id resp)); if closed then raise Netchannels.Closed_channel; if locked then failwith "Nethttpd_engine: channel is locked"; if aborted then failwith "Nethttpd_engine: channel aborted"; if !output_state <> `Sending then failwith "output channel: cannot output now"; let len = min slen 8192 in if len > 0 then ( let old_can_output = self # can_output in let u = String.sub s spos len in resp # send (`Resp_body(u,0,String.length u)); cond_output_filled # signal(); pos_out <- pos_out + len; if old_can_output <> self # can_output then self # notify(); ); len method pos_out = pos_out method flush() = dlogr (fun () -> sprintf "FD %Ld resp-%d: output.flush" fdi (Oo.id resp)); () method close_out() = dlogr (fun () -> sprintf "FD %Ld resp-%d: output.close_out" fdi (Oo.id resp)); if not closed then ( if locked then failwith "Nethttpd_engine: channel is locked"; let old_can_output = self # can_output in resp # send `Resp_end; closed <- true; output_state := `End; f_access(); cond_output_filled # signal(); if old_can_output <> self # can_output then self # notify(); ) method close_after_send_file() = dlogr (fun () -> sprintf "FD %Ld resp-%d: output.close_after_send_file" fdi (Oo.id resp)); closed <- true; f_access(); method can_output = (not config#config_output_flow_control) || ((resp # state = `Active) && resp # send_queue_empty) method unlock() = dlogr (fun () -> sprintf "FD %Ld resp-%d: output.unlock" fdi (Oo.id resp)); locked <- false method resp_notify() = Unixqueue.once ues group 0.0 self#notify (* CHECK: It is assumed that self#notify is called before the condition * is again invalid. * If this turns out to be wrong, we have to implement a quicker way of * notification. It is known that [resp_notify] is called back from the * current [cycle]. One could check after every [cycle] whether the * notification is still valid. *) method request_notification f = notify_list_new <- f :: notify_list_new method private notify() = notify_list <- notify_list @ notify_list_new; notify_list_new <- []; notify_list <- List.filter (fun f -> f()) notify_list method abort() = dlogr (fun () -> sprintf "FD %Ld resp-%d: output.abort" fdi (Oo.id resp)); aborted <- true end class http_async_environment config ues group ((req_meth, req_uri), req_version) req_hdr fd_addr peer_addr in_ch_async in_cnt out_ch_async output_state resp reqrej fdi = let in_ch = Netchannels.lift_in ~buffered:false (`Raw (in_ch_async :> Netchannels.raw_in_channel)) in let out_ch = Netchannels.lift_out ~buffered:false (`Raw (out_ch_async :> Netchannels.raw_out_channel)) in (* [in_ch] and [out_ch] are standard channels corresponding to [in_ch_async] and * [out_ch_async]. Note that there is no buffering. A buffer would introduce * a delay between the standard and the asynchronous channels - a very surprising * effect. Furthermore, there is already a lot of buffering in [http_protocol], * so buffers are probably not needed here. *) object (self) inherit Nethttpd_reactor.http_environment config req_meth req_uri req_version req_hdr fd_addr peer_addr in_ch in_cnt out_ch output_state resp out_ch_async#close_after_send_file reqrej fdi as super method input_ch_async = (in_ch_async :> Uq_engines.async_in_channel) method output_ch_async = (out_ch_async :> Uq_engines.async_out_channel) method send_output_header() = let ok = !output_state = `Start in super # send_output_header(); if ok then out_ch_async # cond_output_filled # signal() method send_file fd length = super # send_file fd length; (* does not block, because [synch] is now [ fun () -> () ] *) out_ch_async # cond_output_filled # signal() end class http_request_manager config ues group req_line req_hdr expect_100_continue fd_addr peer_addr resp fdi = let f_access = ref (fun () -> ()) in (* set below *) let in_cnt = ref 0L in let reqrej = ref false in let rqid = ref (-1) in let output_state = ref `Start in let in_ch = new http_engine_input config ues group in_cnt fdi rqid in let out_ch = new http_engine_output config ues group resp output_state (fun () -> !f_access()) fdi in let env = new http_async_environment config ues group req_line req_hdr fd_addr peer_addr in_ch in_cnt out_ch output_state resp reqrej fdi in (* may raise Standard_response! *) let () = f_access := env # log_access in object(self) (* This class also satisfies the types [http_request_notification] and * [http_request_header_notification] *) initializer ( dlogr (fun () -> sprintf "FD=%Ld: new request_manager req-%d resp-%d" fdi (Oo.id self) (Oo.id resp)); rqid := (Oo.id self) ) val mutable req_state = ( `Received_header : engine_req_state ) (* When this object is created, the header has just been received *) val mutable req_handler = (fun _ -> failwith "Nethttpd_engine: No [on_request] function") val mutable error_handler = (fun () -> ()) method real_input_ch = in_ch (* full type! *) method real_output_ch = out_ch (* full type! *) method environment = (env :> extended_async_environment) method req_state = req_state method set_req_state s = req_state <- s method req_handler = (req_handler : http_request_notification -> unit) method error_handler = error_handler method log_access = env#log_access method abort() = dlogr (fun () -> sprintf "FD %Ld req-%d: abort" fdi (Oo.id self)); in_ch # abort(); out_ch # abort(); method schedule_accept_body ~on_request ?(on_error = fun ()->()) () = dlogr (fun () -> sprintf "FD %Ld req-%d: accept_body" fdi (Oo.id self)); (* Send the "100 Continue" response if requested: *) if expect_100_continue then ( resp # send resp_100_continue; out_ch # cond_output_filled # signal(); ); (* Unlock everything: *) in_ch # unlock(); out_ch # unlock(); env # unlock(); (* Remember the callback functions: *) req_handler <- on_request; error_handler <- on_error method schedule_reject_body ~on_request ?(on_error = fun ()->()) () = dlogr (fun () -> sprintf "FD %Ld req-%d: reject_body" fdi (Oo.id self)); (* Unlock everything: *) in_ch # drop(); out_ch # unlock(); env # unlock(); reqrej := true; (* Remember the callback functions: *) req_handler <- on_request; error_handler <- on_error method schedule_finish() = (* This is quite tricky: * - Any remaining data in the input channel is dropped. The [drop] method * does this. This has also the effect that any additional data still * arriving is thrown away. * - We have to check the output state for the response. If it is still `Start, * the whole response is missing. We generate a "Server Error" in this case. * Otherwise we just close the output channel and hope we are done. * - We also set [req_state] to `Finishing to inform all other parts of the * engine what is going on. *) dlogr (fun () -> sprintf "FD %Ld req-%d: schedule_finish" fdi (Oo.id self)); in_ch # drop(); out_ch # unlock(); env # unlock(); req_state <- `Finishing; match !(env # output_state) with | `Start -> (* The whole response is missing! Generate a "Server Error": *) dlogr (fun () -> sprintf "FD %Ld req-%d: missing response error" fdi (Oo.id self)); output_std_response config env `Internal_server_error None (Some "Nethttpd: Missing response, replying 'Server Error'"); (env # output_state) := `End; | `Sending -> (* The response body is probably incomplete or missing. Try to close * the channel. *) ( try env # output_ch # close_out() with Netchannels.Closed_channel -> () ); (env # output_state) := `End; | `End -> (* Everything ok, just to be sure... *) ( try env # output_ch # close_out() with Netchannels.Closed_channel -> () ) end let ensure_not_reentrant name lock f arg = if !lock then ( prerr_endline ("Illegal reentrancy: " ^ name); assert false ); lock := true; try let r = f arg in lock := false; r with err -> lock := false; raise err class http_engine ~on_request_header () config fd ues = (* note that "new http_engine" can already raise exceptions, e.g. Unix.ENOTCONN *) let _proto = new http_protocol config fd in let handle_event_lock = ref false in let fdi = Netsys.int64_of_file_descr fd in object(self) inherit [unit] Uq_engines.engine_mixin (`Working 0) ues val fd_addr = Unix.getsockname fd val peer_addr = Netsys.getpeername fd val mutable conn_state = (`Active _proto : conn_state) val mutable group = Unixqueue.new_group ues val mutable enable_in = false val mutable in_timeout = 0.0 val mutable enable_out = false (* - When to listen for input events? In principle, when [proto # do_input] * indicates this. This flag can only change after every [proto # cycle]. * - When to listen for output events? In principle, when [proto # do_output] * indicates this. This flag can change after [proto # cycle], but also * after new data have been output. Our output channel will tell us, and * sends [Ev_output_filled] events to us. *) val mutable cur_request_manager = None val mutable eof_seen = false initializer ( self # start(); Netlog.Debug.track_fd (* configure tracking as last step of init *) ~owner:"Nethttpd_engine" ~descr:(sprintf "HTTP %s->%s" (Netsys.string_of_sockaddr peer_addr) (Netsys.string_of_sockaddr fd_addr)) fd ) method private start() = Unixqueue.add_handler ues group (fun _ _ -> self # handle_event); Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception); self # enable_input true method private enable_input flag = let timeout = match conn_state with | `Active proto -> ( match proto # input_timeout_class with | `None -> (-1.0) | `Normal -> config#config_timeout | `Next_message -> config#config_timeout_next_request ) | `Closing lc -> 1.0 (* i.e. check every second whether the lingering phase is over *) | `Closed -> assert false in ( match (flag, enable_in) with | (true, false) -> Unixqueue.add_resource ues group (Unixqueue.Wait_in fd, timeout); | (false, true) -> Unixqueue.remove_resource ues group (Unixqueue.Wait_in fd); | (true, true) when timeout <> in_timeout -> Unixqueue.remove_resource ues group (Unixqueue.Wait_in fd); Unixqueue.add_resource ues group (Unixqueue.Wait_in fd, timeout); | _ -> () ); enable_in <- flag; in_timeout <- timeout; method private enable_output flag = if flag && not enable_out then Unixqueue.add_resource ues group (Unixqueue.Wait_out fd, config#config_timeout); if not flag && enable_out then Unixqueue.remove_resource ues group (Unixqueue.Wait_out fd); enable_out <- flag method private handle_event ev = ensure_not_reentrant "Nethttpd_engine.http_engine#handle_event" handle_event_lock (fun () -> match ev with | Unixqueue.Input_arrived(g,_) when g = group -> (* Input data from the HTTP client *) ( match conn_state with | `Active proto -> self # cycle_proto proto | `Closing lc -> self # cycle_lc lc | `Closed -> () (* drop this event *) ); self # count() | Unixqueue.Output_readiness(g,_) when g = group -> (* The HTTP client accepts new data *) ( match conn_state with | `Active proto -> self # cycle_proto proto | `Closing lc -> () (* Strange. Ignore for now. *) | `Closed -> () (* drop this event *) ); self # count() | Unixqueue.Timeout(g,_) when g = group -> (* Register a timeout *) ( match conn_state with | `Active proto -> self # timeout_proto proto | `Closing lc -> self # cycle_lc lc | `Closed -> () (* drop this event *) ) | Unixqueue.Extra (Ev_output_filled(g,reset)) when g = group -> (* The output channel is filled with fresh data *) reset(); ( match conn_state with | `Active proto -> (* Check whether to enable output now: *) self # enable_output proto#do_output | `Closing lc -> () (* drop this event *) | `Closed -> () (* drop this event *) ) | Unixqueue.Extra (Ev_input_empty(g,reset)) when g = group -> (* The input channel is empty. CHECK: why is this a no-op? *) reset(); ( match conn_state with | `Active proto -> () | `Closing lc -> () (* drop this event *) | `Closed -> () (* drop this event *) ) | _ -> raise Equeue.Reject (* Other engines may want to see this event *) ) () method private count() = match self#state with | `Working n -> self # set_state(`Working(n+1)) | _ -> () method private cycle_proto proto = (* Do a HTTP protocol cycle, and check whether I/O is still enabled *) dlogr (fun () -> sprintf "FD %Ld: cycle" (Netsys.int64_of_file_descr fd)); proto # cycle(); (* do not block! *) self # goon_proto proto method private timeout_proto proto = (* Either input or output has timed out. *) dlogr (fun () -> sprintf "FD %Ld: timeout" (Netsys.int64_of_file_descr fd)); proto # timeout(); self # goon_proto proto method private goon_proto proto = dlogr (fun () -> sprintf "FD %Ld: go on" (Netsys.int64_of_file_descr fd)); let enabled_input_flow = (not config#config_input_flow_control) || ( (* Input flow control: We stop reading from the descriptor when there are * unprocessed request tokens. *) match cur_request_manager with | None -> true (* CHECK: This might be nonsense. It is possible that in the * last request manager the input channel has unprocessed data. * Don't know how to handle this. (I don't think it is possible * to attack the server because of this issue, because the * pipeline length option limits the number of unresponded * requests anyway.) *) | Some rm -> (* If the input channel is empty, we enable input, and vice versa: *) not(rm # environment # input_ch_async # can_input) ) in self # forward_input_tokens proto; self # enable_input (enabled_input_flow && proto#do_input); self # enable_output proto#do_output; (* Check whether the HTTP connection is processed and can be closed: *) if eof_seen && not proto#do_output then ( if proto # need_linger then ( (* FIXME: It is strange to check here for a lingering close. This should never be necessary after getting an EOF from the client *) dlogr (fun () -> sprintf "FD %Ld: lingering" (Netsys.int64_of_file_descr fd)); let lc = new lingering_close ~preclose:(fun () -> Netlog.Debug.release_fd fd) fd in conn_state <- `Closing lc; self # enable_input true; self # enable_output false; ) else ( (* Just close the descriptor and shut down the engine: *) dlogr (fun () -> sprintf "FD %Ld: closing" (Netsys.int64_of_file_descr fd)); Netlog.Debug.release_fd fd; conn_state <- `Closed; Unix.close fd; Unixqueue.clear ues group; (* Stop in Unixqueue terms *) self # set_state (`Done()); (* Report the new state *) ) ) method private receive proto = let tok = proto # receive() in dlogr (fun () -> sprintf "FD %Ld: next token: %s" (Netsys.int64_of_file_descr fd) (Nethttpd_kernel.string_of_req_token tok) ); tok method private forward_input_tokens proto = (* Interpret all available input tokens: *) dlogr (fun () -> sprintf "FD %Ld: forward_input_tokens" (Netsys.int64_of_file_descr fd)); while proto # recv_queue_len > 0 do match self # receive proto with | `Req_header(req_line, req_hdr, resp) -> (* The next request starts. *) assert(cur_request_manager = None); let expect_100_continue = try proto # peek_recv() = `Req_expect_100_continue with Recv_queue_empty -> false in if expect_100_continue then ignore(self # receive proto); let f_access = ref (fun () -> ()) in ( try let rm = new http_request_manager (* or Standard_response *) config ues group req_line req_hdr expect_100_continue fd_addr peer_addr resp fdi in f_access := rm # log_access; cur_request_manager <- Some rm; dlogr (fun () -> sprintf "FD %Ld: got request req-%d, notifying user" (Netsys.int64_of_file_descr fd) (Oo.id rm)); (* Notify the user that we have received the header: *) let () = on_request_header (rm :> http_request_header_notification) in (* Note: the callback may raise an arbitrary exception *) dlogr (fun () -> sprintf "FD %Ld: back from user for req-%d" (Netsys.int64_of_file_descr fd) (Oo.id rm)) with | Standard_response(status, hdr_opt, msg_opt) -> (* Probably a problem when decoding a header field! *) dlogr (fun () -> sprintf "FD %Ld: got request -> std response" (Netsys.int64_of_file_descr fd)); let (req_meth, req_uri) = fst req_line in let (in_cnt,reqrej,env_opt) = match cur_request_manager with | Some rm -> let env = (rm # environment :> extended_environment) in (env#input_body_size, env#request_body_rejected, Some env) | None -> (0L, false, None) in Nethttpd_reactor.logged_error_response fd_addr peer_addr (Some(req_meth,req_uri)) in_cnt reqrej status (Some req_hdr) msg_opt env_opt (Some resp) (config :> Nethttpd_reactor.http_processor_config); Unixqueue.add_event ues (Unixqueue.Extra(Ev_output_filled(group,(fun () -> ())))); (* Now [cur_request_manager = None]. This has the effect that * all further input tokens for this request are silently * dropped. *) ) | `Req_expect_100_continue -> assert false (* already processed *) | `Req_body data_chunk -> (* Just forward data to the current request manager: *) dlogr (fun () -> sprintf "FD %Ld: got body data" (Netsys.int64_of_file_descr fd)); ( match cur_request_manager with | Some rm -> if rm # req_state <> `Finishing then rm # set_req_state `Receiving_body; rm # real_input_ch # add_data data_chunk | None -> () (* drop it *) ) | `Req_trailer _ -> (* Don't know what to do with the trailer. *) dlogr (fun () -> sprintf "FD %Ld: got trailer" (Netsys.int64_of_file_descr fd)); () | `Req_end -> (* Just forward this event to the current request manager: *) dlogr (fun () -> sprintf "FD %Ld: got request end" (Netsys.int64_of_file_descr fd)); ( match cur_request_manager with | Some rm -> dlogr (fun () -> sprintf "FD %Ld: req end for req-%d" (Netsys.int64_of_file_descr fd) (Oo.id rm)); cur_request_manager <- None; ( match rm # req_state with | `Finishing -> (* The request has been dropped, so call the error * handler. *) rm # error_handler() | _ -> (* Normal end of request: *) rm # set_req_state `Received_request; rm # real_input_ch # add_eof (); (* Now call the function given by the [on_request] argument: *) rm # req_handler (rm :> http_request_notification); (* Note: the callback may raise an arbitrary exception *) ) | None -> () (* drop *) ); | `Eof -> (* If there is still a request manager, finish the current request. *) dlogr (fun () -> sprintf "FD %Ld: got eof" (Netsys.int64_of_file_descr fd)); ( match cur_request_manager with | Some rm -> cur_request_manager <- None; ( match rm # req_state with | `Received_request -> (* This is impossible! *) assert false | `Finishing -> (* The current request has not been arrived completely. * It has been dropped, so call the error handler. *) rm # error_handler() | _ -> (* Same as before, but the request was not yet properly * finished. *) rm # schedule_finish(); rm # error_handler() ); | None -> () ); (* Record this event. Will be checked in [cylce_proto] *) eof_seen <- true | `Fatal_error e -> (* The connection is already down. Just log the incident: *) dlogr (fun () -> sprintf "FD %Ld: got fatal error" (Netsys.int64_of_file_descr fd)); if e <> `Broken_pipe_ignore then ( let msg = Nethttpd_kernel.string_of_fatal_error e in Nethttpd_reactor.logged_error_response fd_addr peer_addr None 0L false `Internal_server_error None (Some msg) None None (config :> Nethttpd_reactor.http_processor_config) (* Note: The kernel ensures that the following token will be [`Eof]. * Any necessary cleanup will be done when [`Eof] is processed. *) ) | `Bad_request_error (e, resp) -> (* Log the incident, and reply with a 400 response. There isn't any * request manager, because `Bad_request_error replaces `Req_header * when bad requests arrive. *) dlogr (fun () -> sprintf "FD %Ld: got bad request" (Netsys.int64_of_file_descr fd)); assert(cur_request_manager = None); let msg = string_of_bad_request_error e in let status = status_of_bad_request_error e in Nethttpd_reactor.logged_error_response fd_addr peer_addr None 0L false status None (Some msg) None (Some resp) (config :> Nethttpd_reactor.http_processor_config); Unixqueue.add_event ues (Unixqueue.Extra(Ev_output_filled(group,(fun () -> ())))); (* Note: The kernel ensures that the following token will be [`Eof]. * Any necessary cleanup will be done when [`Eof] is processed. *) | `Timeout -> (* A non-fatal timeout. Always followed by [`Eof] *) dlogr (fun () -> sprintf "FD %Ld: got timeout" (Netsys.int64_of_file_descr fd)); () done method private cycle_lc lc = (* Do a cycle of the [lc] engine. *) dlogr (fun () -> sprintf "FD %Ld: cycle_lc" (Netsys.int64_of_file_descr fd)); lc # cycle(); (* do not block! *) let cont = lc # lingering in self # enable_output false; self # enable_input cont; if not cont then ( (* Now stop the whole engine! *) dlogr (fun () -> sprintf "FD %Ld: cycle_lc transitioning to Done" (Netsys.int64_of_file_descr fd)); conn_state <- `Closed; Unixqueue.clear ues group; (* Stop in Unixqueue terms *) self # set_state (`Done()); (* Report the new state *) ) method private handle_exception err = (* In general this should not happen. The HTTP kernel already handles all kinds * of I/O errors. This means all remaining exceptions are programming errors. *) assert false method event_system = ues method abort() = (* The hard way to stop everything: *) dlogr (fun () -> sprintf "FD %Ld: abort" (Netsys.int64_of_file_descr fd)); match self#state with | `Working _ -> Unixqueue.clear ues group; (* Stop the queue immediately *) if conn_state <> `Closed then ( Netlog.Debug.release_fd fd; try Unix.close fd with _ -> () ); ( match conn_state with | `Active proto -> proto # abort `Broken_pipe (* This closes the file descriptors of all files currently * being sent by [send_file_response]. *) | _ -> () ); ( match cur_request_manager with | Some rm -> rm # abort() (* The input and output channels are forced to fail when data * is input/output. *) | None -> () ); self # set_state `Aborted | _ -> () (* already in a final state *) end class type http_engine_processing_config = object method config_synch_input : (Netchannels.in_obj_channel -> unit) -> Uq_engines.async_in_channel -> unit method config_synch_output : (Netchannels.out_obj_channel -> unit) -> Uq_engines.async_out_channel -> unit end class buffering_engine_processing_config : http_engine_processing_config = object method config_synch_input f (ch : Uq_engines.async_in_channel) = let queue = Queue.create() in let ch' = object(self) val mutable closed = false val mutable token = None val mutable pos_in = 0 method input u up ul = if closed then raise Netchannels.Closed_channel; try let (s,sp,sl) = self # next_token in let n = min ul sl in String.blit s sp u up n; token <- if n = sl then None else Some(s,sp+n,sl-n); pos_in <- pos_in + n; n with Queue.Empty -> raise End_of_file method close_in() = closed <- true method pos_in = pos_in method private next_token = match token with | None -> let tok = Queue.take queue in token <- Some tok; tok | Some(s,p,l) -> (s,p,l) end in let s = String.create 8192 in let on_data() = try while ch # can_input do let n = ch # input s 0 8192 in Queue.push (String.sub s 0 n, 0, n) queue done; true (* notify again *) with End_of_file -> let ch' = Netchannels.lift_in ~buffered:false (`Raw ch') in f ch'; false (* don't notify any longer *) in ch # request_notification on_data; if ch # can_input then ignore(on_data()); method config_synch_output f (ch : Uq_engines.async_out_channel) = let ch' = Netchannels.lift_out ~buffered:false (`Raw (ch :> Netchannels.raw_out_channel)) in f ch' (* The output channel of the engine buffers anyway! *) end class type http_engine_processing_context = object method engine : unit Uq_engines.engine end type x_reaction = [ http_service_reaction | `Redirect_request of string * http_header ] exception Ev_stage2_processed of (http_service_generator option * http_request_header_notification * Unixqueue.group) (* Event: Stage 2 has been processed. The argument is the follow-up action: * - None: Finish request immediately * - Some g: Proceed with generator g * The other args only allow identification of the event. *) exception Ev_stage3_processed of ((string * http_header) option * http_request_notification * Unixqueue.group) (* Event: Stage 2 has been processed. The argument is the follow-up action: * - None: Finish request * - Some(uri,hdr): Redirect response to this location *) class redrained_environment ~out_channel (env : extended_environment) = object(self) inherit redirected_environment env method output_ch = out_channel method out_channel = out_channel (* CHECK: send_file? maybe it needs also to be overridden *) end class fake_rhn (req:http_request_notification) : http_request_header_notification = object method req_state = `Received_header method environment = req # environment method schedule_accept_body ~on_request ?(on_error = fun () -> ()) () = on_request req method schedule_reject_body ~on_request ?(on_error = fun () -> ()) () = on_request req method schedule_finish() = req # schedule_finish() end let process_connection config pconfig fd ues (stage1 : _ http_service) : http_engine_processing_context = let fd_addr = Unix.getsockname fd in let peer_addr = Netsys.getpeername fd in dlogr (fun () -> sprintf "FD %Ld (%s -> %s) processing connection" (Netsys.int64_of_file_descr fd) (Netsys.string_of_sockaddr peer_addr) (Netsys.string_of_sockaddr fd_addr) ); let on_req_hdr = ref (fun _ -> ()) in let eng_config = object inherit Nethttpd_reactor.modify_http_processor_config (config :> Nethttpd_reactor.http_processor_config) method config_input_flow_control = true method config_output_flow_control = true end in let log_error req msg = let env = (req # environment :> extended_environment) in let meth = env # cgi_request_method in let uri = env # cgi_request_uri in Nethttpd_reactor.logged_error_response fd_addr peer_addr (Some(meth,uri)) 0L false `Internal_server_error (Some env#input_header) (Some msg) (Some env) None (config :> Nethttpd_reactor.http_processor_config) in let group = Unixqueue.new_group ues in (* for the extra events *) let fdi = Netsys.int64_of_file_descr fd in (* debug msg *) object(self) val mutable watched_groups = [] val mutable engine = lazy (assert false) initializer ( on_req_hdr := self # on_request_header; (* Create the http_engine, but be careful in case of errors: *) try let eng = new http_engine ~on_request_header:(fun req -> !on_req_hdr req) () eng_config fd ues in Uq_engines.when_state ~is_aborted:(fun _ -> List.iter (Unixqueue.clear ues) watched_groups; watched_groups <- [] ) eng; engine <- lazy eng with | error -> Unix.close fd; (* fd is not yet tracked here *) let eng = new Uq_engines.epsilon_engine (`Error error) ues in engine <- lazy eng ) method private do_stage3 req_id (req : http_request_notification) env redir_count stage3 = (* We call the response generator with a synchronized output channel. By sending * the [Ev_stage3_processed] event, we catch the point in time when the whole * request is over, and can be finished. *) dlogr (fun () -> sprintf "FD %Ld req-%d: preparing stage3 env=%d" fdi req_id (Oo.id env)); (* This construction just catches the [Ev_stage3_processed] event: *) let pe = new Uq_engines.poll_engine ~extra_match:(function | Ev_stage3_processed (_,r,g) -> r=req && g=group | _ -> false) [] ues in watched_groups <- pe#group :: watched_groups; Uq_engines.when_state ~is_done:(function | Unixqueue.Extra(Ev_stage3_processed(redirect_opt,_,_)) -> (* Maybe we have to perform a redirection: *) ( match redirect_opt with | Some (new_uri,new_hdr) -> dlogr (fun () -> sprintf "FD %Ld req-%d: redirect_response \ to %s" fdi req_id new_uri); if !(env # output_state) <> `Start then log_error req "Nethttpd: Redirect_response is not allowed \ after output has started" else ( let (new_script_name, new_query_string) = decode_query new_uri in new_hdr # update_field "Content-length" "0"; let new_properties = update_alist [ "REQUEST_URI", new_uri; "SCRIPT_NAME", new_script_name; "QUERY_STRING", new_query_string; "REQUEST_METHOD", "GET" ] env#cgi_properties in let new_env = new redirected_environment ~properties:new_properties ~in_header:new_hdr env in let new_req = new fake_rhn req in (* The fake_rhn accepts/rejects the body immediately. * In reality this is already done, but when we * redirect the response we must fake a fresh * request header object *) self # process_request new_req new_env (redir_count+1) ) | None -> dlogr (fun () -> sprintf "FD %Ld req-%d: stage3 done" fdi req_id); req # schedule_finish()) | _ -> assert false) pe; pconfig # config_synch_output (fun out_ch -> let env' = new redrained_environment ~out_channel:out_ch env in dlogr (fun () -> sprintf "FD %Ld req-%d: stage3 reqenv=%d env=%d" fdi req_id (Oo.id req#environment) (Oo.id env')); let redirect_opt = try stage3 # generate_response env'; None with | Redirect_request(_,_) -> log_error req "Nethttpd: Caught Redirect_request in stage 3, but it is only allowed in stage 1"; None | Redirect_response(new_uri, new_hdr) -> Some(new_uri, new_hdr) | Standard_response(status, hdr_opt, errmsg_opt) when !(env'#output_state) = `Start -> output_std_response config env' status hdr_opt errmsg_opt; None | err when !(env#output_state) = `Start -> output_std_response config env' `Internal_server_error None (Some("Nethttpd: Exception (sending server error): " ^ Netexn.to_string err)); None | err -> log_error req ("Nethttpd: Exception: " ^ Netexn.to_string err); (* Better do an abort here. We probably cannot finish the cycle regularly *) self # engine # abort(); None in dlogr (fun () -> sprintf "FD %Ld req-%d: stage3 postprocessing" fdi req_id); (* Send the event that we are done here: *) ues # add_event (Unixqueue.Extra(Ev_stage3_processed(redirect_opt,req,group))) ) req # environment # output_ch_async method private do_stage2 (req : http_request_header_notification) env redir_count stage2 = (* This is quite complicated. First, we schedule the body acceptance. Second, * we call the synchronized stage2 processor. In an MT environment, both * processes may run in parallel, so we have to synchronize them again (i.e. * determine the point in time when the body has accepted due to * [schedule_accept_body], and when the stage2 processor is done). To do so, * we send a [Ev_stage2_processed] event, and catch that by the main event loop. * (NB. We could also use a Uq_engines.signal_engine for this purpose, * but signal engines are not thread-safe (the signal function cannot be * called from other threads). Thread-safety is announced in the API, * though.) *) dlogr (fun () -> sprintf "FD %Ld req-%d: preparing stage2 env=%d" fdi (Oo.id req) (Oo.id env)); let accepted_request = ref None in let stage3_opt = ref None in (* When both variables are [Some], we are in synch again. *) let check_synch() = match (!accepted_request, !stage3_opt) with | (Some req', Some(Some stage3)) -> (* Synch + stage2 was successful. Continue with stage3: *) dlogr (fun () -> sprintf "FD %Ld synch req-%d req'-%d" fdi (Oo.id req) (Oo.id req')); (* assertion: req = req' *) self # do_stage3 (Oo.id req) req' env redir_count stage3 | (Some req', Some None) -> (* Synch, but stage2 was not successful. Finish the request immediately. *) dlogr (fun () -> sprintf "FD %Ld synch-err req-%d req'-%d" fdi (Oo.id req) (Oo.id req')); (* assertion: req = req' *) req' # schedule_finish() | _ -> (* All other cases: not yet in synch. Do nothing. *) () in req # schedule_accept_body ~on_request:(fun req' -> accepted_request := Some req'; check_synch()) (* ~on_error:XXX *) (* CHECK: close async in channel? *) (); (* This construction just catches the [Ev_stage2_processed] event: *) let pe = new Uq_engines.poll_engine ~extra_match:(function | Ev_stage2_processed(_,r,g) -> r=req && g=group | _ -> false) [] ues in watched_groups <- pe#group :: watched_groups; Uq_engines.when_state ~is_done:(function | Unixqueue.Extra(Ev_stage2_processed(st3_opt,_,_)) -> stage3_opt := Some st3_opt; check_synch() | _ -> assert false) pe; pconfig # config_synch_input (fun in_ch -> let env' = new redirected_environment ~in_channel:in_ch env in dlogr (fun () -> sprintf "FD %Ld req-%d: stage2 reqenv=%d env=%d" fdi (Oo.id req) (Oo.id req#environment) (Oo.id env')); let stage3_opt = try Some(stage2 # process_body env') with | Redirect_request(_,_) -> log_error req "Nethttpd: Caught Redirect_request in stage 2, \ but it is only allowed in stage 1"; None | Redirect_response(_,_) -> log_error req "Nethttpd: Caught Redirect_response in stage 2, \ but it is only allowed in stage 3"; None | Standard_response(status, hdr_opt, errmsg_opt) when !(env'#output_state) = `Start -> output_std_response config env' status hdr_opt errmsg_opt; None | err when !(env#output_state) = `Start -> output_std_response config env' `Internal_server_error None (Some("Nethttpd: Exception (sending server error): " ^ Netexn.to_string err)); None | err -> (* Unlikely case *) log_error req ("Nethttpd: Exception: " ^ Netexn.to_string err); (* Better do an abort here. We probably cannot finish the cycle regularly *) self # engine # abort(); None in (* Send the event that we are done here: *) dlogr (fun () -> sprintf "FD %Ld req-%d: stage2 done" fdi (Oo.id req)); ues # add_event (Unixqueue.Extra(Ev_stage2_processed(stage3_opt,req,group))) ) req # environment # input_ch_async method private process_request (req:http_request_header_notification) redir_env redir_count = (* [redir_env]: The environment of the request, possibly rewritten by redirects. * [redir_count]: The number of already performed redirections * [req]: Contains always the original environment *) dlogr (fun () -> sprintf "FD %Ld req-%d: process_request FD=%Ld env=%d redir_count=%d" fdi (Oo.id req) (Netsys.int64_of_file_descr fd) (Oo.id redir_env) redir_count); if redir_count > 10 then failwith "Too many redirections"; dlogr (fun () -> sprintf "FD %Ld req-%d: stage1" fdi (Oo.id req)); let reaction = try (stage1 # process_header redir_env :> x_reaction) with | Redirect_request(new_uri, new_hdr) -> `Redirect_request(new_uri, new_hdr) | Redirect_response(_,_) -> failwith "Caught Redirect_response in stage 1, but it is only allowed in stage 3" in dlogr (fun () -> let s_reaction = match reaction with | `Accept_body stage2 -> "Accept_body (next stage: stage2)" | `Reject_body stage3 -> "Reject_body (next stage: stage3)" | `Static _ -> "Static" | `File _ -> "File" | `Std_response _ -> "Std_response" | `Redirect_request _ -> "Redirect_request" in sprintf "FD %Ld req-%d: stage1 results in: %s" fdi (Oo.id req) s_reaction ); ( match reaction with | `Accept_body stage2 -> self # do_stage2 req redir_env redir_count stage2 | `Reject_body stage3 -> req # schedule_reject_body ~on_request:(fun req' -> self # do_stage3 (Oo.id req) req' redir_env redir_count stage3) (); | `Static(status, resp_hdr_opt, resp_str) -> req # schedule_reject_body ~on_request:(fun req' -> output_static_response redir_env status resp_hdr_opt resp_str; req' # schedule_finish()) (); | `File(status, resp_hdr_opt, resp_filename, pos, length) -> req # schedule_reject_body ~on_request:(fun req' -> output_file_response redir_env status resp_hdr_opt resp_filename pos length; req' # schedule_finish()) (); | `Std_response(status, resp_hdr_opt, errlog_opt) -> req # schedule_reject_body ~on_request:(fun req' -> output_std_response config redir_env status resp_hdr_opt errlog_opt; req' # schedule_finish()) (); | `Redirect_request(new_uri, new_hdr) -> dlogr (fun () -> sprintf "FD %Ld req-%d: redirect_request to: %s" fdi (Oo.id req) new_uri); let (new_script_name, new_query_string) = decode_query new_uri in new_hdr # update_multiple_field "Content-length" (redir_env # multiple_input_header_field "Content-length"); let new_properties = update_alist [ "REQUEST_URI", new_uri; "SCRIPT_NAME", new_script_name; "QUERY_STRING", new_query_string ] redir_env#cgi_properties in let new_env = new redirected_environment ~properties:new_properties ~in_header:new_hdr ~in_channel:(redir_env # input_channel) redir_env in self # process_request req new_env (redir_count+1) ) method private on_request_header req = try self # process_request req (req#environment :> extended_environment) 0 with | err -> log_error req ("Nethttpd: Exception: " ^ Netexn.to_string err); (* Better do an abort here. We probably cannot finish the cycle regularly *) self # engine # abort() method engine = Lazy.force engine end let override v opt = match opt with | None -> v | Some x -> x let default_http_engine_config = ( object inherit Nethttpd_reactor.modify_http_processor_config Nethttpd_reactor.default_http_processor_config method config_input_flow_control = false method config_output_flow_control = true end ) class modify_http_engine_config ?modify_http_protocol_config ?modify_http_processor_config:(m2 = fun cfg -> cfg) ?config_input_flow_control ?config_output_flow_control (config : http_engine_config) : http_engine_config = let config_input_flow_control = override config#config_input_flow_control config_input_flow_control in let config_output_flow_control = override config#config_output_flow_control config_output_flow_control in object inherit Nethttpd_reactor.modify_http_processor_config ?modify_http_protocol_config (m2 (config:>Nethttpd_reactor.http_processor_config)) method config_input_flow_control = config_input_flow_control method config_output_flow_control = config_output_flow_control end