Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: netcamlbox.ml 1755 2012-03-24 20:18:30Z gerd $ *)

(* Format of the shm object:

   - Const: CAPACITY

     Number of message slots

   - Const: MSG_SIZE

     Max size of messages

   - Sem: NOTIFICATIONS

     This semaphore is increased by the sender to wake up the receiver,
     and decreased by the receiver.

   - Sem: FREE_SLOTS

     Number of free slots. Increased by the receiver on [delete].
     Decreased by the sender when waiting for space.

   - For each slot: SLOT_READABLE (one byte per slot).

     The number 1 if the slot is filled and has a readable message,
     or 0 if it is free or should be ignored by the receiver.

   - Array SLOT_LIST and number SPLIT_IDX:

     The array positions 0 .. SPLIT_IDX-1 contain the numbers of the
     filled slots. The array positions SPLIT_IDX .. CAPACITY-1 contain
     the numbers of the free slots.

     Sem SLOT_LOCK: mutex for SLOT_LIST and SPLIT_IDX.


  Sending:
   - sem_wait(FREE_SLOTS)
   - lock SLOT_LOCK
   - find a free slot, and move it to the part of the slot list with the
     filled slots
   - unlock SLOT_LOCK
   - copy the value to the chosen slot
   - SLOT_READABLE := 1
   - sem_post(NOTIFICATIONS)

  Receiving:
  1. Wait for message:
      - sem_wait(NOTIFICATIONS)
      - lock SLOT_LOCK
      - get the part of the slot list with the filled slots
      - unlock SLOT_LOCK
      - for each filled slot:
          - get the value of SLOT_READABLE. If it is 0 skip the slot.
          - if the slot was already reported to the user, skip the slot
          - otherwise include the slot in the result list

  2. Get message:
      - test SLOT_READABLE. If it is 1 read the message

  3. Delete the message:
      - Remove the message from the messages that "wait" already reported
      - SLOT_READABLE := 0
      - lock SLOT_LOCK
      - move the slot to the part of the slot list with the empty slots
      - unlock SLOT_LOCK
      - sem_post(FREE_SLOTS)


   FORMAT OF THE SHARED MEMORY OBJECT:

   byte 0: <header>
   byte <sem_offset>: <array of semaphores>
   byte <sl_offset>:  <slot list>
   byte <offset>:     first slot. Slots come one after the other, and
                      are aligned at 128 byte boundaries

   <header>: the ocaml representation of type [header]
   <array of semaphores>: the semaphores one after the other
   <slot list>: the ocaml representation of type [int array]

 *)

module Debug = struct
  let enable = ref false
end

let dlog = Netlog.Debug.mk_dlog "Netcamlbox" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Netcamlbox" Debug.enable

let () =
  Netlog.Debug.register_module "Netcamlbox" Debug.enable



open Netsys_mem
open Printf

type camlbox_address = string

(* The header must not contain pointers, so it is relocatable *)
type header =
    { mutable address : int;    (* address lsr 1 *)
      mutable sem_offset : int; (* offset to semaphore area *)
      mutable sl_offset : int;  (* offset to slot_list *)
      mutable offset : int;     (* offset to first slot *)
      capacity : int;
      msg_size : int;
      mutable free_slots : int;    
      mutable split_idx : int;
      (* function pointers for custom operations: the header contains the
	 pointers the receiver uses.
       *)
      bigarray_custom_ops : int;
      int32_custom_ops : int;
      int64_custom_ops : int;
      nativeint_custom_ops : int;
    }

type semaphores =
    { s_notifications : Netsys_sem.anon_semaphore;
      s_free_slots : Netsys_sem.anon_semaphore;
      s_slot_lock : Netsys_sem.anon_semaphore ;
      slot_readable : int array;   (* pointer to the byte *)
    }

type 'a camlbox =
    { mem : memory;
      hdr : header;            (* pointing into mem *)
      slot_list : int array;   (* pointing into mem *)
      sem : semaphores;        (* pointing into mem *)
      is_new : bool array;     (* local *)
      addr : string;
      cont : Netsys_sem.container; (* local *)
    }

type 'a camlbox_sender = 'a camlbox

exception Empty
exception Message_too_big

let align = 128
  (* header and slots are aligned by this *)

let ws_bytes =   (* word size in bytes *)
    Sys.word_size / 8

let this_bigarray_custom_ops =
  Netsys_mem.get_custom_ops
    (Bigarray.Array1.create Bigarray.char Bigarray.c_layout 1)

let this_int32_custom_ops =
  Netsys_mem.get_custom_ops 0l

let this_int64_custom_ops =
  Netsys_mem.get_custom_ops 0L

let this_nativeint_custom_ops =
  Netsys_mem.get_custom_ops 0n

let ntoi n = (* for addresses *)
  Nativeint.to_int(Nativeint.shift_right n 1)

let iton i = (* for addresses *)
  Nativeint.shift_left (Nativeint.of_int i) 1


let create_header capacity msg_size =
  let dummy = Bigarray.Array1.create Bigarray.char Bigarray.c_layout  0 in
  let hdr =
    { address = 0;
      sem_offset = 0;
      sl_offset = 0;
      offset = 0;
      capacity = capacity;
      msg_size = msg_size;
      free_slots = capacity;
      split_idx = 0;
      bigarray_custom_ops = ntoi (snd this_bigarray_custom_ops);
      int32_custom_ops = ntoi (snd this_int32_custom_ops);
      int64_custom_ops = ntoi (snd this_int64_custom_ops);
      nativeint_custom_ops = ntoi (snd this_nativeint_custom_ops);
    } in
  let _, hdr_bytelen = init_value dummy 0 hdr [ Copy_simulate ] in
  hdr.sem_offset <- hdr_bytelen;
  let sem_bytelen = 3 * Netsys_sem.sem_size + capacity in
  let cap8 = sem_bytelen mod 8 in
  let sem_padding = if cap8 = 0 then 0 else 8 - cap8 in
  let slot_list = Array.init capacity (fun k -> k) in
  let _, sl_bytelen = init_value dummy 0 slot_list [ Copy_simulate ] in
  hdr.sl_offset <- hdr_bytelen + sem_bytelen + sem_padding;
  let bytelen = hdr_bytelen + sem_bytelen + sem_padding + sl_bytelen in
  hdr.offset <- (((bytelen-1) / align) + 1) * align;
  dlogr
    (fun () ->
       sprintf "create_header cap=%d msgsize=%d sem_offs=%d sl_offs=%d offs=%d"
         capacity msg_size hdr.sem_offset hdr.sl_offset hdr.offset
    );
  (hdr, slot_list)

let init_semaphores hdr mem cont =
  let offset = hdr.sem_offset in
  let init_sem k v =
    ignore(
      Netsys_sem.sem_init cont mem (offset+k*Netsys_sem.sem_size) true v) in
  init_sem 0 0;            (* notifications *)
  init_sem 1 hdr.capacity; (* free_slots *)
  init_sem 2 1;            (* slot_lock *)
  let b = 3 in
  let b_bytes = offset+b*Netsys_sem.sem_size in
  for k = 0 to hdr.capacity-1 do
    mem.{ b_bytes + k } <- '\000'
  done

let destroy_semaphores hdr mem cont =
  (* Must be called before shm is unmapped (only in the receiver) *)
  let offset = hdr.sem_offset in
  for k = 0 to 2 do
    let s = Netsys_sem.as_sem cont mem (offset+k*Netsys_sem.sem_size) in
    Netsys_sem.sem_destroy cont s
  done

let free_mem cont mem =
  let hdr =
    (as_value mem ws_bytes : header) in
  destroy_semaphores hdr mem cont

let get_semaphores hdr mem cont =
  let offset = hdr.sem_offset in
  let get_sem k =
    Netsys_sem.as_sem cont mem (offset+k*Netsys_sem.sem_size) in
  let semrec =
    { s_notifications = get_sem 0;          (* notifications *)
      s_free_slots = get_sem 1;             (* free_slots *)
      s_slot_lock = get_sem 2;              (* slot_lock *)
      slot_readable =
        Array.init hdr.capacity
          (fun k -> offset + 3*Netsys_sem.sem_size + k)
    } in
  dlog "get_semaphores";
  semrec


let mk_camlbox fn_name addr capacity msg_size cont f = 
  if not (Netsys_sem.have_anon_semaphores()) then
    invalid_arg (sprintf "%s (no anonymous semaphores)" fn_name);
  if capacity < 1 || capacity > Sys.max_array_length || msg_size < 1 then
    invalid_arg (sprintf "%s (bad params)" fn_name);
  if capacity > Netsys_sem.sem_value_max then
    invalid_arg (sprintf "%s (capacity exceeds sem_value_max)" fn_name);
  let slot_size = (((msg_size-1) / align) + 1) * align in
  if slot_size < msg_size then (* overflow *)
    invalid_arg (sprintf "%s (too large)" fn_name);
  let (hdr0, slot_list0) = create_header capacity msg_size in
  let shm_size = hdr0.offset + capacity * slot_size in
  if shm_size < hdr0.offset then (* overflow *)
    invalid_arg (sprintf "%s (too large)" fn_name);
  if (shm_size - hdr0.offset) / slot_size <> capacity then (* overflow *)
    invalid_arg (sprintf "%s (too large)" fn_name);
  let mem = f shm_size in
  value_area mem;
  hdr0.address <- ntoi (memory_address mem);
  let hdr_voffs, _ = init_value mem 0 hdr0 [] in
  let hdr = (as_value mem hdr_voffs : header) in
  init_semaphores hdr mem cont;
  let sl_voffs, _ = init_value mem hdr.sl_offset slot_list0 [] in
  let slot_list = (as_value mem sl_voffs : int array) in
  Gc.finalise (free_mem cont) mem;
  let sem = get_semaphores hdr mem cont in
  dlogr
    (fun () ->
       sprintf 
         "mk_camlbox fn_name=%s cap=%d msgsize=%d slot_size=%d shm_size=%d"
         fn_name capacity msg_size slot_size shm_size
    );
  { mem = mem;
    hdr = hdr;
    slot_list = slot_list;
    sem = sem;
    is_new = Array.make capacity true;
    addr = addr;
    cont = cont
  }

let format_camlbox addr fd capacity msg_size =
  if not (Netsys_posix.have_posix_shm()) then
    invalid_arg "format_camlbox (no POSIX shm)";
  if String.contains addr '/' then
    invalid_arg "format_camlbox (bad name)";

  let cont = Netsys_sem.container ("/" ^ addr) in
  mk_camlbox "format_camlbox" addr capacity msg_size cont
    (fun shm_size ->
       Unix.ftruncate fd shm_size;
       Netsys_mem.memory_map_file fd true shm_size
    )

let create_camlbox addr capacity msg_size =
  if not (Netsys_posix.have_posix_shm()) then
    invalid_arg "create_camlbox (no POSIX shm)";
  if String.contains addr '/' then
    invalid_arg "create_camlbox (bad name)";

  let cont = Netsys_sem.create_container ("/" ^ addr) in
  let box =
    mk_camlbox "create_camlbox" addr capacity msg_size cont
      (fun shm_size ->
         let shm =
	   Netsys_posix.shm_open
	     ("/" ^ addr)
	     [ Netsys_posix.SHM_O_RDWR;
	       Netsys_posix.SHM_O_CREAT;
	       Netsys_posix.SHM_O_EXCL
	     ]
	     0o600 in
         let mem = 
	   try
	     Unix.ftruncate shm shm_size;
	     let mem = Netsys_mem.memory_map_file shm true shm_size in
	     Unix.close shm;
	     mem
	   with
	     | error ->
	         Unix.close shm;
	         Netsys_sem.drop cont;
	         raise error in
         mem
      ) in
  dlog "create_camlbox returning";
  box


let unlink_camlbox addr =
  Netsys_posix.shm_unlink ("/" ^ addr);
  Netsys_sem.unlink ("/" ^ addr)


let camlbox_fd addr =
  Netsys_posix.shm_open
    ("/" ^ addr)
    [ Netsys_posix.SHM_O_RDWR ]
    0o600

let with_fd fd f =
  try
    let r = f fd in
    Unix.close fd;
    r
  with
    | error ->
	Unix.close fd;
	raise error

let dummy_header () =
  { address = 0;
    sem_offset = 0;
    sl_offset = 0;
    offset = 0;
    capacity = 0;
    msg_size = 0;
    split_idx = 0;
    free_slots = 0;
    bigarray_custom_ops = 0;
    int32_custom_ops = 0;
    int64_custom_ops = 0;
    nativeint_custom_ops = 0;
  }


let header_size = lazy(
  let dummy = Bigarray.Array1.create Bigarray.char Bigarray.c_layout  0 in
  let hdr = dummy_header () in
  let voffset, bytelen =
    init_value
      dummy 0 hdr 
      [ Copy_simulate ] in
  bytelen
)


let camlbox_open cont addr fd =
  (* does not initialize is_new! *)
  let hdr_size = Lazy.force header_size in
  let mini_mem = Netsys_mem.memory_map_file fd true hdr_size in
  let hdr0 = (as_value mini_mem ws_bytes : header) in
  let offset = hdr0.offset in
  let slot_size = (((hdr0.msg_size-1) / align) + 1) * align in
  let shm_size = offset + hdr0.capacity * slot_size in
  let mem = Netsys_mem.memory_map_file fd true shm_size in
  let hdr = (as_value mem ws_bytes : header) in
  let sl = (as_value mem (hdr.sl_offset+ws_bytes) : int array) in
  dlogr
    (fun () ->
       sprintf
         "camlbox_open hdr_size=%d offset=%d slot_size=%d shm_size=%d"
         hdr_size offset slot_size shm_size
    );
  { mem = mem;
    hdr = hdr;
    slot_list = sl;
    sem = get_semaphores hdr mem cont;
    is_new = [| |];
    addr = addr;
    cont = cont;
  }

