Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: pfs_condition.ml 435 2011-10-07 14:32:54Z gerd $ *)

(* Literature:
   "Implementing Condition Variables with Semaphores", by Andrew D. Birrell,
   Microsoft Research Silicon Valley, January 2003

   http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.125.3384&rep=rep1&type=pdf
 *)

open Uq_engines.Operators
open Printf

let dlogr = Plasma_util.dlogr
let dlog = Plasma_util.dlog

type condition =
    { dummy_cond : bool;
      mutable waiters : wait_entry;
      mutable null : wait_entry;   (* means: "no more entry" *)
      mutable lock : Netmcore_sem.semaphore;
    }

and wait_entry =
    { mutable empty : bool;  
      (* if empty, this entry is the last in the set, and it is considered
	 as representing no waiter
       *)
      mutable sem : Netmcore_sem.semaphore;
      mutable pipe : string;
      mutable next : wait_entry;      (* may be cond.null *)
      mutable set_next : wait_entry;  (* not meaningful if [empty] *)
    }

and wait_set = 
    { dummy_set : bool;
      mutable alloc_lock : Netmcore_sem.semaphore;
      mutable head : wait_entry;
      prefix : string
    }

let empty_wait_entry() =
  let rec we =
    { empty = true;
      sem = Netmcore_sem.dummy();
      next = we;
      set_next = we;
      pipe = "";
    } in
  we


let dummy_condition () =
  { dummy_cond = true;
    waiters = empty_wait_entry ();
    null = empty_wait_entry ();
    lock = Netmcore_sem.dummy()
  }


let dummy_wait_set() =
  { dummy_set = true;
    alloc_lock = Netmcore_sem.dummy();
    head = empty_wait_entry();
    prefix = ""
  }


let create_condition mut =
  let null = empty_wait_entry () in
  let cond_orig =
    { dummy_cond = false;
      waiters = null;
      null = null;
      lock = Netmcore_sem.dummy()
    } in
  let cond = Netmcore_heap.add mut cond_orig in
  Netmcore_heap.pin mut cond;
  cond.lock <- Netmcore_sem.create mut 1;
  cond

let destroy_condition c =
  if not c.dummy_cond then (
    Netmcore_sem.destroy c.lock
  )


let create_wait_set mut prefix =
  let wset_orig =
    { dummy_set = false;
      alloc_lock = Netmcore_sem.dummy();
      head = empty_wait_entry();
      prefix = prefix;
    } in
  let wset = Netmcore_heap.add mut wset_orig in
  Netmcore_heap.pin mut wset;
  wset.alloc_lock <- Netmcore_sem.create mut 1;
  wset
  

let destroy_wait_set wset =
  if not wset.dummy_set then (
    let we = ref wset.head in
    Netmcore_sem.destroy !we.sem;
    while not !we.empty do
      we := !we.set_next;
      Netmcore_sem.destroy !we.sem;
    done;
    Netmcore_sem.destroy wset.alloc_lock
  )


let with_alloc_lock wset f =
  Netmcore_sem.wait wset.alloc_lock Netsys_posix.SEM_WAIT_BLOCK;
  try
    let r = f () in
    Netmcore_sem.post wset.alloc_lock;
    r
  with error ->
    Netmcore_sem.post wset.alloc_lock;
    raise error


let create_fifo prefix =
  let p = Netsys_tmp.tmp_prefix prefix in
  let rec loop n =
    try
      let n = p ^ "_" ^ string_of_int n in
      Unix.mkfifo n 0o600;
      n
    with
      | Unix.Unix_error(Unix.EEXIST,_,_) ->
	  loop(n+1) in
  loop 0


let alloc_wait_entry mut wset =
  if wset.dummy_set then
    failwith "Netmcore_condition.alloc_wait_entry: dummy wait_set";
  with_alloc_lock wset
    (fun () ->
       (* not really fast *)
       let we = ref wset.head in
       while not !we.empty do
	 we := !we.set_next
       done;
       let tail_orig = empty_wait_entry () in
       let tail = Netmcore_heap.add mut tail_orig in
       !we.set_next <- tail;
       !we.empty <- false;
       !we.sem <- Netmcore_sem.create mut 0;
       !we.pipe <- Netmcore_heap.add mut (create_fifo wset.prefix);
       !we
    )


let free_wait_entry mut wset we_to_free =
  if wset.dummy_set then
    failwith "Netmcore_condition.free_wait_entry: dummy wait_set";
  with_alloc_lock wset
    (fun () ->
       (* not really fast *)
       let we = ref wset.head in
       let prev = ref None in
       while not !we.empty && !we != we_to_free do
	 prev := Some !we;
	 we := !we.set_next
       done;
       if !we.empty then
	 failwith "Netmcore_condition.free_wait_entry: not found";
       ( match !prev with
	   | None ->
	       wset.head <- !we.set_next
	   | Some p ->
	       p.set_next <- !we.set_next
       );
       !we.set_next <- !we;
       !we.next <- !we;
       ( try Unix.unlink !we.pipe with _ -> () );
    )


