(* $Id: unixqueue_pollset.ml 1710 2012-02-16 15:15:27Z gerd $ *) (* pset # dispose: we try to call it when there are no more operations in the queue. This is tested when - the resource is removed (and we are not waiting) - the group is cleared (and we are not waiting) - after every wait Note that we must not call [dispose] while [wait] is running! Also, we call it when [run] is left by an exception. *) open Unixqueue_util open Printf module Float = struct type t = float (* let compare = ( Pervasives.compare : float -> float -> int ) *) let compare (x:float) y = if x < y then (-1) else if x = y then 0 else 1 (* does not work for non-normal numbers but we don't care *) end module FloatMap = Map.Make(Float) let nogroup_id = Oo.id nogroup let min_key m = (* In ocaml-3.12 there is already a function for this in Map *) let k = ref None in ( try FloatMap.iter (fun t _ -> k := Some t; raise Exit) m with Exit -> () ); match !k with | Some min -> min | None -> raise Not_found let ops_until tmax m = (* Look into the FloatMap m, and return all ops for which t <= tmax *) let l = ref [] in ( try FloatMap.iter (fun t ops -> if t > tmax then raise Exit; l := (t,ops) :: !l ) m with | Exit -> () ); List.rev !l exception Term of Unixqueue_util.group (* Extra (Term g) is now the group termination event for g *) exception Keep_alive (* Sometimes used to keep the event system alive *) exception Exit_loop let () = Netexn.register_printer (Term nogroup) (function | Term g -> if g = nogroup then "Term(nogroup)" else "Term(" ^ string_of_int (Oo.id g) ^ ")" | _ -> assert false ) let pset_set (pset:Netsys_pollset.pollset) fd (i,o,p) = if not i && not o && not p then pset # remove fd else pset # add fd (Netsys_posix.poll_req_events i o p) let pset_find (pset:Netsys_pollset.pollset) fd = try Netsys_posix.poll_req_triple(pset#find fd) with | Not_found -> (false,false,false) let op_of_event ev = match ev with | Unixqueue_util.Input_arrived(_,fd) -> Unixqueue_util.Wait_in fd | Unixqueue_util.Output_readiness(_,fd) -> Unixqueue_util.Wait_out fd | Unixqueue_util.Out_of_band(_,fd) -> Unixqueue_util.Wait_oob fd | Unixqueue_util.Timeout(_,op) -> op | _ -> assert false let rec list_mem_op op l = match l with | h :: t -> is_op_eq h op || list_mem_op op t | [] -> false let while_locked mutex f = Netsys_oothr.serialize mutex f () let escape_lock mutex f = mutex # unlock(); let r = try f () with e -> mutex # lock(); raise e in mutex # lock(); r let flatten_map f l = (* = List.flatten (List.map f l) *) let rec loop l h = match l with | [] -> List.rev h | x :: l' -> let y = f x in loop l' (List.rev_append y h) in loop l [] (* A little encapsulation so we can easily identify handlers by Oo.id *) class ohandler (h:handler) = object method run esys eq ev = h esys eq ev end class pollset_event_system (pset : Netsys_pollset.pollset) = let mtp = !Netsys_oothr.provider in let is_mt = not (mtp#single_threaded) in let sys = ref (lazy (assert false)) in (* initialized below *) let ops_of_group = (Hashtbl.create 10 : (group, OpSet.t) Hashtbl.t) in (* is for [clear] only *) let tmo_of_op = (OpTbl.create 10 : (float * float * group * bool) OpTbl.t) in (* first number: duration of timeout (or -1) second number: point in time (or -1) bool: whether strong *) let ops_of_tmo = ref(FloatMap.empty : OpSet.t FloatMap.t) in let strong_ops = ref 0 in (* number of strong (non-weak) ops *) let aborting = ref false in let close_tab = (Hashtbl.create 10 : (Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t) in let abort_tab = (Hashtbl.create 10 : (group, (group -> exn -> unit)) Hashtbl.t) in let handlers = (Hashtbl.create 10 : (group, ohandler list) Hashtbl.t) in let handled_groups = ref 0 in (* the number of keys in [handlers] *) let waiting = ref false in let when_blocking = ref (fun () -> ()) in let mutex = mtp # create_mutex() in let event_of_op_wl_nf op = let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *) match op with | Unixqueue_util.Wait_in fd -> Unixqueue_util.Input_arrived(g,fd) | Unixqueue_util.Wait_out fd -> Unixqueue_util.Output_readiness(g,fd) | Unixqueue_util.Wait_oob fd -> Unixqueue_util.Out_of_band(g,fd) | Unixqueue_util.Wait _ -> assert false in let events_of_op_wl op = try [ event_of_op_wl_nf op ] with | Not_found -> (* A "ghost event", i.e. there is no handler anymore for it, but wasn't deleted quickly enough from pset *) [] in let tmo_event_of_op_wl_nf op = let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *) Unixqueue_util.Timeout(g,op) in let tmo_events_of_op_wl op = try [ tmo_event_of_op_wl_nf op ] with | Not_found -> [] (* Ghost event, see above *) in let add_event_wl e = Equeue.add_event (Lazy.force !sys) e; pset # cancel_wait true (* Set the cancel bit, so that any pending [wait] is interrupted *) in object(self) initializer ( let equeue_sys = Equeue.create ~string_of_event self#source in sys := lazy equeue_sys; ignore(Lazy.force !sys); (* Add ourselves now at object creation time. The only drawback is that we no longer raise [Equeue.Out_of_handlers] - but this is questionable anyway since the addition of [Immediate] events. In order to generate [Out_of_handlers] we would have to count [Immediate] events in addition to handlers. *) self#equeue_add_handler () ) method private source _sys = (* locking: the lock is not held when called, because we are called back from Equeue *) assert(Lazy.force !sys == _sys); !when_blocking(); let dbg = !Unixqueue_util.Debug.enable in let locked = ref true in (* keep track of locking state *) if is_mt then mutex # lock(); let t0 = Unix.gettimeofday() in let tmin = try min_key !ops_of_tmo with Not_found -> (-1.0) in let delta = if tmin < 0.0 then (-1.0) else max (tmin -. t0) 0.0 in if dbg then dlogr (fun () -> sprintf "t0 = %f, #tmo_of_op = %d" t0 (OpTbl.length tmo_of_op) ); let nothing_to_do = (* For this test only non-weak resources count, so... *) !strong_ops = 0 in let have_eintr = ref false in let pset_events = try if nothing_to_do then ( if dbg then dlogr (fun () -> "nothing_to_do"); [] ) else ( if dbg then dlogr (fun () -> ( let ops = OpTbl.fold (fun op _ l -> op::l) tmo_of_op [] in let op_str = String.concat ";" (List.map string_of_op ops) in sprintf "wait tmo=%f ops=<%s>" delta op_str)); (* Reset the cancel bit immediately before calling [wait]. Any event added up to now is considered anyway by [wait] because our lock is still held. Any new event added after we unlock will set the cancel_wait flag, and cause the [wait] to exit (if it is still running). *) pset # cancel_wait false; waiting := true; if is_mt then mutex # unlock(); locked := false; pset # wait delta ) with | Unix.Unix_error(Unix.EINTR,_,_) -> if dbg then dlogr (fun () -> "wait signals EINTR"); have_eintr := true; [] | e -> (* Usually from [wait], but one never knows... *) if !locked && is_mt then mutex#unlock(); waiting := false; raise e in waiting := false; if not !locked && is_mt then mutex # lock(); locked := true; try (* Catch exceptions and unlock *) if dbg then dlogr (fun () -> ( sprintf "wait returns <%d pset events>" (List.length pset_events))); let t1 = Unix.gettimeofday() in if dbg then dlogr (fun () -> (sprintf "t1 = %f" t1)); (* t1 is the reference for determining the timeouts *) (* while waiting somebody might have removed resouces, so ... *) if OpTbl.length tmo_of_op = 0 then pset # dispose(); let operations = (* flatten_map *) (* The possible operations *) List.fold_left (fun acc (fd,ev_in,ev_out) -> (* Note that POLLHUP and POLLERR can also mean that we have data to read/write! *) let (in_rd,in_wr,in_pri) = Netsys_posix.poll_req_triple ev_in in let out_rd = Netsys_posix.poll_rd_result ev_out in let out_wr = Netsys_posix.poll_wr_result ev_out in let out_pri = Netsys_posix.poll_pri_result ev_out in let out_hup = Netsys_posix.poll_hup_result ev_out in let out_err = Netsys_posix.poll_err_result ev_out in let have_input = in_rd && (out_rd || out_hup || out_err) in let have_output = in_wr && (out_wr || out_hup || out_err) in let have_pri = in_pri && (out_pri || out_hup || out_err) in let e1 = if have_pri then Unixqueue_util.Wait_oob fd :: acc else acc in let e2 = if have_input then Unixqueue_util.Wait_in fd :: e1 else e1 in let e3 = if have_output then Unixqueue_util.Wait_out fd :: e2 else e2 in e3 ) [] pset_events in let ops_timed_out = ops_until t1 !ops_of_tmo in (* Note: this _must_ include operations until <= t1 (not <t1), otherwise a timeout value of 0.0 won't work *) let ops_timed_out_l = (* Determine the operations in [tmo_of_op] that have timed out and that are not in [operations] FIXME: [List.mem op operations] is not scalable. *) List.fold_left (fun oacc (_, ops) -> OpSet.fold (fun op iacc -> if list_mem_op op operations then iacc else op::iacc) ops oacc ) [] ops_timed_out in if dbg then ( dlogr (fun() -> sprintf "delivering events <%s>" (String.concat ";" (flatten_map (fun op -> List.map string_of_event (events_of_op_wl op) ) operations ))); dlogr (fun() -> sprintf "delivering timeouts <%s>" (String.concat ";" (flatten_map (fun op -> List.map string_of_event (tmo_events_of_op_wl op) ) ops_timed_out_l ))); ); (* deliver events *) let delivered = ref false in let deliver get_ev op = try let ev = get_ev op in delivered := true; Equeue.add_event _sys ev with Not_found -> () in List.iter (deliver event_of_op_wl_nf) operations; List.iter (deliver tmo_event_of_op_wl_nf) ops_timed_out_l; if !have_eintr then ( dlogr (fun () -> "delivering Signal"); Equeue.add_event _sys Unixqueue_util.Signal ) else if not !delivered && not nothing_to_do then ( (* Ensure we always add an event to keep the event loop running: *) if dbg then dlogr (fun () -> "delivering Keep_alive"); Equeue.add_event _sys (Unixqueue_util.Extra Keep_alive) ); (* Update ops_of_tmo: *) List.iter (fun (t,_) -> ops_of_tmo := FloatMap.remove t !ops_of_tmo ) ops_timed_out; (* Set a new timeout for all delivered events: (Note that [pset] remains unchanged, because the set of watched resources remains unchanged.) rm_done is true when the old timeout is already removed from ops_of_tmo. *) let update_tmo rm_done oplist = List.iter (fun op -> try let (tmo,t1,g,is_strong) = OpTbl.find tmo_of_op op in (* or Not_found *) if tmo >= 0.0 then ( let t2 = t1 +. tmo in self#sched_upd_tmo_wl g op tmo t2 is_strong (if rm_done then (-1.0) else t1) ) with | Not_found -> () (* It is possible that resources were removed while we were waiting for events. This can lead to [Not_found] here. We just ignore this. *) ) oplist in update_tmo false operations; update_tmo true ops_timed_out_l; if is_mt then mutex # unlock(); locked := false with | e -> (* exceptions are unexpected, but we want to make sure not to mess with locks *) if !locked && is_mt then mutex # unlock(); raise e (* Note: suffix _wl = while locked *) method private sched_remove_wl op = try let tmo, t1, g, is_strong = OpTbl.find tmo_of_op op in (* or Not_found *) dlogr(fun () -> (sprintf "sched_remove %s" (string_of_op op))); OpTbl.remove tmo_of_op op; if is_strong then decr strong_ops; ( try let l_ops = if tmo >= 0.0 then FloatMap.find t1 !ops_of_tmo else raise Not_found in let l_ops' = OpSet.remove op l_ops in if l_ops' = OpSet.empty then ops_of_tmo := FloatMap.remove t1 !ops_of_tmo else ops_of_tmo := FloatMap.add t1 l_ops' !ops_of_tmo with Not_found -> () ); if Oo.id g <> nogroup_id then ( let old_set = Hashtbl.find ops_of_group g in let new_set = OpSet.remove op old_set in if new_set = OpSet.empty then Hashtbl.remove ops_of_group g else Hashtbl.replace ops_of_group g new_set ) with | Not_found -> () method private pset_remove_wl op = match op with | Wait_in fd -> let (i,o,p) = pset_find pset fd in pset_set pset fd (false,o,p) | Wait_out fd -> let (i,o,p) = pset_find pset fd in pset_set pset fd (i,false,p) | Wait_oob fd -> let (i,o,p) = pset_find pset fd in pset_set pset fd (i,o,false) | Wait _ -> () method private sched_add_wl g op tmo t1 is_strong = dlogr(fun () -> (sprintf "sched_add %s tmo=%f t1=%f is_strong=%b" (string_of_op op) tmo t1 is_strong)); OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong); if is_strong then incr strong_ops; let l_ops = try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in if tmo >= 0.0 then ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops) !ops_of_tmo; if Oo.id g <> nogroup_id then ( let old_set = try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in let new_set = OpSet.add op old_set in Hashtbl.replace ops_of_group g new_set ) method private pset_add_wl op = match op with | Wait_in fd -> let (i,o,p) = pset_find pset fd in pset_set pset fd (true,o,p) | Wait_out fd -> let (i,o,p) = pset_find pset fd in pset_set pset fd (i,true,p) | Wait_oob fd -> let (i,o,p) = pset_find pset fd in pset_set pset fd (i,o,true) | Wait _ -> () method private sched_upd_tmo_wl g op tmo t1 is_strong old_t1 = (* only for tmo>=0 *) dlogr(fun () -> (sprintf "sched_upd_tmo %s tmo=%f t1=%f is_strong=%b" (string_of_op op) tmo t1 is_strong)); OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong); (* We assume old_t1 is already removed form ops_of_tmo if old_t1 < 0 *) if old_t1 >= 0.0 then ( try let l_ops = FloatMap.find old_t1 !ops_of_tmo in let l_ops' = OpSet.remove op l_ops in if l_ops' = OpSet.empty then ops_of_tmo := FloatMap.remove old_t1 !ops_of_tmo else ops_of_tmo := FloatMap.add old_t1 l_ops' !ops_of_tmo with Not_found -> () ); let l_ops_new = try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops_new) !ops_of_tmo method exists_resource op = while_locked mutex (fun () -> self # exists_resource_wl op) method private exists_resource_wl op = OpTbl.mem tmo_of_op op method private exists_descriptor_wl fd = self#exists_resource_wl (Unixqueue_util.Wait_in fd) || self#exists_resource_wl (Unixqueue_util.Wait_out fd) || self#exists_resource_wl (Unixqueue_util.Wait_oob fd) method add_resource g (op, tmo) = while_locked mutex (fun () -> self # add_resource_wl g (op, tmo) true ) method add_weak_resource g (op, tmo) = while_locked mutex (fun () -> self # add_resource_wl g (op, tmo) false ) method private add_resource_wl g (op, tmo) is_strong = if g # is_terminating then invalid_arg "Unixqueue.add_resource: the group is terminated"; if not (OpTbl.mem tmo_of_op op) then ( self#pset_add_wl op; let t1 = if tmo < 0.0 then tmo else Unix.gettimeofday() +. tmo in self#sched_add_wl g op tmo t1 is_strong; (* Multi-threading: interrupt [wait] *) pset # cancel_wait true; (* CHECK: In the select-based impl we add Keep_alive to equeue. This idea (probably): If [wait] is about to return, and the queue becomes empty, the whole esys terminates. The Keep_alive prevents that. My current thinking is that this delays the race condition a bit, but does not prevent it from happening *) ) (* Note: The second addition of a resource is silently ignored... Maybe this should be fixed, so that the timeout can be lowered *) method remove_resource g op = while_locked mutex (fun () -> if g # is_terminating then invalid_arg "remove_resource: the group is terminated"; let _, t1, g_found, _ = OpTbl.find tmo_of_op op in if Oo.id g <> Oo.id g_found then failwith "remove_resource: descriptor belongs to different group"; self#sched_remove_wl op; self#pset_remove_wl op; if not !waiting && OpTbl.length tmo_of_op = 0 then pset # dispose(); pset # cancel_wait true; (* interrupt [wait] *) (* is there a close action ? *) let fd_opt = match op with | Wait_in d -> Some d | Wait_out d -> Some d | Wait_oob d -> Some d | Wait _ -> None in match fd_opt with | Some fd -> if not !aborting then ( let action = try Some (snd(Hashtbl.find close_tab fd)) with Not_found -> None in match action with | Some a -> (* any open resource? *) (* FIXME MT: We don't know yet whether fd can be closed. This shouldn't be done before [wait] returns. *) if not (self#exists_descriptor_wl fd) then ( dlogr (fun () -> (sprintf "remove_resource \ <running close action for fd %s>" (string_of_fd fd))); Hashtbl.remove close_tab fd; escape_lock mutex (fun () -> a fd); ) | None -> () ) | None -> () ) method new_group () = new group_object method new_wait_id () = new wait_object method add_close_action g (d,a) = while_locked mutex (fun () -> if g # is_terminating then invalid_arg "add_close_action: the group is terminated"; (* CHECK: Maybe we should fail if g is a different group than * the existing group *) if self#exists_descriptor_wl d then Hashtbl.replace close_tab d (g,a) (* There can be only one close action. * TODO: Rename to set_close_action *) else failwith "add_close_action" ) method add_abort_action g a = while_locked mutex (fun () -> if g # is_terminating then invalid_arg "add_abort_action: the group is terminated"; if Oo.id g = nogroup_id then invalid_arg "add_abort_action: the group is nogroup"; Hashtbl.replace abort_tab g a ) method add_event e = while_locked mutex (fun () -> add_event_wl e) method private uq_handler (esys : event Equeue.t) ev = (* locking: it is assumed that we do not have the lock when uq_handler is called. It is a callback from Equeue *) (* The single Unixqueue handler. For all added (sub) handlers, uq_handler * gets the events, and delivers them *) let terminate_handler_wl g h = if !Unixqueue_util.Debug.enable then dlogr (fun() -> (sprintf "uq_handler <terminating handler group %d, handler %d>" (Oo.id g) (Oo.id h))); let hlist = try Hashtbl.find handlers g with Not_found -> [] in let hlist' = List.filter (fun h' -> Oo.id h' <> Oo.id h) hlist in if hlist' = [] then ( Hashtbl.remove handlers g; handled_groups := !handled_groups - 1; (* if handled_groups = 0 then ( dlogr (fun () -> "uq_handler <self-terminating>"); raise Equeue.Terminate (* delete uq_handler from esys *) ) *) ) else ( Hashtbl.replace handlers g hlist' ) in let rec forward_event_to g (hlist : ohandler list) = (* The caller does not have the lock when this fn is called! *) match hlist with [] -> if !Unixqueue_util.Debug.enable then dlogr (fun () -> "uq_handler <empty list>"); raise Equeue.Reject | h :: hlist' -> ( try (* Note: ues _must not_ be locked now *) if !Unixqueue_util.Debug.enable then dlogr (fun () -> (sprintf "uq_handler <invoke handler group %d, handler %d>" (Oo.id g) (Oo.id h))); h#run (self :> event_system) esys ev; if !Unixqueue_util.Debug.enable then dlogr (fun () -> (sprintf "uq_handler <invoke_success handler group %d, handler %d>" (Oo.id g) (Oo.id h))); with Equeue.Reject -> forward_event_to g hlist' | Equeue.Terminate -> (* Terminate only this handler. *) while_locked mutex (fun () -> terminate_handler_wl g h) (* Any error exceptions simply fall through. Equeue will * catch them, and will add the event to the error queue *) ) in let forward_event g = (* The caller does not have the lock when this fn is called! *) if !Unixqueue_util.Debug.enable then dlogr (fun () -> (sprintf "uq_handler <forward_event group %d>" (Oo.id g))); let hlist = while_locked mutex (fun () -> try Hashtbl.find handlers g with Not_found -> []) in forward_event_to g hlist in let forward_event_to_all() = (* The caller does not have the lock when this fn is called! *) let hlist_all = while_locked mutex (fun () -> Hashtbl.fold (fun g hlist l -> (g,hlist) :: l) handlers []) in try List.iter (fun (g,hlist) -> try forward_event_to g hlist; raise Exit_loop (* event is delivered, so exit iteration *) with (* event is rejected: try next group *) Equeue.Reject -> () ) hlist_all; raise Equeue.Reject (* no handler has accepted the event, so reject *) with Exit_loop -> () in if !Unixqueue_util.Debug.enable then dlogr (fun () -> (sprintf "uq_handler <event %s>" (string_of_event ev))); match ev with | Extra (Term g) -> (* Terminate all handlers of group g *) while_locked mutex (fun () -> if Hashtbl.mem handlers g then ( dlogr (fun () -> (sprintf "uq_handler <terminating group %d>" (Oo.id g))); Hashtbl.remove handlers g; handled_groups := !handled_groups - 1; (* if handled_groups = 0 then ( dlogr (fun () -> "uq_handler <self-terminating>"); raise Equeue.Terminate (* delete uq_handler from esys *) ) *) ) else raise Equeue.Reject (* strange, should not happen *) ) | Extra Keep_alive -> raise Equeue.Reject | Input_arrived(g,_) -> if g # is_terminating then raise Equeue.Reject; forward_event g; | Output_readiness(g,_) -> if g # is_terminating then raise Equeue.Reject; forward_event g; | Out_of_band(g,_) -> if g # is_terminating then raise Equeue.Reject; forward_event g; | Timeout(g,_) -> if g # is_terminating then raise Equeue.Reject; forward_event g; | Signal -> forward_event_to_all(); | Extra x -> forward_event_to_all(); | Immediate(g,f) -> if g # is_terminating then raise Equeue.Reject; ( try f() with Equeue.Terminate -> () ) method private equeue_add_handler () = Equeue.add_handler (Lazy.force !sys) self#uq_handler (* CHECK: There is a small difference between Equeue.add_handler and * this add_handler: Here, the handler is immediately active (if * uq_handler is already active). Can this lead to problems? *) method add_handler g h = while_locked mutex (fun () -> let oh = new ohandler h in dlogr (fun () -> (sprintf "add_handler <group %d, handler %d>" (Oo.id g) (Oo.id oh))); if g # is_terminating then invalid_arg "Unixqueue.add_handler: the group is terminated"; ( try let old_handlers = Hashtbl.find handlers g in Hashtbl.replace handlers g (oh :: old_handlers) with | Not_found -> (* The group g is new *) Hashtbl.add handlers g [oh]; handled_groups := !handled_groups + 1; (* if handled_groups = 1 then self#equeue_add_handler () *) ) ) method clear g = if Oo.id g = nogroup_id then invalid_arg "Unixqueue.clear: nogroup"; while_locked mutex (fun () -> self # clear_wl g) method private clear_wl g = dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g))); (* Set that g is terminating now: *) g # terminate(); (* (i) delete all resources of g: *) let ops = try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in OpSet.iter self#sched_remove_wl ops; OpSet.iter self#pset_remove_wl ops; Hashtbl.remove ops_of_group g; (* (ii) delete all handlers of g: *) add_event_wl (Extra (Term g)); (* side effect: we also interrupt [wait] *) (* (iii) delete special actions of g: *) let to_remove = (* remove from close_tab *) Hashtbl.fold (fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in List.iter (Hashtbl.remove close_tab) to_remove; Hashtbl.remove abort_tab g; (* Note: the Term event isn't caught after all handlers have been * deleted. The Equeue module simply discards events that are not * handled. *) if not !waiting && OpTbl.length tmo_of_op = 0 then pset # dispose(); method private abort g ex = (* caller doesn't have the lock *) (* Note: If g has been terminated, the abort action is removed. So * we will never find here one. *) dlogr (fun () -> (sprintf "abort <group %d, exception %s>" (Oo.id g) (Netexn.to_string ex))); let action = while_locked mutex (fun () -> try Some (Hashtbl.find abort_tab g) with Not_found -> None) in match action with | Some a -> begin dlogr (fun () -> "abort <running abort action>"); let mistake = ref None in while_locked mutex (fun () -> aborting := true); begin try a g ex; with | any -> mistake := Some any (* Wow *) end; while_locked mutex (fun () -> self#clear_wl g; aborting := false ); match !mistake with | None -> () | Some m -> dlogr (fun () -> (sprintf "abort <propagating exception %s>" (Netexn.to_string m))); raise m end | None -> () method run () = (* caller doesn't have the lock *) let continue = ref true in try while !continue do continue := false; try Equeue.run (Lazy.force !sys); with | Abort (g,an_exception) -> begin match an_exception with | (Equeue.Reject|Equeue.Terminate) -> (* A serious programming error: *) failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error" | Abort(_,_) -> failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error" | _ -> () end; self#abort g an_exception; continue := true done; with | error -> pset # dispose(); raise error method is_running = Equeue.is_running (Lazy.force !sys) method when_blocking f = when_blocking := f end let pollset_event_system pset = (new pollset_event_system pset :> Unixqueue_util.event_system)