let camlbox_addr box =
  box.addr

let camlbox_saddr box =
  box.addr

let camlbox_sender addr =
  let cont = Netsys_sem.container ("/" ^ addr) in
  with_fd (camlbox_fd addr) (camlbox_open cont addr)

let camlbox_sender_of_fd addr =
  let cont = Netsys_sem.container ("/" ^ addr) in
  camlbox_open cont addr

let camlbox_bcapacity box =
  box.hdr.capacity

let camlbox_scapacity = camlbox_bcapacity

let camlbox_capacity addr =
  let cont = Netsys_sem.container ("/" ^ addr) in
  with_fd
    (camlbox_fd addr)
    (fun fd ->
       let box = camlbox_open cont addr fd in
       camlbox_bcapacity box
    )

let camlbox_bmsg_size box =
  box.hdr.msg_size

let camlbox_smsg_size = camlbox_bmsg_size

let camlbox_msg_size addr =
  let cont = Netsys_sem.container ("/" ^ addr) in
  with_fd
    (camlbox_fd addr)
    (fun fd ->
       let box = camlbox_open cont addr fd in
       camlbox_bmsg_size box
    )

let camlbox_bmessages box =
  let free =
    try 
      Netsys_sem.sem_getvalue box.sem.s_free_slots
    with Unix.Unix_error(Unix.ENOSYS,_,_) -> (* OS X *)
      Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
      let free = box.hdr.free_slots in
      Netsys_sem.sem_post box.sem.s_slot_lock;
      free in
  box.hdr.capacity - free

let camlbox_smessages = camlbox_bmessages

let camlbox_messages addr =
  let cont = Netsys_sem.container ("/" ^ addr) in
  with_fd
    (camlbox_fd addr)
    (fun fd ->
       let box = camlbox_open cont addr fd in
       camlbox_bmessages box
    )


let is_slot_readable_nolock fn_name box k =
  let p = box.sem.slot_readable.(k) in
  box.mem.{p} <> '\000'

let is_slot_readable fn_name box k =
  let used =
    Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
    let p = box.sem.slot_readable.(k) in
    let used = box.mem.{p} <> '\000' in
    Netsys_sem.sem_post box.sem.s_slot_lock;
    used in
  dlogr
    (fun () ->
       sprintf "is_slot_readable fn_name=%s slot=%d used=%B p=%d p_used=%B" 
         fn_name k used (box.sem.slot_readable.(k)) 
         (box.mem.{box.sem.slot_readable.(k)} <> '\000')
    );
  used


let camlbox_get box k =
  if k < 0 || k >= box.hdr.capacity then
    invalid_arg "camlbox_get";
  if not (is_slot_readable "camlbox_get" box k) then raise Empty;
  let slot_size = (((box.hdr.msg_size-1) / align) + 1) * align in
  let slot_offset = box.hdr.offset + k * slot_size in
  dlogr
    (fun () ->
       sprintf
         "camlbox_get slot=%d slot_offset=%d slot_size=%d"
         k slot_offset slot_size
    );
  as_value box.mem (slot_offset + ws_bytes)

let camlbox_get_copy box k =
  Netsys_mem.copy_value
    [Netsys_mem.Copy_custom_int; Netsys_mem.Copy_bigarray]
    (camlbox_get box k)

let swap (x:int array) p q =
  let u = x.(p) in
  x.(p) <- x.(q);
  x.(q) <- u

let camlbox_delete box k =
  if k < 0 || k >= box.hdr.capacity then
    invalid_arg "camlbox_delete";
  if not (is_slot_readable "camlbox_delete" box k) then raise Empty;
  box.is_new.(k) <- true;
  Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
  let l = box.slot_list in
  let j = ref 0 in
  ( try
      for i = 0 to box.hdr.split_idx - 1 do
	if l.(i) = k then ( j := i; raise Exit )
      done;
      assert false
    with
      | Exit -> ()
  );
  swap l !j (box.hdr.split_idx-1);
  box.hdr.split_idx <- box.hdr.split_idx - 1;
  box.hdr.free_slots <- box.hdr.free_slots + 1;
  let p = box.sem.slot_readable.(k) in
  box.mem.{p} <- '\000';
  Netsys_sem.sem_post box.sem.s_slot_lock;
  Netsys_sem.sem_post box.sem.s_free_slots
  
let camlbox_wait box =
  dlog "camlbox_wait <waiting>";
  Netsys_sem.sem_wait box.sem.s_notifications Netsys_sem.SEM_WAIT_BLOCK;
  let n = ref 0 in
  ( try
      while true do   (* decrement to 0 *)
	Netsys_sem.sem_wait 
	  box.sem.s_notifications Netsys_sem.SEM_WAIT_NONBLOCK;
        incr n;
      done
    with
      | Unix.Unix_error(Unix.EAGAIN,_,_) -> ()
  );
  dlogr (fun () -> sprintf "camlbox_wait #notifications=%d" !n);
  Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
  let l = box.slot_list in
  let filled = ref [] in
  for i = 0 to box.hdr.split_idx - 1 do
    let k = l.(i) in
    if is_slot_readable_nolock "camlbox_wait" box k then
      filled := k :: !filled
    (* NB. Because of the 2-phase sending (first allocate, then fill the
       slot), it is possible to find a used slot that is not yet completely
       filled with a message
     *)
  done;
  dlogr
    (fun () ->
       sprintf "camlbox_wait slots=%s"
         (String.concat " "
            (List.map 
               (fun i -> 
                  sprintf "%d<%s>" 
                    i (if box.is_new.(i) then "new" else "old")
               )
               !filled
            ))
    );
  let r =
    List.filter
      (fun i ->
	 box.is_new.(i)
      )
      !filled in
  Netsys_sem.sem_post box.sem.s_slot_lock;
  List.iter
    (fun k -> box.is_new.(k) <- false)
    r;
  r

let camlbox_cancel_wait box =
  Netsys_sem.sem_post box.sem.s_notifications

let camlbox_wake = camlbox_cancel_wait

let find_free_slot box p =
  let rec loop i =
    if i < box.hdr.capacity then (
      if box.slot_list.(i) = p then
	Some(p,i)
      else
	loop (i+1)
    )
    else
      None in
  loop box.hdr.split_idx


let camlbox_send ?prefer ?slot box value =
  (* First phase: find a free slot *)
  Netsys_sem.sem_wait box.sem.s_free_slots Netsys_sem.SEM_WAIT_BLOCK;
  Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
  assert(box.hdr.split_idx < box.hdr.capacity);
  let i_opt =
    match prefer with
      | None -> None
      | Some p -> find_free_slot box p in
  let k = 
    match i_opt with
      | None ->
	  box.slot_list.(box.hdr.split_idx)
      | Some(p,i) ->
	  let save = box.slot_list.(box.hdr.split_idx) in
	  box.slot_list.(box.hdr.split_idx) <- p;
	  box.slot_list.(i) <- save;
	  p in
  box.hdr.split_idx <- box.hdr.split_idx + 1;
  ( match slot with
      | None -> ()
      | Some s -> s := k
  );
  box.hdr.free_slots <- box.hdr.free_slots - 1;
  Netsys_sem.sem_post box.sem.s_slot_lock;
  (* Phase 2: Fill the slot. The slot_lock is released *)
  let slot_size = (((box.hdr.msg_size-1) / align) + 1) * align in
  let slot_offset = box.hdr.offset + k * slot_size in
  let target_custom_ops =
    [ fst this_bigarray_custom_ops, iton box.hdr.bigarray_custom_ops;
      fst this_int32_custom_ops, iton box.hdr.int32_custom_ops;
      fst this_int64_custom_ops, iton box.hdr.int64_custom_ops;
      fst this_nativeint_custom_ops, iton box.hdr.nativeint_custom_ops;
    ] in
  let (_, _) =
    try
      init_value 
	~targetaddr:(iton box.hdr.address)
	~target_custom_ops
	box.mem 
	slot_offset
	value
	[ Netsys_mem.Copy_bigarray;
	  Netsys_mem.Copy_custom_int;
	  Netsys_mem.Copy_atom
	] 
    with
	Out_of_space -> raise Message_too_big in
  (* Finally make the filled slot visible for the receiver *)
  Netsys_sem.sem_wait box.sem.s_slot_lock Netsys_sem.SEM_WAIT_BLOCK;
  let p = box.sem.slot_readable.(k) in
  dlogr
    (fun () ->
       sprintf "camlbox_send slot=%d p=%d" k p
    );
  assert(box.mem.{p} = '\000');
  box.mem.{p} <- '\001';
  Netsys_sem.sem_post box.sem.s_slot_lock;
  Netsys_sem.sem_post box.sem.s_notifications

    

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml