Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: netsys_sem.ml 1879 2013-08-19 09:46:14Z gerd $ *)

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Netsys_sem" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netsys_sem" Debug.enable

let () =
  Netlog.Debug.register_module "Netsys_sem" Debug.enable

open Printf


type sem_open_flag = Netsys_posix.sem_open_flag =
  | SEM_O_CREAT
  | SEM_O_EXCL

type sem_wait_behavior = Netsys_posix.sem_wait_behavior =
  | SEM_WAIT_BLOCK
  | SEM_WAIT_NONBLOCK

type prefix = string


module Emu = struct
  let n = 16384
    (* We support at most this number of semaphores per container.
       This number can be increased to at most 65535 without changing
       the code.
     *)

  let n_active =
    try Netsys_posix.sysconf_open_max() / 4
    with _ -> 64
    (* max number of open semaphores (normally, each open sem consumes
       a file descr). It is tried not to exceed this value, but in some
       situations this will nevertheless occur.
     *)

  (* Note that there is a basic problem with [mutex]: If the process dies
     while the mutex is in locked state, no other process will ever
     again get the lock. TODO: We could protect against this by recording
     the PID of the lock holder.
   *)

  type anon_semaphore =
      { mutable sem : Netsys_posix.named_semaphore option;
        mutable use_count : int;
	num : int;                              (* number in the container *)
        mutable freshness : int;                (* freshness for lru cache *)
        cont_id : int;                   (* Container ID for cross-lookups *)
      }
      (* The active version of a semaphore. There is only one instance 
         of this record per process and semaphore. [sem] is set to the
         backing named semaphore. [use_counts] counts the number of active uses
         this process (values > 1 are possible in multithreaded programs),
         such as ongoing semaphore operations. When [use_count > 0], the
         semaphore cannot be closed.

         [num] is the number of this semaphore in the container. This means,
         [cont.used.{num} = true].

         The value [freshness] is set to a new maximum whenever the semaphore
         is used.
       *)
         
  type container =
      { prefix : string;
        id : int;
	used : (char,Bigarray.int8_unsigned_elt,Bigarray.c_layout) Bigarray.Array1.t;
	active : (int, anon_semaphore) Hashtbl.t;
	mutex : Netsys_posix.named_semaphore;  (* protects only [used] *)
        mutable open_sems : int;
        mutable maxfresh : int;
      }
      (* The container. There is only one instance of this record per process
         and [prefix]. This instance also gets a unique [id].

         The possible semaphores in the container have numbers 0 to n-1,
         where n is the size of the [used] array. When a number is allocated,
         the [used] array is set to [true] at the corresponding index.

         If a semaphore is opened, it is put into [active]. If a semaphore
         is closed, it is not removed from [active], however. This first
         happens when it is destroyed. The counter [open_sems] contains the
         number of open semaphores in [active] (with [sem<>None]).
       *)

  (* Forks: the construction needs to be compatible with fork(), at least for
     single-threaded apps. Both memory mappings ([used]) and named semaphores
     survive forks, so there should not be any problem.

     In mt apps: there are two problems:
     - [use_count] would have to be reset to 0 after a fork
     - [proc_mutex] would have to be reinitialized

     We do not attempt here to solve these problems.
   *)
     


  let proc_mutex = !Netsys_oothr.provider # create_mutex()

  let cont_by_id = Hashtbl.create 7
  let cont_by_prefix = Hashtbl.create 7

  let sem_name cont k =
    cont.prefix ^ "_sem" ^ string_of_int k


  let lookup_cont_by_id =
    Netsys_oothr.serialize 
      proc_mutex
      (fun id ->
         Hashtbl.find cont_by_id id
      )

  let lookup_cont_by_prefix =
    Netsys_oothr.serialize 
      proc_mutex
      (fun p ->
         Hashtbl.find cont_by_prefix p
      )

  let add_cont =
    Netsys_oothr.serialize 
      proc_mutex
      (fun c ->
         Hashtbl.replace cont_by_id c.id c;
         Hashtbl.replace cont_by_prefix c.prefix c
      )

  let del_cont =
    Netsys_oothr.serialize 
      proc_mutex
      (fun c ->
         Hashtbl.remove cont_by_id c.id;
         Hashtbl.remove cont_by_prefix c.prefix
      )

  let lookup_or_add_cont f =
    Netsys_oothr.serialize 
      proc_mutex
      (fun p ->
         try
           Hashtbl.find cont_by_prefix p
         with
           | Not_found ->
               let c = f p in
               Hashtbl.replace cont_by_id c.id c;
               Hashtbl.replace cont_by_prefix c.prefix c;
               c
      )

  let freshest =
    Netsys_oothr.serialize 
      proc_mutex
      (fun c ->
         let fn = c.maxfresh+1 in
         c.maxfresh <- fn;
         fn
      )

  let mod_use_count d =
    Netsys_oothr.serialize 
      proc_mutex
      (fun usem ->
         usem.use_count <- usem.use_count + d
      )


  let have_anon_semaphores() =
    Netsys_posix.have_named_posix_semaphores()

  let prefix cont = cont.prefix

  let load_container prefix =
    let fd = 
      Netsys_posix.shm_open
	(prefix ^ "_sems")
	[ Netsys_posix.SHM_O_RDWR;
	  Netsys_posix.SHM_O_CREAT
	]
	0o600 in
    let fd_open = ref true in
    ( try
	let st = Unix.fstat fd in
	if st.Unix.st_size = 0 then
	  Unix.ftruncate fd (8 * n);
	let used = 
	  Netsys_mem.memory_map_file fd true n in
	Unix.close fd;
        fd_open := false;
        let mutex =
          Netsys_posix.sem_open
	    (prefix ^ "_contsem")
	    [ Netsys_posix.SEM_O_CREAT ]
	    0o600
	    1 in
        dlogr (fun () ->
                 sprintf "Opened container prefix=%s" prefix
              );
	{ prefix = prefix;
	  used = used;
	  active = Hashtbl.create 47;
	  mutex = mutex;
          open_sems = 0;
          maxfresh = 0;
          id = Oo.id (object end);
	}
      with error ->
        if !fd_open then 
	  Unix.close fd;
	raise error
    )

  let container prefix =
    lookup_or_add_cont
      load_container
      prefix

  let lock cont =
    dlogr (fun () -> sprintf "lock waiting prefix=%s" cont.prefix);
    Netsys_posix.sem_wait cont.mutex Netsys_posix.SEM_WAIT_BLOCK;
    dlogr (fun () -> sprintf "lock acquired prefix=%s" cont.prefix)

  let unlock cont =
    dlogr (fun () -> sprintf "unlock prefix=%s" cont.prefix);
    Netsys_posix.sem_post cont.mutex

  let unlink prefix =
    (* A radical way to get rid of all persistent objects, without acquiring
       mutex
     *)
    dlogr (fun () -> sprintf "unlink prefix=%s" prefix);
    let attempt f arg =
      try f arg
      with _ -> () in
    attempt Netsys_posix.shm_unlink (prefix ^ "_sems");
    attempt Netsys_posix.sem_unlink (prefix ^ "_contsem");
    for k = 0 to n-1 do
      attempt Netsys_posix.sem_unlink (prefix ^ "_sem" ^ string_of_int k)
    done

  let create_container prefix =
    unlink prefix;
    container prefix

  let drop_active_lk cont =
    (* While locked: find the sem with the lowest freshness and close it.
       Consider only sems without users.
     *)
    let old_open_sems = cont.open_sems in
    ( try
        while cont.open_sems+1 > n_active do
          let usem_opt =
            Hashtbl.fold
              (fun _ usem best_usem_opt ->
                 if usem.use_count = 0 && usem.sem <> None then (
                   match best_usem_opt with
                     | None -> (Some usem)
                     | Some b ->
                         if usem.freshness < b.freshness then
                           Some usem
                         else
                           best_usem_opt
                 )
                 else
                   best_usem_opt
              )
              cont.active
              None in
          match usem_opt with
            | None -> raise Not_found (* leave loop *)
            | Some usem ->
                ( match usem.sem with
                    | None -> assert false
                    | Some sem ->
                        dlogr
                          (fun () ->
                             sprintf "drop_active: closing prefix=%s k=%d"
                               cont.prefix usem.num);
                        Netsys_posix.sem_close sem;
                        usem.sem <- None;
                        cont.open_sems <- cont.open_sems - 1;
                )
        done
      with
        | Not_found -> ()
    );
    dlogr
      (fun () ->
         sprintf "drop_active: old_open=%d now_open=%d"
           old_open_sems cont.open_sems
      )

  let drop_active cont =
    Netsys_oothr.serialize 
      proc_mutex
      (fun () -> drop_active_lk cont)
      ()

  let use_sem cont =
    Netsys_oothr.serialize 
      proc_mutex
      (fun usem ->
         match usem.sem with
           | None ->
               dlogr
                 (fun () ->
                    sprintf "Reloading sem prefix=%s k=%d"
                      cont.prefix usem.num
                 );
               if cont.open_sems >= n_active then
                 drop_active cont;
	       let sem =
	         Netsys_posix.sem_open
	           (sem_name cont usem.num)
	           []
	           0
	           0 in
               usem.sem <- Some sem;
               cont.open_sems <- cont.open_sems + 1;
               usem.use_count <- usem.use_count + 1;
               sem
           | Some sem ->
               usem.use_count <- usem.use_count + 1;
               sem
      )    
      

  let lookup cont k =
    (* Look the existing semaphore #k up. ENOENT if not found *)
    let fn = freshest cont in
    Netsys_oothr.serialize 
      proc_mutex
      (fun () ->
         try
           try 
             let usem = Hashtbl.find cont.active k in
             dlogr (fun () -> 
                      sprintf "lookup prefix=%s k=%d active" cont.prefix k);
             (* Might still be closed! *)
             usem
           with Not_found ->
             if cont.open_sems >= n_active then
               drop_active_lk cont;
	     let sem =
	       Netsys_posix.sem_open
	         (sem_name cont k)
	         []
	         0
	         0 in
	     let usem =
	       { sem = Some sem;
	         num = k;
                 freshness = fn;
                 cont_id = cont.id;
                 use_count = 0;
	       } in
             dlogr (fun () -> 
                      sprintf "lookup prefix=%s k=%d opened" cont.prefix k);
	     Hashtbl.add cont.active k usem;
             cont.open_sems <- cont.open_sems + 1;
	     usem
         with error ->
           dlogr (fun () -> 
                    sprintf "lookup prefix=%s k=%d error=%s"
                      cont.prefix k (Netexn.to_string error));
           raise error
      )
      ()


  let find_unused cont =
    (* Find a free k, and mark it as used *)
    lock cont;
    let k = ref 0 in
    while !k < n && cont.used.{ !k } = '\001' do
      incr k
    done;
    if !k < n then (
      cont.used.{ !k } <- '\001';
      dlogr (fun () -> 
               sprintf "find_unused prefix=%s k=%d" cont.prefix !k);
      unlock cont;
      !k
    )
    else (
      unlock cont;
      raise(Unix.Unix_error(Unix.ENOMEM,
			    "Netsys_shm.find_unused (too many semaphores)",
			    ""))
    )

  let mark_unused cont k =
    cont.used.{ k } <- '\000'


  let as_sem cont mem pos =
    let k =
      Char.code mem.{ pos } + 256 * Char.code mem.{ pos+1 } in
    dlogr (fun () -> 
             sprintf "as_sem prefix=%s k=%d" cont.prefix k);
    lookup cont k

  let sem_init cont mem pos pshared init_value =
    let k = find_unused cont in
    let fn = freshest cont in
    Netsys_oothr.serialize 
      proc_mutex
      (fun () ->
         (* First check whether k is still in [active]. This can happen when
            another process destroyed the semaphore, but this process did not
            see it yet. We don't destroy it here again (too error-prone).
          *)
         ( try
             let usem = Hashtbl.find cont.active k in
             if usem.sem <> None then
               cont.open_sems <- cont.open_sems - 1;
             Hashtbl.remove cont.active k
           with Not_found -> ()
         );
         try
           if cont.open_sems >= n_active then
             drop_active_lk cont;
           let sem =
             Netsys_posix.sem_open
	       (sem_name cont k)
	       [ Netsys_posix.SEM_O_CREAT; Netsys_posix.SEM_O_EXCL ]
	       0o600
	       init_value in
           let usem =
             { sem = Some sem;
	       num = k;
               freshness = fn;
               cont_id = cont.id;
               use_count = 0;
             } in
           Hashtbl.replace cont.active k usem;
           cont.open_sems <- cont.open_sems + 1;
           mem.{ pos } <- Char.chr (k land 0xff);
           mem.{ pos+1 } <- Char.chr (k lsr 8);
           dlogr (fun () -> 
                    sprintf "sem_init prefix=%s k=%d" cont.prefix k);
           usem
         with
           | error ->
               dlogr (fun () -> 
                        sprintf "sem_init prefix=%s k=%d error=%s"
                          cont.prefix k (Netexn.to_string error));
               raise error
      )
      ()

  let sem_idestroy cont k =
    ( try
        let usem = Hashtbl.find cont.active k in
        if usem.sem <> None then
          cont.open_sems <- cont.open_sems - 1;
        Hashtbl.remove cont.active k;
      with Not_found -> ()
    );
    ( try
	Netsys_posix.sem_unlink
	  (sem_name cont k)
      with _ -> ()
    )

  let sem_release cont k =
    lock cont;
    mark_unused cont k;
    unlock cont

  let sem_destroy cont =
    Netsys_oothr.serialize 
      proc_mutex
      (fun usem ->
         let w = usem.use_count in
         dlogr (fun () -> 
                  sprintf "sem_destroy prefix=%s k=%d use_count=%d" 
                    cont.prefix usem.num w);
         if w > 0 then
           failwith "Netsys_sem.sem_destroy: there are still threads in sem_wait";
         ( match usem.sem with
             | None -> ()
             | Some sem ->
                 Netsys_posix.sem_close sem;
                 usem.sem <- None
         );
         sem_idestroy cont usem.num;
         sem_release cont usem.num;
      )

  let drop_container cont =
    dlogr (fun () -> 
             sprintf "drop_container start prefix=%s" cont.prefix);
    Netsys_oothr.serialize 
      proc_mutex
      (fun () ->
         lock cont;
         for k = 0 to n-1 do
           if cont.used.{k} = '\001' then
	     sem_idestroy cont k
         done;
         ( try
	     Netsys_posix.shm_unlink (cont.prefix ^ "_sems")
           with _ -> ()
         );
         ( try
	     Netsys_posix.sem_unlink (cont.prefix ^ "_contsem")
           with _ -> ()
         );
         del_cont cont;  (* remove from hash tables *)
         dlogr (fun () -> 
                  sprintf "drop_container finish prefix=%s" cont.prefix);
         unlock cont
      )
      ()


  let get_sem usem =
    let cont = 
      try
        lookup_cont_by_id usem.cont_id
      with
        | Not_found -> assert false in
    let fn = freshest cont in
    usem.freshness <- fn;
    use_sem cont usem   (* also increases use_count by 1 *)

  let sem_getvalue usem =
    let sem = get_sem usem in
    try
      let v = Netsys_posix.sem_getvalue sem in
      mod_use_count (-1) usem;
      v
    with error ->
      mod_use_count (-1) usem;
      raise error

  let sem_post usem =
    dlogr (fun () -> 
             sprintf "sem_post k=%d" usem.num);
    let sem = get_sem usem in
    try
      Netsys_posix.sem_post sem;
      mod_use_count (-1) usem;
    with error ->
      mod_use_count (-1) usem;
      raise error

  let sem_wait usem wb =
    dlogr (fun () -> 
             sprintf "sem_wait waiting k=%d" usem.num);
    let sem = get_sem usem in
    try
      Netsys_posix.sem_wait sem wb;
      mod_use_count (-1) usem;
      dlogr (fun () -> 
               sprintf "sem_wait acquired k=%d" usem.num)
    with
      | error ->
          mod_use_count (-1) usem;
          dlogr (fun () -> 
                   sprintf "sem_wait exn=%s k=%d"
                     (Netexn.to_string error) usem.num);
          raise error
end


module Native = struct
  type container = string (* the prefix *)

  type anon_semaphore =
      Netsys_posix.anon_semaphore

  let have_anon_semaphores() =
    Netsys_posix.have_anon_posix_semaphores()

  let container p = p

  let create_container p = p

  let drop_container _ = ()

  let unlink _ = ()

  let prefix p = p

  let sem_init _ = 
    Netsys_posix.sem_init

  let sem_destroy _ =
    Netsys_posix.sem_destroy

  let as_sem _ =
    Netsys_posix.as_sem

  let sem_getvalue =
    Netsys_posix.sem_getvalue

  let sem_post =
    Netsys_posix.sem_post

  let sem_wait =
    Netsys_posix.sem_wait

end


type container =
  [ `E of Emu.container
  | `N of Native.container
  ]

type anon_semaphore =
  [ `E of Emu.anon_semaphore
  | `N of Native.anon_semaphore
  ]


let force_emu = ref false

let force_emulation() =
  force_emu := true

let emu() =
  !force_emu || 
    (Netsys_posix.have_named_posix_semaphores() && 
       not (Netsys_posix.have_anon_posix_semaphores()))

let have_anon_semaphores() =
  if emu() then
    Emu.have_anon_semaphores()
  else
    Native.have_anon_semaphores()


let container prefix : container =
  if emu() then
    `E(Emu.container prefix)
  else
    `N(Native.container prefix)

let create_container prefix =
  if emu() then
    `E(Emu.create_container prefix)
  else
    `N(Native.create_container prefix)

let unlink prefix =
  if emu() then
    Emu.unlink prefix
  else
    Native.unlink prefix

let prefix =
  function
    | `E c -> Emu.prefix c
    | `N c -> Native.prefix c

let drop cont =
  match cont with
    | `E c -> Emu.drop_container c
    | `N c -> Native.drop_container c

let sem_init cont mem pos pshared init_value =
  match cont with
    | `E c -> `E(Emu.sem_init c mem pos pshared init_value)
    | `N c -> `N(Native.sem_init c mem pos pshared init_value)

let sem_destroy cont sem =
  match (cont,sem) with
    | `E c, `E s -> Emu.sem_destroy c s
    | `N c, `N s -> Native.sem_destroy c s
    | _ -> assert false

let as_sem cont mem pos =
  match cont with
    | `E c -> `E(Emu.as_sem c mem pos)
    | `N c -> `N(Native.as_sem c mem pos)

let sem_getvalue sem =
  match sem with
    | `E s -> Emu.sem_getvalue s
    | `N s -> Native.sem_getvalue s

let sem_post sem =
  match sem with
    | `E s -> Emu.sem_post s
    | `N s -> Native.sem_post s

let sem_wait sem =
  match sem with
    | `E s -> Emu.sem_wait s
    | `N s -> Native.sem_wait s

let sem_value_max =
  Netsys_posix.sem_value_max

let sem_size =
  (* should be a multiple of the word size, so we round it up to the
     next multiple of 8
   *)
  let s = max 4 Netsys_posix.sem_size in
  (((s-1) / 8)+1) * 8


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml