(* $Id: pfs_condition.ml 435 2011-10-07 14:32:54Z gerd $ *)
(* Literature:
"Implementing Condition Variables with Semaphores", by Andrew D. Birrell,
Microsoft Research Silicon Valley, January 2003
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.125.3384&rep=rep1&type=pdf
*)
open Uq_engines.Operators
open Printf
let dlogr = Plasma_util.dlogr
let dlog = Plasma_util.dlog
type condition =
{ dummy_cond : bool;
mutable waiters : wait_entry;
mutable null : wait_entry; (* means: "no more entry" *)
mutable lock : Netmcore_sem.semaphore;
}
and wait_entry =
{ mutable empty : bool;
(* if empty, this entry is the last in the set, and it is considered
as representing no waiter
*)
mutable sem : Netmcore_sem.semaphore;
mutable pipe : string;
mutable next : wait_entry; (* may be cond.null *)
mutable set_next : wait_entry; (* not meaningful if [empty] *)
}
and wait_set =
{ dummy_set : bool;
mutable alloc_lock : Netmcore_sem.semaphore;
mutable head : wait_entry;
prefix : string
}
let empty_wait_entry() =
let rec we =
{ empty = true;
sem = Netmcore_sem.dummy();
next = we;
set_next = we;
pipe = "";
} in
we
let dummy_condition () =
{ dummy_cond = true;
waiters = empty_wait_entry ();
null = empty_wait_entry ();
lock = Netmcore_sem.dummy()
}
let dummy_wait_set() =
{ dummy_set = true;
alloc_lock = Netmcore_sem.dummy();
head = empty_wait_entry();
prefix = ""
}
let create_condition mut =
let null = empty_wait_entry () in
let cond_orig =
{ dummy_cond = false;
waiters = null;
null = null;
lock = Netmcore_sem.dummy()
} in
let cond = Netmcore_heap.add mut cond_orig in
Netmcore_heap.pin mut cond;
cond.lock <- Netmcore_sem.create mut 1;
cond
let destroy_condition c =
if not c.dummy_cond then (
Netmcore_sem.destroy c.lock
)
let create_wait_set mut prefix =
let wset_orig =
{ dummy_set = false;
alloc_lock = Netmcore_sem.dummy();
head = empty_wait_entry();
prefix = prefix;
} in
let wset = Netmcore_heap.add mut wset_orig in
Netmcore_heap.pin mut wset;
wset.alloc_lock <- Netmcore_sem.create mut 1;
wset
let destroy_wait_set wset =
if not wset.dummy_set then (
let we = ref wset.head in
Netmcore_sem.destroy !we.sem;
while not !we.empty do
we := !we.set_next;
Netmcore_sem.destroy !we.sem;
done;
Netmcore_sem.destroy wset.alloc_lock
)
let with_alloc_lock wset f =
Netmcore_sem.wait wset.alloc_lock Netsys_posix.SEM_WAIT_BLOCK;
try
let r = f () in
Netmcore_sem.post wset.alloc_lock;
r
with error ->
Netmcore_sem.post wset.alloc_lock;
raise error
let create_fifo prefix =
let p = Netsys_tmp.tmp_prefix prefix in
let rec loop n =
try
let n = p ^ "_" ^ string_of_int n in
Unix.mkfifo n 0o600;
n
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
loop(n+1) in
loop 0
let alloc_wait_entry mut wset =
if wset.dummy_set then
failwith "Netmcore_condition.alloc_wait_entry: dummy wait_set";
with_alloc_lock wset
(fun () ->
(* not really fast *)
let we = ref wset.head in
while not !we.empty do
we := !we.set_next
done;
let tail_orig = empty_wait_entry () in
let tail = Netmcore_heap.add mut tail_orig in
!we.set_next <- tail;
!we.empty <- false;
!we.sem <- Netmcore_sem.create mut 0;
!we.pipe <- Netmcore_heap.add mut (create_fifo wset.prefix);
!we
)
let free_wait_entry mut wset we_to_free =
if wset.dummy_set then
failwith "Netmcore_condition.free_wait_entry: dummy wait_set";
with_alloc_lock wset
(fun () ->
(* not really fast *)
let we = ref wset.head in
let prev = ref None in
while not !we.empty && !we != we_to_free do
prev := Some !we;
we := !we.set_next
done;
if !we.empty then
failwith "Netmcore_condition.free_wait_entry: not found";
( match !prev with
| None ->
wset.head <- !we.set_next
| Some p ->
p.set_next <- !we.set_next
);
!we.set_next <- !we;
!we.next <- !we;
( try Unix.unlink !we.pipe with _ -> () );
)
let pipe_file we =
we.pipe
let wait_e we c m esys =
dlog "Pfs_condition.wait_e";
if c.dummy_cond then
failwith "Netmcore_condition.wait_e: dummy condition";
if we.empty then
failwith "Netmcore_condition.wait_e: this is the reserved guard wait_entry";
if we.next != we then
failwith "Netmcore_condition.wait_e: the wait entry is being used";
let fd = Unix.openfile we.pipe [Unix.O_RDONLY; Unix.O_NONBLOCK] 0 in
let fd_open = ref true in
let close() =
if !fd_open then Unix.close fd;
fd_open := false in
Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK;
let old_waiters = c.waiters in
c.waiters <- we;
we.next <- old_waiters;
Netmcore_sem.post c.lock;
dlog "Pfs_condition.wait_e: unlocking mutex";
Netmcore_mutex.unlock m;
dlog "Pfs_condition.wait_e: waiting for pipe signal";
let e =
new Uq_engines.poll_engine [ Unixqueue.Wait_in fd, (-1.0) ] esys in
e >>
(fun st ->
match st with
| `Done ev ->
dlog "Pfs_condition.wait_e: got pipe signal";
let s = String.make 1 ' ' in
ignore(Unix.read fd s 0 1);
close();
Netmcore_sem.wait we.sem Netsys_posix.SEM_WAIT_BLOCK;
dlog "Pfs_condition.wait_e: locking mutex";
Netmcore_mutex.lock m;
`Done ()
| (`Error _ | `Aborted) as st ->
close(); st
)
let post c =
dlog "Pfs_condition.post";
let we = c.waiters in
assert(not we.empty);
c.waiters <- we.next;
we.next <- we;
Netmcore_sem.post we.sem;
let fd = Unix.openfile we.pipe [Unix.O_WRONLY; Unix.O_NONBLOCK] 0 in
try
let s = "X" in
ignore(Unix.single_write fd s 0 1);
Unix.close fd
with error ->
Unix.close fd; raise error
let signal c =
if c.dummy_cond then
failwith "Netmcore_condition.signal: dummy condition";
Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK;
try
if c.waiters != c.null then
post c;
Netmcore_sem.post c.lock
with
| error ->
Netmcore_sem.post c.lock;
raise error
let broadcast c =
if c.dummy_cond then
failwith "Netmcore_condition.broadcast: dummy condition";
Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK;
try
while c.waiters != c.null do
post c
done;
Netmcore_sem.post c.lock
with
| error ->
Netmcore_sem.post c.lock;
raise error
type ser =
{ mutable ser_m : Netmcore_mutex.mutex;
mutable ser_c : condition;
mutable ser_busy : bool;
mutable ser_ws : wait_set;
}
let serializer_heap pool =
let s0 =
{ ser_m = Netmcore_mutex.dummy();
ser_c = dummy_condition();
ser_busy = false;
ser_ws = dummy_wait_set();
} in
let sref = Netmcore_ref.sref pool s0 in
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
s.ser_m <- Netmcore_mutex.create mut `Errorcheck;
s.ser_c <- create_condition mut;
s.ser_ws <- create_wait_set mut "/tmp/plasmacond-"; (* FIXME *)
);
sref
let serializer sref esys name : _ Uq_engines.serializer_t =
let local_serializer =
Uq_engines.serializer esys in
let we_unused = Queue.create () in
( object(self)
method private proc_serialized f =
let rec wait_loop_e we () =
let busy = Netmcore_ref.deref_p sref (fun s -> s.ser_busy) in
if busy then (
dlogr
(fun () ->
sprintf
"Pfs_condition.serializer %s: is busy, need to wait" name);
let e =
Netmcore_ref.deref_p sref
(fun s -> wait_e we s.ser_c s.ser_m esys) in
e ++ wait_loop_e we
)
else (
dlogr
(fun () ->
sprintf
"Pfs_condition.serializer %s: is idle, no waiting" name);
eps_e (`Done()) esys
) in
let we =
try Queue.take we_unused
with Queue.Empty ->
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
let we = alloc_wait_entry mut s.ser_ws in
Pfs_pmanage.register_file (String.copy we.pipe);
we
) in
let dealloc _ =
(*
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
free_wait_entry mut s.ser_ws we
) in
*)
Queue.add we we_unused in
dlogr
(fun () -> sprintf "Pfs_condition.serializer %s: locking" name);
Netmcore_ref.deref_p sref
(fun s -> Netmcore_mutex.lock s.ser_m);
dlogr
(fun () ->
sprintf
"Pfs_condition.serializer %s: checking for potential waiters"
name);
let e =
wait_loop_e we ()
++ (fun () ->
dlogr
(fun () ->
sprintf
"Pfs_condition.serializer %s: busy / unlocking" name);
Netmcore_ref.deref_p sref
(fun s ->
s.ser_busy <- true;
Netmcore_mutex.unlock s.ser_m;
);
let post_action _ =
Netmcore_ref.deref_p sref
(fun s ->
dlogr
(fun () ->
sprintf
"Pfs_condition.serializer %s: getting idle" name);
Netmcore_mutex.lock s.ser_m;
s.ser_busy <- false;
signal s.ser_c;
Netmcore_mutex.unlock s.ser_m;
dlogr
(fun () ->
sprintf
"Pfs_condition.serializer %s: idle" name);
) in
let r = f esys in
Uq_engines.when_state
~is_done:post_action
~is_error:post_action
~is_aborted:post_action
r;
r
) in
Uq_engines.when_state
~is_done:dealloc
~is_error:dealloc
~is_aborted:dealloc
e;
e
method serialized f =
(* First select one of the event threads in the local process,
and then compete against the other processes. Otherwise we
will deadlock on s.ser_m.
*)
local_serializer # serialized
(fun _ ->
self # proc_serialized f
)
end
)
class type ['a] int64_serializer_t =
object
method serialized : int64 -> (Unixqueue.event_system -> 'a Uq_engines.engine) -> 'a Uq_engines.engine
method multi_serialized : int64 list -> (Unixqueue.event_system -> 'a Uq_engines.engine) ->
'a Uq_engines.engine
end
module Int64Set =
Set.Make(struct
type t = int64
let compare = compare
end)
type int64_ser_element =
{ mutable iser_m : Netmcore_mutex.mutex;
mutable iser_c : condition;
mutable iser_set : Int64Set.t; (* The set of [i] values currently locked *)
}
type int64_ser =
{ iser : int64_ser_element array;
mutable iser_ws : wait_set;
}
let int64_serializer_heap pool n =
let s0 =
{ iser =
Array.init n
(fun k ->
{ iser_m = Netmcore_mutex.dummy();
iser_c = dummy_condition();
iser_set = Int64Set.empty;
}
);
iser_ws = dummy_wait_set();
} in
let sref = Netmcore_ref.sref pool s0 in
(* dlogf "init root=%nx" (Netsys_mem.obj_address (Netmcore_heap.root (Netmcore_ref.heap sref))); *)
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
Array.iter
(fun sk ->
sk.iser_m <- Netmcore_mutex.create mut `Errorcheck;
sk.iser_c <- create_condition mut;
)
s.iser;
s.iser_ws <- create_wait_set mut "/tmp/plasmacond-"; (* FIXME *)
);
sref
let int64_serializer sref esys name : _ int64_serializer_t =
(* For this implementation it is essential that the same process never
locks several times (resulting in a deadlock). When we pass over control
with ">>" and "++" it is ensured that no other execution thread can
run in parallel (where eps_e would not guarantee this).
*)
let n =
Netmcore_ref.deref_p sref (fun s -> Array.length s.iser) in
let we_unused = Queue.create () in
( object(self)
method private proc_serialized i k f =
let add_i() =
dlogr
(fun () ->
sprintf
"Pfs_condition.int_serializer %s: busy / unlocking" name);
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
let iser = s.iser.(k) in
(* Netmcore_heap.pin mut iser; *) (* careful *)
let set' = Int64Set.add i iser.iser_set in
let set'' = Netmcore_heap.add mut set' in
iser.iser_set <- set'';
Netmcore_mutex.unlock iser.iser_m;
) in
let del_notify_i() =
let s = Netmcore_ref.deref_ro sref in
dlogr
(fun () ->
sprintf "Pfs_condition.int_serializer %s: getting idle" name);
Netmcore_mutex.lock s.iser.(k).iser_m;
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let iser = s.iser.(k) in
Netmcore_heap.pin mut iser; (* careful *)
iser.iser_set <-
Netmcore_heap.add mut
(Int64Set.remove i iser.iser_set);
(* We do not know exactly who is waiting, and need to wake
up all waiters. Btw this broadcast is the reason for the
array.
*)
broadcast iser.iser_c;
Netmcore_mutex.unlock iser.iser_m;
);
dlogr
(fun () -> sprintf "Pfs_condition.int_serializer %s: idle" name) in
let rec wait_loop_e we () =
let busy =
(* we need [modify] because a new iser_set may be being built right
now
*)
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
Int64Set.mem i s.iser.(k).iser_set
) in
if busy then (
dlogr
(fun () ->
sprintf
"Pfs_condition.int_serializer %s: is busy, need to wait"
name);
let s = Netmcore_ref.deref_ro sref in
let e = wait_e we s.iser.(k).iser_c s.iser.(k).iser_m esys in
e ++ wait_loop_e we
)
else (
dlogr
(fun () ->
sprintf
"Pfs_condition.int_serializer %s: is idle, no waiting" name);
add_i();
eps_e (`Done()) esys
) in
let we =
try Queue.take we_unused
with Queue.Empty ->
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
let we = alloc_wait_entry mut s.iser_ws in
Pfs_pmanage.register_file (String.copy we.pipe);
we
) in
let dealloc _ =
(*
Netmcore_heap.modify
(Netmcore_ref.heap sref)
(fun mut ->
let s = Netmcore_ref.deref_ro sref in
free_wait_entry mut s.ser_ws we
) in
*)
Queue.add we we_unused in
dlogr
(fun () ->
sprintf "Pfs_condition.int_serializer %s: locking" name);
Netmcore_ref.deref_p sref
(fun s -> Netmcore_mutex.lock s.iser.(k).iser_m);
dlogr
(fun () ->
sprintf
"Pfs_condition.int_serializer %s: checking for potential waiters"
name);
let e =
wait_loop_e we ()
++ (fun () ->
let post_action _ = del_notify_i() in
let r = f esys in
Uq_engines.when_state
~is_done:post_action
~is_error:post_action
~is_aborted:post_action
r;
r
) in
Uq_engines.when_state
~is_done:dealloc
~is_error:dealloc
~is_aborted:dealloc
e;
e
method serialized i f =
let k = Int64.to_int(Int64.rem i (Int64.of_int n)) in
self # proc_serialized i k f
method multi_serialized l f =
let rec loop l =
match l with
| [] -> f esys
| i :: l' -> self # serialized i (fun _ -> loop l') in
(* Sorting the list prevents deadlocks *)
let l_sorted = List.sort compare l in
loop l_sorted
end
)