(* $Id: netcamlbox.ml 1755 2012-03-24 20:18:30Z gerd $ *)
(* Format of the shm object:
- Const: CAPACITY
Number of message slots
- Const: MSG_SIZE
Max size of messages
- Sem: NOTIFICATIONS
This semaphore is increased by the sender to wake up the receiver,
and decreased by the receiver.
- Sem: FREE_SLOTS
Number of free slots. Increased by the receiver on [delete].
Decreased by the sender when waiting for space.
- For each slot: SLOT_READABLE (one byte per slot).
The number 1 if the slot is filled and has a readable message,
or 0 if it is free or should be ignored by the receiver.
- Array SLOT_LIST and number SPLIT_IDX:
The array positions 0 .. SPLIT_IDX-1 contain the numbers of the
filled slots. The array positions SPLIT_IDX .. CAPACITY-1 contain
the numbers of the free slots.
Sem SLOT_LOCK: mutex for SLOT_LIST and SPLIT_IDX.
Sending:
- sem_wait(FREE_SLOTS)
- lock SLOT_LOCK
- find a free slot, and move it to the part of the slot list with the
filled slots
- unlock SLOT_LOCK
- copy the value to the chosen slot
- SLOT_READABLE := 1
- sem_post(NOTIFICATIONS)
Receiving:
1. Wait for message:
- sem_wait(NOTIFICATIONS)
- lock SLOT_LOCK
- get the part of the slot list with the filled slots
- unlock SLOT_LOCK
- for each filled slot:
- get the value of SLOT_READABLE. If it is 0 skip the slot.
- if the slot was already reported to the user, skip the slot
- otherwise include the slot in the result list
2. Get message:
- test SLOT_READABLE. If it is 1 read the message
3. Delete the message:
- Remove the message from the messages that "wait" already reported
- SLOT_READABLE := 0
- lock SLOT_LOCK
- move the slot to the part of the slot list with the empty slots
- unlock SLOT_LOCK
- sem_post(FREE_SLOTS)
FORMAT OF THE SHARED MEMORY OBJECT:
byte 0: <header>
byte <sem_offset>: <array of semaphores>
byte <sl_offset>: <slot list>
byte <offset>: first slot. Slots come one after the other, and
are aligned at 128 byte boundaries
<header>: the ocaml representation of type [header]
<array of semaphores>: the semaphores one after the other
<slot list>: the ocaml representation of type [int array]
*)
module Debug = struct
let enable = ref false
end
let dlog = Netlog.Debug.mk_dlog "Netcamlbox" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netcamlbox" Debug.enable
let () =
Netlog.Debug.register_module "Netcamlbox" Debug.enable
open Netsys_mem
open Printf
type camlbox_address = string
(* The header must not contain pointers, so it is relocatable *)
type header =
{ mutable address : int; (* address lsr 1 *)
mutable sem_offset : int; (* offset to semaphore area *)
mutable sl_offset : int; (* offset to slot_list *)
mutable offset : int; (* offset to first slot *)
capacity : int;
msg_size : int;
mutable free_slots : int;
mutable split_idx : int;
(* function pointers for custom operations: the header contains the
pointers the receiver uses.
*)
bigarray_custom_ops : int;
int32_custom_ops : int;
int64_custom_ops : int;
nativeint_custom_ops : int;
}
type semaphores =
{ s_notifications : Netsys_sem.anon_semaphore;
s_free_slots : Netsys_sem.anon_semaphore;
s_slot_lock : Netsys_sem.anon_semaphore ;
slot_readable : int array; (* pointer to the byte *)
}
type 'a camlbox =
{ mem : memory;
hdr : header; (* pointing into mem *)
slot_list : int array; (* pointing into mem *)
sem : semaphores; (* pointing into mem *)
is_new : bool array; (* local *)
addr : string;
cont : Netsys_sem.container; (* local *)
}
type 'a camlbox_sender = 'a camlbox
exception Empty
exception Message_too_big
let align = 128
(* header and slots are aligned by this *)
let ws_bytes = (* word size in bytes *)
Sys.word_size / 8
let this_bigarray_custom_ops =
Netsys_mem.get_custom_ops
(Bigarray.Array1.create Bigarray.char Bigarray.c_layout 1)
let this_int32_custom_ops =
Netsys_mem.get_custom_ops 0l
let this_int64_custom_ops =
Netsys_mem.get_custom_ops 0L
let this_nativeint_custom_ops =
Netsys_mem.get_custom_ops 0n
let ntoi n = (* for addresses *)
Nativeint.to_int(Nativeint.shift_right n 1)
let iton i = (* for addresses *)
Nativeint.shift_left (Nativeint.of_int i) 1
let create_header capacity msg_size =
let dummy = Bigarray.Array1.create Bigarray.char Bigarray.c_layout 0 in
let hdr =
{ address = 0;
sem_offset = 0;
sl_offset = 0;
offset = 0;
capacity = capacity;
msg_size = msg_size;
free_slots = capacity;
split_idx = 0;
bigarray_custom_ops = ntoi (snd this_bigarray_custom_ops);
int32_custom_ops = ntoi (snd this_int32_custom_ops);
int64_custom_ops = ntoi (snd this_int64_custom_ops);
nativeint_custom_ops = ntoi (snd this_nativeint_custom_ops);
} in
let _, hdr_bytelen = init_value dummy 0 hdr [ Copy_simulate ] in
hdr.sem_offset <- hdr_bytelen;
let sem_bytelen = 3 * Netsys_sem.sem_size + capacity in
let cap8 = sem_bytelen mod 8 in
let sem_padding = if cap8 = 0 then 0 else 8 - cap8 in
let slot_list = Array.init capacity (fun k -> k) in
let _, sl_bytelen = init_value dummy 0 slot_list [ Copy_simulate ] in
hdr.sl_offset <- hdr_bytelen + sem_bytelen + sem_padding;
let bytelen = hdr_bytelen + sem_bytelen + sem_padding + sl_bytelen in
hdr.offset <- (((bytelen-1) / align) + 1) * align;
dlogr
(fun () ->
sprintf "create_header cap=%d msgsize=%d sem_offs=%d sl_offs=%d offs=%d"
capacity msg_size hdr.sem_offset hdr.sl_offset hdr.offset
);
(hdr, slot_list)
let init_semaphores hdr mem cont =
let offset = hdr.sem_offset in
let init_sem k v =
ignore(
Netsys_sem.sem_init cont mem (offset+k*Netsys_sem.sem_size) true v) in
init_sem 0 0; (* notifications *)
init_sem 1 hdr.capacity; (* free_slots *)
init_sem 2 1; (* slot_lock *)
let b = 3 in
let b_bytes = offset+b*Netsys_sem.sem_size in
for k = 0 to hdr.capacity-1 do
mem.{ b_bytes + k } <- '\000'
done
let destroy_semaphores hdr mem cont =
(* Must be called before shm is unmapped (only in the receiver) *)
let offset = hdr.sem_offset in
for k = 0 to 2 do
let s = Netsys_sem.as_sem cont mem (offset+k*Netsys_sem.sem_size) in
Netsys_sem.sem_destroy cont s
done
let free_mem cont mem =
let hdr =
(as_value mem ws_bytes : header) in
destroy_semaphores hdr mem cont
let get_semaphores hdr mem cont =
let offset = hdr.sem_offset in
let get_sem k =
Netsys_sem.as_sem cont mem (offset+k*Netsys_sem.sem_size) in
let semrec =
{ s_notifications = get_sem 0; (* notifications *)
s_free_slots = get_sem 1; (* free_slots *)
s_slot_lock = get_sem 2; (* slot_lock *)
slot_readable =
Array.init hdr.capacity
(fun k -> offset + 3*Netsys_sem.sem_size + k)
} in
dlog "get_semaphores";
semrec
let mk_camlbox fn_name addr capacity msg_size cont f =
if not (Netsys_sem.have_anon_semaphores()) then
invalid_arg (sprintf "%s (no anonymous semaphores)" fn_name);
if capacity < 1 || capacity > Sys.max_array_length || msg_size < 1 then
invalid_arg (sprintf "%s (bad params)" fn_name);
if capacity > Netsys_sem.sem_value_max then
invalid_arg (sprintf "%s (capacity exceeds sem_value_max)" fn_name);
let slot_size = (((msg_size-1) / align) + 1) * align in
if slot_size < msg_size then (* overflow *)
invalid_arg (sprintf "%s (too large)" fn_name);
let (hdr0, slot_list0) = create_header capacity msg_size in
let shm_size = hdr0.offset + capacity * slot_size in
if shm_size < hdr0.offset then (* overflow *)
invalid_arg (sprintf "%s (too large)" fn_name);
if (shm_size - hdr0.offset) / slot_size <> capacity then (* overflow *)
invalid_arg (sprintf "%s (too large)" fn_name);
let mem = f shm_size in
value_area mem;
hdr0.address <- ntoi (memory_address mem);
let hdr_voffs, _ = init_value mem 0 hdr0 [] in
let hdr = (as_value mem hdr_voffs : header) in
init_semaphores hdr mem cont;
let sl_voffs, _ = init_value mem hdr.sl_offset slot_list0 [] in
let slot_list = (as_value mem sl_voffs : int array) in
Gc.finalise (free_mem cont) mem;
let sem = get_semaphores hdr mem cont in
dlogr
(fun () ->
sprintf
"mk_camlbox fn_name=%s cap=%d msgsize=%d slot_size=%d shm_size=%d"
fn_name capacity msg_size slot_size shm_size
);
{ mem = mem;
hdr = hdr;
slot_list = slot_list;
sem = sem;
is_new = Array.make capacity true;
addr = addr;
cont = cont
}
let format_camlbox addr fd capacity msg_size =
if not (Netsys_posix.have_posix_shm()) then
invalid_arg "format_camlbox (no POSIX shm)";
if String.contains addr '/' then
invalid_arg "format_camlbox (bad name)";
let cont = Netsys_sem.container ("/" ^ addr) in
mk_camlbox "format_camlbox" addr capacity msg_size cont
(fun shm_size ->
Unix.ftruncate fd shm_size;
Netsys_mem.memory_map_file fd true shm_size
)
let create_camlbox addr capacity msg_size =
if not (Netsys_posix.have_posix_shm()) then
invalid_arg "create_camlbox (no POSIX shm)";
if String.contains addr '/' then
invalid_arg "create_camlbox (bad name)";
let cont = Netsys_sem.create_container ("/" ^ addr) in
let box =
mk_camlbox "create_camlbox" addr capacity msg_size cont
(fun shm_size ->
let shm =
Netsys_posix.shm_open
("/" ^ addr)
[ Netsys_posix.SHM_O_RDWR;
Netsys_posix.SHM_O_CREAT;
Netsys_posix.SHM_O_EXCL
]
0o600 in
let mem =
try
Unix.ftruncate shm shm_size;
let mem = Netsys_mem.memory_map_file shm true shm_size in
Unix.close shm;
mem
with
| error ->
Unix.close shm;
Netsys_sem.drop cont;
raise error in
mem
) in
dlog "create_camlbox returning";
box
let unlink_camlbox addr =
Netsys_posix.shm_unlink ("/" ^ addr);
Netsys_sem.unlink ("/" ^ addr)
let camlbox_fd addr =
Netsys_posix.shm_open
("/" ^ addr)
[ Netsys_posix.SHM_O_RDWR ]
0o600
let with_fd fd f =
try
let r = f fd in
Unix.close fd;
r
with
| error ->
Unix.close fd;
raise error
let dummy_header () =
{ address = 0;
sem_offset = 0;
sl_offset = 0;
offset = 0;
capacity = 0;
msg_size = 0;
split_idx = 0;
free_slots = 0;
bigarray_custom_ops = 0;
int32_custom_ops = 0;
int64_custom_ops = 0;
nativeint_custom_ops = 0;
}
let header_size = lazy(
let dummy = Bigarray.Array1.create Bigarray.char Bigarray.c_layout 0 in
let hdr = dummy_header () in
let voffset, bytelen =
init_value
dummy 0 hdr
[ Copy_simulate ] in
bytelen
)
let camlbox_open cont addr fd =
(* does not initialize is_new! *)
let hdr_size = Lazy.force header_size in
let mini_mem = Netsys_mem.memory_map_file fd true hdr_size in
let hdr0 = (as_value mini_mem ws_bytes : header) in
let offset = hdr0.offset in
let slot_size = (((hdr0.msg_size-1) / align) + 1) * align in
let shm_size = offset + hdr0.capacity * slot_size in
let mem = Netsys_mem.memory_map_file fd true shm_size in
let hdr = (as_value mem ws_bytes : header) in
let sl = (as_value mem (hdr.sl_offset+ws_bytes) : int array) in
dlogr
(fun () ->
sprintf
"camlbox_open hdr_size=%d offset=%d slot_size=%d shm_size=%d"
hdr_size offset slot_size shm_size
);
{ mem = mem;
hdr = hdr;
slot_list = sl;
sem = get_semaphores hdr mem cont;
is_new = [| |];
addr = addr;
cont = cont;
}
let camlbox_addr box =
box.addr
let camlbox_saddr box =
box.addr
let camlbox_sender addr =
let cont = Netsys_sem.container ("/" ^ addr) in
with_fd (camlbox_fd addr) (camlbox_open cont addr)
let camlbox_sender_of_fd addr =
let cont = Netsys_sem.container ("/" ^ addr) in
camlbox_open cont addr
let camlbox_bcapacity box =
box.hdr.capacity
let camlbox_scapacity = camlbox_bcapacity
let camlbox_capacity addr =
let cont = Netsys_sem.container ("/" ^ addr) in
with_fd
(camlbox_fd addr)
(fun fd ->
let box = camlbox_open cont addr fd in
camlbox_bcapacity box
)
let camlbox_bmsg_size box =
box.hdr.msg_size
let camlbox_smsg_size = camlbox_bmsg_size
let camlbox_msg_size addr =
let cont = Netsys_sem.container ("/" ^ addr) in
with_fd
(camlbox_fd addr)
(fun fd ->
let box = camlbox_open cont addr fd in
camlbox_bmsg_size box
)
let camlbox_bmessages box =
let free =
try
Netsys_sem.sem_getvalue box.sem.s_free_slots
with Unix.Unix_error(Unix.ENOSYS,_,_) -> (* OS X *)
Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
let free = box.hdr.free_slots in
Netsys_sem.sem_post box.sem.s_slot_lock;
free in
box.hdr.capacity - free
let camlbox_smessages = camlbox_bmessages
let camlbox_messages addr =
let cont = Netsys_sem.container ("/" ^ addr) in
with_fd
(camlbox_fd addr)
(fun fd ->
let box = camlbox_open cont addr fd in
camlbox_bmessages box
)
let is_slot_readable_nolock fn_name box k =
let p = box.sem.slot_readable.(k) in
box.mem.{p} <> '\000'
let is_slot_readable fn_name box k =
let used =
Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
let p = box.sem.slot_readable.(k) in
let used = box.mem.{p} <> '\000' in
Netsys_sem.sem_post box.sem.s_slot_lock;
used in
dlogr
(fun () ->
sprintf "is_slot_readable fn_name=%s slot=%d used=%B p=%d p_used=%B"
fn_name k used (box.sem.slot_readable.(k))
(box.mem.{box.sem.slot_readable.(k)} <> '\000')
);
used
let camlbox_get box k =
if k < 0 || k >= box.hdr.capacity then
invalid_arg "camlbox_get";
if not (is_slot_readable "camlbox_get" box k) then raise Empty;
let slot_size = (((box.hdr.msg_size-1) / align) + 1) * align in
let slot_offset = box.hdr.offset + k * slot_size in
dlogr
(fun () ->
sprintf
"camlbox_get slot=%d slot_offset=%d slot_size=%d"
k slot_offset slot_size
);
as_value box.mem (slot_offset + ws_bytes)
let camlbox_get_copy box k =
Netsys_mem.copy_value
[Netsys_mem.Copy_custom_int; Netsys_mem.Copy_bigarray]
(camlbox_get box k)
let swap (x:int array) p q =
let u = x.(p) in
x.(p) <- x.(q);
x.(q) <- u
let camlbox_delete box k =
if k < 0 || k >= box.hdr.capacity then
invalid_arg "camlbox_delete";
if not (is_slot_readable "camlbox_delete" box k) then raise Empty;
box.is_new.(k) <- true;
Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
let l = box.slot_list in
let j = ref 0 in
( try
for i = 0 to box.hdr.split_idx - 1 do
if l.(i) = k then ( j := i; raise Exit )
done;
assert false
with
| Exit -> ()
);
swap l !j (box.hdr.split_idx-1);
box.hdr.split_idx <- box.hdr.split_idx - 1;
box.hdr.free_slots <- box.hdr.free_slots + 1;
let p = box.sem.slot_readable.(k) in
box.mem.{p} <- '\000';
Netsys_sem.sem_post box.sem.s_slot_lock;
Netsys_sem.sem_post box.sem.s_free_slots
let camlbox_wait box =
dlog "camlbox_wait <waiting>";
Netsys_sem.sem_wait box.sem.s_notifications Netsys_sem.SEM_WAIT_BLOCK;
let n = ref 0 in
( try
while true do (* decrement to 0 *)
Netsys_sem.sem_wait
box.sem.s_notifications Netsys_sem.SEM_WAIT_NONBLOCK;
incr n;
done
with
| Unix.Unix_error(Unix.EAGAIN,_,_) -> ()
);
dlogr (fun () -> sprintf "camlbox_wait #notifications=%d" !n);
Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
let l = box.slot_list in
let filled = ref [] in
for i = 0 to box.hdr.split_idx - 1 do
let k = l.(i) in
if is_slot_readable_nolock "camlbox_wait" box k then
filled := k :: !filled
(* NB. Because of the 2-phase sending (first allocate, then fill the
slot), it is possible to find a used slot that is not yet completely
filled with a message
*)
done;
dlogr
(fun () ->
sprintf "camlbox_wait slots=%s"
(String.concat " "
(List.map
(fun i ->
sprintf "%d<%s>"
i (if box.is_new.(i) then "new" else "old")
)
!filled
))
);
let r =
List.filter
(fun i ->
box.is_new.(i)
)
!filled in
Netsys_sem.sem_post box.sem.s_slot_lock;
List.iter
(fun k -> box.is_new.(k) <- false)
r;
r
let camlbox_cancel_wait box =
Netsys_sem.sem_post box.sem.s_notifications
let camlbox_wake = camlbox_cancel_wait
let find_free_slot box p =
let rec loop i =
if i < box.hdr.capacity then (
if box.slot_list.(i) = p then
Some(p,i)
else
loop (i+1)
)
else
None in
loop box.hdr.split_idx
let camlbox_send ?prefer ?slot box value =
(* First phase: find a free slot *)
Netsys_sem.sem_wait box.sem.s_free_slots Netsys_sem.SEM_WAIT_BLOCK;
Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
assert(box.hdr.split_idx < box.hdr.capacity);
let i_opt =
match prefer with
| None -> None
| Some p -> find_free_slot box p in
let k =
match i_opt with
| None ->
box.slot_list.(box.hdr.split_idx)
| Some(p,i) ->
let save = box.slot_list.(box.hdr.split_idx) in
box.slot_list.(box.hdr.split_idx) <- p;
box.slot_list.(i) <- save;
p in
box.hdr.split_idx <- box.hdr.split_idx + 1;
( match slot with
| None -> ()
| Some s -> s := k
);
box.hdr.free_slots <- box.hdr.free_slots - 1;
Netsys_sem.sem_post box.sem.s_slot_lock;
(* Phase 2: Fill the slot. The slot_lock is released *)
let slot_size = (((box.hdr.msg_size-1) / align) + 1) * align in
let slot_offset = box.hdr.offset + k * slot_size in
let target_custom_ops =
[ fst this_bigarray_custom_ops, iton box.hdr.bigarray_custom_ops;
fst this_int32_custom_ops, iton box.hdr.int32_custom_ops;
fst this_int64_custom_ops, iton box.hdr.int64_custom_ops;
fst this_nativeint_custom_ops, iton box.hdr.nativeint_custom_ops;
] in
let (_, _) =
try
init_value
~targetaddr:(iton box.hdr.address)
~target_custom_ops
box.mem
slot_offset
value
[ Netsys_mem.Copy_bigarray;
Netsys_mem.Copy_custom_int;
Netsys_mem.Copy_atom
]
with
Out_of_space -> raise Message_too_big in
(* Finally make the filled slot visible for the receiver *)
Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
let p = box.sem.slot_readable.(k) in
dlogr
(fun () ->
sprintf "camlbox_send slot=%d p=%d" k p
);
assert(box.mem.{p} = '\000');
box.mem.{p} <- '\001';
Netsys_sem.sem_post box.sem.s_slot_lock;
Netsys_sem.sem_post box.sem.s_notifications