let pipe_file we =
  we.pipe


let wait_e we c m esys =
  dlog "Pfs_condition.wait_e";
  if c.dummy_cond then
    failwith "Netmcore_condition.wait_e: dummy condition";
  if we.empty then
    failwith "Netmcore_condition.wait_e: this is the reserved guard wait_entry";
  if we.next != we then
    failwith "Netmcore_condition.wait_e: the wait entry is being used";
  let fd = Unix.openfile we.pipe [Unix.O_RDONLY; Unix.O_NONBLOCK] 0 in
  let fd_open = ref true in
  let close() = 
    if !fd_open then Unix.close fd;
    fd_open := false in
  Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK;
  let old_waiters = c.waiters in
  c.waiters <- we;
  we.next <- old_waiters;
  Netmcore_sem.post c.lock;
  dlog "Pfs_condition.wait_e: unlocking mutex";
  Netmcore_mutex.unlock m;
  dlog "Pfs_condition.wait_e: waiting for pipe signal";
  let e =
    new Uq_engines.poll_engine [ Unixqueue.Wait_in fd, (-1.0) ] esys in
  e >>
     (fun st ->
	match st with
	  | `Done ev ->
	      dlog "Pfs_condition.wait_e: got pipe signal";
	      let s = String.make 1 ' ' in
	      ignore(Unix.read fd s 0 1);
	      close();
	      Netmcore_sem.wait we.sem Netsys_posix.SEM_WAIT_BLOCK;
	      dlog "Pfs_condition.wait_e: locking mutex";
	      Netmcore_mutex.lock m;
	      `Done ()
	  | (`Error _ | `Aborted) as st ->
	      close(); st
     )


let post c =
  dlog "Pfs_condition.post";
  let we = c.waiters in
  assert(not we.empty);
  c.waiters <- we.next;
  we.next <- we;
  Netmcore_sem.post we.sem;
  let fd = Unix.openfile we.pipe [Unix.O_WRONLY; Unix.O_NONBLOCK] 0 in
  try
    let s = "X" in
    ignore(Unix.single_write fd s 0 1);
    Unix.close fd
  with error -> 
    Unix.close fd; raise error


let signal c =
  if c.dummy_cond then
    failwith "Netmcore_condition.signal: dummy condition";
  Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK;
  try
    if c.waiters != c.null then
      post c;
    Netmcore_sem.post c.lock
  with
    | error ->
	Netmcore_sem.post c.lock;
	raise error


let broadcast c =
  if c.dummy_cond then
    failwith "Netmcore_condition.broadcast: dummy condition";
  Netmcore_sem.wait c.lock Netsys_posix.SEM_WAIT_BLOCK;
  try
    while c.waiters != c.null do
      post c
    done;
    Netmcore_sem.post c.lock
  with
    | error ->
	Netmcore_sem.post c.lock;
	raise error


type ser =
    { mutable ser_m : Netmcore_mutex.mutex;
      mutable ser_c : condition;
      mutable ser_busy : bool;
      mutable ser_ws : wait_set;
    }

let serializer_heap pool =
  let s0 =
    { ser_m = Netmcore_mutex.dummy();
      ser_c = dummy_condition();
      ser_busy = false;
      ser_ws = dummy_wait_set();
    } in
  let sref = Netmcore_ref.sref pool s0 in
  Netmcore_heap.modify
    (Netmcore_ref.heap sref)
    (fun mut ->
       let s = Netmcore_ref.deref_ro sref in
       s.ser_m <- Netmcore_mutex.create mut `Errorcheck;
       s.ser_c <- create_condition mut;
       s.ser_ws <- create_wait_set mut "/tmp/plasmacond-";  (* FIXME *)
    );
  sref


let serializer sref esys name : _ Uq_engines.serializer_t =
  let local_serializer =
    Uq_engines.serializer esys in
  let we_unused = Queue.create () in
  ( object(self)
      method private proc_serialized f =
	let rec wait_loop_e we () =
	  let busy = Netmcore_ref.deref_p sref (fun s -> s.ser_busy) in
	  if busy then (
	    dlogr
	      (fun () ->
		 sprintf
		   "Pfs_condition.serializer %s: is busy, need to wait" name);
	    let e =
	      Netmcore_ref.deref_p sref 
		(fun s -> wait_e we s.ser_c s.ser_m esys) in
	    e ++ wait_loop_e we
	  )
	  else (
	    dlogr
	      (fun () ->
		 sprintf 
		   "Pfs_condition.serializer %s: is idle, no waiting" name);
	    eps_e (`Done()) esys
	  ) in

	let we = 
	  try Queue.take we_unused
	  with Queue.Empty ->
	    Netmcore_heap.modify 
	      (Netmcore_ref.heap sref)
	      (fun mut -> 
		 let s = Netmcore_ref.deref_ro sref in
		 let we = alloc_wait_entry mut s.ser_ws in
		 Pfs_pmanage.register_file (String.copy we.pipe);
		 we
	      ) in
	
	let dealloc _ =
(*
	  Netmcore_heap.modify 
	    (Netmcore_ref.heap sref)
	    (fun mut -> 
	       let s = Netmcore_ref.deref_ro sref in
	       free_wait_entry mut s.ser_ws we
	    ) in
 *)
	  Queue.add we we_unused in

	dlogr
	  (fun () -> sprintf "Pfs_condition.serializer %s: locking" name);
	Netmcore_ref.deref_p sref 
	  (fun s -> Netmcore_mutex.lock s.ser_m);
	dlogr
	  (fun () -> 
	     sprintf
	       "Pfs_condition.serializer %s: checking for potential waiters"
	       name);
	let e =
	  wait_loop_e we ()
	  ++ (fun () ->
		dlogr 
		  (fun () ->
		     sprintf 
		       "Pfs_condition.serializer %s: busy / unlocking" name);
		Netmcore_ref.deref_p sref 
		  (fun s -> 
		     s.ser_busy <- true;
		     Netmcore_mutex.unlock s.ser_m;
		  );
		let post_action _ =
		  Netmcore_ref.deref_p sref 
		    (fun s -> 
		       dlogr
			 (fun () ->
			    sprintf
			      "Pfs_condition.serializer %s: getting idle" name);
		       Netmcore_mutex.lock s.ser_m;
		       s.ser_busy <- false;
		       signal s.ser_c;
		       Netmcore_mutex.unlock s.ser_m;
		       dlogr
			 (fun () ->
			    sprintf
			      "Pfs_condition.serializer %s: idle" name);
		    ) in
		let r = f esys in
		Uq_engines.when_state
		  ~is_done:post_action
		  ~is_error:post_action
		  ~is_aborted:post_action
		  r;
		r
	     ) in
	Uq_engines.when_state
	  ~is_done:dealloc
	  ~is_error:dealloc
	  ~is_aborted:dealloc
	  e;
	e
	

      method serialized f =
	(* First select one of the event threads in the local process,
	   and then compete against the other processes. Otherwise we
	   will deadlock on s.ser_m.
	 *)
	local_serializer # serialized
	  (fun _ ->
	     self # proc_serialized f
	  )

    end
  )


