(* * $Id: uq_engines.ml 1997 2014-08-24 20:21:33Z gerd $ *) open Printf exception Closed_channel exception Broken_communication exception Addressing_method_not_supported exception Watchdog_timeout exception Cancelled exception Timeout class type async_out_channel = object method output : string -> int -> int -> int method close_out : unit -> unit method pos_out : int method flush : unit -> unit method can_output : bool method request_notification : (unit -> bool) -> unit end ;; class type async_in_channel = object method input : string -> int -> int -> int method close_in : unit -> unit method pos_in : int method can_input : bool method request_notification : (unit -> bool) -> unit end ;; type 't engine_state = [ `Working of int | `Done of 't | `Error of exn | `Aborted ] ;; type 't final_state = [ `Done of 't | `Error of exn | `Aborted ] class type [ 't ] engine = object method state : 't engine_state method abort : unit -> unit method request_notification : (unit -> bool) -> unit method event_system : Unixqueue.event_system end ;; class type async_out_channel_engine = object inherit [ unit ] engine inherit async_out_channel end ;; class type async_in_channel_engine = object inherit [ unit ] engine inherit async_in_channel end ;; class type ['a] serializer_t = object method serialized : (Unixqueue.event_system -> 'a engine) -> 'a engine end class type ['a] prioritizer_t = object method prioritized : (Unixqueue.event_system -> 'a engine) -> int -> 'a engine end class type ['a] cache_t = object method get_engine : unit -> 'a engine method get_opt : unit -> 'a option method put : 'a -> unit method invalidate : unit -> unit method abort : unit -> unit end module Debug = struct let enable = ref false end let dlog = Netlog.Debug.mk_dlog "Uq_engines" Debug.enable let dlogr = Netlog.Debug.mk_dlogr "Uq_engines" Debug.enable let () = Netlog.Debug.register_module "Uq_engines" Debug.enable let string_of_state = function | `Working n -> "Working(" ^ string_of_int n ^ ")" | `Done v -> "Done(_)" | `Error e -> "Error(" ^ Netexn.to_string e ^ ")" | `Aborted -> "Aborted" let is_active state = match state with `Working _ -> true | _ -> false ;; module IntSet = Set.Make (struct type t = int let compare (x:t) (y:t) = Pervasives.compare x y end ) class [ 't ] engine_mixin_i (state : 't engine_state ref) esys = let notify_list = ref [] in let notify_list_new = ref [] in object(self) method state = !state method event_system = esys method request_notification f = if (not (is_active !state)) then dlog "engine_mixin warning: the method request_notification was called \ when the engine already reached the final state"; notify_list_new := f :: !notify_list_new method private set_state s = if is_active !state then ( state := s; self # notify(); ) method private notify() = ( match !notify_list_new with | [] -> () | n -> notify_list := !notify_list @ n; notify_list_new := []; ); (* Optimize the case that we only have 1 element in the list. The expensive part here is the assignment (calls caml_modify). *) ( match !notify_list with | [] -> () | [ f ] -> let keep = try f() with | error -> Unixqueue.epsilon esys (fun () -> raise error); false in if not keep then notify_list := [] | _ -> notify_list := ( List.filter (fun f -> try f() with | error -> Unixqueue.epsilon esys (fun () -> raise error); false ) !notify_list ) ) end ;; class [ 't ] engine_mixin init_state esys = ['t] engine_mixin_i (ref init_state) esys let when_state ?(is_done = fun _ -> ()) ?(is_error = fun _ -> ()) ?(is_aborted = fun _ -> ()) ?(is_progressing = fun _ -> ()) (eng : 'a #engine) = (* Execute is_done when the state of eng goes to `Done, * execute is_error when the state goes to `Error, and * execute is_aborted when the state goes to `Aborted. * The argument of the callback function is the argument * of the state value. *) let last_n = match eng#state with | `Done _ | `Error _ | `Aborted -> ref 0 | `Working n -> ref n in eng # request_notification (fun () -> match eng#state with `Done v -> is_done v; false | `Error x -> is_error x; false | `Aborted -> is_aborted(); false | `Working n -> if n <> !last_n then is_progressing n; last_n := n; true ) ;; class ['a,'b] map_engine ~map_done ?map_error ?map_aborted ?(propagate_working = true) (eng : 'a #engine) = let map_state eng_state = match eng_state with (`Working _ as wrk_state) -> wrk_state | `Done x -> map_done x | (`Error x as err_state) -> ( match map_error with Some f -> f x | None -> err_state ) | `Aborted -> ( match map_aborted with Some f -> f () | None -> `Aborted ) in object(self) inherit ['b] engine_mixin (map_state eng#state) eng#event_system initializer if is_active eng#state then eng # request_notification self#map_forward_notification; method private map_forward_notification() = (* This method is called when [eng] changes its state. We compute our * mapped state, and notify our own listeners. *) let eng_state = eng#state in let state' = map_state eng_state in let cont = match state' with (`Working _) -> true | (`Done _) | (`Error _) | `Aborted -> false in if not cont || propagate_working then self # set_state state'; cont method event_system = eng#event_system method abort() = eng#abort() end;; let map_engine = new map_engine class ['a,'b] fmap_engine e (f : 'a final_state -> 'b final_state) = [_,_] map_engine ~map_done:(fun x -> (f (`Done x) :> 'b engine_state)) ~map_error:(fun e -> (f (`Error e) :> 'b engine_state)) ~map_aborted:(fun () -> (f `Aborted :> 'b engine_state)) e let fmap_engine = new fmap_engine class ['a] meta_engine e = ['a,'a final_state] map_engine ~map_done:(fun x -> `Done (`Done x)) ~map_error:(fun e -> `Done (`Error e)) ~map_aborted:(fun () -> `Done `Aborted) e let meta_engine = new meta_engine let const_engine st esys = ( object (self) inherit [_] engine_mixin st esys method abort() = () end ) let aborted_engine esys = const_engine `Aborted esys class ['a,'b] iseq_engine ~nocount (eng_a : 'a #engine) (make_b : 'a -> 'b #engine) = let esys = eng_a # event_system in let eng_a_state = ref (eng_a # state) in let eng_a = ref (Some (eng_a :> 'a engine)) in (* to get rid of the eng_a value when it is done *) let eng_b = ref None in let eng_b_state = ref (`Working 0) in object(self) inherit ['b] engine_mixin (`Working 0) esys initializer match !eng_a with | Some e -> if is_active e#state then e # request_notification self#update_a else ( (* eng_a is already in a final state *) ignore(self#update_a()) ) | None -> assert false method private update_a() = (* eng_a is running, eng_b not yet existing *) let ea = match !eng_a with Some e -> e | None -> assert false in let s = ea # state in match s with | `Working n -> ( match !eng_a_state with | `Working n' when n = n' -> () | _ -> (* i.e. s <> !eng_a_state *) self # seq_count(); eng_a_state := s ); true | `Done arg -> (* Create eng_b *) (* get rid of eng_a - otherwise mem leak: *) eng_a := None; let e = try (make_b arg :> _ engine) with error -> const_engine (`Error error) esys in eng_b := Some e; let s' = e # state in eng_b_state := s'; self # seq_count(); if is_active s' then e # request_notification self#update_b else ignore(self#update_b()); false | `Error arg -> self # set_state (`Error arg); false | `Aborted -> self # set_state `Aborted; false method private update_b() = (* eng_a is `Done, eng_b is running *) let e = match !eng_b with Some e -> e | None -> assert false in let s = e # state in match s with | `Working n -> ( match !eng_b_state with | `Working n' when n=n' -> () | _ -> self # seq_count(); eng_b_state := s ); true | `Done arg -> self # set_state s; false | `Error arg -> self # set_state s; false | `Aborted -> self # set_state s; false method private seq_count() = match self#state with | `Working n -> if not nocount then self # set_state (`Working (n+1)) | _ -> assert false method abort() = ( match !eng_a with | Some e -> e # abort() | None -> () ); ( match !eng_b with | Some e -> e # abort() | None -> () ) end;; class ['a,'b] seq_engine = ['a,'b] iseq_engine ~nocount:false let seq_engine = new seq_engine class ['a,'b] qseq_engine = ['a,'b] iseq_engine ~nocount:true let qseq_engine = new qseq_engine class ['a] delegate_engine e = object(self) inherit ['a] engine_mixin e#state e#event_system initializer ( if is_active e#state then when_state ~is_done:(fun x -> self # set_state (`Done x)) ~is_error:(fun e -> self # set_state (`Error e)) ~is_aborted:(fun () -> self # set_state `Aborted) ~is_progressing:(fun n -> self # set_state (`Working n)) e ) method abort() = e#abort(); self # set_state `Aborted end class ['a] stream_seq_engine x0 (s : ('a -> 'a #engine) Stream.t) esys = object(self) inherit ['a] engine_mixin (`Working 0) esys val mutable x = x0 val mutable cur_e = aborted_engine esys initializer self#next() method private next() = match Stream.peek s with | None -> self # set_state (`Done x) | Some f -> let _ = Stream.next s in (* yep, it's "partial" *) let e = try (f x :> _ engine) with error -> const_engine (`Error error) esys in cur_e <- e; if is_active e#state then when_state ~is_done:(fun x1 -> x <- x1; Unixqueue.epsilon esys self#next (* avoids stack overflow *) ) ~is_error:(fun e -> self # set_state (`Error e)) ~is_aborted:(fun () -> self # set_state `Aborted) ~is_progressing:(fun _ -> self # sseq_count()) e else self # set_state e#state method abort() = cur_e # abort(); self # set_state `Aborted method private sseq_count() = match self#state with `Working n -> self # set_state (`Working (n+1)) | _ -> () end let stream_seq_engine = new stream_seq_engine let abort_if_working eng = match eng#state with `Working _ -> eng # abort() | _ -> () ;; class ['a,'b] sync_engine (eng_a : 'a #engine) (eng_b : 'b #engine) = object(self) val mutable eng_a_state = eng_a # state val mutable eng_b_state = eng_b # state inherit ['a * 'b] engine_mixin (`Working 0) eng_a#event_system initializer if is_active eng_a#state then eng_a # request_notification self#sy_update_a else ignore(self#sy_update_a()); if is_active eng_b#state then eng_b # request_notification self#sy_update_b else ignore(self#sy_update_b()) method private sy_update_a() = let s = eng_a # state in match s with | `Working n -> if s <> eng_a_state then self # transition(); eng_a_state <- s; true | `Done _ -> eng_a_state <- s; self # transition(); false | _ -> eng_a_state <- s; self # transition(); abort_if_working eng_b; false method private sy_update_b() = let s = eng_b # state in match s with | `Working n -> if s <> eng_b_state then self # transition(); eng_b_state <- s; true | `Done _ -> eng_b_state <- s; self # transition(); false | _ -> eng_b_state <- s; self # transition(); abort_if_working eng_a; false method private transition() = (* Compute new state from eng_a_state and eng_b_state: *) let state' = match self#state with `Working n -> ( match (eng_a_state, eng_b_state) with (`Working _, `Working _) -> `Working (n+1) | (`Working _, `Done _) -> `Working (n+1) | (`Done _, `Working _) -> `Working (n+1) | (`Done a, `Done b) -> `Done (a,b) | (`Error x, _) -> `Error x | (_, `Error x) -> `Error x | (`Aborted, _) -> `Aborted | (_, `Aborted) -> `Aborted ) | _ -> (* The state will never change again! *) self#state in self # set_state state' method abort() = eng_a # abort(); eng_b # abort(); end;; let sync_engine = new sync_engine class ['t] epsilon_engine (target_state:'t engine_state) ues : ['t] engine = let aborted = ref false in object(self) inherit ['t] engine_mixin (`Working 0) ues initializer ( Unixqueue.epsilon ues (fun () -> if not !aborted then self # set_state target_state ) ) method abort() = aborted := true; self # set_state `Aborted end let epsilon_engine = new epsilon_engine class poll_engine ?(extra_match = fun _ -> false) oplist ues = let state = ref (`Working 0) in object(self) inherit [Unixqueue.event] engine_mixin_i state ues val mutable group = Unixqueue.new_group ues initializer self # restart() method group = group method restart() = group <- Unixqueue.new_group ues; state := (`Working 0 : Unixqueue.event engine_state); (* N.B. set_state would not work here *) (* Define the event handler: *) Unixqueue.add_handler ues group (fun _ _ -> self # handle_event); (* Add the resources: *) List.iter (Unixqueue.add_resource ues group) oplist; method private handle_event ev = match ev with Unixqueue.Input_arrived(g,fd) when g = group -> self # accept_event ev | Unixqueue.Output_readiness(g,fd) when g = group -> self # accept_event ev | Unixqueue.Out_of_band(g,fd) when g = group -> self # accept_event ev | Unixqueue.Timeout(g,op) when g = group -> self # accept_event ev | Unixqueue.Extra x -> if extra_match x then self # accept_event ev else raise Equeue.Reject | _ -> raise Equeue.Reject method private accept_event ev = Unixqueue.clear ues group; self # set_state (`Done ev); method private handle_exception x = self # set_state (`Error x) method abort() = match self#state with `Working _ -> Unixqueue.clear ues group; self # set_state `Aborted; | _ -> () method event_system = ues end ;; class ['a] delay_engine t f esys = let wid = Unixqueue.new_wait_id esys in [_,'a] seq_engine (new poll_engine [ Unixqueue.Wait wid, t ] esys) (fun _ -> f()) let delay_engine = new delay_engine let signal_engine esys = let wid = Unixqueue.new_wait_id esys in let op = Unixqueue.Wait wid in let p = new poll_engine [op, (-1.0)] esys in let r = ref `Aborted in let flag = ref false in let e = new map_engine ~map_done:(fun _ -> (!r :> _ engine_state)) ~map_aborted:(fun _ -> (!r :> _ engine_state)) p in let signal st = if not !flag then ( (* atomic *) r := st; flag := true ); (* p#abort() - old implementation *) Unixqueue.add_event esys (Unixqueue.Timeout(p#group, op)) in (e, signal) class ['a] signal_engine esys = let (e, signal) = signal_engine esys in object(self) inherit ['a] delegate_engine e method signal x = signal (x : _ final_state) end let timeout_engine d exn eng = let esys = eng#event_system in let g = Unixqueue.new_group esys in let timeout_flag = ref false in Unixqueue.once esys g d (fun () -> timeout_flag := true; eng#abort(); ); map_engine ~map_done:(fun r -> Unixqueue.clear esys g; `Done r) ~map_aborted:(fun _ -> if !timeout_flag then `Error exn else ( Unixqueue.clear esys g; `Aborted)) ~map_error:(fun e -> Unixqueue.clear esys g; `Error e) eng class ['a] timeout_engine d exn (eng : _ engine) = ['a] delegate_engine(timeout_engine d exn eng) class poll_process_engine ?(period = 0.1) ~pid ues = object(self) inherit [Unix.process_status] engine_mixin (`Working 0) ues val group = Unixqueue.new_group ues val wait_id = Unixqueue.new_wait_id ues initializer (* Define the event handler: *) Unixqueue.add_handler ues group (fun _ _ -> self # handle_event); (* Define the abort (exception) handler: *) Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception); (* Add the resources: *) Unixqueue.add_resource ues group (Unixqueue.Wait wait_id, period); method private handle_event ev = match ev with Unixqueue.Timeout(g, Unixqueue.Wait wid) when g = group && wid = wait_id -> self # check_process() | Unixqueue.Signal -> self # check_process(); raise Equeue.Reject (* Signal must not be accepted! *) | _ -> raise Equeue.Reject method private check_process () = try let (w_pid, w_status) = Unix.waitpid [ Unix.WNOHANG ] pid in if w_pid > 0 then ( Unixqueue.clear ues group; self # set_state (`Done w_status); ) with error -> raise(Unixqueue.Abort(group,error)) method private handle_exception x = self # set_state (`Error x) method abort() = match self#state with `Working _ -> Unixqueue.clear ues group; self # set_state `Aborted; | _ -> () method event_system = ues end ;; class watchdog period eng = let ues = eng#event_system in let wid = Unixqueue.new_wait_id ues in object (self) inherit [unit] engine_mixin (`Working 0) ues val mutable last_eng_state = eng # state val timer_eng = new poll_engine [ Unixqueue.Wait wid, 0.1 *. period ] ues val mutable aborted = false val mutable inactivity = 0 (* Counts to 10 *) initializer let rec watch() = when_state ~is_done:(fun _ -> let eng_state = eng # state in if eng_state = last_eng_state then ( inactivity <- inactivity + 1; if inactivity >= 10 then ( aborted <- true; self # set_state (`Error Watchdog_timeout) ) else ( timer_eng # restart(); watch(); ) ) else ( last_eng_state <- eng_state; inactivity <- 0; timer_eng # restart(); watch() ) ) timer_eng in watch(); when_state ~is_done:(fun _ -> if not aborted then self # set_state (`Done())) ~is_error:(fun _ -> if not aborted then self # set_state (`Done())) ~is_aborted:(fun _ -> if not aborted then self # set_state (`Done())) eng method abort() = match self#state with `Working _ -> aborted <- true; timer_eng # abort(); self # set_state `Aborted; | _ -> () method event_system = ues end ;; let watchdog = new watchdog let rec msync_engine l f x0 esys = match l with | [] -> new epsilon_engine (`Done x0) esys | [e] -> new map_engine ~map_done:(fun r -> `Done (f r x0)) e | e1 :: l' -> new map_engine ~map_done:(fun (r,x) -> `Done (f r x)) (new sync_engine e1 (msync_engine l' f x0 esys)) class ['a,'b] msync_engine (l : 'a #engine list) f (x0:'b) esys = ['b] delegate_engine (msync_engine l f x0 esys) class ['a] serializer (esys : Unixqueue.event_system) = object(self) val mutable running = None val mutable queue = Queue.create() method serialized ( f : (Unixqueue.event_system -> 'a engine) ) = (** Will call [f esys] when it is time to start the engine *) let rec next f signal = let e = try (f esys : 'a engine) with error -> epsilon_engine (`Error error) esys in running <- Some e; signal e; if is_active e#state then when_state ~is_done:(fun _ -> check()) ~is_error:(fun _ -> check()) ~is_aborted:(fun _ -> check()) e else ( Unixqueue.epsilon esys check; ); e and check() = running <- None; if not (Queue.is_empty queue) then ( let (f,signal) = Queue.take queue in ignore(next f signal) ) in match running with | Some _ -> (** Create a wrapper engine. When [f] is finally called, the wrapper is terminated and "replaced" by the engine returned by [f] *) let sig_e, do_signal = signal_engine esys in let eff_e = ref None in let wrap_e = new seq_engine sig_e (fun _ -> match !eff_e with | None -> assert false | Some e -> e ) in let signal e = eff_e := Some e; do_signal(`Done()) in Queue.push (f,signal) queue; wrap_e | None -> next f (fun _ -> ()) end let serializer = new serializer class ['a] prioritizer (esys : Unixqueue.event_system) = object(self) val mutable prio = 0 (* priority of [running] engines *) val mutable running = 0 (* # running engines *) val mutable prios = IntSet.empty (* all waiting priorities *) val mutable preempting = false (* whether there is a bigger prio in prios *) val mutable waiting = Hashtbl.create 3 (* the waiting engines by prio *) method prioritized f p = let rec next f signal = running <- running + 1; prio <- p; let e = try (f esys : 'a engine) with error -> epsilon_engine (`Error error) esys in signal e; if is_active e#state then when_state ~is_done:(fun _ -> check()) ~is_error:(fun _ -> check()) ~is_aborted:(fun _ -> check()) e else ( Unixqueue.epsilon esys check; ); e and check () = running <- running - 1; if running = 0 && prios <> IntSet.empty then ( let highest = IntSet.min_elt prios in prios <- IntSet.remove highest prios; preempting <- false; let l = try Hashtbl.find waiting highest with Not_found -> assert false in Hashtbl.remove waiting highest; List.iter (fun (f,signal) -> ignore(next f signal) ) (List.rev l); ) in if running = 0 || prio = p || not preempting then ( (* we can start immediately *) next f (fun _ -> ()) ) else ( (* push f onto the queue *) let sig_e, do_signal = signal_engine esys in let eff_e = ref None in let wrap_e = seq_engine sig_e (fun _ -> match !eff_e with | None -> assert false | Some e -> e ) in let signal e = eff_e := Some e; do_signal(`Done()) in let l = try Hashtbl.find waiting p with Not_found -> [] in Hashtbl.replace waiting p ((f,signal)::l); prios <- IntSet.add p prios; if p < prio then preempting <- true; wrap_e ) end let prioritizer = new prioritizer class ['a] cache call_get_e esys = object(self) val mutable value_opt = (None : 'a option) val mutable value_gen = 0 val mutable getting = None method get_opt() = value_opt method get_engine() = match value_opt with | None -> ( match getting with | None -> (* No get engine is running. Start a new one *) let get_e = call_get_e esys in getting <- Some get_e; let gen = value_gen in let get_e' = new map_engine ~map_done:(fun v -> (* There could be a [put] writing: *) if value_gen = gen then ( value_opt <- Some v; value_gen <- gen+1 ); `Done v ) get_e in get_e' | Some get_e -> (* Some previous user already called [get] but it was not yet finished. For simplicity we return here the same engine. This is ok except that when one user aborts this engine, all other users are also affected. *) get_e ) | Some v -> new epsilon_engine (`Done v) esys method put v' = value_opt <- Some v'; value_gen <- value_gen + 1 method invalidate () = value_opt <- None; value_gen <- value_gen + 1 method abort() = ( match value_opt with | None -> ( match getting with | None -> () | Some get_e -> get_e # abort() ) | Some _ -> () ); self#invalidate() end let cache = new cache class ['a] input_engine f fd tmo esys = [Unixqueue.event, 'a] seq_engine (new poll_engine [ Unixqueue.Wait_in fd, tmo ] esys) (fun ev -> match ev with | Unixqueue.Input_arrived(_,_) -> ( try let r = f fd in epsilon_engine (`Done r) esys with | error -> epsilon_engine (`Error error) esys ) | Unixqueue.Timeout(_,_) -> epsilon_engine (`Error Timeout) esys | _ -> assert false ) class ['a] output_engine f fd tmo esys = [Unixqueue.event, 'a] seq_engine (new poll_engine [ Unixqueue.Wait_out fd, tmo ] esys) (fun ev -> match ev with | Unixqueue.Output_readiness(_,_) -> ( try let r = f fd in epsilon_engine (`Done r) esys with | error -> epsilon_engine (`Error error) esys ) | Unixqueue.Timeout(_,_) -> epsilon_engine (`Error Timeout) esys | _ -> assert false ) class pseudo_async_in_channel ch : async_in_channel = object method input = ch # input method close_in = ch # close_in method pos_in = ch # pos_in method can_input = true method request_notification _ = () end let pseudo_async_in_channel = new pseudo_async_in_channel class pseudo_async_out_channel ch : async_out_channel = object method output = ch # output method close_out = ch # close_out method pos_out = ch # pos_out method flush = ch # flush method can_output = true method request_notification _ = () end let pseudo_async_out_channel = new pseudo_async_out_channel (* TODO: Avoid the usage of Extra events here. Extra events are more * expensive than other events because all handlers see them. * Can be substituted with Timeout events. *) exception Receiver_attn of Unixqueue.group ;; let receiver_attn g = Unixqueue.Extra(Receiver_attn g);; exception Sender_attn of Unixqueue.group ;; let sender_attn g = Unixqueue.Extra(Sender_attn g);; let buf_max_size = 4096;; class receiver ~src ~(dst : #async_out_channel) ?(close_src=true) ?(close_dst=true) ues : [ unit ] engine = object(self) (* The receiver has to copy data if (1) the src file descriptor is * readable, and (2) the dst channel accepts output. There is also * an internal buffer that stored read data that cannot yet be * written into the dst channel. * * We implement the following logic: * * - The src file descriptor is polled when there is space in the * internal buffer. Every time new data is added to the buffer, * the event Receiver_attn is generated * - When the dst state changes, the event Receiver_attn is generated * - The event handler catches Receiver_attn, and checks whether * the output channel is ready. If so, data of the internal * buffer is written to the output channel, and a new Receiver_attn * event is generated. If the output channel is not ready, nothing * will happen. *) inherit [unit] engine_mixin (`Working 0 : unit engine_state) ues val group = Unixqueue.new_group ues val buf = String.create buf_max_size val mutable buf_size = 0 val mutable in_eof = false val mutable in_polling = false val mutable out_eof = false val mutable deferred_exn = None initializer (* Arrange that Receiver_attn is generated when the dst state changes: *) dst # request_notification (fun () -> (* Note: With MT, we do not know which thread calls this function. * Fortunately, add_event is thread-safe. *) if is_active self#state then ( Unixqueue.add_event ues (receiver_attn group); true (* Continue notifications *) ) else false (* The engine is no longer active: disable any further * notification *) ); (* Define the event handler: *) Unixqueue.add_handler ues group (fun _ _ -> self # handle_event); (* Define the abort (exception) handler: *) Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception); (* Because the internal buffer is empty initially, we can poll * src: *) Unixqueue.add_resource ues group (Unixqueue.Wait_in src, -1.0); in_polling <- true (* Remember add_resource *) method abort() = match self#state with `Working _ -> if not in_eof && close_src then Unix.close src; in_eof <- true; if not out_eof && close_dst then dst # close_out(); out_eof <- true; self # set_state `Aborted; Unixqueue.clear ues group | _ -> () method event_system = ues method private rcv_count() = match self#state with `Working n -> self # set_state (`Working (n+1)) | _ -> () method private handle_event ev = match ev with Unixqueue.Input_arrived(g,_) when g = group -> self # handle_input(); self # check_input_polling(); | Unixqueue.Extra (Receiver_attn g) when g = group -> self # handle_output(); if out_eof then ( Unixqueue.clear ues group; (* Delete the whole group *) raise Equeue.Terminate (* Deactivate this handler *) ) | _ -> raise Equeue.Reject method private handle_exception exn = (* Unixqueue already ensures that the whole group will be deleted, * so we need not to do it here *) if not in_eof && close_src then ( try Unix.close src with error -> Netlog.logf `Err "Uq_engines.receiver#handle_exception: %s" (Netexn.to_string error) ); in_eof <- true; if not out_eof && close_dst then ( try dst # close_out() with error -> Netlog.logf `Err "Uq_engines.receiver#handle_exception: %s" (Netexn.to_string error) ); out_eof <- true; self # set_state (`Error exn) method private handle_input() = if not in_eof && buf_size < buf_max_size then try let n = Unix.read src buf buf_size (buf_max_size - buf_size) in buf_size <- buf_size + n; in_eof <- (n = 0); if in_eof && close_src then Unix.close src; Unixqueue.add_event ues (receiver_attn group); self # rcv_count(); with Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> (* These exceptions are expected, and can be ignored *) () | error -> (* Any other exception stops the engine. But first it is tried to * process the buffer contents: *) in_eof <- true; deferred_exn <- Some error; if in_eof && close_src then Unix.close src; Unixqueue.add_event ues (receiver_attn group); self # rcv_count(); method private check_input_polling() = let need_polling = not in_eof && buf_size < buf_max_size in ( if need_polling && not in_polling then Unixqueue.add_resource ues group (Unixqueue.Wait_in src, -1.0) else if not need_polling && in_polling then Unixqueue.remove_resource ues group (Unixqueue.Wait_in src); ); in_polling <- need_polling method private handle_output() = (* If this method is called when out_eof, we assume that this is * an event coming too late. Just ignore. *) if not out_eof then ( (* First check the state of dst: If [pos_out] raises an exception, * we assume that the output channel is broken. *) ( try ignore(dst#pos_out) with _ -> (* dst is in an error state, or somebody has closed it *) raise(Unixqueue.Abort(group,Broken_communication)) ); (* It is possible that dst#can_output is false, because we get * Reciever_attn events for many conditions, not just that * output is again accepted. Ignore this case. *) try if dst#can_output then ( if buf_size > 0 then ( let n = dst # output buf 0 buf_size in if n > 0 then ( String.blit buf n buf 0 (buf_size - n); buf_size <- buf_size - n; if (buf_size > 0 && dst#can_output) || in_eof then Unixqueue.add_event ues (receiver_attn group); self # check_input_polling(); self # rcv_count(); ) ) else if in_eof then ( (* Note: we do not close dst. out_eof just means that copying * is done *) if close_dst then dst # close_out(); out_eof <- true; ( match deferred_exn with | None -> self # set_state (`Done()); | Some err -> self # set_state (`Error err); ) ) ) with error -> (* In most cases coming from dst#output *) raise(Unixqueue.Abort(group,error)) ) end ;; class sender ~(src : #async_in_channel) ~dst ?(close_src=true) ?(close_dst=true) ues : [ unit ] engine = object(self) (* The sender has to copy data if (1) the src channel is * readable, and (2) the dst descriptor accepts output. There is also * an internal buffer that stored read data that cannot yet be * written into the dst descriptor. * * We implement the following logic: * * - The dst file descriptor is polled when there is data in the * internal buffer. Every time new data is added to the buffer, * the event Sender_attn is generated * - When the src state changes, the event Sender_attn is generated * - The event handler catches Sender_attn, and checks whether * the input channel has data. If so, the data is appended to the internal * buffer, and a new Sender_attn * event is generated. *) inherit [unit] engine_mixin (`Working 0 : unit engine_state) ues val group = Unixqueue.new_group ues val buf = String.create buf_max_size val mutable buf_size = 0 val mutable in_eof = false val mutable out_eof = false val mutable out_polling = false initializer (* Arrange that Sender_attn is generated when the src state changes: *) src # request_notification (fun () -> (* Note: With MT, we do not know which thread calls this function. * Fortunately, add_event is thread-safe. *) if is_active self#state then ( Unixqueue.add_event ues (sender_attn group); true (* Continue notifications *) ) else false (* The engine is no longer active: disable any further * notification *) ); (* Define the event handler: *) Unixqueue.add_handler ues group (fun _ _ -> self # handle_event); (* Define the abort (exception) handler: *) Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception); (* Because the internal buffer is empty initially, we cannot poll * dst. *) out_polling <- false; (* Immediately check for input: *) Unixqueue.add_event ues (sender_attn group); method abort() = match self#state with `Working _ -> if not in_eof && close_src then src # close_in(); in_eof <- true; if not out_eof && close_dst then Unix.close dst; out_eof <- true; self # set_state `Aborted; Unixqueue.clear ues group | _ -> () method event_system = ues method private snd_count() = match self#state with `Working n -> self # set_state (`Working (n+1)) | _ -> () method private handle_event ev = match ev with Unixqueue.Extra (Sender_attn g) when g = group -> self # handle_input(); | Unixqueue.Output_readiness(g,_) when g = group -> self # handle_output(); self # check_output_polling(); if out_eof then ( Unixqueue.clear ues group; (* Delete the whole group *) raise Equeue.Terminate (* Deactivate this handler *) ) | _ -> raise Equeue.Reject method private handle_exception exn = (* Unixqueue already ensures that the whole group will be deleted, * so we need not to do it here *) if not in_eof && close_src then ( try src # close_in(); with error -> Netlog.logf `Err "Uq_engines.sender#handle_exception: %s" (Netexn.to_string error) ); in_eof <- true; if not out_eof && close_dst then ( try Unix.close dst with error -> Netlog.logf `Err "Uq_engines.sender#handle_exception: %s" (Netexn.to_string error) ); out_eof <- true; self # set_state (`Error exn) method private handle_output() = if not out_eof then try let n = Unix.single_write dst buf 0 buf_size in String.blit buf n buf 0 (buf_size - n); buf_size <- buf_size - n; if buf_size = 0 && in_eof then ( out_eof <- true; if close_dst then Unix.close dst; self # set_state (`Done()); ) else ( self # snd_count(); if n > 0 && not in_eof && src#can_input then Unixqueue.add_event ues (sender_attn group); (* if not src#can_input, we will be notified when input is * again possible. *) ) with Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> (* These exceptions are expected, and can be ignored *) () | error -> (* Any other exception stops the engine *) raise(Unixqueue.Abort(group,error)) method private check_output_polling() = let need_polling = not out_eof && (buf_size > 0 || in_eof) in ( if need_polling && not out_polling then Unixqueue.add_resource ues group (Unixqueue.Wait_out dst, -1.0) else if not need_polling && out_polling then Unixqueue.remove_resource ues group (Unixqueue.Wait_out dst); ); out_polling <- need_polling method private handle_input() = (* If this method is called when in_eof, we assume that this is * an event coming too late. Just ignore. *) if not in_eof then ( (* First check the state of src: If [pos_in] raises an exception, * we assume that the input channel is broken. *) ( try ignore(src#pos_in) with _ -> (* src is in an error state, or somebody has closed it *) raise(Unixqueue.Abort(group,Broken_communication)) ); (* It is possible that src#can_input is false, because we get * Sender_attn events for many conditions, not just that * input data is again available. Ignore this case. *) try if src#can_input then ( let l = String.length buf in if buf_size < l then ( try let n = src # input buf buf_size (l-buf_size) in if n > 0 then ( buf_size <- buf_size + n; (* Check for more input data immediately: *) if buf_size < l then Unixqueue.add_event ues (sender_attn group); self # check_output_polling(); self # snd_count(); ) with End_of_file -> (* We do see EOF for the first time! *) if close_src then src # close_in(); in_eof <- true; self # check_output_polling(); self # snd_count(); ) ) with error -> (* In most cases coming from src#input *) raise(Unixqueue.Abort(group,error)) ) end ;; exception Mem_not_supported class type multiplex_controller = object method alive : bool method mem_supported : bool method event_system : Unixqueue.event_system method reading : bool method start_reading : ?peek:(unit -> unit) -> when_done:(exn option -> int -> unit) -> string -> int -> int -> unit method start_mem_reading : ?peek:(unit -> unit) -> when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int -> unit method cancel_reading : unit -> unit method writing : bool method start_writing : when_done:(exn option -> int -> unit) -> string -> int -> int -> unit method start_mem_writing : when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int -> unit method supports_half_open_connection : bool method start_writing_eof : when_done:(exn option -> unit) -> unit -> unit method cancel_writing : unit -> unit method read_eof : bool method wrote_eof : bool method shutting_down : bool method start_shutting_down : ?linger : float -> when_done:(exn option -> unit) -> unit -> unit method cancel_shutting_down : unit -> unit method inactivate : unit -> unit end class type datagram_multiplex_controller = object inherit multiplex_controller method received_from : Unix.sockaddr method send_to : Unix.sockaddr -> unit end type onshutdown_out_spec = [ `Ignore | `Initiate_shutdown | `Action of async_out_channel_engine -> multiplex_controller -> unit engine_state -> unit ] type onshutdown_in_spec = [ `Ignore | `Initiate_shutdown | `Action of async_in_channel_engine -> multiplex_controller -> unit engine_state -> unit ] type onclose_spec = [ `Ignore | `Write_eof ] class output_async_mplex ?(onclose = (`Ignore : onclose_spec) ) ?(onshutdown = (`Ignore : onshutdown_out_spec) ) ?buffer_size (mplex : multiplex_controller) : async_out_channel_engine = object (self) inherit [unit] engine_mixin (`Working 0 : unit engine_state) mplex#event_system val data_queue = Queue.create() (* The queue of strings to output *) val mutable data_top_pos = 0 (* How many bytes of the first string of data_queue * have already been copied to buf. *) val mutable data_queue_length = 0 (* The sum of all strings in data_queue, not counting * data_top_pos *) val buf = String.create buf_max_size (* The output buffer. The strings from data_queue are * appended to this buffer to reduce the number of * Unix.write syscalls *) val mutable buf_size = 0 (* The number of bytes used at the beginning of [buf]. *) val mutable pos_out = 0 (* The position of the channel *) (* Note that the object buffers the strings in data_queue plus the * string in buf, and buffer_size is the limit for * data_queue_length + buf_size *) val mutable in_eof = false val mutable shutdown_done = false method output s p l = if p < 0 || l < 0 || p > String.length s || p+l > String.length s then invalid_arg "Uq.engines.output_async_mplex#output"; if in_eof then raise Closed_channel; let l' = match buffer_size with None -> (* Unrestricted buffers *) if l > 0 then Queue.add (String.sub s p l) data_queue; l | Some max_size -> let size = data_queue_length + buf_size in let n = min l (max_size - size) in if n > 0 then Queue.add (String.sub s p n) data_queue; n in pos_out <- pos_out + l'; data_queue_length <- data_queue_length + l'; assert(data_queue_length >= 0); (* must never overflow *) if not mplex#writing && l' > 0 then self # check_for_output(); if l' > 0 then self # oam_count(); (* If l' = 0, there was no space in the buffer. No need for notification *) l' method close_out () = if not in_eof then ( in_eof <- true; if not mplex#writing then self # check_for_output(); ) method pos_out = if in_eof then raise Closed_channel; pos_out method flush () = if in_eof then raise Closed_channel; () method abort() = match self#state with `Working _ -> mplex # cancel_writing(); self # shutdown `Aborted; | _ -> () method event_system = mplex # event_system method private oam_count() = match self#state with `Working n -> self # set_state (`Working (n+1)) | _ -> () method can_output = not in_eof && match buffer_size with None -> (* Unrestricted buffers *) true | Some max_size -> let size = data_queue_length + buf_size in size < max_size method private handle_exception exn = mplex # cancel_writing(); self # shutdown (`Error exn) method private check_for_output() = assert(not mplex#writing); if not mplex#wrote_eof then ( (* Refill buf: *) while buf_size < buf_max_size && not (Queue.is_empty data_queue) do let s0 = Queue.top data_queue in let m = String.length s0 - data_top_pos in let space = buf_max_size - buf_size in let n = min space m in String.blit s0 data_top_pos buf buf_size n; buf_size <- buf_size + n; data_top_pos <- data_top_pos + n; data_queue_length <- data_queue_length - n; if data_top_pos >= String.length s0 then ( ignore(Queue.take data_queue); data_top_pos <- 0 ); assert(data_queue_length >= 0); (* must never overflow *) assert(data_top_pos >= 0); (* must never overflow *) done; (* Have something to write? *) if buf_size > 0 then ( let cur_buf_size = buf_size in mplex # start_writing ~when_done:(fun exn_opt n -> match exn_opt with | None -> assert(buf_size = cur_buf_size); String.blit buf n buf 0 (buf_size - n); buf_size <- buf_size - n; self # check_for_output(); if n > 0 then self # oam_count(); (* Note: this also implies notification because * [can_output] returns true *) | Some Cancelled -> (* Called from [abort], so ignore any data *) () | Some error -> self # handle_exception error ) buf 0 cur_buf_size; ) else if in_eof then ( match onclose with | `Write_eof -> mplex # start_writing_eof ~when_done:(fun exn_opt -> match exn_opt with | None -> self # shutdown (`Done()); | Some Cancelled -> () | Some error -> self # handle_exception error ) () | `Ignore -> self # shutdown (`Done()); ) ) method private shutdown next_state = (* See also input_async_mplex # shutdown *) if not shutdown_done then ( shutdown_done <- true; in_eof <- true; Queue.clear data_queue; data_queue_length <- 0; data_top_pos <- 0; ( match onshutdown with | `Ignore -> () | `Initiate_shutdown -> mplex # start_shutting_down ~when_done:(fun _ -> ()) () (* CHECK: What to do if shutdown not possible? E.g. because * there is also a reader? *) | `Action f -> ( try f (self : #async_out_channel_engine :> async_out_channel_engine) mplex next_state with error -> (* CHECK: We could map that also to state Error *) Netlog.logf `Err "Uq_engines.output_async_mplex#shutdown: %s" (Netexn.to_string error) ) ); self # set_state next_state ) end ;; class input_async_mplex ?(onshutdown = (`Ignore : onshutdown_in_spec) ) ?buffer_size (mplex : multiplex_controller) : async_in_channel_engine = object (self) inherit [unit] engine_mixin (`Working 0 : unit engine_state) mplex#event_system val data_queue = Queue.create() (* The queue of the read strings *) val mutable data_top_pos = 0 (* How many bytes of the first string of data_queue * have already been copied to the reading user. *) val mutable data_queue_length = 0 (* The sum of all strings in data_queue, not counting * data_top_pos *) val buf = String.create buf_max_size (* The input buffer *) val mutable pos_in = 0 val mutable in_eof = false val mutable shutdown_done = false initializer self # check_for_input() method input s p l = if p < 0 || l < 0 || p > String.length s || p+l > String.length s then invalid_arg "Uq.engines.input_async_mplex#input"; if in_eof then raise Closed_channel; let l' = min l data_queue_length in let l_todo = ref l' in let s_pos = ref p in while !l_todo > 0 do let u = try Queue.peek data_queue with Queue.Empty -> assert false in let n = min !l_todo (String.length u - data_top_pos) in String.blit u data_top_pos s !s_pos n; s_pos := !s_pos + n; data_top_pos <- data_top_pos + n; l_todo := !l_todo - n; if data_top_pos = String.length u then ( let _ = Queue.take data_queue in data_top_pos <- 0 ) done; pos_in <- pos_in + l'; data_queue_length <- data_queue_length - l'; assert(data_queue_length >= 0); (* must never overflow *) if not mplex#reading then self # check_for_input(); if l' > 0 then self # iam_count(); (* If l' = 0, there were no data in the buffer. No need for notification *) if l' = 0 && mplex # read_eof then raise End_of_file else l' method close_in () = if not in_eof then ( mplex # cancel_reading(); self # shutdown (`Done()) ) method pos_in = if in_eof then raise Closed_channel; pos_in method abort() = match self#state with `Working _ -> mplex # cancel_reading(); self # shutdown `Aborted; | _ -> () method event_system = mplex # event_system method private iam_count() = match self#state with `Working n -> self # set_state (`Working (n+1)) | _ -> () method can_input = not in_eof && (data_queue_length > 0 || mplex#read_eof) method private handle_exception exn = mplex # cancel_reading(); self # shutdown (`Error exn) method private check_for_input() = assert(not mplex#reading); if not mplex#read_eof then ( (* Space to read something? *) let space = match buffer_size with | None -> String.length buf | Some m -> min (String.length buf) (m - data_queue_length) in if space > 0 then ( mplex # start_reading ~when_done:(fun exn_opt n -> match exn_opt with | None -> if n > 0 then ( let s = String.sub buf 0 n in Queue.add s data_queue; data_queue_length <- data_queue_length + n; assert(data_queue_length >= 0); (* must never overflow *) ); self # check_for_input(); if n > 0 then self # iam_count() | Some End_of_file -> self # iam_count() | Some Cancelled -> (* Called from [abort], so ignore any data *) () | Some error -> self # handle_exception error ) buf 0 space ) ) method private shutdown next_state = (* See also output_async_mplex # shutdown *) if not shutdown_done then ( shutdown_done <- true; in_eof <- true; Queue.clear data_queue; data_top_pos <- 0; data_queue_length <- 0; ( match onshutdown with | `Ignore -> () | `Initiate_shutdown -> mplex # start_shutting_down ~when_done:(fun _ -> ()) () | `Action f -> ( try f (self : #async_in_channel_engine :> async_in_channel_engine) mplex next_state with error -> Netlog.logf `Err "Uq_engines.input_async_mplex#shutdown: %s" (Netexn.to_string error) ) ); self # set_state next_state ) end ;; let anyway ~finally f arg = try let r = f arg in finally(); r with | error -> finally(); raise error ;; class socket_multiplex_controller ?(close_inactive_descr = true) ?(preclose = fun () -> ()) ?(supports_half_open_connection = false) ?timeout fd esys : datagram_multiplex_controller = let fd_style = Netsys.get_fd_style fd in let get_ph() = Netsys_win32.lookup_pipe fd in (* To be used only when fd_style = `Pipe *) let supports_half_open_connection = match fd_style with | `W32_pipe -> false | _ -> supports_half_open_connection in let mem_supported = match fd_style with | `Read_write -> true | `Recv_send _ -> true | `Recv_send_implied -> true | _ -> false in let start_timer f = (* Call [f x] when the timer fires *) match timeout with | None -> None | Some (tmo, x) -> let tmo_g = Unixqueue.new_group esys in Unixqueue.once esys tmo_g tmo (fun () -> f x); Some (tmo_g, f) in let stop_timer r = match !r with | None -> () | Some (old_tmo_g, f) -> Unixqueue.clear esys old_tmo_g; r := None in (* let () = prerr_endline ("fd style: " ^ Netsys.string_of_fd_style fd_style) in *) object(self) val mutable alive = true val mutable read_eof = false val mutable wrote_eof = false val mutable reading = `None val mutable reading_tmo = ref None val mutable writing = `None val mutable writing_tmo = ref None val mutable writing_eof = None val mutable shutting_down = None val mutable shutting_down_tmo = ref None val mutable disconnecting = None val mutable need_linger = false val mutable have_handler = false val mutable rcvd_from = None val mutable send_to = None val group = Unixqueue.new_group esys method alive = alive method mem_supported = mem_supported method reading = reading <> `None method writing = writing <> `None || writing_eof <> None method shutting_down = shutting_down <> None method read_eof = read_eof method wrote_eof = wrote_eof method supports_half_open_connection = supports_half_open_connection method received_from = match rcvd_from with | None -> failwith "#received_from: Nothing received yet, or unknown address" | Some a -> a method send_to a = send_to <- Some a initializer ( match fd_style with | `W32_pipe -> () | `W32_event | `W32_pipe_server | `W32_input_thread | `W32_output_thread | `W32_process -> invalid_arg "Uq_engines.socket_multiplex_controller: \ invalid type of file descriptor" | _ -> Unix.set_nonblock fd ); dlogr (fun () -> sprintf "new socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method private restart_all_timers() = match timeout with | None -> () | Some (tmo, x) -> List.iter (fun r -> match !r with | None -> () | Some (old_tmo_g, f) -> Unixqueue.clear esys old_tmo_g; r := start_timer f ) [ reading_tmo; writing_tmo; shutting_down_tmo ] method start_reading ?(peek = fun ()->()) ~when_done s pos len = if pos < 0 || len < 0 || pos > String.length s - len then invalid_arg "#start_reading"; if reading <> `None then failwith "#start_reading: already reading"; if shutting_down <> None then failwith "#start_reading: already shutting down"; if not alive then failwith "#start_reading: inactive connection"; self # check_for_connect(); Unixqueue.add_resource esys group (Unixqueue.Wait_in fd, -1.0); reading <- `String(when_done, peek, s, pos, len); reading_tmo := start_timer self#cancel_reading_with; disconnecting <- None; dlogr (fun () -> sprintf "start_reading socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method start_mem_reading ?(peek = fun ()->()) ~when_done m pos len = if not mem_supported then raise Mem_not_supported; if pos < 0 || len < 0 || pos > Bigarray.Array1.dim m - len then invalid_arg "#start_mem_reading"; if reading <> `None then failwith "#start_mem_reading: already reading"; if shutting_down <> None then failwith "#start_mem_reading: already shutting down"; if not alive then failwith "#start_mem_reading: inactive connection"; self # check_for_connect(); Unixqueue.add_resource esys group (Unixqueue.Wait_in fd, -1.0); reading <- `Mem(when_done, peek, m, pos, len); reading_tmo := start_timer self#cancel_reading_with; disconnecting <- None; dlogr (fun () -> sprintf "start_reading socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method cancel_reading () = self # cancel_reading_with Cancelled method private cancel_reading_with x = match reading with | `None -> () | `String(f_when_done, _, _, _, _) -> self # really_cancel_reading(); anyway ~finally:self#check_for_disconnect (f_when_done (Some x)) 0 | `Mem(f_when_done, _, _, _, _) -> self # really_cancel_reading(); anyway ~finally:self#check_for_disconnect (f_when_done (Some x)) 0 method private really_cancel_reading() = stop_timer reading_tmo; if reading <> `None then ( Unixqueue.remove_resource esys group (Unixqueue.Wait_in fd); reading <- `None; dlogr (fun () -> sprintf "cancel_reading socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) ) method start_writing ~when_done s pos len = if pos < 0 || len < 0 || pos > String.length s - len then invalid_arg "#start_writing"; if writing <> `None || writing_eof <> None then failwith "#start_writing: already writing"; if shutting_down <> None then failwith "#start_writing: already shutting down"; if wrote_eof then failwith "#start_writing: already past EOF"; if not alive then failwith "#start_writing: inactive connection"; self # check_for_connect(); Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, -1.0); writing <- `String(when_done, s, pos, len); writing_tmo := start_timer self#cancel_writing_with; disconnecting <- None; dlogr (fun () -> sprintf "start_writing socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method start_mem_writing ~when_done m pos len = if not mem_supported then raise Mem_not_supported; if pos < 0 || len < 0 || pos > Bigarray.Array1.dim m - len then invalid_arg "#start_mem_writing"; if writing <> `None || writing_eof <> None then failwith "#start_mem_writing: already writing"; if shutting_down <> None then failwith "#start_mem_writing: already shutting down"; if wrote_eof then failwith "#start_mem_writing: already past EOF"; if not alive then failwith "#start_mem_writing: inactive connection"; self # check_for_connect(); Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, -1.0); writing <- `Mem(when_done, m, pos, len); writing_tmo := start_timer self#cancel_writing_with; disconnecting <- None; dlogr (fun () -> sprintf "start_writing socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method start_writing_eof ~when_done () = if not supports_half_open_connection then failwith "#start_writing_eof: operation not supported"; (* From here on we know fd is not a named pipe *) if writing <> `None || writing_eof <> None then failwith "#start_writing_eof: already writing"; if shutting_down <> None then failwith "#start_writing_eof: already shutting down"; if wrote_eof then failwith "#start_writing_eof: already past EOF"; if not alive then failwith "#start_writing_eof: inactive connection"; self # check_for_connect(); Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, -1.0); writing_eof <- Some when_done; writing_tmo := start_timer self#cancel_writing_with; disconnecting <- None; dlogr (fun () -> sprintf "start_writing_eof socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method cancel_writing () = self # cancel_writing_with Cancelled method private cancel_writing_with x = match writing, writing_eof with | `None, None -> () | (`String(f_when_done, _, _, _) | `Mem(f_when_done, _, _, _)), None -> self # really_cancel_writing(); anyway ~finally:self#check_for_disconnect (f_when_done (Some x)) 0 | `None, Some f_when_done -> self # really_cancel_writing(); anyway ~finally:self#check_for_disconnect f_when_done (Some x) | _ -> assert false method private really_cancel_writing() = stop_timer writing_tmo; if writing <> `None || writing_eof <> None then ( Unixqueue.remove_resource esys group (Unixqueue.Wait_out fd); writing <- `None; writing_eof <- None; dlogr (fun () -> sprintf "cancel_writing socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) ) method start_shutting_down ?(linger = 60.0) ~when_done () = if reading <> `None || writing <> `None || writing_eof <> None then failwith "#start_shutting_down: still reading or writing"; if shutting_down <> None then failwith "#start_shutting_down: already shutting down"; if not alive then failwith "#start_shutting_down: inactive connection"; self # check_for_connect(); let linger_timeout = if need_linger then linger else 0.0 in let wid = Unixqueue.new_wait_id esys in let (op, tmo) = if linger_timeout = 0.0 then (Unixqueue.Wait wid, 0.0) else (Unixqueue.Wait_in fd, linger_timeout) in Unixqueue.add_resource esys group (op,tmo); shutting_down <- Some(when_done, op); shutting_down_tmo := start_timer self#cancel_shutting_down_with; disconnecting <- None; dlogr (fun () -> sprintf "start_shutting_down socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method cancel_shutting_down () = self # cancel_shutting_down_with Cancelled method private cancel_shutting_down_with x = match shutting_down with | None -> () | Some (f_when_done, _) -> self # really_cancel_shutting_down (); anyway ~finally:self#check_for_disconnect f_when_done (Some x) method private really_cancel_shutting_down () = stop_timer shutting_down_tmo; match shutting_down with | None -> () | Some (_, op) -> Unixqueue.remove_resource esys group op; shutting_down <- None; dlogr (fun () -> sprintf "cancel_shutting_down \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)) method private check_for_connect() = if not have_handler then ( Unixqueue.add_handler esys group (fun _ _ -> self # handle_event); have_handler <- true ); disconnecting <- None method private check_for_disconnect() = if reading = `None && writing = `None && writing_eof = None && shutting_down = None && disconnecting = None then ( let wid = Unixqueue.new_wait_id esys in let disconnector = Unixqueue.Wait wid in Unixqueue.add_event esys (Unixqueue.Timeout(group,disconnector)); disconnecting <- Some disconnector ) method private notify_rd f_when_done exn_opt n = self # really_cancel_reading(); self # restart_all_timers(); dlogr (fun () -> sprintf "input_done \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); anyway ~finally:self#check_for_disconnect (f_when_done exn_opt) n method private notify_wr f_when_done exn_opt n = self # really_cancel_writing(); self # restart_all_timers(); dlogr (fun () -> sprintf "output_done \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); anyway ~finally:self#check_for_disconnect (f_when_done exn_opt) n method private handle_event ev = match ev with | Unixqueue.Input_arrived(g, _) when g = group -> dlogr (fun () -> sprintf "input_event \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); ( match reading with | `None -> () | `String (f_when_done, peek, _, _, _) | `Mem (f_when_done, peek, _, _, _) -> ( peek(); try rcvd_from <- None; let n = match reading with | `None -> assert false | `String(_,_, s, pos, len) -> ( match fd_style with | `Recv_send(_,a) -> let n = Unix.recv fd s pos len [] in rcvd_from <- Some a; n | `Recvfrom_sendto -> let (n, a) = Unix.recvfrom fd s pos len [] in rcvd_from <- Some a; n | _ -> Netsys.gread fd_style fd s pos len ) | `Mem(_,_, m, pos, len) -> ( match fd_style with | `Recv_send(_,a) -> let n = Netsys_mem.mem_recv fd m pos len [] in rcvd_from <- Some a; n | `Recv_send_implied -> Netsys_mem.mem_recv fd m pos len [] | `Read_write -> Netsys_mem.mem_read fd m pos len | _ -> assert false ) in if n = 0 then ( read_eof <- true; need_linger <- false; self # notify_rd f_when_done (Some End_of_file) 0 ) else self # notify_rd f_when_done None n with | Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> () | error -> self # notify_rd f_when_done (Some error) 0 ) ); ( match shutting_down with | Some (f_when_done, op) when op = Unixqueue.Wait_in fd -> let exn_opt, notify = try Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL; (None, true) with | Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> (None, false) | Unix.Unix_error(Unix.ENOTCONN,_,_) -> (None, true) | Unix.Unix_error(Unix.EPERM,_,_) -> (None, true) | error -> (Some error, true) in if notify then ( self # really_cancel_shutting_down(); stop_timer shutting_down_tmo; read_eof <- true; wrote_eof <- true; dlogr (fun () -> sprintf "shutdown_done \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); anyway ~finally:self#check_for_disconnect f_when_done exn_opt ) | _ -> () ) | Unixqueue.Output_readiness(g, _) when g = group -> dlogr (fun () -> sprintf "output_event \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); ( match writing with | `None -> () | `String (f_when_done, _, _, _) | `Mem (f_when_done, _, _, _) -> ( try let n = match writing with | `None -> assert false | `String(_, s, pos, len) -> ( match fd_style with | `Recvfrom_sendto -> ( match send_to with | None -> failwith "socket_multiplex_controller: Unknown receiver of message to send" | Some a -> Unix.sendto fd s pos len [] a ) | _ -> Netsys.gwrite fd_style fd s pos len ) | `Mem(_, m, pos, len) -> ( match fd_style with | `Recv_send _ -> Netsys_mem.mem_send fd m pos len [] | `Recv_send_implied -> Netsys_mem.mem_send fd m pos len [] | `Read_write -> Netsys_mem.mem_write fd m pos len | _ -> assert false ) in self # notify_wr f_when_done None n with | Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> () | error -> self # notify_wr f_when_done (Some error) 0 ) ); ( match writing_eof with | None -> () | Some f_when_done -> let exn_opt, notify = try if not wrote_eof then ( Netsys.gshutdown fd_style fd Unix.SHUTDOWN_SEND; if not read_eof then need_linger <- true; wrote_eof <- true ); (None, true) with | Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> (None, false) | Netsys.Shutdown_not_supported -> (None, true) in if notify then ( self # really_cancel_writing(); dlogr (fun () -> sprintf "output_eof_done \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); anyway ~finally:self#check_for_disconnect f_when_done exn_opt ) ) | Unixqueue.Timeout (g, op) when g = group -> (* Note: The following is incompatible with [once] because we * always accept timeout events! *) dlogr (fun () -> sprintf "other_event \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); ( match shutting_down with | Some (f_when_done, op') when op = op' -> let exn_opt, notify = try match fd_style with | `W32_pipe -> let ph = get_ph() in Netsys_win32.pipe_shutdown ph; (None, true) | _ -> Unix.shutdown fd (if wrote_eof then Unix.SHUTDOWN_RECEIVE else Unix.SHUTDOWN_ALL); (None, true) with | Unix.Unix_error(Unix.EAGAIN,_,_) | Unix.Unix_error(Unix.EWOULDBLOCK,_,_) | Unix.Unix_error(Unix.EINTR,_,_) -> assert false (* Not documented in man pages *) | Unix.Unix_error(Unix.ENOTCONN,_,_) -> (None, true) | error -> (Some error, true) in if notify then ( self # really_cancel_shutting_down(); read_eof <- true; wrote_eof <- true; dlogr (fun () -> sprintf "shutdown_done \ socket_multiplex_controller mplex=%d fd=%Ld" (Oo.id self) (Netsys.int64_of_file_descr fd)); anyway ~finally:self#check_for_disconnect f_when_done exn_opt ) | _ -> () ); ( match disconnecting with | Some op' when op = op' -> disconnecting <- None; have_handler <- false; raise Equeue.Terminate | _ -> () ) | _ -> raise Equeue.Reject method inactivate() = dlogr (fun () -> sprintf "inactivate \ socket_multiplex_controller mplex=%d fd=%Ld alive=%b \ close_inactive_descr=%b" (Oo.id self) (Netsys.int64_of_file_descr fd) alive close_inactive_descr); if alive then ( alive <- false; self # really_cancel_reading(); self # really_cancel_writing(); self # really_cancel_shutting_down(); stop_timer reading_tmo; stop_timer writing_tmo; stop_timer shutting_down_tmo; disconnecting <- None; have_handler <- false; Unixqueue.clear esys group; if close_inactive_descr then ( preclose(); Netsys.gclose fd_style fd (* It is important that Unix.close (or substitute) is the very last action. From here on, a thread running in parallel can allocate this descriptor again, so it is essential that there are no references anymore to it when the old descriptor is closed. *) ) ) method event_system = esys end ;; let create_multiplex_controller_for_connected_socket ?close_inactive_descr ?preclose ?supports_half_open_connection ?timeout fd esys = let mplex = new socket_multiplex_controller ?close_inactive_descr ?preclose ?supports_half_open_connection ?timeout fd esys in (mplex :> multiplex_controller) ;; let create_multiplex_controller_for_datagram_socket ?close_inactive_descr ?preclose ?timeout fd esys = let mplex = new socket_multiplex_controller ?close_inactive_descr ?preclose ~supports_half_open_connection:false ?timeout fd esys in (mplex :> datagram_multiplex_controller) ;; class output_async_descr ~dst ?buffer_size ?(close_dst=true) esys = (* Map to output_async_mplex. Be careful not to depend on socket * functionaliy (esp. shutdown). *) let mplex = create_multiplex_controller_for_connected_socket dst esys in let shutdown ach mplex _ = if close_dst then Unix.close dst in output_async_mplex ~onclose:`Ignore ~onshutdown:(`Action shutdown) ?buffer_size mplex ;; class input_async_descr ~src ?buffer_size ?(close_src=true) esys = (* Map to input_async_mplex. Be careful not to depend on socket * functionaliy (esp. shutdown). *) let mplex = create_multiplex_controller_for_connected_socket src esys in let shutdown ach mplex _ = if close_src then Unix.close src in input_async_mplex ~onshutdown:(`Action shutdown) ?buffer_size mplex ;; type copy_task = [ `Unidirectional of (Unix.file_descr * Unix.file_descr) | `Uni_socket of (Unix.file_descr * Unix.file_descr) | `Bidirectional of (Unix.file_descr * Unix.file_descr) | `Tridirectional of (Unix.file_descr * Unix.file_descr * Unix.file_descr) ] ;; class copier (copy_task : copy_task) ues : [unit] engine = object(self) val mutable engines = [] val mutable last_eng_states = [] val mutable last_count = 0 initializer ( match copy_task with `Unidirectional(fd1, fd2) -> self # init_unidirectional fd1 fd2 | `Uni_socket(fd1, fd2) -> self # init_uni_socket fd1 fd2 | `Bidirectional(fd1, fd2) -> self # init_tridirectional true fd1 fd2 fd2 | `Tridirectional(fd1, fd2, fd3) -> self # init_tridirectional false fd1 fd2 fd3 (* | _ -> assert false *) ); last_eng_states <- List.map (fun eng -> eng # state) engines; method private init_unidirectional fd1 fd2 = (* This is quite simple. fd2_ch is an output channel * writing data to fd2. fd1_rcv is a receiver transferring * data from fd1 to fd2_ch. If fd1_rcv is at EOF, it will * close fd1, and close fd2_ch. fd2_ch closes fd2 after * it has written all buffered data. *) let fd2_ch = new output_async_descr ~dst:fd2 ~buffer_size:buf_max_size ues in let fd1_rcv = new receiver ~src:fd1 ~dst:(fd2_ch :> async_out_channel) ues in engines <- [ fd1_rcv; (fd2_ch :> unit engine); ] method private init_uni_socket fd1 fd2 = (* Here, we have to modify the EOF behaviour. First, * fd1_rcv must not close src. Of course, it must * close the output channel fd2_ch, otherwise the * channel would not know that it is at the end of * the data stream. However, fd2_ch must not close * dst; instead we catch the EOF situation, and * shutdown the socket. *) let fd2_ch = new output_async_descr ~dst:fd2 ~close_dst:false ~buffer_size:buf_max_size ues in let fd1_rcv = new receiver ~src:fd1 ~close_src:false ~dst:(fd2_ch :> async_out_channel) ues in when_state ~is_done:(fun () -> Unix.shutdown fd2 Unix.SHUTDOWN_SEND) fd2_ch; engines <- [ fd1_rcv; (fd2_ch :> unit engine); ] method private init_tridirectional bi_case fd1 fd2 fd3 = (* Basically, we have two `Uni_socket copiers where one copier * transfers data into the reverse direction as the other * copier. Additionally, we have to close the descriptors * when work is done, either successfully or with error. *) (* bi_case: fd2 = fd3 is assumed, and fd2 must be a socket *) (* Copy fd1 to fd2: *) let fd2_ch = new output_async_descr ~dst:fd2 ~close_dst:false ~buffer_size:buf_max_size ues in let fd1_rcv = new receiver ~src:fd1 ~close_src:false ~dst:(fd2_ch :> async_out_channel) ues in (* Copy fd3 to fd1: *) let fd1_ch = new output_async_descr ~dst:fd1 ~close_dst:false ~buffer_size:buf_max_size ues in let fd3_rcv = new receiver ~src:fd3 ~close_src:false ~dst:(fd1_ch :> async_out_channel) ues in (* Check state: *) let fd1_eof = ref false in (* whether output to fd1 @ eof *) let fd1_closed = ref false in let fd2_eof = ref false in (* whether output to fd2 @ eof *) let fd2_closed = ref false in let fd3_closed = ref false in let full_close _ = if not !fd1_closed then Unix.close fd1; fd1_eof := true; fd1_closed := true; if not !fd2_closed then ( Unix.close fd2; if bi_case then fd3_closed := true; ); fd2_eof := true; fd2_closed := true; if not !fd3_closed then Unix.close fd3; fd3_closed := true; in let half_close_fd1() = if !fd2_eof then ( full_close() ) else ( if not !fd1_eof then Unix.shutdown fd1 Unix.SHUTDOWN_SEND; fd1_eof := true ) in let half_close_fd2() = if !fd1_eof then ( full_close() ) else ( if not !fd2_eof then ( if bi_case then Unix.shutdown fd2 Unix.SHUTDOWN_SEND else ( Unix.close fd2; fd2_closed := true; ) ); fd2_eof := true ) in when_state ~is_done:half_close_fd2 ~is_error:full_close ~is_aborted:full_close (fd2_ch :> 'a engine); when_state ~is_done:half_close_fd1 ~is_error:full_close ~is_aborted:full_close fd1_ch; engines <- [ fd1_rcv; (fd2_ch :> unit engine); fd3_rcv; (fd1_ch :> unit engine); ] method state = (* We inspect the states of all engines. If there is an engine * in error state, this state will be returned (Broken_communication * has lower priority than other errors). Otherwise: If there is * an aborted engine, we return that the copier is aborted. * Otherwise: If there is at least one working engine, we return * working state. The last case is that all engines are done, and * we return done. * * Note that the progress meter for `Working is emulated, and * the more often [state] is invoked, the more frequent the progress * meter is increased. But it is only increased if at least one * engine has made some progress. * * CHECK: This seems to be a generalization of the sync_engine above. * Maybe we want it as basic engine construct? *) let eng_states = List.map (fun eng -> eng # state) engines in let our_state = ref(`Done()) in List.iter (fun st -> match st with `Done _ -> () | `Working _ -> ( match !our_state with `Done _ -> our_state := `Working 0 | _ -> () ) | `Aborted -> ( match !our_state with `Done _ | `Working _ -> our_state := `Aborted | _ -> () ) | `Error err -> ( match !our_state with `Done _ | `Working _ | `Aborted | `Error Broken_communication -> our_state := st | `Error _ -> () ) ) eng_states; ( match !our_state with `Working _ -> if eng_states <> last_eng_states then last_count <- last_count + 1; our_state := `Working last_count | _ -> () ); last_eng_states <- eng_states; !our_state method abort () = (* Simply abort all engines *) List.iter (fun eng -> eng # abort()) engines (* CHECK: Hopefully, no engine goes to an error state because the * other engine aborts... *) method request_notification f = (* Simply forward the request to all engines *) let enabled = ref true in (* After the first notification has disabled further notifications, * it must be ensured that no more notifications will happen. * [enabled] is [true] as long as notifications are enabled. *) let f'() = !enabled && ( let enabled' = f() in enabled := !enabled && enabled'; !enabled ) in List.iter (fun eng -> eng # request_notification f') engines method event_system = ues end ;; type inetspec = [ `Sock_inet of (Unix.socket_type * Unix.inet_addr * int) | `Sock_inet_byname of (Unix.socket_type * string * int) ] type sockspec = [ `Sock_unix of (Unix.socket_type * string) | inetspec ] ;; let sockspec_of_sockaddr st = function | Unix.ADDR_INET(ip,port) -> `Sock_inet(st, ip, port) | Unix.ADDR_UNIX path -> `Sock_unix(st, path) let sockspec_of_socksymbol st = function | `Inet(ip,port) -> `Sock_inet(st, ip, port) | `Inet_byname(n,port) -> `Sock_inet_byname(st, n, port) | `Unix p -> `Sock_unix(st, p) type connect_address = [ `Socket of sockspec * connect_options | `Command of string * (int -> Unixqueue.event_system -> unit) | `W32_pipe of Netsys_win32.pipe_mode * string ] and connect_options = { conn_bind : sockspec option; } ;; let default_connect_options = { conn_bind = None } ;; type connect_status = [ `Socket of Unix.file_descr * sockspec | `Command of Unix.file_descr * int | `W32_pipe of Unix.file_descr ] ;; let client_endpoint = function `Socket(fd,_) -> fd | `Command(fd,_) -> fd | `W32_pipe fd -> fd ;; let client_socket = client_endpoint class type client_endpoint_connector = object method connect : connect_address -> Unixqueue.event_system -> connect_status engine end ;; class type client_socket_connector = client_endpoint_connector let addr_of_name name = let entry = Uq_resolver.get_host_by_name name in entry.Unix.h_addr_list.(0) ;; let getsockspec stype s = match Unix.getsockname s with Unix.ADDR_UNIX path -> `Sock_unix(stype, path) | Unix.ADDR_INET(addr, port) -> `Sock_inet(stype, addr, port) ;; (* let getpeerspec stype s = (* Warning: this function may fail if the socket is connected and the peer does not have an address (e.g. older OSX) *) match Netsys.getpeername s with Unix.ADDR_UNIX path -> `Sock_unix(stype, path) | Unix.ADDR_INET(addr, port) -> `Sock_inet(stype, addr, port) ;; *) let getinetpeerspec stype s = try match Netsys.getpeername s with Unix.ADDR_UNIX path -> None | Unix.ADDR_INET(addr, port) -> Some(`Sock_inet(stype, addr, port)) with | _ -> None ;; (* let getconnerror s = (* Call this after getpeerspec raised ENOTCONN *) try let b = String.make 1 ' ' in let _ = Unix.recv s b 0 1 [] in assert false with | error -> error *) (* type xdom = [ `Socket of socket_domain | `Pipe ] *) (* - new impl, not yet ready *) (* class direct_socket_connector() : client_socket_connector = object (self) method connect connaddr ues = let const_eng v = new epsilon_engine(`Done v) ues in let sock_prim_data_eng sockspec = match sockspec with | `Sock_unix(stype, path) -> let addr = Unix.ADDR_UNIX path in const_eng(`Socket Unix.PF_UNIX, stype, addr) | `Sock_inet(stype, ip, port) -> let dom = Netsys.domain_of_inet_addr ip in let addr = Unix.ADDR_INET(ip,port) in const_eng(`Socket dom, stype, addr) | `Sock_inet_by_name(stype, name, port) -> let ip_opt = XXX in ( match ip_opt with | Some ip -> let dom = Netsys.domain_of_inet_addr ip in let addr = Unix.ADDR_INET(ip, port) in const_eng(`Socket dom, stype, addr) | None -> (* Now need a real host lookup *) let r = Uq_resolver.current_resolver() in let eng = r # host_by_name name ues in new map_engine ~map_done:(fun he -> let dom = he.Unix.h_addrtype in let ip = he.Unix.h_addr_list.(0) in let addr = Unix.ADDR_INET(ip, port) in `Done(`Socket dom, stype, addr) ) eng ) | `Pipe XXX -> in let sock_data_eng sockspec opts = (* Creates an engine that finds out the relevant data to create the socket *) match opts.conn_bind with | None -> new map_engine ~map_done:(fun (dom, stype, addr) -> `Done(dom, stype, addr, None)) (sock_prim_data_eng sockspec) | Some bind_sockspec -> (* Run two resolver engines in parallel *) let d1_eng = sock_prim_data_eng sockspec in let d2_eng = sock_prim_data_eng bind_sockspec in new map_engine ~map_done:(fun ((dom1, stype1, addr1), (dom2, _, addr2)) -> if dom1 <> dom2 then `Error(Failure("direct_socket_connector: Socket domain mismatch")) else `Done(dom1, stype1, addr1, Some addr2) ) (new sync_engine d1_eng d2_eng) in let sock_data_check sockspec opts = (* Check whether params are valid *) match (sockspec, opts.conn_bind) with | `Sock_unix(stype,_), None -> () | `Sock_unix(stype,_), Some(`Sock_unix(stype, _)) -> () | `Sock_inet(stype,_,_), None -> () | `Sock_inet_byname(stype,_,_), None -> () | `Sock_inet(stype,_,_), Some(`Sock_inet(stype,_,_)) -> () | `Sock_inet(stype,_,_), Some(`Sock_inet_byname(stype,_,_)) -> () | `Sock_inet_byname(stype,_,_), Some(`Sock_inet(stype,_,_)) -> () | `Sock_inet_byname(stype,_,_), Some(`Sock_inet_byname(stype,_,_)) -> () | `Pipe(_,_), None -> () | `Pipe(_,_), Some(`Pipe(_,_)) -> () | _ -> invalid_arg "direct_socket_connector: socket type mismatch" in let create_eng sockspec (dom, stype, addr, bind_addr_opt) -> (* Create and connect the socket s, and return the connect engine *) match dom with | `Socket sockdom -> let connect_tried = ref false in let s = Unix.socket sockdom 0 in ( try Netsys.set_close_on_exec s; Unix.set_nonblock s; ( match bind_addr_opt with | None -> () | Some bind_addr -> Unix.bind s bind_addr ); connect_tried := true; Unix.connect s addr; Netsys.connect_check s; let fake_conn_eng = const_eng(`Socket(s, getsockspec stype s)) in when_state ~is_aborted:(fun _ -> Unix.close s) fake_conn_eng; fake_conn_eng with | Unix.Unix_error((Unix.EINPROGRESS|Unix.EWOULDBLOCK),_,_) when !connect_tried -> (* Note: Win32 returns EWOULDBLOCK instead of EINPROGRESS *) (* Wait until the socket is writeable. Win32 reports connect errors by signaling that out-of-band data can be received (funny, right?), so we wait for that condition, too. *) let poll_eng = new poll_engine [ Unixqueue.Wait_out s, (-1.0); Unixqueue.Wait_oob s, (-1.0) ] ues in let conn_eng = new map_engine ~map_done:(fun _ -> try Netsys.connect_check s; `Done(getsockspec stype s) with | error -> Unix.close s; `Error error ) ~map_error:(fun e -> Unix.close s; `Error e) ~map_aborted:(fun _ -> Unix.close s; `Aborted) (poll_eng :> Unixqueue.event engine) in conn_eng | e -> Unix.close s; raise e ) | `Pipe -> ( match sockspec with | `Pipe(mode,name) -> let ph = Netsys_win32.pipe_connect name mode in let s = Netsys_win32.pipe_descr ph in (s, None) (* CHECK: do we need Netsys_win32.pipe_shutdown ph *) | _ -> assert false ) in match connaddr with | `Socket(sockspec,opts) -> (* Check on wrong arguments: *) sock_data_check sockspec opts; (* Create and use the engines: *) let data_eng = sock_data_eng sockspec opts in new seq_engine data_eng (fun sock_data -> create_eng sockspec sock_data) | _ -> raise Addressing_method_not_supported end *) (* Old impl with sync name lookup *) class direct_connector() : client_endpoint_connector = object (self) method connect connaddr ues = let setup_socket s stype dest_addr opts = try Netsys.set_close_on_exec s; Unix.set_nonblock s; ( match opts.conn_bind with | Some bind_spec -> ( match bind_spec with | `Sock_unix(stype', path) -> if stype <> stype' then invalid_arg "Socket type mismatch"; Unix.bind s (Unix.ADDR_UNIX path) | `Sock_inet(stype', addr, port) -> if stype <> stype' then invalid_arg "Socket type mismatch"; Unix.bind s (Unix.ADDR_INET(addr,port)) | `Sock_inet_byname(stype', name, port) -> if stype <> stype' then invalid_arg "Socket type mismatch"; let addr = addr_of_name name in Unix.bind s (Unix.ADDR_INET(addr,port)) ) | None -> () ); Unix.connect s dest_addr; (s, stype, true) with Unix.Unix_error((Unix.EINPROGRESS|Unix.EWOULDBLOCK),_,_) -> (s,stype,false) (* Note: Win32 returns EWOULDBLOCK instead of EINPROGRESS *) | error -> (* Remarks: * We can get here EAGAIN. Unfortunately, this is a kind of * "catch-all" error for Unix.connect, e.g. you can get it when * you are run out of local ports, or if the backlog limit is * exceeded. It is totally unclear what to do in this case, * so we do not handle it here. The user is supposed to connect * later again. *) Unix.close s; raise error in match connaddr with `Socket(sockspec,opts) -> let (s, stype, is_connected) = match sockspec with | `Sock_unix(stype, path) -> let s = Unix.socket Unix.PF_UNIX stype 0 in setup_socket s stype (Unix.ADDR_UNIX path) opts; | `Sock_inet(stype, addr, port) -> let dom = Netsys.domain_of_inet_addr addr in let s = Unix.socket dom stype 0 in setup_socket s stype (Unix.ADDR_INET(addr,port)) opts; | `Sock_inet_byname(stype, name, port) -> let addr = addr_of_name name in let dom = Netsys.domain_of_inet_addr addr in let s = Unix.socket dom stype 0 in setup_socket s stype (Unix.ADDR_INET(addr,port)) opts; in let conn_eng = if is_connected then ( let status = try Netsys.connect_check s; `Done(`Socket(s, getsockspec stype s)) with | error -> `Error error in new epsilon_engine status ues ) else ( (* Now wait until the socket is writeable. Win32 reports connect errors by signaling that out-of-band data can be received (funny, right?), so we wait for that condition, too. *) let e = new poll_engine [ Unixqueue.Wait_out s, (-1.0); Unixqueue.Wait_oob s, (-1.0) ] ues in new map_engine ~map_done:(fun _ -> try Netsys.connect_check s; `Done(`Socket(s, getsockspec stype s)) with | error -> `Error error ) (e :> Unixqueue.event engine) ) in (* It is possible that somebody aborts conn_eng. In this case, * the socket must be closed. Same when we enter an error state. *) when_state ~is_aborted:(fun () -> Unix.close s) ~is_error:(fun _ -> Unix.close s) conn_eng; (* conn_eng is what the user sees: *) conn_eng | `W32_pipe(mode,name) -> let ph = Netsys_win32.pipe_connect name mode in let s = Netsys_win32.pipe_descr ph in let status = `Done(`W32_pipe s) in let conn_eng = new epsilon_engine status ues in (* It is possible that somebody aborts conn_eng. In this case, * the descr must be closed. The error state cannot be reached. *) let close() = Netsys_win32.pipe_shutdown ph; Unix.close s in when_state ~is_aborted:(fun () -> close()) conn_eng; (* conn_eng is what the user sees: *) conn_eng | _ -> raise Addressing_method_not_supported end ;; (* TODO: Close u, v on abort/error *) (* TODO: port to Win32 *) class command_connector () : client_endpoint_connector = object(self) method connect connaddr ues = match connaddr with `Command (cmdstr,cmdcb) -> let (u,v) = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in Unix.set_nonblock u; Unix.set_nonblock v; Netsys.set_close_on_exec u; Netsys.set_close_on_exec v; let (s_in_sub, s_in) = Unix.pipe() in let (s_out, s_out_sub) = Unix.pipe() in let _e1 = new copier (`Tridirectional(v, s_in, s_out)) ues in Netsys.set_close_on_exec s_in; Netsys.set_close_on_exec s_out; let e2 = new epsilon_engine (`Done ()) ues in new map_engine ~map_done:(fun _ -> let pid = Unix.create_process "/bin/sh" [| "/bin/sh"; "-c"; cmdstr |] s_in_sub s_out_sub Unix.stderr in (* CHECK: Are the other descriptors closed? *) Unix.close s_in_sub; Unix.close s_out_sub; cmdcb pid ues; `Done (`Command(u, pid))) e2 | _ -> raise Addressing_method_not_supported end ;; (* CHECK: Maybe we want a combinator for engines that: * - Is only `Done when all engines are done * - Goes to `Error when one engine is `Error * - Goes to `Aborted when one engine is `Aborted * * See copier. It implements a similar idea. *) let connector ?proxy connaddr ues = let eff_proxy = match proxy with Some p -> ( p :> client_socket_connector ) | None -> ( match connaddr with | `Socket _ | `W32_pipe _ -> new direct_connector() | `Command _ -> new command_connector() ) in eff_proxy # connect connaddr ues ;; type listen_address = [ `Socket of sockspec * listen_options | `W32_pipe of Netsys_win32.pipe_mode * string * listen_options ] and listen_options = { lstn_backlog : int; lstn_reuseaddr : bool; } ;; let default_listen_options = { lstn_backlog = 20; lstn_reuseaddr = false; } ;; class type server_endpoint_acceptor = object method server_address : connect_address method multiple_connections : bool method accept : unit -> (Unix.file_descr * inetspec option) engine method shut_down : unit -> unit end ;; class type server_socket_acceptor = server_endpoint_acceptor class type server_endpoint_listener = object method listen : listen_address -> Unixqueue.event_system -> server_endpoint_acceptor engine end ;; class type server_socket_listener = server_endpoint_listener class direct_acceptor ?(close_on_shutdown=true) ?(preclose=fun()->()) fd ues : server_socket_acceptor = let fd_style = Netsys.get_fd_style fd in let pipe_objects = lazy ( let psrv = Netsys_win32.lookup_pipe_server fd in let cn_ev = Netsys_win32.pipe_connect_event psrv in let cn_ev_descr = Netsys_win32.event_descr cn_ev in (psrv, cn_ev, cn_ev_descr) ) in let () = match fd_style with | `Read_write | `W32_pipe | `W32_event -> failwith "Uq_engines.direct_acceptor: endpoint not supported" | `W32_pipe_server -> ignore(Lazy.force pipe_objects) | _ -> () in object(self) val mutable acc_engine = None (* The engine currently accepting connections *) method server_address = match fd_style with | `W32_pipe_server -> let (psrv, _, _) = Lazy.force pipe_objects in let name = Netsys_win32.pipe_server_name psrv in let mode = Netsys_win32.pipe_server_mode psrv in let mode' = Netsys_win32.rev_mode mode in `W32_pipe(mode',name) | `Recv_send _ | `Recvfrom_sendto | `Recv_send_implied -> `Socket(getsockspec Unix.SOCK_STREAM fd, default_connect_options) | _ -> assert false method multiple_connections = true method accept () = (* Poll until the socket becomes readable, then accept it *) if acc_engine <> None then failwith "Uq_engines.direct_acceptor: Already waiting for connection"; let socket_accept_eng() = let eng = new poll_engine [ Unixqueue.Wait_in fd, (-1.0) ] ues in new map_engine ~map_done:(fun _ -> try let (fd',_) = Unix.accept fd in Unix.set_nonblock fd'; (* There seem to be buggy kernels out there where * [accept] does not always return a connected socket. * We get ENOTCONN for [getpeername] then. * Who does that? The super hackers at SW-Soft with * their Virtuozzo shit. *) let ps = try getinetpeerspec Unix.SOCK_STREAM fd' with | Unix.Unix_error(Unix.ENOTCONN,_,_) as e -> Unix.close fd'; raise e in acc_engine <- None; `Done(fd', ps) with | Unix.Unix_error( (Unix.EAGAIN | Unix.EINTR | Unix.ENOTCONN), _, _) -> eng # restart(); `Working 0 | error -> `Error error ) eng in let w32_pipe_accept_eng() = let (psrv, cn_ev, cn_ev_descr) = Lazy.force pipe_objects in let eng = new poll_engine [ Unixqueue.Wait_in cn_ev_descr, (-1.0) ] ues in new map_engine ~map_done:(fun _ -> try let pipe = Netsys_win32.pipe_accept psrv in let pipe_fd = Netsys_win32.pipe_descr pipe in acc_engine <- None; `Done(pipe_fd, None) with | Unix.Unix_error( (Unix.EAGAIN | Unix.EINTR | Unix.ENOTCONN), _, _) -> eng # restart(); `Working 0 | error -> `Error error ) eng in let acc_eng = match fd_style with | `Recv_send _ | `Recvfrom_sendto | `Recv_send_implied -> socket_accept_eng() | `W32_pipe_server -> w32_pipe_accept_eng() | _ -> assert false in when_state ~is_error:(fun x -> acc_engine <- None) ~is_aborted:(fun () -> acc_engine <- None) acc_eng; acc_engine <- Some acc_eng; acc_eng method shut_down() = if close_on_shutdown then ( preclose(); ( match fd_style with | `Recv_send _ | `Recvfrom_sendto | `Recv_send_implied -> Unix.close fd | `W32_pipe_server -> let (psrv, _, cn_ev_descr) = Lazy.force pipe_objects in Unix.close cn_ev_descr; Netsys_win32.pipe_shutdown_server psrv; Unix.close fd | _ -> assert false ) ); (* else: if not close_on_shutdown, there is no portable way of achieving that further connection attempts are refused by the kernel. listen(fd,0) works on some systems, but not on all. *) match acc_engine with None -> () | Some acc -> acc # abort() end ;; class direct_socket_acceptor fd esys = direct_acceptor fd esys class direct_listener () : server_socket_listener = object(self) method listen lstnaddr ues = let accept fd = let acc = new direct_acceptor fd ues in let eng = new epsilon_engine (`Done acc) ues in when_state ~is_aborted:(fun () -> acc # shut_down()) ~is_error:(fun _ -> acc # shut_down()) eng; eng in match lstnaddr with `Socket (sockspec, opts) -> ( match sockspec with `Sock_unix(stype, path) -> let s = Unix.socket Unix.PF_UNIX stype 0 in Unix.set_nonblock s; if opts.lstn_reuseaddr then Unix.setsockopt s Unix.SO_REUSEADDR true; Unix.bind s (Unix.ADDR_UNIX path); Unix.listen s opts.lstn_backlog; accept s | `Sock_inet(stype, addr, port) -> let dom = Netsys.domain_of_inet_addr addr in let s = Unix.socket dom stype 0 in Unix.set_nonblock s; if opts.lstn_reuseaddr then Unix.setsockopt s Unix.SO_REUSEADDR true; Unix.bind s (Unix.ADDR_INET(addr,port)); Unix.listen s opts.lstn_backlog; accept s | `Sock_inet_byname(stype, name, port) -> let addr = addr_of_name name in let dom = Netsys.domain_of_inet_addr addr in let s = Unix.socket dom stype 0 in Unix.set_nonblock s; if opts.lstn_reuseaddr then Unix.setsockopt s Unix.SO_REUSEADDR true; Unix.bind s (Unix.ADDR_INET(addr,port)); Unix.listen s opts.lstn_backlog; accept s ) | `W32_pipe (mode, name, opts) -> let backlog = opts.lstn_backlog in let psrv = Netsys_win32.create_local_pipe_server name mode max_int in Netsys_win32.pipe_listen psrv backlog; let fd = Netsys_win32.pipe_server_descr psrv in accept fd (* | _ -> raise Addressing_method_not_supported *) end ;; let listener ?proxy lstnaddr ues = let eff_proxy = match proxy with Some p -> ( p :> server_socket_listener ) | None -> ( match lstnaddr with | `Socket _ | `W32_pipe _ -> new direct_listener() ) in eff_proxy # listen lstnaddr ues ;; type datagram_type = [ `Unix_dgram | `Inet_udp | `Inet6_udp ] ;; class type wrapped_datagram_socket = object method descriptor : Unix.file_descr method sendto : string -> int -> int -> Unix.msg_flag list -> sockspec -> int method recvfrom : string -> int -> int -> Unix.msg_flag list -> (int * sockspec) method shut_down : unit -> unit method datagram_type : datagram_type method socket_domain : Unix.socket_domain method socket_type : Unix.socket_type method socket_protocol : int end;; class type datagram_socket_provider = object method create_datagram_socket : datagram_type -> Unixqueue.event_system -> wrapped_datagram_socket engine end ;; class direct_datagram_socket dgtype (sdom,stype,sproto) : wrapped_datagram_socket = let sock = Unix.socket sdom stype sproto in let _ = Unix.set_nonblock sock; Netsys.set_close_on_exec sock in object(self) method descriptor = sock method sendto s p n flags spec = let sockaddr = match spec with `Sock_unix(stype', path) -> if stype <> stype' then invalid_arg "Socket type mismatch"; Unix.ADDR_UNIX path | `Sock_inet(stype', addr, port) -> if stype <> stype' then invalid_arg "Socket type mismatch"; Unix.ADDR_INET(addr,port) | `Sock_inet_byname(stype', name, port) -> if stype <> stype' then invalid_arg "Socket type mismatch"; let addr = addr_of_name name in Unix.ADDR_INET(addr,port) in Unix.sendto sock s p n flags sockaddr method recvfrom s p n flags = let (n, sockaddr) = Unix.recvfrom sock s p n flags in let sockspec = match sockaddr with Unix.ADDR_UNIX path -> `Sock_unix(stype, path) | Unix.ADDR_INET(addr,port) -> `Sock_inet(stype, addr, port) in (n,sockspec) method shut_down() = Unix.close sock method datagram_type = dgtype method socket_domain = sdom method socket_type = stype method socket_protocol = sproto end ;; let datagram_provider ?proxy dgtype ues = match proxy with Some p -> ( p :> datagram_socket_provider ) # create_datagram_socket dgtype ues | None -> let (sdom,stype,sproto) = match dgtype with `Unix_dgram -> (Unix.PF_UNIX, Unix.SOCK_DGRAM, 0) | `Inet_udp -> (Unix.PF_INET, Unix.SOCK_DGRAM, 0) | `Inet6_udp -> (Unix.PF_INET6, Unix.SOCK_DGRAM, 0) in let wsock = new direct_datagram_socket dgtype (sdom,stype,sproto) in let eng = new epsilon_engine (`Done wsock) ues in when_state ~is_aborted:(fun () -> wsock # shut_down()) ~is_error:(fun _ -> wsock # shut_down()) eng; eng ;; module Operators = struct let ( ++ ) = qseq_engine let ( >> ) = fmap_engine let eps_e = epsilon_engine end