(* $Id: netsys_sem.ml 1879 2013-08-19 09:46:14Z gerd $ *) module Debug = struct let enable = ref false end let dlog = Netlog.Debug.mk_dlog "Netsys_sem" Debug.enable let dlogr = Netlog.Debug.mk_dlogr "Netsys_sem" Debug.enable let () = Netlog.Debug.register_module "Netsys_sem" Debug.enable open Printf type sem_open_flag = Netsys_posix.sem_open_flag = | SEM_O_CREAT | SEM_O_EXCL type sem_wait_behavior = Netsys_posix.sem_wait_behavior = | SEM_WAIT_BLOCK | SEM_WAIT_NONBLOCK type prefix = string module Emu = struct let n = 16384 (* We support at most this number of semaphores per container. This number can be increased to at most 65535 without changing the code. *) let n_active = try Netsys_posix.sysconf_open_max() / 4 with _ -> 64 (* max number of open semaphores (normally, each open sem consumes a file descr). It is tried not to exceed this value, but in some situations this will nevertheless occur. *) (* Note that there is a basic problem with [mutex]: If the process dies while the mutex is in locked state, no other process will ever again get the lock. TODO: We could protect against this by recording the PID of the lock holder. *) type anon_semaphore = { mutable sem : Netsys_posix.named_semaphore option; mutable use_count : int; num : int; (* number in the container *) mutable freshness : int; (* freshness for lru cache *) cont_id : int; (* Container ID for cross-lookups *) } (* The active version of a semaphore. There is only one instance of this record per process and semaphore. [sem] is set to the backing named semaphore. [use_counts] counts the number of active uses this process (values > 1 are possible in multithreaded programs), such as ongoing semaphore operations. When [use_count > 0], the semaphore cannot be closed. [num] is the number of this semaphore in the container. This means, [cont.used.{num} = true]. The value [freshness] is set to a new maximum whenever the semaphore is used. *) type container = { prefix : string; id : int; used : (char,Bigarray.int8_unsigned_elt,Bigarray.c_layout) Bigarray.Array1.t; active : (int, anon_semaphore) Hashtbl.t; mutex : Netsys_posix.named_semaphore; (* protects only [used] *) mutable open_sems : int; mutable maxfresh : int; } (* The container. There is only one instance of this record per process and [prefix]. This instance also gets a unique [id]. The possible semaphores in the container have numbers 0 to n-1, where n is the size of the [used] array. When a number is allocated, the [used] array is set to [true] at the corresponding index. If a semaphore is opened, it is put into [active]. If a semaphore is closed, it is not removed from [active], however. This first happens when it is destroyed. The counter [open_sems] contains the number of open semaphores in [active] (with [sem<>None]). *) (* Forks: the construction needs to be compatible with fork(), at least for single-threaded apps. Both memory mappings ([used]) and named semaphores survive forks, so there should not be any problem. In mt apps: there are two problems: - [use_count] would have to be reset to 0 after a fork - [proc_mutex] would have to be reinitialized We do not attempt here to solve these problems. *) let proc_mutex = !Netsys_oothr.provider # create_mutex() let cont_by_id = Hashtbl.create 7 let cont_by_prefix = Hashtbl.create 7 let sem_name cont k = cont.prefix ^ "_sem" ^ string_of_int k let lookup_cont_by_id = Netsys_oothr.serialize proc_mutex (fun id -> Hashtbl.find cont_by_id id ) let lookup_cont_by_prefix = Netsys_oothr.serialize proc_mutex (fun p -> Hashtbl.find cont_by_prefix p ) let add_cont = Netsys_oothr.serialize proc_mutex (fun c -> Hashtbl.replace cont_by_id c.id c; Hashtbl.replace cont_by_prefix c.prefix c ) let del_cont = Netsys_oothr.serialize proc_mutex (fun c -> Hashtbl.remove cont_by_id c.id; Hashtbl.remove cont_by_prefix c.prefix ) let lookup_or_add_cont f = Netsys_oothr.serialize proc_mutex (fun p -> try Hashtbl.find cont_by_prefix p with | Not_found -> let c = f p in Hashtbl.replace cont_by_id c.id c; Hashtbl.replace cont_by_prefix c.prefix c; c ) let freshest = Netsys_oothr.serialize proc_mutex (fun c -> let fn = c.maxfresh+1 in c.maxfresh <- fn; fn ) let mod_use_count d = Netsys_oothr.serialize proc_mutex (fun usem -> usem.use_count <- usem.use_count + d ) let have_anon_semaphores() = Netsys_posix.have_named_posix_semaphores() let prefix cont = cont.prefix let load_container prefix = let fd = Netsys_posix.shm_open (prefix ^ "_sems") [ Netsys_posix.SHM_O_RDWR; Netsys_posix.SHM_O_CREAT ] 0o600 in let fd_open = ref true in ( try let st = Unix.fstat fd in if st.Unix.st_size = 0 then Unix.ftruncate fd (8 * n); let used = Netsys_mem.memory_map_file fd true n in Unix.close fd; fd_open := false; let mutex = Netsys_posix.sem_open (prefix ^ "_contsem") [ Netsys_posix.SEM_O_CREAT ] 0o600 1 in dlogr (fun () -> sprintf "Opened container prefix=%s" prefix ); { prefix = prefix; used = used; active = Hashtbl.create 47; mutex = mutex; open_sems = 0; maxfresh = 0; id = Oo.id (object end); } with error -> if !fd_open then Unix.close fd; raise error ) let container prefix = lookup_or_add_cont load_container prefix let lock cont = dlogr (fun () -> sprintf "lock waiting prefix=%s" cont.prefix); Netsys_posix.sem_wait cont.mutex Netsys_posix.SEM_WAIT_BLOCK; dlogr (fun () -> sprintf "lock acquired prefix=%s" cont.prefix) let unlock cont = dlogr (fun () -> sprintf "unlock prefix=%s" cont.prefix); Netsys_posix.sem_post cont.mutex let unlink prefix = (* A radical way to get rid of all persistent objects, without acquiring mutex *) dlogr (fun () -> sprintf "unlink prefix=%s" prefix); let attempt f arg = try f arg with _ -> () in attempt Netsys_posix.shm_unlink (prefix ^ "_sems"); attempt Netsys_posix.sem_unlink (prefix ^ "_contsem"); for k = 0 to n-1 do attempt Netsys_posix.sem_unlink (prefix ^ "_sem" ^ string_of_int k) done let create_container prefix = unlink prefix; container prefix let drop_active_lk cont = (* While locked: find the sem with the lowest freshness and close it. Consider only sems without users. *) let old_open_sems = cont.open_sems in ( try while cont.open_sems+1 > n_active do let usem_opt = Hashtbl.fold (fun _ usem best_usem_opt -> if usem.use_count = 0 && usem.sem <> None then ( match best_usem_opt with | None -> (Some usem) | Some b -> if usem.freshness < b.freshness then Some usem else best_usem_opt ) else best_usem_opt ) cont.active None in match usem_opt with | None -> raise Not_found (* leave loop *) | Some usem -> ( match usem.sem with | None -> assert false | Some sem -> dlogr (fun () -> sprintf "drop_active: closing prefix=%s k=%d" cont.prefix usem.num); Netsys_posix.sem_close sem; usem.sem <- None; cont.open_sems <- cont.open_sems - 1; ) done with | Not_found -> () ); dlogr (fun () -> sprintf "drop_active: old_open=%d now_open=%d" old_open_sems cont.open_sems ) let drop_active cont = Netsys_oothr.serialize proc_mutex (fun () -> drop_active_lk cont) () let use_sem cont = Netsys_oothr.serialize proc_mutex (fun usem -> match usem.sem with | None -> dlogr (fun () -> sprintf "Reloading sem prefix=%s k=%d" cont.prefix usem.num ); if cont.open_sems >= n_active then drop_active cont; let sem = Netsys_posix.sem_open (sem_name cont usem.num) [] 0 0 in usem.sem <- Some sem; cont.open_sems <- cont.open_sems + 1; usem.use_count <- usem.use_count + 1; sem | Some sem -> usem.use_count <- usem.use_count + 1; sem ) let lookup cont k = (* Look the existing semaphore #k up. ENOENT if not found *) let fn = freshest cont in Netsys_oothr.serialize proc_mutex (fun () -> try try let usem = Hashtbl.find cont.active k in dlogr (fun () -> sprintf "lookup prefix=%s k=%d active" cont.prefix k); (* Might still be closed! *) usem with Not_found -> if cont.open_sems >= n_active then drop_active_lk cont; let sem = Netsys_posix.sem_open (sem_name cont k) [] 0 0 in let usem = { sem = Some sem; num = k; freshness = fn; cont_id = cont.id; use_count = 0; } in dlogr (fun () -> sprintf "lookup prefix=%s k=%d opened" cont.prefix k); Hashtbl.add cont.active k usem; cont.open_sems <- cont.open_sems + 1; usem with error -> dlogr (fun () -> sprintf "lookup prefix=%s k=%d error=%s" cont.prefix k (Netexn.to_string error)); raise error ) () let find_unused cont = (* Find a free k, and mark it as used *) lock cont; let k = ref 0 in while !k < n && cont.used.{ !k } = '\001' do incr k done; if !k < n then ( cont.used.{ !k } <- '\001'; dlogr (fun () -> sprintf "find_unused prefix=%s k=%d" cont.prefix !k); unlock cont; !k ) else ( unlock cont; raise(Unix.Unix_error(Unix.ENOMEM, "Netsys_shm.find_unused (too many semaphores)", "")) ) let mark_unused cont k = cont.used.{ k } <- '\000' let as_sem cont mem pos = let k = Char.code mem.{ pos } + 256 * Char.code mem.{ pos+1 } in dlogr (fun () -> sprintf "as_sem prefix=%s k=%d" cont.prefix k); lookup cont k let sem_init cont mem pos pshared init_value = let k = find_unused cont in let fn = freshest cont in Netsys_oothr.serialize proc_mutex (fun () -> (* First check whether k is still in [active]. This can happen when another process destroyed the semaphore, but this process did not see it yet. We don't destroy it here again (too error-prone). *) ( try let usem = Hashtbl.find cont.active k in if usem.sem <> None then cont.open_sems <- cont.open_sems - 1; Hashtbl.remove cont.active k with Not_found -> () ); try if cont.open_sems >= n_active then drop_active_lk cont; let sem = Netsys_posix.sem_open (sem_name cont k) [ Netsys_posix.SEM_O_CREAT; Netsys_posix.SEM_O_EXCL ] 0o600 init_value in let usem = { sem = Some sem; num = k; freshness = fn; cont_id = cont.id; use_count = 0; } in Hashtbl.replace cont.active k usem; cont.open_sems <- cont.open_sems + 1; mem.{ pos } <- Char.chr (k land 0xff); mem.{ pos+1 } <- Char.chr (k lsr 8); dlogr (fun () -> sprintf "sem_init prefix=%s k=%d" cont.prefix k); usem with | error -> dlogr (fun () -> sprintf "sem_init prefix=%s k=%d error=%s" cont.prefix k (Netexn.to_string error)); raise error ) () let sem_idestroy cont k = ( try let usem = Hashtbl.find cont.active k in if usem.sem <> None then cont.open_sems <- cont.open_sems - 1; Hashtbl.remove cont.active k; with Not_found -> () ); ( try Netsys_posix.sem_unlink (sem_name cont k) with _ -> () ) let sem_release cont k = lock cont; mark_unused cont k; unlock cont let sem_destroy cont = Netsys_oothr.serialize proc_mutex (fun usem -> let w = usem.use_count in dlogr (fun () -> sprintf "sem_destroy prefix=%s k=%d use_count=%d" cont.prefix usem.num w); if w > 0 then failwith "Netsys_sem.sem_destroy: there are still threads in sem_wait"; ( match usem.sem with | None -> () | Some sem -> Netsys_posix.sem_close sem; usem.sem <- None ); sem_idestroy cont usem.num; sem_release cont usem.num; ) let drop_container cont = dlogr (fun () -> sprintf "drop_container start prefix=%s" cont.prefix); Netsys_oothr.serialize proc_mutex (fun () -> lock cont; for k = 0 to n-1 do if cont.used.{k} = '\001' then sem_idestroy cont k done; ( try Netsys_posix.shm_unlink (cont.prefix ^ "_sems") with _ -> () ); ( try Netsys_posix.sem_unlink (cont.prefix ^ "_contsem") with _ -> () ); del_cont cont; (* remove from hash tables *) dlogr (fun () -> sprintf "drop_container finish prefix=%s" cont.prefix); unlock cont ) () let get_sem usem = let cont = try lookup_cont_by_id usem.cont_id with | Not_found -> assert false in let fn = freshest cont in usem.freshness <- fn; use_sem cont usem (* also increases use_count by 1 *) let sem_getvalue usem = let sem = get_sem usem in try let v = Netsys_posix.sem_getvalue sem in mod_use_count (-1) usem; v with error -> mod_use_count (-1) usem; raise error let sem_post usem = dlogr (fun () -> sprintf "sem_post k=%d" usem.num); let sem = get_sem usem in try Netsys_posix.sem_post sem; mod_use_count (-1) usem; with error -> mod_use_count (-1) usem; raise error let sem_wait usem wb = dlogr (fun () -> sprintf "sem_wait waiting k=%d" usem.num); let sem = get_sem usem in try Netsys_posix.sem_wait sem wb; mod_use_count (-1) usem; dlogr (fun () -> sprintf "sem_wait acquired k=%d" usem.num) with | error -> mod_use_count (-1) usem; dlogr (fun () -> sprintf "sem_wait exn=%s k=%d" (Netexn.to_string error) usem.num); raise error end module Native = struct type container = string (* the prefix *) type anon_semaphore = Netsys_posix.anon_semaphore let have_anon_semaphores() = Netsys_posix.have_anon_posix_semaphores() let container p = p let create_container p = p let drop_container _ = () let unlink _ = () let prefix p = p let sem_init _ = Netsys_posix.sem_init let sem_destroy _ = Netsys_posix.sem_destroy let as_sem _ = Netsys_posix.as_sem let sem_getvalue = Netsys_posix.sem_getvalue let sem_post = Netsys_posix.sem_post let sem_wait = Netsys_posix.sem_wait end type container = [ `E of Emu.container | `N of Native.container ] type anon_semaphore = [ `E of Emu.anon_semaphore | `N of Native.anon_semaphore ] let force_emu = ref false let force_emulation() = force_emu := true let emu() = !force_emu || (Netsys_posix.have_named_posix_semaphores() && not (Netsys_posix.have_anon_posix_semaphores())) let have_anon_semaphores() = if emu() then Emu.have_anon_semaphores() else Native.have_anon_semaphores() let container prefix : container = if emu() then `E(Emu.container prefix) else `N(Native.container prefix) let create_container prefix = if emu() then `E(Emu.create_container prefix) else `N(Native.create_container prefix) let unlink prefix = if emu() then Emu.unlink prefix else Native.unlink prefix let prefix = function | `E c -> Emu.prefix c | `N c -> Native.prefix c let drop cont = match cont with | `E c -> Emu.drop_container c | `N c -> Native.drop_container c let sem_init cont mem pos pshared init_value = match cont with | `E c -> `E(Emu.sem_init c mem pos pshared init_value) | `N c -> `N(Native.sem_init c mem pos pshared init_value) let sem_destroy cont sem = match (cont,sem) with | `E c, `E s -> Emu.sem_destroy c s | `N c, `N s -> Native.sem_destroy c s | _ -> assert false let as_sem cont mem pos = match cont with | `E c -> `E(Emu.as_sem c mem pos) | `N c -> `N(Native.as_sem c mem pos) let sem_getvalue sem = match sem with | `E s -> Emu.sem_getvalue s | `N s -> Native.sem_getvalue s let sem_post sem = match sem with | `E s -> Emu.sem_post s | `N s -> Native.sem_post s let sem_wait sem = match sem with | `E s -> Emu.sem_wait s | `N s -> Native.sem_wait s let sem_value_max = Netsys_posix.sem_value_max let sem_size = (* should be a multiple of the word size, so we round it up to the next multiple of 8 *) let s = max 4 Netsys_posix.sem_size in (((s-1) / 8)+1) * 8