(* $Id: uq_lwt.ml 1616 2011-06-10 15:08:57Z gerd $ *)
open Printf
let fd_of_op =
function
| Unixqueue.Wait_in fd -> fd
| Unixqueue.Wait_out fd -> fd
| _ -> assert false
exception Esys_exit
class lwt_backend (esys:Unixqueue.event_system) =
let g = Unixqueue.new_group esys in
let rd_ht = Hashtbl.create 5 in
let wr_ht = Hashtbl.create 5 in
let timers = Hashtbl.create 5 in
let add ht fd x =
let l =
try Hashtbl.find ht fd with Not_found -> [] in
let l' =
x :: l in
Hashtbl.replace ht fd l' in
let add_res ht op =
let fd = fd_of_op op in
if Hashtbl.mem ht fd then
Unixqueue.add_resource esys g (op, (-1.0)) in
let remove ht fd x =
let l =
try Hashtbl.find ht fd with Not_found -> [] in
let l' =
List.filter (fun y -> y != x) l in
if l' = [] then
Hashtbl.remove ht fd
else
Hashtbl.replace ht fd l' in
let remove_res ht op =
let fd = fd_of_op op in
if not(Hashtbl.mem ht fd) then
Unixqueue.remove_resource esys g op in
object(self)
initializer (
Unixqueue.add_handler esys g self#handler
)
method private handler _ _ ev =
let l =
match ev with
| Unixqueue.Input_arrived(_,fd) ->
( try Hashtbl.find rd_ht fd with Not_found -> [] )
| Unixqueue.Output_readiness(_,fd) ->
( try Hashtbl.find wr_ht fd with Not_found -> [] )
| _ ->
raise Equeue.Reject in
let l1 = List.rev l in
match l1 with
| [] -> ()
| [f] -> f()
| _ ->
(* Run from the event queue, so exceptions are separately handled *)
List.iter
(fun f -> Unixqueue.once esys g 0.0 f)
l1
method private register_readable fd f =
(* eprintf "+reg_rd fd=%Ld\n%!" (Netsys.int64_of_file_descr fd); *)
let op = Unixqueue.Wait_in fd in
add rd_ht fd f;
add_res rd_ht op;
lazy (
(* eprintf "-reg_rd fd=%Ld\n%!" (Netsys.int64_of_file_descr fd); *)
remove rd_ht fd f; remove_res rd_ht op)
method private register_writable fd f =
(* eprintf "+reg_wr fd=%Ld\n%!" (Netsys.int64_of_file_descr fd);*)
let op = Unixqueue.Wait_out fd in
add wr_ht fd f;
add_res wr_ht op;
lazy (
(* eprintf "-reg_wr fd=%Ld\n%!" (Netsys.int64_of_file_descr fd);*)
remove wr_ht fd f; remove_res wr_ht op)
method private register_timer tmo repeat_flag f =
let tg = Unixqueue.new_group esys in
let rec loop() =
Unixqueue.once esys tg tmo
(fun () ->
if repeat_flag then
loop()
else
Hashtbl.remove timers tg;
f()
) in
loop();
Hashtbl.add timers tg ();
lazy (Unixqueue.clear esys tg; Hashtbl.remove timers tg)
method private cleanup =
Unixqueue.clear esys g;
Hashtbl.iter (fun tg _ -> Unixqueue.clear esys tg) timers;
Hashtbl.clear rd_ht;
Hashtbl.clear wr_ht;
Hashtbl.clear timers
method iter block =
let n = if block then 2 else 1 in
let k = ref 0 in
try
esys # when_blocking
(fun () ->
incr k;
if !k >= n then (
esys # when_blocking (fun () -> ());
raise Esys_exit
)
);
Unixqueue.run esys
with
| Esys_exit -> ()
end