(* $Id: unixqueue_pollset.ml 1272 2009-10-01 01:33:31Z gerd $ *)
(* pset # dispose: we try to call it when there are no more operations in
the queue. This is tested when
- the resource is removed (and we are not waiting)
- the group is cleared (and we are not waiting)
- after every wait
Note that we must not call [dispose] while [wait] is running!
*)
open Unixqueue_util
open Printf
module Float = struct
type t = float
let compare : float -> float -> int = Pervasives.compare
end
module FloatMap = Map.Make(Float)
let min_key m =
let k = ref None in
( try
FloatMap.iter
(fun t _ -> k := Some t; raise Exit)
m
with Exit -> ()
);
match !k with
| Some min -> min
| None -> raise Not_found
let ops_until tmax m =
(* Look into the FloatMap m, and return all ops for which
t <= tmax
*)
let l = ref [] in
( try
FloatMap.iter
(fun t ops ->
if t > tmax then raise Exit;
l := (t,ops) :: !l
)
m
with
| Exit -> ()
);
!l
exception Term of Unixqueue_util.group
(* Extra (Term g) is now the group termination event for g *)
exception Keep_alive
(* Sometimes used to keep the event system alive *)
exception Exit_loop
let pset_set (pset:Netsys_pollset.pollset) fd (i,o,p) =
if not i && not o && not p then
pset # remove fd
else
pset # add fd (Netsys_posix.poll_req_events i o p)
let pset_find (pset:Netsys_pollset.pollset) fd =
try Netsys_posix.poll_req_triple(pset#find fd)
with
| Not_found -> (false,false,false)
let op_of_event ev =
match ev with
| Unixqueue.Input_arrived(_,fd) -> Unixqueue.Wait_in fd
| Unixqueue.Output_readiness(_,fd) -> Unixqueue.Wait_out fd
| Unixqueue.Out_of_band(_,fd) -> Unixqueue.Wait_oob fd
| Unixqueue.Timeout(_,op) -> op
| _ -> assert false
let while_locked mutex f =
Netsys_oothr.serialize mutex f ()
let escape_lock mutex f =
mutex # unlock();
let r =
try f ()
with e -> mutex # lock(); raise e in
mutex # lock();
r
(* A little encapsulation so we can easily identify handlers by Oo.id *)
class ohandler (h:handler) =
object
method run esys eq ev =
h esys eq ev
end
class pollset_event_system (pset : Netsys_pollset.pollset) =
let mtp = !Netsys_oothr.provider in
object(self)
val mutable sys =
lazy (assert false) (* initialized below *)
(*
val mutable ops_of_group =
(Hashtbl.create 10 : (group, operation list) Hashtbl.t)
-- would be for [clear] only
*)
val mutable tmo_of_op =
(Hashtbl.create 10 : (operation, float * float * group) Hashtbl.t)
(* first number: duration of timeout (or -1)
second number: point in time (or -1)
*)
val mutable ops_of_tmo =
(FloatMap.empty : operation list FloatMap.t)
val mutable strong_op =
(Hashtbl.create 10 : (operation, unit) Hashtbl.t)
(* contains all non-weak ops *)
val mutable aborting = false
val mutable close_tab = (Hashtbl.create 10 : (Unix.file_descr, (group * (Unix.file_descr -> unit))) Hashtbl.t)
val mutable abort_tab = ([] : (group * (group -> exn -> unit)) list)
val mutable handlers = (Hashtbl.create 10 : (group, ohandler list) Hashtbl.t)
val mutable handled_groups = 0
(* the number of keys in [handlers] *)
val mutable waiting = false
val mutex = mtp # create_mutex()
initializer (
let equeue_sys = Equeue.create ~string_of_event self#source in
sys <- lazy equeue_sys;
ignore(Lazy.force sys)
)
method private source _sys =
(* locking: the lock is not held when called, because we are called back
from Equeue
*)
assert(Lazy.force sys == _sys);
let locked = ref true in (* keep track of locking state *)
mutex # lock();
let t0 = Unix.gettimeofday() in
let tmin = try min_key ops_of_tmo with Not_found -> (-1.0) in
let delta = if tmin < 0.0 then (-1.0) else max (tmin -. t0) 0.0 in
dlogr (fun () -> (
sprintf "t0 = %f" t0));
let nothing_to_do =
(* For this test only non-weak resources count, so... *)
Hashtbl.length strong_op = 0 in
let pset_events, have_eintr =
try
if nothing_to_do then (
dlogr (fun () -> "nothing_to_do");
([], false)
)
else (
dlogr (fun () -> (
let ops =
Hashtbl.fold (fun op _ l -> op::l) tmo_of_op [] in
let op_str =
String.concat ";" (List.map string_of_op ops) in
sprintf "wait tmo=%f ops=<%s>" delta op_str));
(* Reset the cancel bit immediately before calling [wait]. Any
event added up to now is considered anyway by [wait] because
our lock is still held. Any new event added after we unlock will
set the cancel_wait flag, and cause the [wait] to exit (if it is
still running).
*)
pset # cancel_wait false;
waiting <- true;
mutex # unlock();
locked := false;
(pset # wait delta, false)
)
with
| Unix.Unix_error(Unix.EINTR,_,_) ->
dlogr (fun () -> "wait signals EINTR");
([], true)
| e ->
(* Usually from [wait], but one never knows... *)
if !locked then mutex#unlock();
waiting <- false;
raise e
in
waiting <- false;
if not !locked then mutex # lock();
locked := true;
try
(* Catch exceptions and unlock *)
dlogr (fun () -> (
sprintf "wait returns <%d pset events>"
(List.length pset_events)));
let t1 = Unix.gettimeofday() in
dlogr (fun () -> (sprintf "t1 = %f" t1));
(* t1 is the reference for determining the timeouts *)
(* while waiting somebody might have removed resouces, so ... *)
if Hashtbl.length tmo_of_op = 0 then
pset # dispose();
let operations =
(* The possible operations *)
List.flatten
(List.map
(fun (fd,ev_in,ev_out) ->
(* Note that POLLHUP and POLLERR can also mean that we
have data to read/write!
*)
let (in_rd,in_wr,in_pri) =
Netsys_posix.poll_req_triple ev_in in
let out_rd =
Netsys_posix.poll_rd_result ev_out in
let out_wr =
Netsys_posix.poll_wr_result ev_out in
let out_pri =
Netsys_posix.poll_pri_result ev_out in
let out_hup =
Netsys_posix.poll_hup_result ev_out in
let out_err =
Netsys_posix.poll_err_result ev_out in
let have_input =
in_rd && (out_rd || out_hup || out_err) in
let have_output =
in_wr && (out_wr || out_hup || out_err) in
let have_pri =
in_pri && (out_pri || out_hup || out_err) in
if have_input || have_output || have_pri then (
let e1 = if have_pri then [Unixqueue.Wait_oob fd] else [] in
let e2 = if have_input then [Unixqueue.Wait_in fd] else [] in
let e3 = if have_output then [Unixqueue.Wait_out fd] else [] in
e1 @ e2 @ e3
)
else
[]
)
pset_events) in
let events =
(* The events corresponding to [operations] *)
List.flatten
(List.map
(fun op -> self#events_of_op_wl op)
operations) in
let ops_timed_out =
ops_until t1 ops_of_tmo in
(* Note: this _must_ include operations until <= t1 (not <t1), otherwise
a timeout value of 0.0 won't work
*)
let timeout_events =
(* Generate timeout events for all ops in [tmo_of_op] that have timed
out and that are not in [events]
*)
List.flatten
(List.map
(fun (_, ops) ->
let ops' =
List.filter (fun op -> not(List.mem op operations)) ops in
List.flatten
(List.map
(fun op -> self#events_of_op_wl op)
ops')
)
ops_timed_out) in
dlogr(fun() -> (sprintf "delivering <%s>"
(String.concat ";"
(List.map
string_of_event
(events @ timeout_events)))));
(* deliver events *)
List.iter (Equeue.add_event _sys) events;
List.iter (Equeue.add_event _sys) timeout_events;
if have_eintr then (
dlogr (fun () -> "delivering Signal");
Equeue.add_event _sys Unixqueue.Signal
)
else
if events = [] && timeout_events = [] && not nothing_to_do then (
(* Ensure we always add an event to keep the event loop running: *)
dlogr (fun () -> "delivering Keep_alive");
Equeue.add_event _sys (Unixqueue.Extra Keep_alive)
);
(* Update ops_of_tmo: *)
List.iter
(fun (t,_) ->
ops_of_tmo <- FloatMap.remove t ops_of_tmo
)
ops_timed_out;
(* Set a new timeout for all delivered events:
(Note that [pset] remains unchanged, because the set of watched
resources remains unchanged.)
*)
List.iter
(fun evlist ->
List.iter
(fun ev ->
try
let op = op_of_event ev in
let (tmo,_,g) = Hashtbl.find tmo_of_op op in (* or Not_found *)
let is_strong =
try Hashtbl.find strong_op op; true
with Not_found -> false in
self#sched_remove_wl op;
let t2 = if tmo < 0.0 then tmo else t1 +. tmo in
self#sched_add_wl g op tmo t2 is_strong
with
| Not_found -> assert false
)
evlist)
[ events; timeout_events ];
mutex # unlock();
locked := false
with
| e ->
(* exceptions are unexpected, but we want to make sure not to mess
with locks
*)
if !locked then mutex # unlock();
raise e
(* Note: suffix _wl = while locked *)
method private events_of_op_wl op =
try
let (_,_,g) = Hashtbl.find tmo_of_op op in (* or Not_found *)
match op with
| Unixqueue.Wait_in fd ->
[Unixqueue.Input_arrived(g,fd)]
| Unixqueue.Wait_out fd ->
[Unixqueue.Output_readiness(g,fd)]
| Unixqueue.Wait_oob fd ->
[Unixqueue.Out_of_band(g,fd)]
| Unixqueue.Wait _ ->
[Unixqueue.Timeout(g,op)]
with
| Not_found ->
(* A "ghost event", i.e. there is no handler anymore
for it, but wasn't deleted quickly enough from
pset
*)
[]
method private sched_remove_wl op =
try
let tmo, t1, g = Hashtbl.find tmo_of_op op in (* or Not_found *)
dlogr(fun () -> (sprintf "sched_remove %s" (string_of_op op)));
Hashtbl.remove tmo_of_op op;
Hashtbl.remove strong_op op;
let l_ops =
if tmo >= 0.0 then
try FloatMap.find t1 ops_of_tmo with Not_found -> []
else [] in
let l_ops' =
List.filter (fun op' -> op <> op') l_ops in
if l_ops' = [] then
ops_of_tmo <- FloatMap.remove t1 ops_of_tmo
else
ops_of_tmo <- FloatMap.add t1 l_ops' ops_of_tmo
with
| Not_found -> ()
method private pset_remove_wl op =
match op with
| Wait_in fd ->
let (i,o,p) = pset_find pset fd in
pset_set pset fd (false,o,p)
| Wait_out fd ->
let (i,o,p) = pset_find pset fd in
pset_set pset fd (i,false,p)
| Wait_oob fd ->
let (i,o,p) = pset_find pset fd in
pset_set pset fd (i,o,false)
| Wait _ ->
()
method private sched_add_wl g op tmo t1 is_strong =
dlogr(fun () -> (sprintf "sched_add %s tmo=%f t1=%f is_strong=%b"
(string_of_op op) tmo t1 is_strong));
Hashtbl.add tmo_of_op op (tmo, t1, g);
if is_strong then
Hashtbl.add strong_op op ();
let l_ops =
try FloatMap.find t1 ops_of_tmo with Not_found -> [] in
if tmo >= 0.0 then
ops_of_tmo <- FloatMap.add t1 (op :: l_ops) ops_of_tmo
method private pset_add_wl op =
match op with
| Wait_in fd ->
let (i,o,p) = pset_find pset fd in
pset_set pset fd (true,o,p)
| Wait_out fd ->
let (i,o,p) = pset_find pset fd in
pset_set pset fd (i,true,p)
| Wait_oob fd ->
let (i,o,p) = pset_find pset fd in
pset_set pset fd (i,o,true)
| Wait _ ->
()
method exists_resource op =
while_locked mutex
(fun () -> self # exists_resource_wl op)
method private exists_resource_wl op =
Hashtbl.mem tmo_of_op op
method private exists_descriptor_wl fd =
self#exists_resource_wl (Unixqueue.Wait_in fd) ||
self#exists_resource_wl (Unixqueue.Wait_out fd) ||
self#exists_resource_wl (Unixqueue.Wait_oob fd)
method add_resource g (op, tmo) =
while_locked mutex
(fun () ->
self # add_resource_wl g (op, tmo) true
)
method add_weak_resource g (op, tmo) =
while_locked mutex
(fun () ->
self # add_resource_wl g (op, tmo) false
)
method private add_resource_wl g (op, tmo) is_strong =
if g # is_terminating then
invalid_arg "Unixqueue.add_resource: the group is terminated";
if not (Hashtbl.mem tmo_of_op op) then (
self#pset_add_wl op;
let t1 = if tmo < 0.0 then tmo else Unix.gettimeofday() +. tmo in
self#sched_add_wl g op tmo t1 is_strong;
(* Multi-threading: interrupt [wait] *)
pset # cancel_wait true;
(* CHECK: In the select-based impl we add Keep_alive to equeue.
This idea (probably): If [wait] is about to return, and the
queue becomes empty, the whole esys terminates. The Keep_alive
prevents that.
My current thinking is that this delays the race condition
a bit, but does not prevent it from happening
*)
)
(* Note: The second addition of a resource is silently ignored...
Maybe this should be fixed, so that the timeout can be lowered
*)
method remove_resource g op =
while_locked mutex
(fun () ->
if g # is_terminating then
invalid_arg "remove_resource: the group is terminated";
let _, t1, g_found = Hashtbl.find tmo_of_op op in
if g <> g_found then
failwith "remove_resource: descriptor belongs to different group";
self#sched_remove_wl op;
self#pset_remove_wl op;
if not waiting && Hashtbl.length tmo_of_op = 0 then
pset # dispose();
pset # cancel_wait true; (* interrupt [wait] *)
(* is there a close action ? *)
let fd_opt =
match op with
| Wait_in d -> Some d
| Wait_out d -> Some d
| Wait_oob d -> Some d
| Wait _ -> None in
match fd_opt with
| Some fd ->
if not aborting then (
let action =
try Some (snd(Hashtbl.find close_tab fd))
with Not_found -> None in
match action with
| Some a ->
(* any open resource? *)
(* FIXME MT: We don't know yet whether fd can be closed.
This shouldn't be done before [wait] returns.
*)
if not (self#exists_descriptor_wl fd) then (
dlogr (fun () -> (sprintf "remove_resource <running close action for fd %s>"
(string_of_fd fd)));
Hashtbl.remove close_tab fd;
escape_lock mutex (fun () -> a fd);
)
| None ->
()
)
| None -> ()
)
method new_group () =
new group_object
method new_wait_id () =
new wait_object
method add_close_action g (d,a) =
while_locked mutex
(fun () ->
if g # is_terminating then
invalid_arg "add_close_action: the group is terminated";
(* CHECK: Maybe we should fail if g is a different group than
* the existing group
*)
if self#exists_descriptor_wl d then
Hashtbl.replace close_tab d (g,a)
(* There can be only one close action.
* TODO: Rename to set_close_action
*)
else
failwith "add_close_action"
)
method add_abort_action g a =
while_locked mutex
(fun () ->
if g # is_terminating then
invalid_arg "add_abort_action: the group is terminated";
abort_tab <- (g,a) :: abort_tab
)
method add_event e =
while_locked mutex
(fun () -> self # add_event_wl e)
method private add_event_wl e =
Equeue.add_event (Lazy.force sys) e;
pset # cancel_wait true
(* Set the cancel bit, so that any pending [wait] is interrupted *)
method private uq_handler (esys : event Equeue.t) ev =
(* locking: it is assumed that we do not have the lock when uq_handler
is called. It is a callback from Equeue
*)
(* The single Unixqueue handler. For all added (sub) handlers, uq_handler
* gets the events, and delivers them
*)
let terminate_handler_wl g h =
dlogr
(fun() ->
(sprintf "uq_handler <terminating handler group %d, handler %d>"
(Oo.id g) (Oo.id h)));
let hlist =
try Hashtbl.find handlers g with Not_found -> [] in
let hlist' =
List.filter (fun h' -> h' <> h) hlist in
if hlist' = [] then (
Hashtbl.remove handlers g;
handled_groups <- handled_groups - 1;
if handled_groups = 0 then (
dlogr (fun () -> "uq_handler <self-terminating>");
raise Equeue.Terminate (* delete uq_handler from esys *)
)
) else (
Hashtbl.replace handlers g hlist'
)
in
let rec forward_event_to g (hlist : ohandler list) =
(* The caller does not have the lock when this fn is called! *)
match hlist with
[] ->
dlogr (fun () -> "uq_handler <empty list>");
raise Equeue.Reject
| h :: hlist' ->
( try
(* Note: ues _must not_ be locked now *)
dlogr
(fun () ->
(sprintf
"uq_handler <invoke handler group %d, handler %d>"
(Oo.id g) (Oo.id h)));
h#run (self :> event_system) esys ev;
dlogr
(fun () ->
(sprintf
"uq_handler <invoke_success handler group %d, handler %d>"
(Oo.id g) (Oo.id h)));
with
Equeue.Reject ->
forward_event_to g hlist'
| Equeue.Terminate ->
(* Terminate only this handler. *)
while_locked mutex
(fun () -> terminate_handler_wl g h)
(* Any error exceptions simply fall through. Equeue will
* catch them, and will add the event to the error queue
*)
)
in
let forward_event g =
(* The caller does not have the lock when this fn is called! *)
dlogr
(fun () ->
(sprintf "uq_handler <forward_event group %d>" (Oo.id g)));
let hlist =
while_locked mutex
(fun () ->
try Hashtbl.find handlers g with Not_found -> []) in
forward_event_to g hlist
in
let forward_event_to_all() =
(* The caller does not have the lock when this fn is called! *)
let hlist_all =
while_locked mutex
(fun () ->
Hashtbl.fold (fun g hlist l -> (g,hlist) :: l) handlers []) in
try
List.iter
(fun (g,hlist) ->
try
forward_event_to g hlist;
raise Exit_loop (* event is delivered, so exit iteration *)
with
(* event is rejected: try next group *)
Equeue.Reject -> ()
)
hlist_all;
raise Equeue.Reject (* no handler has accepted the event, so reject *)
with
Exit_loop -> ()
in
dlogr
(fun () ->
(sprintf "uq_handler <event %s>"
(string_of_event ev)));
match ev with
| Extra (Term g) ->
(* Terminate all handlers of group g *)
while_locked mutex
(fun () ->
if Hashtbl.mem handlers g then (
dlogr
(fun () ->
(sprintf "uq_handler <terminating group %d>" (Oo.id g)));
Hashtbl.remove handlers g;
handled_groups <- handled_groups - 1;
if handled_groups = 0 then (
dlogr (fun () -> "uq_handler <self-terminating>");
raise Equeue.Terminate (* delete uq_handler from esys *)
)
)
else raise Equeue.Reject (* strange, should not happen *)
)
| Extra Keep_alive ->
raise Equeue.Reject
| Input_arrived(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Output_readiness(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Out_of_band(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Timeout(g,_) ->
if g # is_terminating then raise Equeue.Reject;
forward_event g;
| Signal ->
forward_event_to_all();
| Extra x ->
forward_event_to_all();
method private equeue_add_handler () =
Equeue.add_handler (Lazy.force sys) self#uq_handler
(* CHECK: There is a small difference between Equeue.add_handler and
* this add_handler: Here, the handler is immediately active (if
* uq_handler is already active). Can this lead to problems?
*)
method add_handler g h =
while_locked mutex
(fun () ->
let oh = new ohandler h in
dlogr
(fun () ->
(sprintf
"add_handler <group %d, handler %d>" (Oo.id g) (Oo.id oh)));
if g # is_terminating then
invalid_arg "Unixqueue.add_handler: the group is terminated";
( try
let old_handlers = Hashtbl.find handlers g in
Hashtbl.replace handlers g (oh :: old_handlers)
with
| Not_found ->
(* The group g is new *)
Hashtbl.add handlers g [oh];
handled_groups <- handled_groups + 1;
if handled_groups = 1 then
self#equeue_add_handler ()
)
)
method clear g =
while_locked mutex
(fun () -> self # clear_wl g)
method private clear_wl g =
dlogr (fun () -> (sprintf "clear <group %d>" (Oo.id g)));
(* Set that g is terminating now: *)
g # terminate();
(* (i) delete all resources of g: *)
let ops =
Hashtbl.fold
(fun op (_,_,g') l -> if g=g' then op::l else l)
tmo_of_op
[] in
List.iter
self#sched_remove_wl
ops;
List.iter
self#pset_remove_wl
ops;
(* (ii) delete all handlers of g: *)
self#add_event_wl (Extra (Term g));
(* side effect: we also interrupt [wait] *)
(* (iii) delete special actions of g: *)
let to_remove = (* remove from close_tab *)
Hashtbl.fold
(fun d (g',_) l -> if g = g' then d :: l else l) close_tab [] in
List.iter
(Hashtbl.remove close_tab) to_remove;
abort_tab <- List.filter (fun (g',_) -> g<>g') abort_tab;
(* Note: the Term event isn't caught after all handlers have been
* deleted. The Equeue module simply discards events that are not
* handled.
*)
if not waiting && Hashtbl.length tmo_of_op = 0 then
pset # dispose();
method private abort g ex =
(* caller doesn't have the lock *)
(* Note: If g has been terminated, the abort action is removed. So
* we will never find here one.
*)
dlogr (fun () -> (sprintf "abort <group %d, exception %s>"
(Oo.id g) (Netexn.to_string ex)));
let action =
while_locked mutex
(fun () ->
try Some (List.assoc g abort_tab) with Not_found -> None) in
match action with
| Some a ->
begin
dlogr (fun () -> "abort <running abort action>");
let mistake = ref None in
while_locked mutex
(fun () -> aborting <- true);
begin try
a g ex;
with
| any ->
mistake := Some any (* Wow *)
end;
while_locked mutex
(fun () ->
self#clear_wl g;
aborting <- false
);
match !mistake with
| None -> ()
| Some m ->
dlogr (fun () -> (sprintf "abort <propagating exception %s>"
(Netexn.to_string m)));
raise m
end
| None ->
()
method run () =
(* caller doesn't have the lock *)
let continue = ref true in
try
while !continue do
continue := false;
try
Equeue.run (Lazy.force sys);
with
| Abort (g,an_exception) ->
begin
match an_exception with
| (Equeue.Reject|Equeue.Terminate) ->
(* A serious programming error: *)
failwith "Caught 'Abort' exception with Reject or Terminate exception as argument; this is a programming error"
| Abort(_,_) ->
failwith "Caught 'Abort' exception with an 'Abort' exception as argument; this is a programming error"
| _ -> ()
end;
self#abort g an_exception;
continue := true
done;
with
| error ->
raise error
method is_running =
Equeue.is_running (Lazy.force sys)
end
let pollset_event_system pset =
(new pollset_event_system pset :> Unixqueue_util.event_system)