Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: unixqueue_select.ml 1262 2009-08-31 18:14:21Z gerd $ *)

open Printf
open Unixqueue_util



class type sb_event_system =
object
  (* Public interface *)
  method new_group : unit -> group
  method new_wait_id : unit -> wait_id
  method exists_resource : operation -> bool
  method add_resource : group -> (operation * float) -> unit
  method add_weak_resource : group -> (operation * float) -> unit
  method add_close_action : group -> (Unix.file_descr * (Unix.file_descr -> unit)) -> unit
  method add_abort_action : group -> (group -> exn -> unit) -> unit
  method remove_resource : group -> operation -> unit
  method add_handler : group -> (event_system -> event Equeue.t -> event -> unit) -> unit
  method add_event : event -> unit
  method clear : group -> unit
  method run : unit -> unit
  method is_running : bool
  (* Protected interface *)
  method private setup : unit -> (Unix.file_descr list * Unix.file_descr list * Unix.file_descr list * float)
  method private queue_events : (Unix.file_descr list * Unix.file_descr list * Unix.file_descr list) -> bool
  method private source : event Equeue.t -> unit
end

(**********************************************************************)
(* A set of file descriptors:                                         *)
(**********************************************************************)

module Fdescr = struct
  type t = Unix.file_descr

  let compare (a:t) (b:t) = Pervasives.compare a b

end;;

module Fd_Set = Set.Make(Fdescr);;

let set_of_list =
  List.fold_left (fun set x -> Fd_Set.add x set) Fd_Set.empty 
;;

(**********************************************************************)

exception Term of group
  (* Extra (Term g) is now the group termination event for g *)

exception Keep_alive
  (* Sometimes used to keep the event system alive *)


module RID = struct
  (* RID = resource identifier, i.e. the operation *)
  type t = operation

  (* let compare (rid1:t) (rid2:t) =
        Pervasives.compare rid1 rid2
   *)

  let equal (rid1:t) (rid2:t) =
    rid1 = rid2
    
  let hash =
    match Sys.os_type with
      | "Win32" ->
	  Hashtbl.hash
      | _ ->
	  (fun (rid:t) ->
	     match rid with
	       | Wait_in fd -> (Obj.magic fd : int)
	       | Wait_out fd -> 1031 + (Obj.magic fd : int)
	       | Wait_oob fd -> 2029 + (Obj.magic fd : int)
	       | Wait wid -> 3047 + Oo.id wid
	  )


end

(* module RID_Map = Map.Make(RID) *)
module RID_Table = Hashtbl.Make(RID)

exception Exists;;

let rid_map_exists f map =
  try
    RID_Table.iter
      (fun k v -> if f k v then raise Exists)
      map;
    false
  with
      Exists -> true
;;



(* The control pipe:
 * 
 * The control pipe is only used in multi-threaded programs: if thread
 * A adds an event/resource to the event system while thread B runs the
 * system (and blocks). In this case, A must wake up B such that B can
 * react on the changed conditions of the system.
 *
 * The control pipe has a read end and a write end. The read end is added
 * to the watched resources such that when bytes arrive thread B wakes
 * up. Thread A signals changed conditions by writing a single byte into
 * the write end of the control pipe.
 *
 * The control pipe is now created every time [run] is called, and it is
 * closed before this method is left. This avoids that we run out of 
 * file descriptors when lots of queues are created. 
 *)

(**********************************************************************)
(* The class unix_event_system                                        *)
(**********************************************************************)


class select_based_event_system () =
  let mtp = !Netsys_oothr.provider in
object(self)

  val mutable sys = lazy (assert false)   (* initialized below *)
  val mutable res = (RID_Table.create 47 : resource_prop RID_Table.t)
            (* current resources. Note that this map may contain terminated
               groups! These groups are first removed at the next [setup]
               time.
	     *)
  val mutable strong_res = (RID_Table.create 47 : unit RID_Table.t)
            (* only the non-weak resources *)
  val mutable res_may_contain_terminated_groups = false
  val mutable new_res = (RID_Table.create 47 : (resource_prop*bool) RID_Table.t)
            (* added resources. Note that this map may contain terminated
               groups! These groups are first removed at the next [setup]
               time.
               The bool is the is_strong flag.
	     *)
  val mutable close_tab = (Hashtbl.create 10 : (Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t)
  val mutable abort_tab = ([] : (group * (group -> exn -> unit)) list)
  val mutable aborting = false
  val mutable handlers = (Hashtbl.create 10 : (group, handler list) Hashtbl.t)
  val mutable handled_groups = 0
            (* the number of keys in [handlers] *)
  val mutable mutex = mtp#create_mutex()
  val mutable blocking = false
  val mutable ctrl_pipe_rd = None
  val mutable ctrl_pipe_wr = None
  val mutable ctrl_pipe_group = new Unixqueue_util.group_object

  (**********************************************************************)
  (* Implementation of the queue                                        *)
  (**********************************************************************)

  initializer
    let equeue_sys = 
      Equeue.create 
	~string_of_event:Unixqueue_util.string_of_event self#source in
    sys <- lazy equeue_sys;
    ignore(Lazy.force sys);

  method private setup () =
    (* Find out which resources should be watched for interesting events *)

    (* CHECK: Can we do this faster? Only new_res? *)
    let tref = Unix.gettimeofday () in

    if res_may_contain_terminated_groups then (
      (* This is the right time to remove references to terminated groups
         from [res]. We don't do it directly in [clear] to avoid bad
         performance when there are a lot of resources.
       *)
      let to_del =
	RID_Table.fold 
	  (fun op (g,_,_) acc ->
	     if g#is_terminating then op :: acc else acc
	  )
	  res
	  [] in
      List.iter
	(fun op ->
	   RID_Table.remove res op;
	   RID_Table.remove strong_res op;
	)
	to_del;
      res_may_contain_terminated_groups <- false;
    );

    if RID_Table.length new_res <> 0 then begin
      (* Append new_res to res, and change negative tlast to tref.
       * A negative event time tlast indicates that the
       * resource has just been added; such a resource is handled as if 
       * the last event has just been seen.
       *)    
      RID_Table.iter
	(fun op ((g, tout, _), is_strong) ->
	   if not(g#is_terminating) then (
	     RID_Table.replace res op (g, tout, ref tref);
	     if is_strong then
	       RID_Table.replace strong_res op ()
	   );
	)
	new_res;
      RID_Table.clear new_res
    end;

    dlogr (fun () ->
	     let reslst =
	       RID_Table.fold
		 (fun op (g, tout, tlast) acc ->
		    (sprintf "%s => group %d, timeout %f, lastevent %f"
		       (string_of_op op) (Oo.id g) tout !tlast) :: acc)
		 res
		 [] in
	     sprintf "setup <resources: %s>"
	       (String.concat "; " reslst));

    (* When we only have weak resources, or nothing, we return the empty set *)
    if RID_Table.length strong_res <> 0 then begin
      let infiles, outfiles, oobfiles, time =
	(* (infiles, outfiles, oobfiles, time): Lists of file descriptors
	 * that must be observed and the maximum period of time until
	 * something must have been happened (-1 means infinite period)
	 *)
	RID_Table.fold
	  (fun op (g, tout,tlast) (inf, outf, oobf, t) ->
	     (* (inf, outf, oobf, t): Current intermediate result. The descrip-
	      *      tors inf, outf, oobf are already known to be observed, and
	      *      t is the smallest timeout value of these descriptors
	      * rid = (g,op):
	      *      The resource being examined. g is the group; op is
	      *      the operation
	      * tout: The timeout value of the operation
	      * tlast: The point in time when the resource caused last an
	      *      event
	      *)
	     (* t': Compute the smallest timeout value if r is considered to
	      *     happen
	      *)
	     let t' =
	       if tout < 0.0 then 
		 (* Infinite timeout was specified for r; so t' = t *)
		 t 
	       else 
		 (* tref -. tlast: time since the last event from resource r
		  * tout -. (tref -. tlast): how much time may elapse until
		  *     the timeout happens. If this number is negative
		  *     the timeout should already have been happened.
		  * tdelta: how much time may elapse until the timeout
		  *     happens in the future. No negative values.
		  *)
		 let tdelta = max 0.0 (tout -. (tref -. !tlast)) in
		 if t < 0.0 then 
		   (* t: was infinite timeout, and t' is now tdelta *)
		   tdelta 
		 else 
		   (* t: was finite timeout, and t' is the minimum *)
		   min t tdelta
	     in
	     (* Add the descriptor contained in op to one of the descriptor
	      * lists
	      *)
	     match op with
		 Wait_in  d -> (d :: inf, outf,      oobf,      t')
	       | Wait_out d -> (inf,      d :: outf, oobf,      t')
	       | Wait_oob d -> (inf,      outf,      d :: oobf, t')
	       | Wait _     -> (inf,      outf,      oobf,      t')
		   (* NOTE: In previous version of this library, we stored the
		    * group into the three lists intf, outf, oobf. The group can
		    * be retrieved by looking at res at any time, and it
		    * cannot be changed. So we do not do this any longer.
		    *)
	  )
	  res
	  ([],[],[],(-1.0))
      in
      (infiles, outfiles, oobfiles, time)
    end
    else ([],[],[],(-1.0))


  method private queue_events (infiles', outfiles', oobfiles') =
    let dummy_buf = String.create 1 in
    let deferred_exn = ref None in
    let have_event = ref false in
  
    (* Compare the watched resources infiles/outfiles/oobfiles with the
     * actually happened system events infiles'/outfiles'/oobfiles',
     * and add the events to the queue resulting from that.
     *)

    let read_ctrl_pipe() =
      (* Read bytes from the control pipe and ignore them. The control pipe
       * is in non-blocking mode.
       * Problem: Compiled with -vmthread, there is a bug in Unix.read
       * that EAGAIN is not returned. Instead, reading is restarted. To
       * avoid this, we first test with Unix.select, which seems to work
       * correctly.
       * Update: Does not work. Don't know how to fix this problem.
       *)
      match ctrl_pipe_rd with
	| None ->
	    ()   (* this should not happen *)
	| Some p ->
	    ( try
		while true do
		  let have_ctrl_data = Netsys.is_readable `Read_write p in
		  if not have_ctrl_data then
		    raise(Unix.Unix_error(Unix.EAGAIN,"(artificial)",""));
		  ignore(Unix.read p dummy_buf 0 1)
		done;
		assert false
	      with
		  Unix.Unix_error(Unix.EAGAIN,_,_) ->
		    ()
		| Unix.Unix_error(Unix.EINTR,_,_) as err ->
		    (* It is unclear whether this can happen or not. *)
		    deferred_exn := Some err
	    )
    in

    (* first process file descriptors, and add events resulting from 
     * file descriptors to the event queue.
     *)
    let add tag ev_constructor res_constructor descr_list =
      List.iter 
	(fun d ->
	   try
	     let r = res_constructor d in
	     let g, _, _ = RID_Table.find res r in
	     if g = ctrl_pipe_group then
	       (* It's only the control pipe. Drop any bytes from the pipe. *)
	       read_ctrl_pipe()
	     else (
	       have_event := true;
	       Equeue.add_event (Lazy.force sys) (ev_constructor g d))
   	       (* Add the event *)
	   with
	       Not_found ->
		 prerr_endline ("Unixqueue: Got event without resource (implementation error?), for descriptor " ^ string_of_fd d ^ " (" ^ tag ^ ")")
	)
	descr_list in
    add "in"  (fun g d -> Input_arrived(g,d))    (fun d -> Wait_in d)  infiles';
    add "out" (fun g d -> Output_readiness(g,d)) (fun d -> Wait_out d) outfiles';
    add "oob" (fun g d -> Out_of_band(g,d))      (fun d -> Wait_oob d) oobfiles';

    (* determine which descriptors are timed out: *)
    let tref' = Unix.gettimeofday() in
    
    (* For faster lookups in these sets: *)
    let infiles'_set  = set_of_list infiles' in
    let outfiles'_set = set_of_list outfiles' in
    let oobfiles'_set = set_of_list oobfiles' in
    
    RID_Table.iter
      (fun op (g, tout, tlast) ->
	 (* Note: In previous versions of this library, we compared the
	  * group of xxxfiles with g. Actually, the group is guaranteed
	  * to match, because it is not possible to change the groups
	  * in res between [setup] and [queue_events]. So this test
	  * is superflous.
	  *)
	 if match op with 
	     Wait_in d  -> Fd_Set.mem d infiles'_set
	   | Wait_out d -> Fd_Set.mem d outfiles'_set
	   | Wait_oob d -> Fd_Set.mem d oobfiles'_set
	   | Wait _     -> false
	 then
	   (* r denotes a file descriptor resource that must be updated 
	    * because an event happened
	    *)
	   tlast := tref'
	 else
	   (* r is some other resource. Find out if it is timed out
	    * and generate a Timeout event in this case.
	    *)
	   if tout >= 0.0 then begin
	     if tref' >= tout +. !tlast then begin
	       have_event := true;
	       Equeue.add_event (Lazy.force sys) (Timeout(g,op));
	       tlast := tref'
	     end
	   end
      )
      res;

    match !deferred_exn with
	None -> !have_event
      | Some exn -> raise exn


  method private source _sys =
    assert(Lazy.force sys == _sys);
    mutex#lock();
    try
      (* Find out which system events are interesting now: *)
      let (infiles, outfiles, oobfiles, time) = self#setup() in

      dlogr
	(fun() ->
	   ( sprintf "setup result <infiles=%s; outfiles=%s; oobfiles=%s; timeout=%f>"
	       (String.concat "," (List.map string_of_fd infiles))
	       (String.concat "," (List.map string_of_fd outfiles))
	       (String.concat "," (List.map string_of_fd oobfiles))
	       time
	   ));

      let interesting =
	infiles <> [] || outfiles <> [] || oobfiles <> [] || time >= 0.0 in

      if interesting then begin
	(* There ARE interesting situations. *)
	
        (* Wait until these events happen: *)
	
	try
	  (* Maybe we get an EINTR *)

	  blocking <- true;
	  mutex#unlock();  (* Unlock because Unix.select may block *)

	  let (infiles', outfiles', oobfiles') as actual_tuple =
	      (* (infiles', outfiles', oobfiles'): Lists of file descriptors
	       * that can be handled
	       *)
	    Unix.select infiles outfiles oobfiles time in

	  mutex#lock();
	  blocking <- false;
	
          (* Now we have in infiles', outfiles', oobfiles' the actually
	   * happened file descriptor events.
	   * Furthermore, pure timeout events may have happened, but this
	   * is not indicated specially.
	   *)
    	  let have_event = self#queue_events actual_tuple in

	  (* Ensure we always add an event to keep the event loop running: *)
	  if not have_event then
	    Equeue.add_event (Lazy.force sys) (Extra Keep_alive)
	with
	    Unix.Unix_error(Unix.EINTR,_,_) ->
	      (* automatically generate a Signal event: *)
		mutex#lock();
		blocking <- false;
		Equeue.add_event (Lazy.force sys) Signal
      end;
      mutex#unlock()
    with
	any -> 
	  mutex#unlock(); 
	  raise any

  (**********************************************************************)
  (* External interface                                                 *)
  (**********************************************************************)

  method private protect : 's 't . ('s -> 't) -> 's -> 't =
    fun f arg ->
      mutex#lock();
      try
	let r = f arg in
	mutex#unlock();
	r
      with
	  any -> mutex#unlock(); raise any

  method new_group () =
    new Unixqueue_util.group_object

  method new_wait_id () =
    new Unixqueue_util.wait_object

  method private exists_resource_nolock op =
    try
      let (g,_,_) = RID_Table.find res op in
      not (g#is_terminating)
    with
	Not_found ->
	  try
	    let ( (g,_,_), _) = RID_Table.find new_res op in
	    not (g#is_terminating)
	  with
	      Not_found ->
		false

  method exists_resource op =
    self#protect self#exists_resource_nolock op

  method private wake_up keep_alive =
    (* wake_up requires the lock! *)
    if (not mtp#single_threaded) && blocking then begin
      try
	(* avoid that the queue becomes empty: *)
	if keep_alive then 
	  Equeue.add_event (Lazy.force sys) (Extra Keep_alive); 
	( match ctrl_pipe_wr with
	    | None -> ()
	    | Some p_wr ->
		let buf = String.make 1 'X' in
		ignore(Unix.single_write p_wr buf 0 1);
	)
      with
	  Unix.Unix_error(Unix.EINTR,_,_) ->
	    Equeue.add_event (Lazy.force sys) Signal;
	    self#wake_up keep_alive
    end

  method add_resource g (op,t) =
    self # add_resource_1 g (op,t) true

  method add_weak_resource g (op,t) =
    self # add_resource_1 g (op,t) false

  method private add_resource_1 g (op,t) is_strong =
    dlogr (fun () -> (sprintf "add_resource <group %d, %s, timeout %f>"
			 (Oo.id g) (string_of_op op) t));
    if g # is_terminating then
      invalid_arg "Unixqueue.add_resource: the group is terminated";
    self#protect
      (fun () ->
	 if self#exists_resource_nolock op then (
	   (* CHECK: Maybe we should fail if g is a different group than
	    * the already existing group
	    *)
	   dlogr (fun () -> "add_resouce <resource exists already>");
	   ()
	 )
	 else begin
	   (* add the resource: *)
	   RID_Table.replace new_res op ((g,t,ref (-1.0)), is_strong);
	   (* wake the thread up that runs the system (if in mt mode): *)
           self#wake_up true
	 end
      )
      ()

  method private exists_descriptor_nolock d =
    self#exists_resource_nolock (Wait_in d) ||
    self#exists_resource_nolock (Wait_out d) ||
    self#exists_resource_nolock (Wait_oob d)

  method private exists_descriptor d =
    self#protect self#exists_descriptor_nolock d

  method add_close_action g (d,a) =
    if g # is_terminating then
      invalid_arg "Unixqueue.add_close_action: the group is terminated";
    self#protect
      (fun () ->
	 (* CHECK: Maybe we should fail if g is a different group than
	  * the existing group
	  *)
	 if self#exists_descriptor_nolock d then begin
	   Hashtbl.replace close_tab d (g,a)
	     (* There can be only one close action.
	      * TODO: Rename to set_close_action
	      *)
	 end else
	   failwith "add_close_action"
      )
      ()

  method add_abort_action g a =
    if g # is_terminating then
      invalid_arg "Unixqueue.add_abort_action: the group is terminated";
    self#protect
      (fun () ->
	 abort_tab <- (g,a) :: abort_tab
      )
      ()

  method private remove_resource_nolock g op =
    dlogr (fun () -> (sprintf "remove_resource <group %d, %s>"
			 (Oo.id g) (string_of_op op)));
    if g # is_terminating then
      invalid_arg "Unixqueue.remove_resource: the group is terminated";
    (* If there is no resource (g,op) raise Not_found *)
    let g_found, _, _ = 
      try RID_Table.find res op 
      with Not_found -> fst(RID_Table.find new_res op) in
    if g_found # is_terminating then raise Not_found; (* ADD TO WINK PATCH *)
    if g <> g_found then
      failwith "remove_resource: descriptor belongs to different group";
    RID_Table.remove res op;
    RID_Table.remove new_res op;
    RID_Table.remove strong_res op;
    (* is there a close action ? *)
    let sd =
      match op with
	  Wait_in  d' -> Some d'
	| Wait_out d' -> Some d'
	| Wait_oob d' -> Some d'
	| Wait _      -> None
    in
    match sd with
      | Some d ->
	  if not aborting then begin
    	    let action = try Some (snd(Hashtbl.find close_tab d)) with Not_found -> None in
    	    match action with
		Some a -> 
		  (* any open resource? *)
      		  if not (self#exists_descriptor_nolock d) then begin
		    dlogr (fun () -> (sprintf "remove_resource <running close action for fd %s>"
					 (string_of_fd d)));
		    Hashtbl.remove close_tab d;
      		    a d;
      		  end
    	      | None ->
      		  ()
	  end
      | None -> ()


  method remove_resource =
    self#protect self#remove_resource_nolock

  method private add_event_nolock e =
    Equeue.add_event (Lazy.force sys) e; 
    self#wake_up false

  method add_event e =
    self#protect self#add_event_nolock e


  method private uq_handler (esys : event Equeue.t) ev =
    (* The single Unixqueue handler. For all added (sub) handlers, uq_handler
     * gets the events, and delivers them
     *)

    let terminate_handler_nolock g h =
      dlogr (fun () -> (sprintf "uq_handler <terminating handler group %d>" 
			   (Oo.id g)));
      let hlist =
	try Hashtbl.find handlers g with Not_found -> [] in
      let hlist' =
	List.filter (fun h' -> h' != h) hlist in
      if hlist' = [] then (
	Hashtbl.remove handlers g;
	handled_groups <- handled_groups - 1;
	if handled_groups = 0 then
	  raise Equeue.Terminate  (* delete uq_handler from esys *)
      ) else (
	Hashtbl.replace handlers g hlist'
      )
    in

    let rec forward_event_to g (hlist : handler list) =
      match hlist with
	| [] -> 
	    raise Equeue.Reject
	| h :: hlist' ->
	    ( try
		(* Note: ues _must not_ be locked now *)
		h (self (*: #event_system*) :> event_system) esys ev
	      with
		  Equeue.Reject ->
		    forward_event_to g hlist'
		| Equeue.Terminate ->
		    (* Terminate only this handler. *)
		    self#protect (terminate_handler_nolock g) h
		    (* Any error exceptions simply fall through. Equeue will
		     * catch them, and will add the event to the error queue
		     *)
	    )
    in
    
    let forward_event g =
      let hlist = 
	self#protect 
	  (fun () -> try Hashtbl.find handlers g with Not_found -> []) 
	  () in
      forward_event_to g hlist
    in
  
    let forward_event_to_all() =
      let hlist_all =
	self#protect
	  (fun () ->
	     Hashtbl.fold
	       (fun g hlist l -> 
		  if g#is_terminating then l else (g,hlist) :: l)
	       handlers []
	  )
	  ()
      in
      try
	List.iter
	  (fun (g,hlist) ->
	     try
	       forward_event_to g hlist;
	       raise Exit   (* event is delivered, so exit iteration *)
	     with
		 (* event is rejected: try next group *)
		 Equeue.Reject -> ()
	  )
	  hlist_all;
	raise Equeue.Reject (* no handler has accepted the event, so reject *)
      with
	  Exit -> ()
    in

    match ev with
	Extra (Term g) ->
	  (* Terminate all handlers of group g *)
	  self#protect
	    (fun () ->
	       if Hashtbl.mem handlers g then (
		 Hashtbl.remove handlers g;
		 handled_groups <- handled_groups - 1;
		 if handled_groups = 0 then
		   raise Equeue.Terminate  (* delete uq_handler from esys *)
	       )
	       else raise Equeue.Reject (* strange, should not happen *)
	    )
	    ()
      | Extra Keep_alive ->
	  raise Equeue.Reject
      | Input_arrived(g,_) ->
	  if g # is_terminating then raise Equeue.Reject;
	  forward_event g;
      | Output_readiness(g,_) ->
	  if g # is_terminating then raise Equeue.Reject;
	  forward_event g;
      | Out_of_band(g,_) ->
	  if g # is_terminating then raise Equeue.Reject;
	  forward_event g;
      | Timeout(g,_) ->
	  if g # is_terminating then raise Equeue.Reject;
	  forward_event g;
      | Signal ->
	  forward_event_to_all();
      | Extra x ->
	  forward_event_to_all();

  method private equeue_add_handler () =
    Equeue.add_handler (Lazy.force sys) self#uq_handler
      (* It is not necessary to call wake_up *)

  (* CHECK: There is a small difference between Equeue.add_handler and
   * this add_handler: Here, the handler is immediately active (if
   * uq_handler is already active). Can this lead to problems?
   *)

  method private add_handler_nolock g h =
    dlogr (fun () -> (sprintf "add_handler <group %d>" (Oo.id g)));
    
    if g # is_terminating then
      invalid_arg "Unixqueue.add_handler: the group is terminated";

    ( try
	let old_handlers = Hashtbl.find handlers g in
	Hashtbl.replace handlers g (h :: old_handlers)
      with
	  Not_found ->
	    (* The group g is new *)
	    Hashtbl.add handlers g [h];
	    handled_groups <- handled_groups + 1;
	    if handled_groups = 1 then
	      self#equeue_add_handler ()
    )

  method add_handler g =
    self#protect (self#add_handler_nolock g)

  method private clear_nolock g = 
    dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g)));
    
    (* Set that g is terminating now: *)
    g # terminate();

    (* (i) delete all resources of g: *)
    (* This is no longer done immediately like in the two following assignments
       that are commented out. Instead, the resources are deleted the next
       time [setup] is running. Also, the access functions like
       [exists_resource] hide the existence of terminated groups.
     *)
    (*
    res <- RID_Map.fold
             (fun op (g',tout,tlast) res ->
		if g <> g' then RID_Map.add op (g',tout,tlast) res
		else res)
             res
             RID_Map.empty;
    new_res <- RID_Map.fold
                 (fun op (g',tout,tlast) res ->
		    if g <> g' then RID_Map.add op (g',tout,tlast) res
		    else res)
                 new_res
                 RID_Map.empty;
     *)
    res_may_contain_terminated_groups <- true;  (* Faster replacement *)

    (* (ii) delete all handlers of g: (delayed *)
    self#add_event_nolock (Extra (Term g));

    (* (iii) delete special actions of g: *)
    let to_remove =   (* remove from close_tab *)
      Hashtbl.fold 
	(fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in
    List.iter
      (Hashtbl.remove close_tab) to_remove;

    abort_tab <- List.filter (fun (g',_) -> g<>g') abort_tab;

    self#wake_up false;

    (* Note: the Term event isn't caught after all handlers have been
     * deleted. The Equeue module simply discards events that are not
     * handled.
     *)
    ()

  method  clear =
    self#protect self#clear_nolock

  method private abort_nolock g ex =
    (* is there an abort action ? *)
    (* Note: If g has been terminated, the abort action is removed. So
     * we will never find here one.
     *)
    dlogr (fun () -> (sprintf "abort <group %d, exception %s>"
			 (Oo.id g) (Netexn.to_string ex)));
    let action = try Some (List.assoc g abort_tab) with Not_found -> None in
    match action with
	Some a ->
	  begin
	    dlogr (fun () -> "abort <running abort action>");
            aborting <- true;
	    let mistake = ref None in
	    begin try
      	      a g ex;
	    with
		any ->
		  mistake := Some any (* Wow *)
	    end;
	    self#clear g;
	    aborting <- false;
	    match !mistake with
		None -> ()
	      | Some m -> 
		  dlogr (fun () -> (sprintf "abort <propagating exception %s>"
				       (Netexn.to_string m)));
		  raise m
	  end
      | None ->
	  ()

  method private abort g =
  self#protect self#abort_nolock g


  method private add_ctrl_pipe() =
    (* In multi-threaded mode: watch at least the control pipe *)
    if (not mtp#single_threaded) then (
      assert(ctrl_pipe_rd = None && ctrl_pipe_wr = None);
      let (p_rd, p_wr) = Unix.pipe() in
      ctrl_pipe_rd <- Some p_rd;
      ctrl_pipe_wr <- Some p_wr;
      (* The control pipe is weak - does not keep the event system running *)
      RID_Table.add 
	new_res (Wait_in p_rd) ((ctrl_pipe_group,-1.0, ref(-1.0)), false)
    );

  method private close_ctrl_pipe() =
    match ctrl_pipe_rd, ctrl_pipe_wr with
      | None, None -> ()
      | (Some p_rd), (Some p_wr) ->
	  let op = Wait_in p_rd in
	  RID_Table.remove res op;
	  RID_Table.remove new_res op;
	  RID_Table.remove strong_res op;
	  Unix.close p_wr;
	  Unix.close p_rd;
	  ctrl_pipe_rd <- None;
	  ctrl_pipe_wr <- None;
      | _ ->
	  assert false

  method run () =
    let continue = ref true in
    self # add_ctrl_pipe();
    try
      while !continue do
	continue := false;
	try
	  Equeue.run (Lazy.force sys);
	with
	  | Abort (g,an_exception) ->
	      begin
		match an_exception with
		    (Equeue.Reject|Equeue.Terminate) ->
		      (* A serious programming error: *)
		      failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error"
		  | Abort(_,_) ->
		      failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error"
		  | _ -> ()
	      end;
	      self#abort g an_exception;
	      continue := true
      done;
      self # close_ctrl_pipe();
    with
      | error ->
	  self # close_ctrl_pipe();
	  raise error

  method is_running =
    Equeue.is_running (Lazy.force sys)

end
;;


let select_based_event_system() =
  new select_based_event_system()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml