(* $Id: pfs_condition.ml 435 2011-10-07 14:32:54Z gerd $ *) (* Literature: "Implementing Condition Variables with Semaphores", by Andrew D. Birrell, Microsoft Research Silicon Valley, January 2003 http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.125.3384&rep=rep1&type=pdf *) open Uq_engines.Operators open Printf let dlogr = Plasma_util.dlogr let dlog = Plasma_util.dlog type condition = { dummy_cond : bool; mutable waiters : wait_entry; mutable null : wait_entry; (* means: "no more entry" *) mutable lock : Netmcore_sem.semaphore; } and wait_entry = { mutable empty : bool; (* if empty, this entry is the last in the set, and it is considered as representing no waiter *) mutable sem : Netmcore_sem.semaphore; mutable pipe : string; mutable next : wait_entry; (* may be cond.null *) mutable set_next : wait_entry; (* not meaningful if [empty] *) } and wait_set = { dummy_set : bool; mutable alloc_lock : Netmcore_sem.semaphore; mutable head : wait_entry; prefix : string } let empty_wait_entry() = let rec we = { empty = true; sem = Netmcore_sem.dummy(); next = we; set_next = we; pipe = ""; } in we let dummy_condition () = { dummy_cond = true; waiters = empty_wait_entry (); null = empty_wait_entry (); lock = Netmcore_sem.dummy() } let dummy_wait_set() = { dummy_set = true; alloc_lock = Netmcore_sem.dummy(); head = empty_wait_entry(); prefix = "" } let create_condition mut = let null = empty_wait_entry () in let cond_orig = { dummy_cond = false; waiters = null; null = null; lock = Netmcore_sem.dummy() } in let cond = Netmcore_heap.add mut cond_orig in Netmcore_heap.pin mut cond; cond.lock <- Netmcore_sem.create mut 1; cond let destroy_condition c = if not c.dummy_cond then ( Netmcore_sem.destroy c.lock ) let create_wait_set mut prefix = let wset_orig = { dummy_set = false; alloc_lock = Netmcore_sem.dummy(); head = empty_wait_entry(); prefix = prefix; } in let wset = Netmcore_heap.add mut wset_orig in Netmcore_heap.pin mut wset; wset.alloc_lock <- Netmcore_sem.create mut 1; wset let destroy_wait_set wset = if not wset.dummy_set then ( let we = ref wset.head in Netmcore_sem.destroy !we.sem; while not !we.empty do we := !we.set_next; Netmcore_sem.destroy !we.sem; done; Netmcore_sem.destroy wset.alloc_lock ) let with_alloc_lock wset f = Netmcore_sem.wait wset.alloc_lock Netsys_posix.SEM_WAIT_BLOCK; try let r = f () in Netmcore_sem.post wset.alloc_lock; r with error -> Netmcore_sem.post wset.alloc_lock; raise error let create_fifo prefix = let p = Netsys_tmp.tmp_prefix prefix in let rec loop n = try let n = p ^ "_" ^ string_of_int n in Unix.mkfifo n 0o600; n with | Unix.Unix_error(Unix.EEXIST,_,_) -> loop(n+1) in loop 0 let alloc_wait_entry mut wset = if wset.dummy_set then failwith "Netmcore_condition.alloc_wait_entry: dummy wait_set"; with_alloc_lock wset (fun () -> (* not really fast *) let we = ref wset.head in while not !we.empty do we := !we.set_next done; let tail_orig = empty_wait_entry () in let tail = Netmcore_heap.add mut tail_orig in !we.set_next <- tail; !we.empty <- false; !we.sem <- Netmcore_sem.create mut 0; !we.pipe <- Netmcore_heap.add mut (create_fifo wset.prefix); !we ) let free_wait_entry mut wset we_to_free = if wset.dummy_set then failwith "Netmcore_condition.free_wait_entry: dummy wait_set"; with_alloc_lock wset (fun () -> (* not really fast *) let we = ref wset.head in let prev = ref None in while not !we.empty && !we != we_to_free do prev := Some !we; we := !we.set_next done; if !we.empty then failwith "Netmcore_condition.free_wait_entry: not found"; ( match !prev with | None -> wset.head <- !we.set_next | Some p -> p.set_next <- !we.set_next ); !we.set_next <- !we; !we.next <- !we; ( try Unix.unlink !we.pipe with _ -> () ); ) let pipe_file we = we.pipe let wait_e we c m esys = dlog "Pfs_condition.wait_e"; if c.dummy_cond then failwith "Netmcore_condition.wait_e: dummy condition"; if we.empty then failwith "Netmcore_condition.wait_e: this is the reserved guard wait_entry"; if we.next != we then failwith "Netmcore_condition.wait_e: the wait entry is being used"; let fd = Unix.openfile we.pipe [Unix.O_RDONLY; Unix.O_NONBLOCK] 0 in let fd_open = ref true in let close() = if !fd_open then Unix.close fd; fd_open := false in Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK; let old_waiters = c.waiters in c.waiters <- we; we.next <- old_waiters; Netmcore_sem.post c.lock; dlog "Pfs_condition.wait_e: unlocking mutex"; Netmcore_mutex.unlock m; dlog "Pfs_condition.wait_e: waiting for pipe signal"; let e = new Uq_engines.poll_engine [ Unixqueue.Wait_in fd, (-1.0) ] esys in e >> (fun st -> match st with | `Done ev -> dlog "Pfs_condition.wait_e: got pipe signal"; let s = String.make 1 ' ' in ignore(Unix.read fd s 0 1); close(); Netmcore_sem.wait we.sem Netsys_posix.SEM_WAIT_BLOCK; dlog "Pfs_condition.wait_e: locking mutex"; Netmcore_mutex.lock m; `Done () | (`Error _ | `Aborted) as st -> close(); st ) let post c = dlog "Pfs_condition.post"; let we = c.waiters in assert(not we.empty); c.waiters <- we.next; we.next <- we; Netmcore_sem.post we.sem; let fd = Unix.openfile we.pipe [Unix.O_WRONLY; Unix.O_NONBLOCK] 0 in try let s = "X" in ignore(Unix.single_write fd s 0 1); Unix.close fd with error -> Unix.close fd; raise error let signal c = if c.dummy_cond then failwith "Netmcore_condition.signal: dummy condition"; Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK; try if c.waiters != c.null then post c; Netmcore_sem.post c.lock with | error -> Netmcore_sem.post c.lock; raise error let broadcast c = if c.dummy_cond then failwith "Netmcore_condition.broadcast: dummy condition"; Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK; try while c.waiters != c.null do post c done; Netmcore_sem.post c.lock with | error -> Netmcore_sem.post c.lock; raise error type ser = { mutable ser_m : Netmcore_mutex.mutex; mutable ser_c : condition; mutable ser_busy : bool; mutable ser_ws : wait_set; } let serializer_heap pool = let s0 = { ser_m = Netmcore_mutex.dummy(); ser_c = dummy_condition(); ser_busy = false; ser_ws = dummy_wait_set(); } in let sref = Netmcore_ref.sref pool s0 in Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in s.ser_m <- Netmcore_mutex.create mut `Errorcheck; s.ser_c <- create_condition mut; s.ser_ws <- create_wait_set mut "/tmp/plasmacond-"; (* FIXME *) ); sref let serializer sref esys name : _ Uq_engines.serializer_t = let local_serializer = Uq_engines.serializer esys in let we_unused = Queue.create () in ( object(self) method private proc_serialized f = let rec wait_loop_e we () = let busy = Netmcore_ref.deref_p sref (fun s -> s.ser_busy) in if busy then ( dlogr (fun () -> sprintf "Pfs_condition.serializer %s: is busy, need to wait" name); let e = Netmcore_ref.deref_p sref (fun s -> wait_e we s.ser_c s.ser_m esys) in e ++ wait_loop_e we ) else ( dlogr (fun () -> sprintf "Pfs_condition.serializer %s: is idle, no waiting" name); eps_e (`Done()) esys ) in let we = try Queue.take we_unused with Queue.Empty -> Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in let we = alloc_wait_entry mut s.ser_ws in Pfs_pmanage.register_file (String.copy we.pipe); we ) in let dealloc _ = (* Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in free_wait_entry mut s.ser_ws we ) in *) Queue.add we we_unused in dlogr (fun () -> sprintf "Pfs_condition.serializer %s: locking" name); Netmcore_ref.deref_p sref (fun s -> Netmcore_mutex.lock s.ser_m); dlogr (fun () -> sprintf "Pfs_condition.serializer %s: checking for potential waiters" name); let e = wait_loop_e we () ++ (fun () -> dlogr (fun () -> sprintf "Pfs_condition.serializer %s: busy / unlocking" name); Netmcore_ref.deref_p sref (fun s -> s.ser_busy <- true; Netmcore_mutex.unlock s.ser_m; ); let post_action _ = Netmcore_ref.deref_p sref (fun s -> dlogr (fun () -> sprintf "Pfs_condition.serializer %s: getting idle" name); Netmcore_mutex.lock s.ser_m; s.ser_busy <- false; signal s.ser_c; Netmcore_mutex.unlock s.ser_m; dlogr (fun () -> sprintf "Pfs_condition.serializer %s: idle" name); ) in let r = f esys in Uq_engines.when_state ~is_done:post_action ~is_error:post_action ~is_aborted:post_action r; r ) in Uq_engines.when_state ~is_done:dealloc ~is_error:dealloc ~is_aborted:dealloc e; e method serialized f = (* First select one of the event threads in the local process, and then compete against the other processes. Otherwise we will deadlock on s.ser_m. *) local_serializer # serialized (fun _ -> self # proc_serialized f ) end ) class type ['a] int64_serializer_t = object method serialized : int64 -> (Unixqueue.event_system -> 'a Uq_engines.engine) -> 'a Uq_engines.engine method multi_serialized : int64 list -> (Unixqueue.event_system -> 'a Uq_engines.engine) -> 'a Uq_engines.engine end module Int64Set = Set.Make(struct type t = int64 let compare = compare end) type int64_ser_element = { mutable iser_m : Netmcore_mutex.mutex; mutable iser_c : condition; mutable iser_set : Int64Set.t; (* The set of [i] values currently locked *) } type int64_ser = { iser : int64_ser_element array; mutable iser_ws : wait_set; } let int64_serializer_heap pool n = let s0 = { iser = Array.init n (fun k -> { iser_m = Netmcore_mutex.dummy(); iser_c = dummy_condition(); iser_set = Int64Set.empty; } ); iser_ws = dummy_wait_set(); } in let sref = Netmcore_ref.sref pool s0 in (* dlogf "init root=%nx" (Netsys_mem.obj_address (Netmcore_heap.root (Netmcore_ref.heap sref))); *) Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in Array.iter (fun sk -> sk.iser_m <- Netmcore_mutex.create mut `Errorcheck; sk.iser_c <- create_condition mut; ) s.iser; s.iser_ws <- create_wait_set mut "/tmp/plasmacond-"; (* FIXME *) ); sref let int64_serializer sref esys name : _ int64_serializer_t = (* For this implementation it is essential that the same process never locks several times (resulting in a deadlock). When we pass over control with ">>" and "++" it is ensured that no other execution thread can run in parallel (where eps_e would not guarantee this). *) let n = Netmcore_ref.deref_p sref (fun s -> Array.length s.iser) in let we_unused = Queue.create () in ( object(self) method private proc_serialized i k f = let add_i() = dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: busy / unlocking" name); Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in let iser = s.iser.(k) in (* Netmcore_heap.pin mut iser; *) (* careful *) let set' = Int64Set.add i iser.iser_set in let set'' = Netmcore_heap.add mut set' in iser.iser_set <- set''; Netmcore_mutex.unlock iser.iser_m; ) in let del_notify_i() = let s = Netmcore_ref.deref_ro sref in dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: getting idle" name); Netmcore_mutex.lock s.iser.(k).iser_m; Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let iser = s.iser.(k) in Netmcore_heap.pin mut iser; (* careful *) iser.iser_set <- Netmcore_heap.add mut (Int64Set.remove i iser.iser_set); (* We do not know exactly who is waiting, and need to wake up all waiters. Btw this broadcast is the reason for the array. *) broadcast iser.iser_c; Netmcore_mutex.unlock iser.iser_m; ); dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: idle" name) in let rec wait_loop_e we () = let busy = (* we need [modify] because a new iser_set may be being built right now *) Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in Int64Set.mem i s.iser.(k).iser_set ) in if busy then ( dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: is busy, need to wait" name); let s = Netmcore_ref.deref_ro sref in let e = wait_e we s.iser.(k).iser_c s.iser.(k).iser_m esys in e ++ wait_loop_e we ) else ( dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: is idle, no waiting" name); add_i(); eps_e (`Done()) esys ) in let we = try Queue.take we_unused with Queue.Empty -> Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in let we = alloc_wait_entry mut s.iser_ws in Pfs_pmanage.register_file (String.copy we.pipe); we ) in let dealloc _ = (* Netmcore_heap.modify (Netmcore_ref.heap sref) (fun mut -> let s = Netmcore_ref.deref_ro sref in free_wait_entry mut s.ser_ws we ) in *) Queue.add we we_unused in dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: locking" name); Netmcore_ref.deref_p sref (fun s -> Netmcore_mutex.lock s.iser.(k).iser_m); dlogr (fun () -> sprintf "Pfs_condition.int_serializer %s: checking for potential waiters" name); let e = wait_loop_e we () ++ (fun () -> let post_action _ = del_notify_i() in let r = f esys in Uq_engines.when_state ~is_done:post_action ~is_error:post_action ~is_aborted:post_action r; r ) in Uq_engines.when_state ~is_done:dealloc ~is_error:dealloc ~is_aborted:dealloc e; e method serialized i f = let k = Int64.to_int(Int64.rem i (Int64.of_int n)) in self # proc_serialized i k f method multi_serialized l f = let rec loop l = match l with | [] -> f esys | i :: l' -> self # serialized i (fun _ -> loop l') in (* Sorting the list prevents deadlocks *) let l_sorted = List.sort compare l in loop l_sorted end )