module Netplex_semaphore:sig
..end
This implementation works in both multi-processing and multi-threading netplex environments. It is, however, not very fast, because the counters live in the controller, and the increment/decrement operations are realized by RPC's. It is good enough when these operations are only infrequently called, e.g. in the post-start and pre-finish processor callbacks.
This interface is designed so that a later re-implementation with POSIX semaphores is relatively straight-forward.
Thread safety: Full. The functions can be called from any thread.
val plugin : Netplex_types.plugin
add_plugin
method
with this object as argument. This can e.g. be done in the
post_add_hook
of the processor.Netplex_cenv.Not_in_container_thread
is raised.val increment : string -> int64
Semaphore names are global to the whole netplex system. By convention,
these names are formed like "service_name.local_name"
, i.e. they
are prefixed by the socket service to which they refer.
val decrement : ?wait:bool -> string -> int64
wait = false
. However, (-1)
is then returned nevertheless.
If the value is already 0 and wait=true
, the operation waits until
the value exceeds 0, and when this happens, the semaphore is then
decremented again. If several waiters exist, only one waiter gets
the chance to decrement.
val get : string -> int64
get
can also be invoked from the controller process.
val create : ?protected:bool -> string -> int64 -> bool
true
if the
creation is successful, and false
if the semaphore already existed.
If protected
, the semaphore
is automatically decremented by some value when the container
calling this function terminates. This value is pi - d
where
pi
is the number of increments and d
is the number
of (successful) decrements requested by the container.
A semaphore needs not to be explicitly created by calling create
.
It is automatically created at the first use time with a value of 0
and protected=true
.
create
can also be invoked from the controller process.
val destroy : string -> unit
decrement
will immediately
get (-1L).
Note that there is no protection against unintended re-creation
after destroy
.
destroy
can also be invoked from the controller process.
val ctrl_increment : string -> Netplex_types.container_id -> int64
Override the processor callbacks as follows to count the number of containers for the service:
method post_add_hook sockserv ctrl =
ctrl # add_plugin Netplex_semaphore.plugin
method post_start_hook container =
let sem_name = container#socket_service#name ^ ".counter" in
let n =
Netplex_semaphore.increment sem_name in
if n=1 then
prerr_endline "First container"
method pre_finish_hook container =
let sem_name = container#socket_service#name ^ ".counter" in
let n =
Netplex_semaphore.decrement sem_name in
if n=0 then
prerr_endline "Last container"