(* $Id: unixqueue_select.ml 1616 2011-06-10 15:08:57Z gerd $ *)
open Printf
open Unixqueue_util
class type sb_event_system =
object
(* Public interface *)
method new_group : unit -> group
method new_wait_id : unit -> wait_id
method exists_resource : operation -> bool
method add_resource : group -> (operation * float) -> unit
method add_weak_resource : group -> (operation * float) -> unit
method add_close_action : group -> (Unix.file_descr * (Unix.file_descr -> unit)) -> unit
method add_abort_action : group -> (group -> exn -> unit) -> unit
method remove_resource : group -> operation -> unit
method add_handler : group -> (event_system -> event Equeue.t -> event -> unit) -> unit
method add_event : event -> unit
method clear : group -> unit
method run : unit -> unit
method is_running : bool
method when_blocking : (unit -> unit) -> unit
(* Protected interface *)
method private setup : unit -> (Unix.file_descr list * Unix.file_descr list * Unix.file_descr list * float)
method private queue_events : (Unix.file_descr list * Unix.file_descr list * Unix.file_descr list) -> bool
method private source : event Equeue.t -> unit
end
(**********************************************************************)
(* A set of file descriptors: *)
(**********************************************************************)
module Fdescr = struct
type t = Unix.file_descr
let compare (a:t) (b:t) = Pervasives.compare a b
end;;
module Fd_Set = Set.Make(Fdescr);;
let set_of_list =
List.fold_left (fun set x -> Fd_Set.add x set) Fd_Set.empty
;;
(**********************************************************************)
exception Term of group
(* Extra (Term g) is now the group termination event for g *)
exception Keep_alive
(* Sometimes used to keep the event system alive *)
module RID = struct
(* RID = resource identifier, i.e. the operation *)
type t = operation
(* let compare (rid1:t) (rid2:t) =
Pervasives.compare rid1 rid2
*)
let equal (rid1:t) (rid2:t) =
rid1 = rid2
let hash =
match Sys.os_type with
| "Win32" ->
Hashtbl.hash
| _ ->
(fun (rid:t) ->
match rid with
| Wait_in fd -> (Obj.magic fd : int)
| Wait_out fd -> 1031 + (Obj.magic fd : int)
| Wait_oob fd -> 2029 + (Obj.magic fd : int)
| Wait wid -> 3047 + Oo.id wid
)
end
(* module RID_Map = Map.Make(RID) *)
module RID_Table = Hashtbl.Make(RID)
exception Exists;;
let rid_map_exists f map =
try
RID_Table.iter
(fun k v -> if f k v then raise Exists)
map;
false
with
Exists -> true
;;
(* The control pipe:
*
* The control pipe is only used in multi-threaded programs: if thread
* A adds an event/resource to the event system while thread B runs the
* system (and blocks). In this case, A must wake up B such that B can
* react on the changed conditions of the system.
*
* The control pipe has a read end and a write end. The read end is added
* to the watched resources such that when bytes arrive thread B wakes
* up. Thread A signals changed conditions by writing a single byte into
* the write end of the control pipe.
*
* The control pipe is now created every time [run] is called, and it is
* closed before this method is left. This avoids that we run out of
* file descriptors when lots of queues are created.
*)
(**********************************************************************)
(* The class unix_event_system *)
(**********************************************************************)
class select_based_event_system () =
let mtp = !Netsys_oothr.provider in
object(self)
val mutable sys = lazy (assert false) (* initialized below *)
val mutable res = (RID_Table.create 47 : resource_prop RID_Table.t)
(* current resources. Note that this map may contain terminated
groups! These groups are first removed at the next [setup]
time.
*)
val mutable strong_res = (RID_Table.create 47 : unit RID_Table.t)
(* only the non-weak resources *)
val mutable res_may_contain_terminated_groups = false
val mutable new_res = (RID_Table.create 47 : (resource_prop*bool) RID_Table.t)
(* added resources. Note that this map may contain terminated
groups! These groups are first removed at the next [setup]
time.
The bool is the is_strong flag.
*)
val mutable close_tab = (Hashtbl.create 10 : (Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t)
val mutable abort_tab = ([] : (group * (group -> exn -> unit)) list)
val mutable aborting = false
val mutable handlers = (Hashtbl.create 10 : (group, handler list) Hashtbl.t)
val mutable handled_groups = 0
(* the number of keys in [handlers] *)
val mutable mutex = mtp#create_mutex()
val mutable blocking = false
val mutable ctrl_pipe_rd = None
val mutable ctrl_pipe_wr = None
val mutable ctrl_pipe_group = new Unixqueue_util.group_object
val mutable when_blocking = (fun () -> ())
(**********************************************************************)
(* Implementation of the queue *)
(**********************************************************************)
initializer
let equeue_sys =
Equeue.create
~string_of_event:Unixqueue_util.string_of_event self#source in
sys <- lazy equeue_sys;
ignore(Lazy.force sys);
(* Add ourselves now at object creation time. The only drawback is that
we no longer raise [Equeue.Out_of_handlers] - but this is questionable
anyway since the addition of [Immediate] events.
In order to generate [Out_of_handlers] we would have to count
[Immediate] events in addition to handlers.
*)
self#equeue_add_handler ()
method private setup () =
(* Find out which resources should be watched for interesting events *)
(* CHECK: Can we do this faster? Only new_res? *)
let tref = Unix.gettimeofday () in
if res_may_contain_terminated_groups then (
(* This is the right time to remove references to terminated groups
from [res]. We don't do it directly in [clear] to avoid bad
performance when there are a lot of resources.
*)
let to_del =
RID_Table.fold
(fun op (g,_,_) acc ->
if g#is_terminating then op :: acc else acc
)
res
[] in
List.iter
(fun op ->
RID_Table.remove res op;
RID_Table.remove strong_res op;
)
to_del;
res_may_contain_terminated_groups <- false;
);
if RID_Table.length new_res <> 0 then begin
(* Append new_res to res, and change negative tlast to tref.
* A negative event time tlast indicates that the
* resource has just been added; such a resource is handled as if
* the last event has just been seen.
*)
RID_Table.iter
(fun op ((g, tout, _), is_strong) ->
if not(g#is_terminating) then (
RID_Table.replace res op (g, tout, ref tref);
if is_strong then
RID_Table.replace strong_res op ()
);
)
new_res;
RID_Table.clear new_res
end;
dlogr (fun () ->
let reslst =
RID_Table.fold
(fun op (g, tout, tlast) acc ->
(sprintf "%s => group %d, timeout %f, lastevent %f"
(string_of_op op) (Oo.id g) tout !tlast) :: acc)
res
[] in
sprintf "setup <resources: %s>"
(String.concat "; " reslst));
(* When we only have weak resources, or nothing, we return the empty set *)
if RID_Table.length strong_res <> 0 then begin
let infiles, outfiles, oobfiles, time =
(* (infiles, outfiles, oobfiles, time): Lists of file descriptors
* that must be observed and the maximum period of time until
* something must have been happened (-1 means infinite period)
*)
RID_Table.fold
(fun op (g, tout,tlast) (inf, outf, oobf, t) ->
(* (inf, outf, oobf, t): Current intermediate result. The descrip-
* tors inf, outf, oobf are already known to be observed, and
* t is the smallest timeout value of these descriptors
* rid = (g,op):
* The resource being examined. g is the group; op is
* the operation
* tout: The timeout value of the operation
* tlast: The point in time when the resource caused last an
* event
*)
(* t': Compute the smallest timeout value if r is considered to
* happen
*)
let t' =
if tout < 0.0 then
(* Infinite timeout was specified for r; so t' = t *)
t
else
(* tref -. tlast: time since the last event from resource r
* tout -. (tref -. tlast): how much time may elapse until
* the timeout happens. If this number is negative
* the timeout should already have been happened.
* tdelta: how much time may elapse until the timeout
* happens in the future. No negative values.
*)
let tdelta = max 0.0 (tout -. (tref -. !tlast)) in
if t < 0.0 then
(* t: was infinite timeout, and t' is now tdelta *)
tdelta
else
(* t: was finite timeout, and t' is the minimum *)
min t tdelta
in
(* Add the descriptor contained in op to one of the descriptor
* lists
*)
match op with
Wait_in d -> (d :: inf, outf, oobf, t')
| Wait_out d -> (inf, d :: outf, oobf, t')
| Wait_oob d -> (inf, outf, d :: oobf, t')
| Wait _ -> (inf, outf, oobf, t')
(* NOTE: In previous version of this library, we stored the
* group into the three lists intf, outf, oobf. The group can
* be retrieved by looking at res at any time, and it
* cannot be changed. So we do not do this any longer.
*)
)
res
([],[],[],(-1.0))
in
(infiles, outfiles, oobfiles, time)
end
else ([],[],[],(-1.0))
method private queue_events (infiles', outfiles', oobfiles') =
let dummy_buf = String.create 1 in
let deferred_exn = ref None in
let have_event = ref false in
(* Compare the watched resources infiles/outfiles/oobfiles with the
* actually happened system events infiles'/outfiles'/oobfiles',
* and add the events to the queue resulting from that.
*)
let read_ctrl_pipe() =
(* Read bytes from the control pipe and ignore them. The control pipe
* is in non-blocking mode.
* Problem: Compiled with -vmthread, there is a bug in Unix.read
* that EAGAIN is not returned. Instead, reading is restarted. To
* avoid this, we first test with Unix.select, which seems to work
* correctly.
* Update: Does not work. Don't know how to fix this problem.
*)
match ctrl_pipe_rd with
| None ->
() (* this should not happen *)
| Some p ->
( try
while true do
let have_ctrl_data = Netsys.is_readable `Read_write p in
if not have_ctrl_data then
raise(Unix.Unix_error(Unix.EAGAIN,"(artificial)",""));
ignore(Unix.read p dummy_buf 0 1)
done;
assert false
with
Unix.Unix_error(Unix.EAGAIN,_,_) ->
()
| Unix.Unix_error(Unix.EINTR,_,_) as err ->
(* It is unclear whether this can happen or not. *)
deferred_exn := Some err
)
in
(* first process file descriptors, and add events resulting from
* file descriptors to the event queue.
*)
let add tag ev_constructor res_constructor descr_list =
List.iter
(fun d ->
try
let r = res_constructor d in
let g, _, _ = RID_Table.find res r in
if g = ctrl_pipe_group then
(* It's only the control pipe. Drop any bytes from the pipe. *)
read_ctrl_pipe()
else (
have_event := true;
Equeue.add_event (Lazy.force sys) (ev_constructor g d))
(* Add the event *)
with
Not_found ->
prerr_endline ("Unixqueue: Got event without resource (implementation error?), for descriptor " ^ string_of_fd d ^ " (" ^ tag ^ ")")
)
descr_list in
add "in" (fun g d -> Input_arrived(g,d)) (fun d -> Wait_in d) infiles';
add "out" (fun g d -> Output_readiness(g,d)) (fun d -> Wait_out d) outfiles';
add "oob" (fun g d -> Out_of_band(g,d)) (fun d -> Wait_oob d) oobfiles';
(* determine which descriptors are timed out: *)
let tref' = Unix.gettimeofday() in
(* For faster lookups in these sets: *)
let infiles'_set = set_of_list infiles' in
let outfiles'_set = set_of_list outfiles' in
let oobfiles'_set = set_of_list oobfiles' in
RID_Table.iter
(fun op (g, tout, tlast) ->
(* Note: In previous versions of this library, we compared the
* group of xxxfiles with g. Actually, the group is guaranteed
* to match, because it is not possible to change the groups
* in res between [setup] and [queue_events]. So this test
* is superflous.
*)
if match op with
Wait_in d -> Fd_Set.mem d infiles'_set
| Wait_out d -> Fd_Set.mem d outfiles'_set
| Wait_oob d -> Fd_Set.mem d oobfiles'_set
| Wait _ -> false
then
(* r denotes a file descriptor resource that must be updated
* because an event happened
*)
tlast := tref'
else
(* r is some other resource. Find out if it is timed out
* and generate a Timeout event in this case.
*)
if tout >= 0.0 then begin
if tref' >= tout +. !tlast then begin
have_event := true;
Equeue.add_event (Lazy.force sys) (Timeout(g,op));
tlast := tref'
end
end
)
res;
match !deferred_exn with
None -> !have_event
| Some exn -> raise exn
method private source _sys =
assert(Lazy.force sys == _sys);
when_blocking();
mutex#lock();
try
(* Find out which system events are interesting now: *)
let (infiles, outfiles, oobfiles, time) = self#setup() in
dlogr
(fun() ->
( sprintf "setup result <infiles=%s; outfiles=%s; oobfiles=%s; timeout=%f>"
(String.concat "," (List.map string_of_fd infiles))
(String.concat "," (List.map string_of_fd outfiles))
(String.concat "," (List.map string_of_fd oobfiles))
time
));
let interesting =
infiles <> [] || outfiles <> [] || oobfiles <> [] || time >= 0.0 in
if interesting then begin
(* There ARE interesting situations. *)
(* Wait until these events happen: *)
try
(* Maybe we get an EINTR *)
blocking <- true;
mutex#unlock(); (* Unlock because Unix.select may block *)
let (infiles', outfiles', oobfiles') as actual_tuple =
(* (infiles', outfiles', oobfiles'): Lists of file descriptors
* that can be handled
*)
Unix.select infiles outfiles oobfiles time in
mutex#lock();
blocking <- false;
(* Now we have in infiles', outfiles', oobfiles' the actually
* happened file descriptor events.
* Furthermore, pure timeout events may have happened, but this
* is not indicated specially.
*)
let have_event = self#queue_events actual_tuple in
(* Ensure we always add an event to keep the event loop running: *)
if not have_event then
Equeue.add_event (Lazy.force sys) (Extra Keep_alive)
with
Unix.Unix_error(Unix.EINTR,_,_) ->
(* automatically generate a Signal event: *)
mutex#lock();
blocking <- false;
Equeue.add_event (Lazy.force sys) Signal
end;
mutex#unlock()
with
any ->
mutex#unlock();
raise any
(**********************************************************************)
(* External interface *)
(**********************************************************************)
method private protect : 's 't . ('s -> 't) -> 's -> 't =
fun f arg ->
mutex#lock();
try
let r = f arg in
mutex#unlock();
r
with
any -> mutex#unlock(); raise any
method new_group () =
new Unixqueue_util.group_object
method new_wait_id () =
new Unixqueue_util.wait_object
method private exists_resource_nolock op =
try
let (g,_,_) = RID_Table.find res op in
not (g#is_terminating)
with
Not_found ->
try
let ( (g,_,_), _) = RID_Table.find new_res op in
not (g#is_terminating)
with
Not_found ->
false
method exists_resource op =
self#protect self#exists_resource_nolock op
method private wake_up keep_alive =
(* wake_up requires the lock! *)
if (not mtp#single_threaded) && blocking then begin
try
(* avoid that the queue becomes empty: *)
if keep_alive then
Equeue.add_event (Lazy.force sys) (Extra Keep_alive);
( match ctrl_pipe_wr with
| None -> ()
| Some p_wr ->
let buf = String.make 1 'X' in
ignore(Unix.single_write p_wr buf 0 1);
)
with
Unix.Unix_error(Unix.EINTR,_,_) ->
Equeue.add_event (Lazy.force sys) Signal;
self#wake_up keep_alive
end
method add_resource g (op,t) =
self # add_resource_1 g (op,t) true
method add_weak_resource g (op,t) =
self # add_resource_1 g (op,t) false
method private add_resource_1 g (op,t) is_strong =
dlogr (fun () -> (sprintf "add_resource <group %d, %s, timeout %f>"
(Oo.id g) (string_of_op op) t));
if g # is_terminating then
invalid_arg "Unixqueue.add_resource: the group is terminated";
self#protect
(fun () ->
if self#exists_resource_nolock op then (
(* CHECK: Maybe we should fail if g is a different group than
* the already existing group
*)
dlogr (fun () -> "add_resouce <resource exists already>");
()
)
else begin
(* add the resource: *)
RID_Table.replace new_res op ((g,t,ref (-1.0)), is_strong);
(* wake the thread up that runs the system (if in mt mode): *)
self#wake_up true
end
)
()
method private exists_descriptor_nolock d =
self#exists_resource_nolock (Wait_in d) ||
self#exists_resource_nolock (Wait_out d) ||
self#exists_resource_nolock (Wait_oob d)
method private exists_descriptor d =
self#protect self#exists_descriptor_nolock d
method add_close_action g (d,a) =
if g # is_terminating then
invalid_arg "Unixqueue.add_close_action: the group is terminated";
self#protect
(fun () ->
(* CHECK: Maybe we should fail if g is a different group than
* the existing group
*)
if self#exists_descriptor_nolock d then begin
Hashtbl.replace close_tab d (g,a)
(* There can be only one close action.
* TODO: Rename to set_close_action
*)
end else
failwith "add_close_action"
)
()
method add_abort_action g a =
if g # is_terminating then
invalid_arg "Unixqueue.add_abort_action: the group is terminated";
self#protect
(fun () ->
abort_tab <- (g,a) :: abort_tab
)
()
method private remove_resource_nolock g op =
dlogr (fun () -> (sprintf "remove_resource <group %d, %s>"
(Oo.id g) (string_of_op op)));
if g # is_terminating then
invalid_arg "Unixqueue.remove_resource: the group is terminated";
(* If there is no resource (g,op) raise Not_found *)
let g_found, _, _ =
try RID_Table.find res op
with Not_found -> fst(RID_Table.find new_res op) in
if g_found # is_terminating then raise Not_found; (* ADD TO WINK PATCH *)
if g <> g_found then
failwith "remove_resource: descriptor belongs to different group";
RID_Table.remove res op;
RID_Table.remove new_res op;
RID_Table.remove strong_res op;
(* is there a close action ? *)
let sd =
match op with
Wait_in d' -> Some d'
| Wait_out d' -> Some d'
| Wait_oob d' -> Some d'
| Wait _ -> None
in
match sd with
| Some d ->
if not aborting then begin
let action = try Some (snd(Hashtbl.find close_tab d)) with Not_found -> None in
match action with
Some a ->
(* any open resource? *)
if not (self#exists_descriptor_nolock d) then begin
dlogr (fun () -> (sprintf "remove_resource <running close action for fd %s>"
(string_of_fd d)));
Hashtbl.remove close_tab d;
a d;
end
| None ->
()
end
| None -> ()
method remove_resource =
self#protect self#remove_resource_nolock
method private add_event_nolock e =
Equeue.add_event (Lazy.force sys) e;
self#wake_up false
method add_event e =
self#protect self#add_event_nolock e
method private uq_handler (esys : event Equeue.t) ev =
(* The single Unixqueue handler. For all added (sub) handlers, uq_handler
* gets the events, and delivers them
*)
let terminate_handler_nolock g h =
dlogr (fun () -> (sprintf "uq_handler <terminating handler group %d>"
(Oo.id g)));
let hlist =
try Hashtbl.find handlers g with Not_found -> [] in
let hlist' =
List.filter (fun h' -> h' != h) hlist in
if hlist' = [] then (
Hashtbl.remove handlers g;
handled_groups <- handled_groups - 1;
(*
if handled_groups = 0 then
raise Equeue.Terminate (* delete uq_handler from esys *)
*)
) else (
Hashtbl.replace handlers g hlist'
)
in
let rec forward_event_to g (hlist : handler list) =
match hlist with
| [] ->
raise Equeue.Reject
| h :: hlist' ->
( try
(* Note: ues _must not_ be locked now *)
h (self (*: #event_system*) :> event_system) esys ev
with
Equeue.Reject ->
forward_event_to g hlist'
| Equeue.Terminate ->
(* Terminate only this handler. *)
self#protect (terminate_handler_nolock g) h
(* Any error exceptions simply fall through. Equeue will
* catch them, and will add the event to the error queue
*)
)
in
let forward_event g =
let hlist =
self#protect
(fun () -> try Hashtbl.find handlers g with Not_found -> [])
() in
forward_event_to g hlist
in
let forward_event_to_all() =
let hlist_all =
self#protect
(fun () ->
Hashtbl.fold
(fun g hlist l ->
if g#is_terminating then l else (g,hlist) :: l)
handlers []
)
()
in
try
List.iter
(fun (g,hlist) ->
try
forward_event_to g hlist;
raise Exit (* event is delivered, so exit iteration *)
with
(* event is rejected: try next group *)
Equeue.Reject -> ()
)
hlist_all;
raise Equeue.Reject (* no handler has accepted the event, so reject *)
with
Exit -> ()
in
match ev with
Extra (Term g) ->
(* Terminate all handlers of group g *)
self#protect
(fun () ->
if Hashtbl.mem handlers g then (
Hashtbl.remove handlers g;
handled_groups <- handled_groups - 1;
(*
if handled_groups = 0 then
raise Equeue.Terminate (* delete uq_handler from esys *)
*)
)
else raise Equeue.Reject (* strange, should not happen *)
)
()
| Extra Keep_alive ->
raise Equeue.Reject
| Input_arrived(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Output_readiness(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Out_of_band(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Timeout(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Signal ->
forward_event_to_all();
| Extra x ->
forward_event_to_all();
| Immediate(g,f) ->
if g # is_terminating then raise Equeue.Reject;
( try f()
with Equeue.Terminate -> ()
)
method private equeue_add_handler () =
Equeue.add_handler (Lazy.force sys) self#uq_handler
(* It is not necessary to call wake_up *)
(* CHECK: There is a small difference between Equeue.add_handler and
* this add_handler: Here, the handler is immediately active (if
* uq_handler is already active). Can this lead to problems?
*)
method private add_handler_nolock g h =
dlogr (fun () -> (sprintf "add_handler <group %d>" (Oo.id g)));
if g # is_terminating then
invalid_arg "Unixqueue.add_handler: the group is terminated";
( try
let old_handlers = Hashtbl.find handlers g in
Hashtbl.replace handlers g (h :: old_handlers)
with
Not_found ->
(* The group g is new *)
Hashtbl.add handlers g [h];
handled_groups <- handled_groups + 1;
if handled_groups = 1 then
self#equeue_add_handler ()
)
method add_handler g =
self#protect (self#add_handler_nolock g)
method private clear_nolock g =
dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g)));
(* Set that g is terminating now: *)
g # terminate();
(* (i) delete all resources of g: *)
(* This is no longer done immediately like in the two following assignments
that are commented out. Instead, the resources are deleted the next
time [setup] is running. Also, the access functions like
[exists_resource] hide the existence of terminated groups.
*)
(*
res <- RID_Map.fold
(fun op (g',tout,tlast) res ->
if g <> g' then RID_Map.add op (g',tout,tlast) res
else res)
res
RID_Map.empty;
new_res <- RID_Map.fold
(fun op (g',tout,tlast) res ->
if g <> g' then RID_Map.add op (g',tout,tlast) res
else res)
new_res
RID_Map.empty;
*)
res_may_contain_terminated_groups <- true; (* Faster replacement *)
(* (ii) delete all handlers of g: (delayed *)
self#add_event_nolock (Extra (Term g));
(* (iii) delete special actions of g: *)
let to_remove = (* remove from close_tab *)
Hashtbl.fold
(fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in
List.iter
(Hashtbl.remove close_tab) to_remove;
abort_tab <- List.filter (fun (g',_) -> g<>g') abort_tab;
self#wake_up false;
(* Note: the Term event isn't caught after all handlers have been
* deleted. The Equeue module simply discards events that are not
* handled.
*)
()
method clear =
self#protect self#clear_nolock
method private abort_nolock g ex =
(* is there an abort action ? *)
(* Note: If g has been terminated, the abort action is removed. So
* we will never find here one.
*)
dlogr (fun () -> (sprintf "abort <group %d, exception %s>"
(Oo.id g) (Netexn.to_string ex)));
let action = try Some (List.assoc g abort_tab) with Not_found -> None in
match action with
Some a ->
begin
dlogr (fun () -> "abort <running abort action>");
aborting <- true;
let mistake = ref None in
begin try
a g ex;
with
any ->
mistake := Some any (* Wow *)
end;
self#clear g;
aborting <- false;
match !mistake with
None -> ()
| Some m ->
dlogr (fun () -> (sprintf "abort <propagating exception %s>"
(Netexn.to_string m)));
raise m
end
| None ->
()
method private abort g =
self#protect self#abort_nolock g
method private add_ctrl_pipe() =
(* In multi-threaded mode: watch at least the control pipe *)
if (not mtp#single_threaded) then (
assert(ctrl_pipe_rd = None && ctrl_pipe_wr = None);
let (p_rd, p_wr) = Unix.pipe() in
ctrl_pipe_rd <- Some p_rd;
ctrl_pipe_wr <- Some p_wr;
(* The control pipe is weak - does not keep the event system running *)
RID_Table.add
new_res (Wait_in p_rd) ((ctrl_pipe_group,-1.0, ref(-1.0)), false)
);
method private close_ctrl_pipe() =
match ctrl_pipe_rd, ctrl_pipe_wr with
| None, None -> ()
| (Some p_rd), (Some p_wr) ->
let op = Wait_in p_rd in
RID_Table.remove res op;
RID_Table.remove new_res op;
RID_Table.remove strong_res op;
Unix.close p_wr;
Unix.close p_rd;
ctrl_pipe_rd <- None;
ctrl_pipe_wr <- None;
| _ ->
assert false
method run () =
let continue = ref true in
self # add_ctrl_pipe();
try
while !continue do
continue := false;
try
Equeue.run (Lazy.force sys);
with
| Abort (g,an_exception) ->
begin
match an_exception with
(Equeue.Reject|Equeue.Terminate) ->
(* A serious programming error: *)
failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error"
| Abort(_,_) ->
failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error"
| _ -> ()
end;
self#abort g an_exception;
continue := true
done;
self # close_ctrl_pipe();
with
| error ->
self # close_ctrl_pipe();
raise error
method is_running =
Equeue.is_running (Lazy.force sys)
method when_blocking f =
when_blocking <- f
end
;;
let select_based_event_system() =
new select_based_event_system()