class type ['a] int64_serializer_t =
object
  method serialized : int64 -> (Unixqueue.event_system -> 'a Uq_engines.engine) -> 'a Uq_engines.engine
  method multi_serialized : int64 list -> (Unixqueue.event_system -> 'a Uq_engines.engine) ->
                              'a Uq_engines.engine
end


module Int64Set =
  Set.Make(struct 
	     type t = int64
	     let compare = compare
	   end)
    

type int64_ser_element =
    { mutable iser_m : Netmcore_mutex.mutex;
      mutable iser_c : condition;
      mutable iser_set : Int64Set.t; (* The set of [i] values currently locked *)
    }

type int64_ser =
    { iser : int64_ser_element array;
      mutable iser_ws : wait_set;
    }
      

let int64_serializer_heap pool n =
  let s0 =
    { iser =
	Array.init n
	  (fun k ->
	     { iser_m = Netmcore_mutex.dummy();
	       iser_c = dummy_condition();
	       iser_set = Int64Set.empty;
	     }
	  );
      iser_ws = dummy_wait_set();
    } in
  let sref = Netmcore_ref.sref pool s0 in
  (* dlogf "init root=%nx" (Netsys_mem.obj_address (Netmcore_heap.root (Netmcore_ref.heap sref))); *)
  Netmcore_heap.modify
    (Netmcore_ref.heap sref)
    (fun mut ->
       let s = Netmcore_ref.deref_ro sref in
       Array.iter
	 (fun sk ->
	    sk.iser_m <- Netmcore_mutex.create mut `Errorcheck;
	    sk.iser_c <- create_condition mut;
	 )
	 s.iser;
       s.iser_ws <- create_wait_set mut "/tmp/plasmacond-";  (* FIXME *)
    );
  sref


let int64_serializer sref esys name : _ int64_serializer_t =
  (* For this implementation it is essential that the same process never
     locks several times (resulting in a deadlock). When we pass over control
     with ">>" and "++" it is ensured that no other execution thread can
     run in parallel (where eps_e would not guarantee this).
   *)
  let n =
    Netmcore_ref.deref_p sref (fun s -> Array.length s.iser) in
  let we_unused = Queue.create () in
  ( object(self)
      method private proc_serialized i k f =
	let add_i() =
	  dlogr
	    (fun () ->
	       sprintf
		 "Pfs_condition.int_serializer %s: busy / unlocking" name);
	  Netmcore_heap.modify 
	    (Netmcore_ref.heap sref)
	    (fun mut -> 
	       let s = Netmcore_ref.deref_ro sref in
	       let iser = s.iser.(k) in
	       (* Netmcore_heap.pin mut iser; *) (* careful *)
	       let set' = Int64Set.add i iser.iser_set in
	       let set'' = Netmcore_heap.add mut set' in
	       iser.iser_set <- set'';
	       Netmcore_mutex.unlock iser.iser_m;
	    ) in

	let del_notify_i() =
	  let s = Netmcore_ref.deref_ro sref in
	  dlogr
	    (fun () -> 
	       sprintf "Pfs_condition.int_serializer %s: getting idle" name);
	  Netmcore_mutex.lock s.iser.(k).iser_m;
	  Netmcore_heap.modify 
	    (Netmcore_ref.heap sref)
	    (fun mut -> 
	       let iser = s.iser.(k) in
	       Netmcore_heap.pin mut iser; (* careful *)
	       iser.iser_set <- 
		 Netmcore_heap.add mut
		 (Int64Set.remove i iser.iser_set);
	       (* We do not know exactly who is waiting, and need to wake
		  up all waiters. Btw this broadcast is the reason for the
		  array.
		*)
	       broadcast iser.iser_c;
	       Netmcore_mutex.unlock iser.iser_m;
	    );
	  dlogr 
	    (fun () -> sprintf "Pfs_condition.int_serializer %s: idle" name) in

	let rec wait_loop_e we () =
	  let busy = 
	    (* we need [modify] because a new iser_set may be being built right
	       now
	     *)
	    Netmcore_heap.modify
	      (Netmcore_ref.heap sref)
	      (fun mut ->
		 let s = Netmcore_ref.deref_ro sref in
		 Int64Set.mem i s.iser.(k).iser_set
	      ) in
	  if busy then (
	    dlogr
	      (fun () ->
		 sprintf
		   "Pfs_condition.int_serializer %s: is busy, need to wait" 
		   name);
	    let s = Netmcore_ref.deref_ro sref in
	    let e = wait_e we s.iser.(k).iser_c s.iser.(k).iser_m esys in
	    e ++ wait_loop_e we
	  )
	  else (
	    dlogr
	      (fun () ->
		 sprintf
		   "Pfs_condition.int_serializer %s: is idle, no waiting" name);
	    add_i();
	    eps_e (`Done()) esys
	  ) in

	let we = 
	  try Queue.take we_unused
	  with Queue.Empty ->
	    Netmcore_heap.modify 
	      (Netmcore_ref.heap sref)
	      (fun mut -> 
		 let s = Netmcore_ref.deref_ro sref in
		 let we = alloc_wait_entry mut s.iser_ws in
		 Pfs_pmanage.register_file (String.copy we.pipe);
		 we
	      ) in
	
	let dealloc _ =
(*
	  Netmcore_heap.modify 
	    (Netmcore_ref.heap sref)
	    (fun mut -> 
	       let s = Netmcore_ref.deref_ro sref in
	       free_wait_entry mut s.ser_ws we
	    ) in
 *)
	  Queue.add we we_unused in

	dlogr
	  (fun () ->
	     sprintf "Pfs_condition.int_serializer %s: locking" name);
	Netmcore_ref.deref_p sref 
	  (fun s -> Netmcore_mutex.lock s.iser.(k).iser_m);
	dlogr
	  (fun () ->
	     sprintf 
	       "Pfs_condition.int_serializer %s: checking for potential waiters"
	       name);
	let e =
	  wait_loop_e we ()
	  ++ (fun () ->
		let post_action _ = del_notify_i() in
		let r = f esys in
		Uq_engines.when_state
		  ~is_done:post_action
		  ~is_error:post_action
		  ~is_aborted:post_action
		  r;
		r
	     ) in
	Uq_engines.when_state
	  ~is_done:dealloc
	  ~is_error:dealloc
	  ~is_aborted:dealloc
	  e;
	e
	

      method serialized i f =
	let k = Int64.to_int(Int64.rem i (Int64.of_int n)) in
	self # proc_serialized i k f

      method multi_serialized l f =
	let rec loop l =
	  match l with
	    | [] -> f esys
	    | i :: l' -> self # serialized i (fun _ -> loop l') in
	(* Sorting the list prevents deadlocks *)
	let l_sorted = List.sort compare l in
	loop l_sorted

    end
  )

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml