Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: unixqueue_pollset.ml 1710 2012-02-16 15:15:27Z gerd $ *)

(* pset # dispose: we try to call it when there are no more operations in
   the queue. This is tested when
    - the resource is removed (and we are not waiting)
    - the group is cleared (and we are not waiting)
    - after every wait
   Note that we must not call [dispose] while [wait] is running!

   Also, we call it when [run] is left by an exception.
 *)


open Unixqueue_util
open Printf

module Float = struct
  type t = float
(*  let compare = ( Pervasives.compare : float -> float -> int ) *)
  let compare (x:float) y =
    if x < y then (-1) else if x = y then 0 else 1
      (* does not work for non-normal numbers but we don't care *)
end


module FloatMap = Map.Make(Float)


let nogroup_id = Oo.id nogroup

let min_key m = 
  (* In ocaml-3.12 there is already a function for this in Map *)
  let k = ref None in
  ( try
      FloatMap.iter
	(fun t _ -> k := Some t; raise Exit)
	m
    with Exit -> ()
  );
  match !k with
    | Some min -> min
    | None -> raise Not_found


let ops_until tmax m =
  (* Look into the FloatMap m, and return all ops for which
     t <= tmax
   *)
  let l = ref [] in
  ( try
      FloatMap.iter
	(fun t ops ->
	   if t > tmax then raise Exit;
	   l := (t,ops) :: !l
	)
	m
    with
      | Exit -> ()
  );
  List.rev !l


exception Term of Unixqueue_util.group
  (* Extra (Term g) is now the group termination event for g *)

exception Keep_alive
  (* Sometimes used to keep the event system alive *)

exception Exit_loop


let () =
  Netexn.register_printer
    (Term nogroup)
    (function
       | Term g ->
	   if g = nogroup then
	     "Term(nogroup)"
	   else
	     "Term(" ^ string_of_int (Oo.id g) ^ ")"
       | _ -> assert false
    )


let pset_set (pset:Netsys_pollset.pollset) fd (i,o,p) =
  if not i && not o && not p then
    pset # remove fd
  else
    pset # add fd (Netsys_posix.poll_req_events i o p)


let pset_find (pset:Netsys_pollset.pollset) fd =
  try Netsys_posix.poll_req_triple(pset#find fd)
  with
    | Not_found -> (false,false,false)



let op_of_event ev =
  match ev with
    | Unixqueue_util.Input_arrived(_,fd)    -> Unixqueue_util.Wait_in fd
    | Unixqueue_util.Output_readiness(_,fd) -> Unixqueue_util.Wait_out fd
    | Unixqueue_util.Out_of_band(_,fd)      -> Unixqueue_util.Wait_oob fd
    | Unixqueue_util.Timeout(_,op)          -> op
    | _ -> assert false

let rec list_mem_op op l =
  match l with
    | h :: t ->
	is_op_eq h op || list_mem_op op t
    | [] ->
	false

let while_locked mutex f =
  Netsys_oothr.serialize mutex f ()


let escape_lock mutex f =
  mutex # unlock();
  let r = 
    try f ()
    with e -> mutex # lock(); raise e in
  mutex # lock();
  r
 

let flatten_map f l =
  (* = List.flatten (List.map f l) *)
  let rec loop l h =
    match l with
      | [] -> List.rev h
      | x :: l' ->
	  let y = f x in
	  loop l' (List.rev_append y h) in
  loop l []


(* A little encapsulation so we can easily identify handlers by Oo.id *)
class ohandler (h:handler) = 
object
  method run esys eq ev =
    h esys eq ev
end


class pollset_event_system (pset : Netsys_pollset.pollset) =
  let mtp = !Netsys_oothr.provider in
  let is_mt = not (mtp#single_threaded) in
  let sys = ref (lazy (assert false)) in   (* initialized below *)
  let ops_of_group = 
    (Hashtbl.create 10 : (group, OpSet.t) Hashtbl.t) in
        (* is for [clear] only *)
  let tmo_of_op =
    (OpTbl.create 10 : (float * float * group * bool) OpTbl.t) in
      (* first number: duration of timeout (or -1)
         second number: point in time (or -1)
	 bool: whether strong
       *)
  let ops_of_tmo = ref(FloatMap.empty : OpSet.t FloatMap.t) in
  let strong_ops = ref 0 in (* number of strong (non-weak) ops *)

  let aborting = ref false in
  let close_tab = 
    (Hashtbl.create 10 :
       (Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t) in
  let abort_tab = 
     (Hashtbl.create 10 : (group, (group -> exn -> unit)) Hashtbl.t) in
  let handlers = (Hashtbl.create 10 : (group, ohandler list) Hashtbl.t) in
  let handled_groups = ref 0 in
            (* the number of keys in [handlers] *)

  let waiting = ref false in
  let when_blocking = ref (fun () -> ()) in

  let mutex = mtp # create_mutex() in


  let event_of_op_wl_nf op =
      let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *)
      match op with
	| Unixqueue_util.Wait_in fd ->
	    Unixqueue_util.Input_arrived(g,fd)
	| Unixqueue_util.Wait_out fd ->
	    Unixqueue_util.Output_readiness(g,fd)
	| Unixqueue_util.Wait_oob fd ->
	    Unixqueue_util.Out_of_band(g,fd)
	| Unixqueue_util.Wait _ ->
	    assert false in

  let events_of_op_wl op =
    try 
      [ event_of_op_wl_nf op ]
    with
      | Not_found ->
	  (* A "ghost event", i.e. there is no handler anymore
             for it, but wasn't deleted quickly enough from 
             pset
           *)
	  [] in

  let tmo_event_of_op_wl_nf op =
    let (_,_,g,_) = OpTbl.find tmo_of_op op in (* or Not_found *)
    Unixqueue_util.Timeout(g,op) in

  let tmo_events_of_op_wl op =
    try 
      [ tmo_event_of_op_wl_nf op ]
    with
      | Not_found -> [] (* Ghost event, see above *) in
	    

  let add_event_wl e =
    Equeue.add_event (Lazy.force !sys) e;
    pset # cancel_wait true
      (* Set the cancel bit, so that any pending [wait] is interrupted *) in


object(self)

  initializer (
    let equeue_sys = Equeue.create ~string_of_event self#source in
    sys := lazy equeue_sys;
    ignore(Lazy.force !sys);
    (* Add ourselves now at object creation time. The only drawback is that
       we no longer raise [Equeue.Out_of_handlers] - but this is questionable
       anyway since the addition of [Immediate] events.

       In order to generate [Out_of_handlers] we would have to count
       [Immediate] events in addition to handlers.
     *)
    self#equeue_add_handler ()
  )


  method private source  _sys =
    (* locking: the lock is not held when called, because we are called back
       from Equeue
     *)
    assert(Lazy.force !sys == _sys);

    !when_blocking();

    let dbg = !Unixqueue_util.Debug.enable in

    let locked = ref true in   (* keep track of locking state *)
    if is_mt then
      mutex # lock();
    
    let t0 = Unix.gettimeofday() in
    let tmin = try min_key !ops_of_tmo with Not_found -> (-1.0) in
    let delta = if tmin < 0.0 then (-1.0) else max (tmin -. t0) 0.0 in

    if dbg then
      dlogr (fun () ->
	       sprintf "t0 = %f,   #tmo_of_op = %d" 
		 t0 (OpTbl.length tmo_of_op)
	    );
    
    let nothing_to_do =
      (* For this test only non-weak resources count, so... *)
      !strong_ops = 0 in

    let have_eintr = ref false in

    let pset_events = 
      try
	if nothing_to_do then (
	  if dbg then
	    dlogr (fun () -> "nothing_to_do");
	  []
	)
	else (
	  if dbg then
	    dlogr (fun () -> (
		     let ops = 
		       OpTbl.fold (fun op _ l -> op::l) tmo_of_op [] in
		     let op_str = 
		       String.concat ";" (List.map string_of_op ops) in
		     sprintf "wait tmo=%f ops=<%s>" delta op_str));
	  (* Reset the cancel bit immediately before calling [wait]. Any
             event added up to now is considered anyway by [wait] because
             our lock is still held. Any new event added after we unlock will
             set the cancel_wait flag, and cause the [wait] to exit (if it is
             still running).
	   *)
	  pset # cancel_wait false;
	  waiting := true;
	  if is_mt then
	    mutex # unlock();
	  locked := false;
	  pset # wait delta
	)
      with
	| Unix.Unix_error(Unix.EINTR,_,_) ->
	    if dbg then
	      dlogr (fun () -> "wait signals EINTR");
	    have_eintr := true;
	    []
	| e ->   
	    (* Usually from [wait], but one never knows... *)
	    if !locked && is_mt then mutex#unlock();
	    waiting := false;
	    raise e
    in

    waiting := false;
    if not !locked && is_mt then mutex # lock();
    locked := true;
    try  
      (* Catch exceptions and unlock *)

      if dbg then
	dlogr (fun () -> (
		 sprintf "wait returns <%d pset events>" 
		   (List.length pset_events)));
      
      let t1 = Unix.gettimeofday() in
      if dbg then
	dlogr (fun () -> (sprintf "t1 = %f" t1));
      (* t1 is the reference for determining the timeouts *)

      (* while waiting somebody might have removed resouces, so ... *)
      if OpTbl.length tmo_of_op = 0 then
	pset # dispose();

      let operations = 	(* flatten_map *)
	(* The possible operations *)
	List.fold_left
	  (fun acc (fd,ev_in,ev_out) ->
	     (* Note that POLLHUP and POLLERR can also mean that we
                have data to read/write!
	      *)
	     let (in_rd,in_wr,in_pri) = 
	       Netsys_posix.poll_req_triple ev_in in
	     let out_rd = 
	       Netsys_posix.poll_rd_result ev_out in 
	     let out_wr = 
	       Netsys_posix.poll_wr_result ev_out in 
	     let out_pri = 
	       Netsys_posix.poll_pri_result ev_out in 
	     let out_hup = 
	       Netsys_posix.poll_hup_result ev_out in 
	     let out_err = 
	       Netsys_posix.poll_err_result ev_out in 
	     let have_input  = 
	       in_rd && (out_rd || out_hup || out_err) in
	     let have_output =
	       in_wr && (out_wr || out_hup || out_err) in
	     let have_pri =
	       in_pri && (out_pri || out_hup || out_err) in
	     let e1 = 
	       if have_pri then Unixqueue_util.Wait_oob fd :: acc else acc in
	     let e2 = 
	       if have_input then Unixqueue_util.Wait_in fd :: e1 else e1 in
	     let e3 = 
	       if have_output then Unixqueue_util.Wait_out fd :: e2 else e2 in
	     e3
	  )
	  []
	  pset_events in

      let ops_timed_out =
	ops_until t1 !ops_of_tmo in
      (* Note: this _must_ include operations until <= t1 (not <t1), otherwise
         a timeout value of 0.0 won't work
       *)

      let ops_timed_out_l =
	(* Determine the operations in [tmo_of_op] that have timed
           out and that are not in [operations]
	   FIXME: [List.mem op operations] is not scalable.
	 *)
	List.fold_left
	  (fun oacc (_, ops) ->
	     OpSet.fold
	       (fun op iacc ->
		  if list_mem_op op operations then iacc else op::iacc) 
	       ops 
	       oacc
	  )
	  []
	  ops_timed_out in

      if dbg then (
	dlogr
	  (fun() -> 
	     sprintf "delivering events <%s>"
	       (String.concat ";" 
		  (flatten_map
		     (fun op ->
			List.map string_of_event (events_of_op_wl op)
		     )
		     operations
		  )));
	dlogr
	  (fun() -> 
	     sprintf "delivering timeouts <%s>"
	       (String.concat ";" 
		  (flatten_map
		     (fun op ->
			List.map string_of_event (tmo_events_of_op_wl op)
		     )
		     ops_timed_out_l
		  )));
      );
      
      (* deliver events *)
      let delivered = ref false in
      let deliver get_ev op =
	try 
	  let ev = get_ev op in
	  delivered := true;
	  Equeue.add_event _sys ev
	with Not_found -> () in
      List.iter (deliver event_of_op_wl_nf)     operations;
      List.iter (deliver tmo_event_of_op_wl_nf) ops_timed_out_l;

      if !have_eintr then (
	dlogr (fun () -> "delivering Signal");
	Equeue.add_event _sys Unixqueue_util.Signal
      ) else
	if not !delivered && not nothing_to_do then (
          (* Ensure we always add an event to keep the event loop running: *)
	  if dbg then
	    dlogr (fun () -> "delivering Keep_alive");
	  Equeue.add_event _sys (Unixqueue_util.Extra Keep_alive)
	);
    
      (* Update ops_of_tmo: *)
      List.iter
	(fun (t,_) ->
	   ops_of_tmo := FloatMap.remove t !ops_of_tmo
	)
	ops_timed_out;

      (* Set a new timeout for all delivered events:
         (Note that [pset] remains unchanged, because the set of watched
         resources remains unchanged.)
	 rm_done is true when the old timeout is already removed from
	 ops_of_tmo.
       *)
      let update_tmo rm_done oplist =
	List.iter
	  (fun op ->
	     try
	       let (tmo,t1,g,is_strong) = 
		 OpTbl.find tmo_of_op op in (* or Not_found *)
	       if tmo >= 0.0 then (
		 let t2 = t1 +. tmo in
		 self#sched_upd_tmo_wl g op tmo t2 is_strong 
		   (if rm_done then (-1.0) else t1)
	       )
	     with
	       | Not_found -> ()
		   (* It is possible that resources were removed while
		      we were waiting for events. This can lead to
		      [Not_found] here. We just ignore this.
		    *)
	  )
	  oplist in
      update_tmo false operations;
      update_tmo true  ops_timed_out_l;

      if is_mt then mutex # unlock();
      locked := false

    with
      | e ->
	  (* exceptions are unexpected, but we want to make sure not to mess
             with locks
	   *)
	  if !locked && is_mt then mutex # unlock();
	  raise e


  (* Note: suffix _wl = while locked *)

  method private sched_remove_wl op =
    try
      let tmo, t1, g, is_strong = OpTbl.find tmo_of_op op in  (* or Not_found *)
      dlogr(fun () -> (sprintf "sched_remove %s" (string_of_op op)));
      OpTbl.remove tmo_of_op op;
      if is_strong then decr strong_ops;
      ( try
	  let l_ops =
	    if tmo >= 0.0 then
	      FloatMap.find t1 !ops_of_tmo 
	    else raise Not_found in
	  let l_ops' =
	    OpSet.remove op l_ops in
	  if l_ops' = OpSet.empty then
	    ops_of_tmo := FloatMap.remove t1 !ops_of_tmo
	  else
	    ops_of_tmo := FloatMap.add t1 l_ops' !ops_of_tmo
	with Not_found -> ()
      );
      if Oo.id g <> nogroup_id then (
	let old_set = Hashtbl.find ops_of_group g in
	let new_set = OpSet.remove op old_set in
	if new_set = OpSet.empty then
	  Hashtbl.remove ops_of_group g
	else
	  Hashtbl.replace ops_of_group g new_set
      )

    with
      | Not_found -> ()


  method private pset_remove_wl op =
    match op with
      | Wait_in fd ->
	  let (i,o,p) = pset_find pset fd in
	  pset_set pset fd (false,o,p)
      | Wait_out fd ->
	  let (i,o,p) = pset_find pset fd in
	  pset_set pset fd (i,false,p)
      | Wait_oob fd ->
	  let (i,o,p) = pset_find pset fd in
	  pset_set pset fd (i,o,false)
      | Wait _ ->
	  ()
	    

  method private sched_add_wl g op tmo t1 is_strong =
    dlogr(fun () -> (sprintf "sched_add %s tmo=%f t1=%f is_strong=%b"
		       (string_of_op op) tmo t1 is_strong));
    OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong);
    if is_strong then
      incr strong_ops;
    let l_ops =
      try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in
    if tmo >= 0.0 then
      ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops) !ops_of_tmo;
    if Oo.id g <> nogroup_id then (
      let old_set =
	try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in
      let new_set =
	OpSet.add op old_set in
      Hashtbl.replace ops_of_group g new_set
    )

  method private pset_add_wl op =
    match op with
      | Wait_in fd ->
	  let (i,o,p) = pset_find pset fd in
	  pset_set pset fd (true,o,p)
      | Wait_out fd ->
	  let (i,o,p) = pset_find pset fd in
	  pset_set pset fd (i,true,p)
      | Wait_oob fd ->
	  let (i,o,p) = pset_find pset fd in
	  pset_set pset fd (i,o,true)
      | Wait _ ->
	  ()

  method private sched_upd_tmo_wl g op tmo t1 is_strong old_t1 =
    (* only for tmo>=0 *)
    dlogr(fun () -> (sprintf "sched_upd_tmo %s tmo=%f t1=%f is_strong=%b"
		       (string_of_op op) tmo t1 is_strong));
    OpTbl.replace tmo_of_op op (tmo, t1, g, is_strong);

    (* We assume old_t1 is already removed form ops_of_tmo if old_t1 < 0 *)
    if old_t1 >= 0.0 then (
      try
	let l_ops =
	  FloatMap.find old_t1 !ops_of_tmo in
	let l_ops' =
	  OpSet.remove op l_ops in
	if l_ops' = OpSet.empty then
	  ops_of_tmo := FloatMap.remove old_t1 !ops_of_tmo
	else
	  ops_of_tmo := FloatMap.add old_t1 l_ops' !ops_of_tmo
      with Not_found -> ()
    );

    let l_ops_new =
      try FloatMap.find t1 !ops_of_tmo with Not_found -> OpSet.empty in
    ops_of_tmo := FloatMap.add t1 (OpSet.add op l_ops_new) !ops_of_tmo

  method exists_resource op =
    while_locked mutex
      (fun () -> self # exists_resource_wl op)

  method private exists_resource_wl op =
    OpTbl.mem tmo_of_op op


  method private exists_descriptor_wl fd =
    self#exists_resource_wl (Unixqueue_util.Wait_in fd) ||
    self#exists_resource_wl (Unixqueue_util.Wait_out fd) ||
    self#exists_resource_wl (Unixqueue_util.Wait_oob fd)


  method add_resource g (op, tmo) =
    while_locked mutex
      (fun () ->
	 self # add_resource_wl g (op, tmo) true
      )

  method add_weak_resource g (op, tmo) =
    while_locked mutex
      (fun () ->
	 self # add_resource_wl g (op, tmo) false
      )


  method private add_resource_wl g (op, tmo) is_strong =
    if g # is_terminating then
      invalid_arg "Unixqueue.add_resource: the group is terminated";
    if not (OpTbl.mem tmo_of_op op) then (
      self#pset_add_wl op;
      let t1 = if tmo < 0.0 then tmo else Unix.gettimeofday() +. tmo in
      self#sched_add_wl g op tmo t1 is_strong;
      (* Multi-threading: interrupt [wait] *)
      pset # cancel_wait true;
      (* CHECK: In the select-based impl we add Keep_alive to equeue.
              This idea (probably): If [wait] is about to return, and the
              queue becomes empty, the whole esys terminates. The Keep_alive
              prevents that. 
              My current thinking is that this delays the race condition
              a bit, but does not prevent it from happening
       *)
    )
      (* Note: The second addition of a resource is silently ignored...
              Maybe this should be fixed, so that the timeout can be lowered
       *)


  method remove_resource g op =
    while_locked mutex
      (fun () ->
	 if g # is_terminating then
	   invalid_arg "remove_resource: the group is terminated";
	 let _, t1, g_found, _ = OpTbl.find tmo_of_op op in
	 if Oo.id g <> Oo.id g_found then
	   failwith "remove_resource: descriptor belongs to different group";
	 self#sched_remove_wl op;
	 self#pset_remove_wl op;

	 if not !waiting && OpTbl.length tmo_of_op = 0 then
	   pset # dispose();

	 pset # cancel_wait true;    (* interrupt [wait] *)
	 (* is there a close action ? *)
	 let fd_opt =
	   match op with
             | Wait_in  d -> Some d
             | Wait_out d -> Some d
             | Wait_oob d -> Some d
             | Wait _      -> None in
	 match fd_opt with
	   | Some fd ->
               if not !aborting then (
		 let action = 
		   try Some (snd(Hashtbl.find close_tab fd)) 
		   with Not_found -> None in
		 match action with
		   | Some a ->
                       (* any open resource? *)
		       (* FIXME MT: We don't know yet whether fd can be closed.
                          This shouldn't be done before [wait] returns.
                        *)
                       if not (self#exists_descriptor_wl fd) then (
			 dlogr 
			   (fun () -> 
			      (sprintf "remove_resource \
                                        <running close action for fd %s>"
                                 (string_of_fd fd)));
			 Hashtbl.remove close_tab fd;
			 escape_lock mutex (fun () -> a fd);
                       )
		   | None ->
                       ()
               )
	   | None -> ()
      )


  method new_group () =
    new group_object

  method new_wait_id () =
    new wait_object


  method add_close_action g (d,a) =
    while_locked mutex
      (fun () ->
	 if g # is_terminating then
	   invalid_arg "add_close_action: the group is terminated";
	 (* CHECK: Maybe we should fail if g is a different group than
          * the existing group
	  *)
	 if self#exists_descriptor_wl d then
	   Hashtbl.replace close_tab d (g,a)
             (* There can be only one close action.
              * TODO: Rename to set_close_action
              *)
	 else
	   failwith "add_close_action"
      )


  method add_abort_action g a =
    while_locked mutex
      (fun () ->
	 if g # is_terminating then
	   invalid_arg "add_abort_action: the group is terminated";
	 if Oo.id g = nogroup_id then
	   invalid_arg "add_abort_action: the group is nogroup";
	 Hashtbl.replace abort_tab g a
      )

  method add_event e =
    while_locked mutex
      (fun () -> add_event_wl e)


  method private uq_handler (esys : event Equeue.t) ev =
    (* locking: it is assumed that we do not have the lock when uq_handler
       is called. It is a callback from Equeue
     *)
    (* The single Unixqueue handler. For all added (sub) handlers, uq_handler
     * gets the events, and delivers them
     *)

    let terminate_handler_wl g h =
      if !Unixqueue_util.Debug.enable then
	dlogr
	  (fun() -> 
	     (sprintf "uq_handler <terminating handler group %d, handler %d>"
		(Oo.id g) (Oo.id h)));
      let hlist =
        try Hashtbl.find handlers g with Not_found -> [] in
      let hlist' =
        List.filter (fun h' -> Oo.id h' <> Oo.id h) hlist in
      if hlist' = [] then (
        Hashtbl.remove handlers g;
        handled_groups := !handled_groups - 1;
(*
        if handled_groups = 0 then (
	  dlogr (fun () -> "uq_handler <self-terminating>");
          raise Equeue.Terminate  (* delete uq_handler from esys *)
	)
 *)
      ) else (
        Hashtbl.replace handlers g hlist'
      )
    in

    let rec forward_event_to g (hlist : ohandler list) =
      (* The caller does not have the lock when this fn is called! *)
      match hlist with
          [] ->
	    if !Unixqueue_util.Debug.enable then
	      dlogr (fun () -> "uq_handler <empty list>");
            raise Equeue.Reject
        | h :: hlist' ->
            ( try
                (* Note: ues _must not_ be locked now *)
		if !Unixqueue_util.Debug.enable then
		  dlogr
		    (fun () -> 
		       (sprintf 
			  "uq_handler <invoke handler group %d, handler %d>"
			  (Oo.id g) (Oo.id h)));
                h#run (self :> event_system) esys ev;
		if !Unixqueue_util.Debug.enable then
		  dlogr
		    (fun () -> 
		       (sprintf 
			  "uq_handler <invoke_success handler group %d, handler %d>"
			  (Oo.id g) (Oo.id h)));
              with
                  Equeue.Reject ->
                    forward_event_to g hlist'
                | Equeue.Terminate ->
                    (* Terminate only this handler. *)
                    while_locked mutex
		      (fun () -> terminate_handler_wl g h)
                    (* Any error exceptions simply fall through. Equeue will
                     * catch them, and will add the event to the error queue
                     *)
            )
    in

    let forward_event g =
      (* The caller does not have the lock when this fn is called! *)
      if !Unixqueue_util.Debug.enable then
	dlogr
	  (fun () ->
	     (sprintf "uq_handler <forward_event group %d>" (Oo.id g)));
      let hlist =
        while_locked mutex
	  (fun () -> 
             try Hashtbl.find handlers g with Not_found -> []) in
      forward_event_to g hlist
    in

    let forward_event_to_all() =
      (* The caller does not have the lock when this fn is called! *)
      let hlist_all =
	while_locked mutex
	  (fun () -> 
             Hashtbl.fold (fun g hlist l -> (g,hlist) :: l) handlers []) in
      try
        List.iter
          (fun (g,hlist) ->
             try
               forward_event_to g hlist;
               raise Exit_loop   (* event is delivered, so exit iteration *)
             with
                 (* event is rejected: try next group *)
                 Equeue.Reject -> ()
          )
          hlist_all;
        raise Equeue.Reject (* no handler has accepted the event, so reject *)
      with
          Exit_loop -> ()
    in

    if !Unixqueue_util.Debug.enable then
      dlogr
	(fun () ->
	   (sprintf "uq_handler <event %s>"
	      (string_of_event ev)));

    match ev with
      | Extra (Term g) ->
          (* Terminate all handlers of group g *)
	  while_locked mutex
	    (fun () ->
               if Hashtbl.mem handlers g then (
		 dlogr
		   (fun () ->
		      (sprintf "uq_handler <terminating group %d>" (Oo.id g)));
		 Hashtbl.remove handlers g;
		 handled_groups := !handled_groups - 1;
		 (*
		 if handled_groups = 0 then (
		   dlogr (fun () -> "uq_handler <self-terminating>");
		   raise Equeue.Terminate  (* delete uq_handler from esys *)
		 )
		  *)
               )
               else raise Equeue.Reject (* strange, should not happen *)
	    )
      | Extra Keep_alive ->
          raise Equeue.Reject
      | Input_arrived(g,_) ->
          if g # is_terminating then raise Equeue.Reject;
          forward_event g;
      | Output_readiness(g,_) ->
          if g # is_terminating then raise Equeue.Reject;
          forward_event g;
      | Out_of_band(g,_) ->
          if g # is_terminating then raise Equeue.Reject;
          forward_event g;
      | Timeout(g,_) ->
          if g # is_terminating then raise Equeue.Reject;
          forward_event g;
      | Signal ->
          forward_event_to_all();
      | Extra x ->
          forward_event_to_all();
      | Immediate(g,f) ->
	  if g # is_terminating then raise Equeue.Reject;
	  ( try f()
	    with Equeue.Terminate -> ()
	  )

  method private equeue_add_handler () =
    Equeue.add_handler (Lazy.force !sys) self#uq_handler

  (* CHECK: There is a small difference between Equeue.add_handler and
   * this add_handler: Here, the handler is immediately active (if
   * uq_handler is already active). Can this lead to problems?
   *)

  method add_handler g h =
    while_locked mutex
      (fun () ->
	 let oh = new ohandler h in
	 dlogr
	   (fun () ->
	      (sprintf
		 "add_handler <group %d, handler %d>" (Oo.id g) (Oo.id oh)));
	 
	 if g # is_terminating then
	   invalid_arg "Unixqueue.add_handler: the group is terminated";
	 
	 ( try
             let old_handlers = Hashtbl.find handlers g in
             Hashtbl.replace handlers g (oh :: old_handlers)
	   with
             | Not_found ->
		 (* The group g is new *)
		 Hashtbl.add handlers g [oh];
		 handled_groups := !handled_groups + 1;
		 (* if handled_groups = 1 then
		   self#equeue_add_handler ()
		  *)
	 )
      )

  method clear g =
    if Oo.id g = nogroup_id then
      invalid_arg "Unixqueue.clear: nogroup";
    while_locked mutex
      (fun () -> self # clear_wl g)


  method private clear_wl g =
    dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g)));
    
    (* Set that g is terminating now: *)
    g # terminate();
    
    (* (i) delete all resources of g: *)
    let ops = 
      try Hashtbl.find ops_of_group g with Not_found -> OpSet.empty in
    OpSet.iter
      self#sched_remove_wl
      ops;
    OpSet.iter
      self#pset_remove_wl
      ops;
    Hashtbl.remove ops_of_group g;

    (* (ii) delete all handlers of g: *)
    add_event_wl (Extra (Term g));
    (* side effect: we also interrupt [wait] *)

    (* (iii) delete special actions of g: *)
    let to_remove =   (* remove from close_tab *)
      Hashtbl.fold
        (fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in
    List.iter
      (Hashtbl.remove close_tab) to_remove;
    
    Hashtbl.remove abort_tab g;

    (* Note: the Term event isn't caught after all handlers have been
     * deleted. The Equeue module simply discards events that are not
     * handled.
     *)

    if not !waiting && OpTbl.length tmo_of_op = 0 then
      pset # dispose();


  method private abort g ex =
    (* caller doesn't have the lock *)
    (* Note: If g has been terminated, the abort action is removed. So
     * we will never find here one.
     *)
    dlogr (fun () -> (sprintf "abort <group %d, exception %s>"
                         (Oo.id g) (Netexn.to_string ex)));
    let action =
      while_locked mutex
	(fun () ->
	   try Some (Hashtbl.find abort_tab g) with Not_found -> None) in
    match action with
      | Some a ->
          begin
            dlogr (fun () -> "abort <running abort action>");
            let mistake = ref None in
	    while_locked mutex 
	      (fun () -> aborting := true);
            begin try
              a g ex;
            with
              | any ->
                  mistake := Some any (* Wow *)
            end;
	    while_locked mutex 
	      (fun () ->
		 self#clear_wl g;
		 aborting := false
	      );
            match !mistake with
              | None -> ()
              | Some m ->
                  dlogr (fun () -> (sprintf "abort <propagating exception %s>"
                                       (Netexn.to_string m)));
                  raise m
          end
      | None ->
          ()

  method run () =
    (* caller doesn't have the lock *)
    let continue = ref true in
    try
      while !continue do
        continue := false;
        try
          Equeue.run (Lazy.force !sys);
        with
          | Abort (g,an_exception) ->
              begin
                match an_exception with
                  | (Equeue.Reject|Equeue.Terminate) ->
                      (* A serious programming error: *)
                      failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error"
                  | Abort(_,_) ->
                      failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error"
                  | _ -> ()
              end;
              self#abort g an_exception;
              continue := true
      done;
    with
      | error ->
	  pset # dispose();
          raise error

  method is_running =
    Equeue.is_running (Lazy.force !sys)


  method when_blocking f =
    when_blocking := f

end


let pollset_event_system pset = 
  (new pollset_event_system pset :> Unixqueue_util.event_system)


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml