(* $Id: netshm.ml 1742 2012-02-29 17:39:17Z gerd $ *)
(* FILE FORMAT:
*
* Page 0 is called the master page, and is usally not modified:
*
* 0/0: File magic 0
* 0/1: File magic 1
* 0/2: File magic 2
* 0/3: File magic 3
* 0/4: Base_shm version number
* 0/5: Pointer to descriptor page
*
* The descriptor page [d] contains global data that must be frequently
* updated:
*
* d/0: Descriptor magic
* d/1: Total number of pages
* d/2: Size of the hash table S (used plus free entries)
* d/3: Number of used entries in the hash table
* d/4: Total number of bindings
* d/5: Pointer to the first free content page (head of free list)
* d/6: Number of free content pages
* d/7: unused
* d/8: Pointer to first extent of hash table
* d/9: Size of first extent of hash table in pages
* d/10: Pointer to first extent of hash table or 0 if unused
* d/11: Size of first extent of hash table in pages or 0 if unused
* ...
* d/38: Pointer to 16th extent of hash table or 0 if unused
* d/39: Size of first extent of hash table in pages or 0 if unused
*
* The hash table is the array concatenation of 1 to 16 extents. Extents
* are added when the hash table is resized. Extents are contiguous
* sequences of pages.
*
* The hash table array consists of S cells. Every cell has four words:
*
* 0: Key
* 1: Pointer to content page.
* 0 means: Unused cell
* -1 means: Used cell but there is currenty no binding
* 2: Spare word used for resizing
* 3: Spare word used for resizing
*
* Every hash table page contains P/4-1 cells (when P is the number of
* words per page). The four first words of the page have a special meaning:
*
* 0: Hash table page magic
* 1: Sequential number of the page in the table
* 2: unused
* 3: unused
*
* Used content pages have this layout:
*
* c/0: Content page magic
* c/1: Key
* c/2: Pointer to next content page for this entry (or 0)
* c/3: Pointer to first content page of next binding for this key, or 0
* c/4: Position where the value begins (which word)
* c/5: Length of the value in words
* c/6 - P-1: Content words
*
* Follow-up content pages:
*
* c'/0: Follow-up content page magic
* c'/1: Key
* c'/2: Pointer to next content page for this entry (or 0)
* c'/3: Sequence number of this content page
* c'/4 - P-1: Content words
*
* The position in c/4 is relative to the concatenated content
* areas, i.e. position 0 is c/6 etc.
*
* Free content pages:
*
* f/0: Free page magic
* f/1: Pointer to next free page or 0
*
* LOCKING
*
* Locking is page-wise.
*
* A Lock of the descriptor page is also interpreted as a lock for the
* whole hash table.
*
* A lock of the first content page is also interpreted as a lock of
* the remaining content pages of the entry.
*
* - Reading a single entry:
*
* 1. Read-lock descriptor page
* 2. Look up entry
* 3. Read-lock content page
* 4, Unlock descriptor page
* 5. Read content
* 6. Unlock content page
*
* - Reading all bindings:
*
* 1. Read-lock descriptor page
* 2. Look up entry
* 3. Read-lock 1st content page
* 4, Unlock descriptor page
* 5. Read 1st content
* 6. Lock 2nd content page
* 7. Unlock 1st content page
* 8. Read 2nd content
* 9. Lock 3rd content page
* 10. Unlock 2nd content page
* ...
* N. Unlock n-th content page
*
* - Adding a single entry:
*
* 1. Write-lock descriptor page
* 2. If necessary, do a resize pass (see below)
* 4. Find entry in hash table and fill it
* 4. Find free content page(s) and fill it/them
* 5. Unlock descriptor page
*
* - Removing a single entry:
*
* 1. Write-lock descriptor page
* 2. Modify hash table
* 3. Write-lock old content page
* 4. Make the old content page a free page
* 5. Unlock descriptor page
* 6. Unlock content page
*)
open Printf
module Int = struct
type t = int
let compare = (Pervasives.compare : int -> int -> int)
end
module IntMap = Map.Make(Int)
(**********************************************************************)
(* shm_descr *)
(**********************************************************************)
type shm_descr =
[ `POSIX of string * Unix.file_descr * bool ref
| `File of string * Unix.file_descr * bool ref
]
(* (name, fd, is_open) *)
type shm_type = [ `POSIX | `File ]
let supported_types =
( if Netsys.have_posix_shm() then [ `POSIX ] else [] )
@ [ `File ]
type shm_name = [ `POSIX of string | `File of string ]
let shm_type_of_name =
function
| `POSIX _ -> `POSIX
| `File _ -> `File
let open_shm name flags perm =
match name with
| `File n ->
let fd = Unix.openfile n flags perm in
`File(n, fd, ref true)
| `POSIX n ->
let flags' =
List.flatten
(List.map
(function
| Unix.O_RDONLY -> [ Netsys.SHM_O_RDONLY ]
| Unix.O_RDWR -> [ Netsys.SHM_O_RDWR ]
| Unix.O_CREAT -> [ Netsys.SHM_O_CREAT ]
| Unix.O_EXCL -> [ Netsys.SHM_O_EXCL ]
| Unix.O_TRUNC -> [ Netsys.SHM_O_TRUNC ]
| Unix.O_WRONLY ->
invalid_arg "Netsys.open_shm: O_WRONLY not supported"
| _ ->
[]
)
flags) in
let fd = Netsys.shm_open n flags' perm in
`POSIX(n, fd, ref true)
let rndsrc = Random.State.make_self_init()
let chars =
[| '0'; '1'; '2'; '3'; '4'; '5'; '6'; '7';
'8'; '9'; 'A'; 'B'; 'C'; 'D'; 'E'; 'F';
'G'; 'H'; 'I'; 'J'; 'K'; 'L'; 'M'; 'N';
'O'; 'P'; 'Q'; 'R'; 'S'; 'T'; 'U'; 'V';
'W'; 'X'; 'Y'; 'Z'
|]
(* We do not include lower case chars because there are case-insensitive
* file systems
*)
let create_unique_shm name perm =
let inst_string n =
(* Replace 'X' with random chars *)
let n' = String.copy n in
for k = 0 to String.length n - 1 do
if n'.[ k ] = 'X' then
n'.[ k ] <- chars.( Random.State.int rndsrc (Array.length chars) )
done;
n'
in
let rec create iter =
if iter = 1000 then
failwith "Netshm.create_unique_shm: Unable to generate name for shared memory object";
let name' =
match name with
| `File n ->
`File(inst_string n)
| `POSIX n ->
`POSIX(inst_string n) in
try
open_shm name' [ Unix.O_RDWR; Unix.O_CREAT; Unix.O_EXCL ] perm
with
| Unix.Unix_error(Unix.EEXIST,_,_) ->
create (iter+1)
in
create 0
let name_of_shm =
function
| `File(n,_,_) -> `File n
| `POSIX(n,_,_) -> `POSIX n
let close_shm =
function
| `File(_,fd,is_open) -> if !is_open then Unix.close fd; is_open := false
| `POSIX(_,fd,is_open) -> if !is_open then Unix.close fd; is_open := false
let unlink_shm =
function
| `File n -> Unix.unlink n
| `POSIX n -> Netsys.shm_unlink n
let chmod_shm_fd fd is_open perm =
if not !is_open then
failwith "Netshm.chmod_shm: descriptor is not open";
( try Unix.fchmod fd perm
with Unix.Unix_error(Unix.EINVAL,_,_) -> ()
(* OSX seems to throw EINVAL here *)
)
let chmod_shm =
function
| `File(_,fd,is_open) -> chmod_shm_fd fd is_open
| `POSIX(_,fd,is_open) -> chmod_shm_fd fd is_open
let chown_shm_fd fd is_open uid gid =
if not !is_open then
failwith "Netshm.chown_shm: descriptor is not open";
Unix.fchown fd uid gid
let chown_shm =
function
| `File(_,fd,is_open) -> chown_shm_fd fd is_open
| `POSIX(_,fd,is_open) -> chown_shm_fd fd is_open
let size_of_shm_fd fd is_open =
if not !is_open then
failwith "Netshm.size_of_shm: descriptor is not open";
let n64 =
(Unix.LargeFile.fstat fd).Unix.LargeFile.st_size in
if n64 > Int64.of_int max_int then
failwith "Netshm.size_of_shm: Shared memory object too large";
Int64.to_int n64
let size_of_shm =
function
| `File(_,fd,is_open) -> size_of_shm_fd fd is_open
| `POSIX(_,fd,is_open) -> size_of_shm_fd fd is_open
let resize_shm_fd fd is_open n =
if not !is_open then
failwith "Netshm.resize_shm: descriptor is not open";
Unix.LargeFile.ftruncate fd (Int64.of_int n)
let resize_shm =
function
| `File(_,fd,is_open) -> resize_shm_fd fd is_open
| `POSIX(_,fd,is_open) -> resize_shm_fd fd is_open
let map_int32matrix_fd fd is_open rows cols =
Bigarray.Array2.map_file
fd Bigarray.int32 Bigarray.c_layout true rows cols
let map_int32matrix =
function
| `File(_,fd,is_open) -> map_int32matrix_fd fd is_open
| `POSIX(_,fd,is_open) -> map_int32matrix_fd fd is_open
let dummy_int32matrix() =
Bigarray.Array2.create Bigarray.int32 Bigarray.c_layout 0 0
(**********************************************************************)
(* locking *)
(**********************************************************************)
type locking_method =
[ `No_locking | `Record_locking ]
let best_locking_method _ = `Record_locking
type lock_type = Read | Write
exception Deadlock
(* record_locking_descr represents the locking requirements. rld_table
* maps pages to a list of locking requirements for the pages. The
* strongest requirement is passed through to the lockf interface.
*)
type record_locking_descr =
{ rld_sd : shm_descr;
mutable rld_table : (lock_type list ref) IntMap.t;
rld_pagesize : int;
}
let rec rl_do_lock rld page lt =
let fd, is_open =
match rld.rld_sd with
| `POSIX(_,fd,is_open) -> fd, is_open
| `File(_,fd,is_open) -> fd, is_open in
if not !is_open then
failwith "Netshm: Shared memory object is not open";
try
ignore(Unix.lseek fd (page * rld.rld_pagesize) Unix.SEEK_SET);
Unix.lockf fd (if lt = Read then Unix.F_RLOCK else Unix.F_LOCK) 1;
with
| Unix.Unix_error(Unix.EDEADLK,_,_) ->
raise Deadlock
| Unix.Unix_error(Unix.EINTR,_,_) ->
rl_do_lock rld page lt
let rl_lock rld page lt =
try
let lt_list = IntMap.find page rld.rld_table in
( match lt with
| Read -> lt_list := Read :: !lt_list
| Write ->
if not(List.mem Write !lt_list) then
rl_do_lock rld page Write;
lt_list := Write :: !lt_list
)
with
| Not_found ->
rl_do_lock rld page lt;
rld.rld_table <- IntMap.add page (ref [ lt ]) rld.rld_table
let rl_unlock rld page =
(* releases the last (most recent) lock requirement for [page] *)
let fd, is_open =
match rld.rld_sd with
| `POSIX(_,fd,is_open) -> fd, is_open
| `File(_,fd,is_open) -> fd, is_open in
if not !is_open then
failwith "Netshm: Shared memory object is not open";
try
let lt_list = IntMap.find page rld.rld_table in
( match !lt_list with
| [ _ ] ->
(* release lock entirely *)
ignore(Unix.lseek fd (page * rld.rld_pagesize) Unix.SEEK_SET);
Unix.lockf fd Unix.F_ULOCK 1;
rld.rld_table <- IntMap.remove page rld.rld_table;
| Read :: lt_list' ->
lt_list := lt_list'
| Write :: lt_list' ->
if not(List.mem Write lt_list') then
rl_do_lock rld page Read; (* downgrade from Write to Read *)
lt_list := lt_list'
| [] ->
assert false
)
with
| Not_found ->
()
type locking_descr =
[ `No_locking
| `Record_locking of record_locking_descr
]
let create_locking_descr sd pagesize lm =
match lm with
| `No_locking ->
`No_locking
| `Record_locking ->
`Record_locking
{ rld_sd = sd;
rld_table = IntMap.empty;
rld_pagesize = pagesize;
}
(* A groupable locking descriptor has the property that one can wrap
* several operations as a "group". The locks are not released while
* the operations of the group are executed. This has the effect that
* the group is executed atomically.
*)
type groupable_locking_descr =
{ gld : locking_descr;
mutable gld_groups : int;
mutable gld_deferred_unlocks : int list
}
let gld_lock gld p ltype =
( match gld.gld with
| `Record_locking rld ->
rl_lock rld p ltype;
| `No_locking ->
()
);
if gld.gld_groups > 0 then
gld.gld_deferred_unlocks <- p :: gld.gld_deferred_unlocks
;;
let gld_unlock gld p =
if gld.gld_groups = 0 then (
match gld.gld with
| `Record_locking rld ->
rl_unlock rld p
| `No_locking ->
()
);
;;
let gld_start gld =
gld.gld_groups <- gld.gld_groups + 1
let gld_end gld =
gld.gld_groups <- gld.gld_groups - 1;
if gld.gld_groups = 0 then (
List.iter
(fun p ->
gld_unlock gld p
)
gld.gld_deferred_unlocks;
gld.gld_deferred_unlocks <- []
)
let create_gld sd pagesize lm =
{ gld = create_locking_descr sd pagesize lm;
gld_groups = 0;
gld_deferred_unlocks = []
}
(**********************************************************************)
(* shm_table *)
(**********************************************************************)
type shm_table =
{ sd : shm_descr;
lm : groupable_locking_descr;
mutable mem : (int32, Bigarray.int32_elt, Bigarray.c_layout) Bigarray.Array2.t;
pagesize : int; (* in words! *)
dpage : int; (* descriptor page *)
mutable self_locked : bool;
(* Whether locked against mutations by this process *)
}
type int32_array =
(int32, Bigarray.int32_elt, Bigarray.c_layout) Bigarray.Array1.t
exception Corrupt_file of string
exception Next
exception Break
let file_magic1 = 0xde871209l
let file_magic2 = 0xde881209l
let file_magic3 = 0xde871309l
let file_magic4 = 0xde87120al
let descriptor_magic = 0x6712DE9Fl
let content1_magic = 0x12DE9F67l
let content2_magic = 0xDE9F6712l
let hash_table_magic = 0x9F6712DEl
let free_page_magic = 0xf3eef3eel
let to_int =
match Sys.word_size with
| 32 ->
let max32 = Int32.of_int max_int in
let min32 = Int32.of_int min_int in
(fun n ->
if n > max32 || n < min32 then
raise(Corrupt_file "Integer too large");
Int32.to_int n
)
| 64 ->
Int32.to_int
| _ -> assert false
;;
let of_int =
match Sys.word_size with
| 32 ->
Int32.of_int
| 64 ->
let max32 = Int32.to_int Int32.max_int in
let min32 = Int32.to_int Int32.min_int in
(fun n ->
if n > max32 || n < min32 then
raise(Corrupt_file "Integer too large");
Int32.of_int n
)
| _ -> assert false
;;
let remap t pages =
let mem = map_int32matrix t.sd pages t.pagesize in
t.mem <- mem;
;;
let lock_page t p ltype =
gld_lock t.lm p ltype
;;
let unlock_page t p =
gld_unlock t.lm p
;;
let unlock_protect t l f arg =
try f arg
with
| error -> List.iter (fun p -> unlock_page t p) l; raise error
let self_locked t f arg =
let old = t.self_locked in
t.self_locked <- true;
try
let r = f arg in
t.self_locked <- old;
r
with
| error ->
t.self_locked <- old;
raise error
let group t f arg =
try
gld_start t.lm;
let r = f arg in
gld_end t.lm;
r
with
| error ->
gld_end t.lm;
raise error
let hash_cell t k =
(* Returns the k-th hash cell as pair (page, position), or Not_found *)
let entries_per_page = t.pagesize / 4 - 1 in
let rec find_hash_cell ext k =
if ext < 16 then (
let pos = 2 * ext + 8 in
let ext_page = t.mem.{ t.dpage, pos } in
let ext_size = t.mem.{ t.dpage, pos+1 } in
if ext_page = 0l || ext_size = 0l then raise Not_found;
let ext_length = (to_int ext_size) * entries_per_page in
if k < ext_length then
let ext_offset = k / entries_per_page in
let ext_pos = k mod entries_per_page in
((to_int ext_page) + ext_offset, ext_pos * 4 + 4)
else
find_hash_cell (ext+1) (k - ext_length)
)
else raise Not_found
in
find_hash_cell 0 k
;;
let quad_probing =
[| 0; 1; 4; 9; 16; 25; 36; 49; 64; 81 |]
let rec hash_lookup t key seq spare_flag =
(* Finds [key] in the hash table and returns the entry as triple
* (page, position, found). [found] indicates whether the [key]
* was actually found. If not, the entry is the unused entry that
* will store [key].
*
* seq is used to find the hash entry and must be initially 0l.
*
* spare_flag: If set, the spare entries are returned instead of the
* normal ones.
*)
let remainder x y =
let r = Int32.rem x y in
if r >= 0l then
r
else
Int32.add y r in
let total_pages = t.mem.{ t.dpage, 1 } in
let ht_size = t.mem.{ t.dpage, 2 } in
let (ht_page, ht_pos) =
try
let offset =
(* The first ten positions in this probing sequence are the same as
* for quadratic probing. Then we simply continue with linear probing.
* This way, we get most of the good properties of quadratic hashing,
* but there is no size restriction on the table.
*)
if seq <= 9l then
Int32.of_int quad_probing.( Int32.to_int seq )
else
Int32.add 72l seq in
hash_cell t (to_int (remainder (Int32.add key offset) ht_size))
with
| Not_found ->
raise(Corrupt_file "Too few hash table extents found") in
if ht_page <= 0 || (of_int ht_page) >= total_pages then
raise(Corrupt_file "Bad page number found");
if ht_pos < 0 || ht_pos >= t.pagesize then
raise(Corrupt_file "Bad page position found");
let spare_offset =
if spare_flag then 2 else 0 in
let ht_key = t.mem.{ ht_page, ht_pos+spare_offset } in
let ht_ptr = t.mem.{ ht_page, ht_pos+spare_offset+1 } in
if ht_ptr = 0l then
(ht_page, ht_pos, false)
else
if ht_key = key then
(ht_page, ht_pos, true)
else
hash_lookup t key (Int32.succ seq) spare_flag
;;
let alloc_hash_extent t seq n =
let q_ht = (t.pagesize-4) / 4 in (* entries per ht page *)
let total_pages = t.mem.{ t.dpage, 1 } in
let total_pages' = to_int total_pages + n in
remap t total_pages';
t.mem.{ t.dpage, 1 } <- of_int total_pages';
t.mem.{ t.dpage, 2 } <- Int32.add t.mem.{ t.dpage, 2 } (of_int (n * q_ht));
for k = 0 to n-1 do
let p = to_int total_pages + k in
t.mem.{ p, 0 } <- hash_table_magic;
t.mem.{ p, 1 } <- of_int (seq + k);
t.mem.{ p, 2 } <- 0l;
t.mem.{ p, 3 } <- 0l;
for k = 0 to q_ht - 1 do
t.mem.{ p, 0 + 4*k } <- 0l;
t.mem.{ p, 1 + 4*k } <- 0l;
t.mem.{ p, 2 + 4*k } <- 0l;
t.mem.{ p, 3 + 4*k } <- 0l;
done
done;
to_int total_pages
;;
let add_hash_extent t =
(* The new extent is twice as large as the largest extent *)
let rec loop count largest ext =
if ext < 16 then (
let pos = 2 * ext + 8 in
let ext_page = t.mem.{ t.dpage, pos } in
let ext_size = t.mem.{ t.dpage, pos+1 } in
if ext_page = 0l || ext_size = 0l then (
(* found the empty slot *)
let n = 2 * largest in
let pg = alloc_hash_extent t count n in
t.mem.{ t.dpage, pos } <- of_int pg;
t.mem.{ t.dpage, pos+1 } <- of_int n;
)
else (
loop (count + to_int ext_size) (max (to_int ext_size) largest) (ext+1)
)
)
else
failwith "add: cannot add further hash table extent"
in
loop 0 0 0
;;
let iter_hash_table t f =
let q_ht = (t.pagesize-4) / 4 in (* entries per ht page *)
let rec loop ext =
if ext < 16 then (
let pos = 2 * ext + 8 in
let ext_page = t.mem.{ t.dpage, pos } in
let ext_size = t.mem.{ t.dpage, pos+1 } in
if ext_page <> 0l && ext_size <> 0l then (
for k = 0 to to_int ext_size - 1 do
let p = to_int ext_page + k in
for j = 0 to q_ht - 1 do
let q = 4 + j*4 in
f p q
done
done;
loop (ext+1)
)
) in
loop 0
;;
let refill_hash_table t =
let fill_spare () =
iter_hash_table
t
(fun p q ->
let ht_key = t.mem.{ p, q } in
let ht_ptr = t.mem.{ p, q+1 } in
if ht_ptr <> 0l && ht_ptr <> (-1l) then (
let (ht_page, ht_pos, ht_found) =
hash_lookup t ht_key 0l true in
if ht_found then
raise(Corrupt_file "Duplicate key found in hash table");
t.mem.{ ht_page, ht_pos + 2 } <- ht_key;
t.mem.{ ht_page, ht_pos + 3 } <- ht_ptr;
)
) in
let copy_back () =
iter_hash_table
t
(fun p q ->
t.mem.{ p, q } <- t.mem.{ p, q+2 };
t.mem.{ p, q+1 } <- t.mem.{ p, q+3 };
t.mem.{ p, q+2 } <- 0l;
t.mem.{ p, q+3 } <- 0l;
) in
fill_spare ();
copy_back ()
;;
let hash_lookup_for_addition t key =
(* Returns (page, pos, inc).
* inc is true if a new ht cell was allocated
*)
let (ht_page_0, ht_pos_0, ht_found_0) = hash_lookup t key 0l false in
if ht_found_0 then
(ht_page_0, ht_pos_0, false)
else (
(* We have to allocate another cell of the hash table. Check whether
* we have to resize the hash table!
*)
let n_used = to_int t.mem.{ t.dpage, 3 } in
let n_total = to_int t.mem.{ t.dpage, 2 } in
if n_used * 2 > n_total then (
(* more than 1/2 is used, so resize *)
add_hash_extent t;
refill_hash_table t;
(* Note: total_pages has changed! *)
let (ht_page_1, ht_pos_1, ht_found_1) = hash_lookup t key 0l false in
assert(not ht_found_1);
(ht_page_1, ht_pos_1, true)
)
else (
(ht_page_0, ht_pos_0, true)
)
)
;;
(* Iterate over all bindings for a given key *)
type iterator =
{ i_table : shm_table;
i_key : int32;
i_total_pages : int32;
mutable i_binding_pg : int; (* 1st page of current binding *)
mutable i_next_binding_pg : int; (* 1st page of next binding *)
mutable i_current_pg : int; (* current page of current binding *)
mutable i_current : int32_array; (* same, the page itself *)
mutable i_next_pg : int; (* next page of current binding *)
mutable i_seq : int; (* sequence number *)
mutable i_start_val : int; (* start position of value fragment *)
mutable i_len_val : int; (* length of value fragment *)
mutable i_rest_len_val : int; (* rem. length of value after fragment *)
}
(* Note: the iterator functions do not do any locking *)
let start_binding it pg =
(* Go to the start of the binding at page [pg] *)
let current = Bigarray.Array2.slice_left it.i_table.mem pg in
let cp_magic = current.{ 0 } in
let cp_key = current.{ 1 } in
let cp_cpage' = current.{ 2 } in
let cp_cpage_next = current.{ 3 } in
let cp_val_pos = current.{ 4 } in
let cp_val_len = current.{ 5 } in
if cp_magic <> content1_magic then
raise(Corrupt_file "Bad content1 magic");
if cp_key <> it.i_key then
raise(Corrupt_file "Wrong content page");
if cp_cpage' < 0l || cp_cpage' >= it.i_total_pages then
raise(Corrupt_file "Page pointer out of bounds");
if cp_cpage_next < 0l || cp_cpage' >= it.i_total_pages then
raise(Corrupt_file "Page pointer out of bounds");
if cp_val_pos < 0l || cp_val_len < 0l then
raise(Corrupt_file "Negative string pointers");
let ca_start = 6 in (* first word of content area is at pos 6 *)
let ca_len = it.i_table.pagesize - ca_start in
let cp_cpage' = to_int cp_cpage' in
let cp_cpage_next = to_int cp_cpage_next in
let cp_val_pos = to_int cp_val_pos in
let cp_val_len = to_int cp_val_len in
if cp_val_pos >= ca_len then
raise(Corrupt_file
"value does not start in first content page");
let len_val = min ca_len cp_val_len in
it.i_binding_pg <- pg;
it.i_next_binding_pg <- cp_cpage_next;
it.i_current_pg <- pg;
it.i_current <- current;
it.i_next_pg <- cp_cpage';
it.i_seq <- 0;
it.i_start_val <- cp_val_pos + ca_start;
it.i_len_val <- len_val;
it.i_rest_len_val <- cp_val_len - len_val;
;;
let create_iterator t key total_pages pg =
(* Create an iterator for the list of bindings starting at page [pg] *)
let current = Bigarray.Array2.slice_left t.mem pg in
let it =
{ i_table = t;
i_key = key;
i_total_pages = total_pages;
i_binding_pg = 0;
i_next_binding_pg = 0;
i_current_pg = 0;
i_current = current;
i_next_pg = 0;
i_seq = 0;
i_start_val = 0;
i_len_val = 0;
i_rest_len_val = 0;
} in
start_binding it pg;
it
;;
let next_page it =
(* Switches to the next page of the current binding.
* Precondition: it.i_next_pg <> 0
*)
let pg = it.i_next_pg in
assert(pg <> 0);
let current = Bigarray.Array2.slice_left it.i_table.mem pg in
let cp'_magic = current.{ 0 } in
let cp'_key = current.{ 1 } in
let cp'_cpage' = current.{ 2 } in
let cp'_seq = current.{ 3 } in
if cp'_magic <> content2_magic then
raise(Corrupt_file "Bad content2 magic");
if cp'_key <> it.i_key then
raise(Corrupt_file "Wrong content page");
if cp'_cpage' < 0l || cp'_cpage' >= it.i_total_pages then
raise(Corrupt_file "Page pointer out of bounds");
if cp'_seq <> of_int (it.i_seq + 1) then
raise(Corrupt_file "Bad sequence number");
let ca_start = 4 in (* first word of content area is at pos 4 *)
let ca_len = it.i_table.pagesize - ca_start in
let len_val = min ca_len it.i_rest_len_val in
it.i_current_pg <- pg;
it.i_current <- current;
it.i_next_pg <- to_int cp'_cpage';
it.i_seq <- it.i_seq + 1;
it.i_start_val <- ca_start;
it.i_len_val <- len_val;
it.i_rest_len_val <- it.i_rest_len_val - len_val
;;
let next_binding it =
start_binding it it.i_next_binding_pg
;;
(* the read_blocks implementation bases on iterators *)
let rec read_blocks_start t key f total_pages prev_pg pg () =
lock_page t pg Read;
if prev_pg > 0 then unlock_page t prev_pg;
let followup =
unlock_protect t [ pg ]
(fun () ->
let it =
create_iterator t key total_pages pg in
read_blocks_extract it f pg
)
() in
followup ()
and read_blocks_extract it f pg () =
let followup =
unlock_protect it.i_table [ pg ]
(fun () ->
let val_frag =
Bigarray.Array1.sub it.i_current it.i_start_val it.i_len_val in
try
self_locked it.i_table
f (Some val_frag);
if it.i_next_pg <> 0 then (
next_page it;
read_blocks_extract it f pg
)
else (
self_locked it.i_table
f None;
raise Next
)
with
| Next ->
read_blocks_next it f pg
| Break ->
(fun () ->
unlock_page it.i_table pg)
)
() in
followup()
and read_blocks_next it f prev_pg () =
let pg = it.i_next_binding_pg in
if pg <> 0 then (
lock_page it.i_table pg Read;
unlock_page it.i_table prev_pg;
next_binding it;
read_blocks_extract it f pg ()
)
else
unlock_page it.i_table prev_pg
;;
let read_blocks t key f =
lock_page t t.dpage Read;
unlock_protect t [ t.dpage ]
(fun () ->
let total_pages = t.mem.{ t.dpage, 1 } in
if Bigarray.Array2.dim1 t.mem <> (to_int total_pages) then
remap t (to_int total_pages);
let (ht_page, ht_pos, ht_found) = hash_lookup t key 0l false in
if ht_found then (
let ht_ptr = t.mem.{ ht_page, ht_pos+1 } in
if ht_ptr = (-1l) then (
(* key is unbound *)
unlock_page t t.dpage;
()
)
else
if ht_ptr > 0l && ht_ptr < total_pages then (
(* The following call will unlock t.dpage *)
read_blocks_start
t key f total_pages t.dpage (to_int ht_ptr) ()
)
else
raise(Corrupt_file "Bad page number found")
)
else (
(* key is unbound *)
unlock_page t t.dpage;
()
)
)
()
;;
let find_all t key =
let l = ref [] in
let v = ref [] in
let v_size = ref 0 in
read_blocks t key
(fun val_frag_opt ->
match val_frag_opt with
| Some val_frag ->
v := val_frag :: !v;
v_size := !v_size + Bigarray.Array1.dim val_frag
| None ->
let v_total =
Bigarray.Array1.create
Bigarray.int32 Bigarray.c_layout !v_size in
List.iter
(fun val_frag ->
let len = Bigarray.Array1.dim val_frag in
v_size := !v_size - len;
Bigarray.Array1.blit
val_frag
(Bigarray.Array1.sub v_total !v_size len)
)
!v;
assert(!v_size = 0);
l := v_total :: !l;
v := []
);
List.rev !l
;;
exception Done of int32_array
let find t key =
let v = ref [] in
let v_size = ref 0 in
try
read_blocks t key
(fun val_frag_opt ->
match val_frag_opt with
| Some val_frag ->
v := val_frag :: !v;
v_size := !v_size + Bigarray.Array1.dim val_frag
| None ->
(* Important: we must concatenate the fragments while
* the binding is read-locked!
*)
let v_total =
Bigarray.Array1.create
Bigarray.int32 Bigarray.c_layout !v_size in
List.iter
(fun val_frag ->
let len = Bigarray.Array1.dim val_frag in
v_size := !v_size - len;
Bigarray.Array1.blit
val_frag
(Bigarray.Array1.sub v_total !v_size len)
)
!v;
assert(!v_size = 0);
raise (Done v_total)
);
raise Not_found
with
| Done v_total ->
v_total
;;
let mem t key =
try
read_blocks t key
(fun _ -> raise Exit);
false
with
| Exit ->
true
;;
let iter_keys f t =
lock_page t t.dpage Read;
unlock_protect t [ t.dpage ]
(fun () ->
let total_pages = t.mem.{ t.dpage, 1 } in
if Bigarray.Array2.dim1 t.mem <> (to_int total_pages) then
remap t (to_int total_pages);
iter_hash_table
t
(fun p q ->
let ht_key = t.mem.{ p, q } in
let ht_ptr = t.mem.{ p, q+1 } in
if ht_ptr <> 0l && ht_ptr <> (-1l) then
self_locked t
f ht_key
)
)
();
unlock_page t t.dpage
;;
let iter f t =
let v = ref [] in
let v_size = ref 0 in
let reassemble key val_frag_opt =
match val_frag_opt with
| Some val_frag ->
v := val_frag :: !v;
v_size := !v_size + Bigarray.Array1.dim val_frag
| None ->
let v_total =
Bigarray.Array1.create
Bigarray.int32 Bigarray.c_layout !v_size in
List.iter
(fun val_frag ->
let len = Bigarray.Array1.dim val_frag in
v_size := !v_size - len;
Bigarray.Array1.blit
val_frag
(Bigarray.Array1.sub v_total !v_size len)
)
!v;
assert(!v_size = 0);
v := [];
f key v_total
in
lock_page t t.dpage Read;
unlock_protect t [ t.dpage ]
(fun () ->
let total_pages = t.mem.{ t.dpage, 1 } in
if Bigarray.Array2.dim1 t.mem <> (to_int total_pages) then
remap t (to_int total_pages);
iter_hash_table
t
(fun p q ->
let ht_key = t.mem.{ p, q } in
let ht_ptr = t.mem.{ p, q+1 } in
if ht_ptr <> 0l && ht_ptr <> (-1l) then (
read_blocks_start
t ht_key (reassemble ht_key) total_pages 0 (to_int ht_ptr) ()
)
)
)
();
unlock_page t t.dpage
;;
let fold f t x0 =
let acc = ref x0 in
iter
(fun key v ->
acc := f key v !acc
)
t;
!acc
;;
let length t =
lock_page t t.dpage Read;
let n =
unlock_protect t [ t.dpage ]
(fun () ->
to_int t.mem.{ t.dpage, 4 } )
() in
unlock_page t t.dpage;
n
;;
let get_page_from_free_list t =
(* Assumes that there is a page! *)
let p = t.mem.{ t.dpage, 5 } in
if p = 0l then
raise(Corrupt_file "Free list is empty when it should not be empty");
let p = to_int p in
if t.mem.{ p, 0 } <> free_page_magic then
raise(Corrupt_file "Bad free page magic");
let p' = t.mem.{ p, 1} in
t.mem.{ t.dpage, 5 } <- p';
t.mem.{ t.dpage, 6 } <- Int32.pred t.mem.{ t.dpage, 6 };
p
;;
let add t key v =
if t.self_locked then
failwith "Netshm: Cannot modify table locked by caller";
lock_page t t.dpage Write;
unlock_protect t [ t.dpage ]
(fun () ->
let total_pages = t.mem.{ t.dpage, 1 } in
if Bigarray.Array2.dim1 t.mem <> (to_int total_pages) then
remap t (to_int total_pages);
let (ht_page, ht_pos, ht_inc) =
hash_lookup_for_addition t key in
let old_cpage = t.mem.{ ht_page, ht_pos+1 } in
let old_cpage =
if old_cpage = (-1l) then 0l else old_cpage in
(* How many pages do we need? *)
let words = Bigarray.Array1.dim v in
let pages = (words + 2 - 1) / (t.pagesize - 4) + 1 in
(* How many pages do we need to allocate? *)
let pages_in_free_list = to_int t.mem.{ t.dpage, 6 } in
let pages_to_allocate = max 0 (pages - pages_in_free_list) in
(* Allocate pages and enter into free list: *)
if pages_to_allocate > 0 then (
let total_pages = to_int t.mem.{ t.dpage, 1 } in
remap t (total_pages + pages_to_allocate);
t.mem.{ t.dpage, 1 } <- of_int (total_pages + pages_to_allocate);
for k = 0 to pages_to_allocate - 1 do
let p = total_pages + k in
t.mem.{ p, 0 } <- free_page_magic;
t.mem.{ p, 1 } <- t.mem.{ t.dpage, 5 };
t.mem.{ t.dpage, 5 } <- of_int p;
done;
t.mem.{ t.dpage, 6 } <-
of_int (pages_in_free_list + pages_to_allocate)
);
(* MAYBE TODO: Unlock before writing to content pages *)
(* Write content pages: *)
let val_idx = ref 0 in
let val_len = Bigarray.Array1.dim v in
let cpage = get_page_from_free_list t in
t.mem.{ cpage, 0 } <- content1_magic;
t.mem.{ cpage, 1 } <- key;
t.mem.{ cpage, 2 } <- 0l; (* later updated if necessary *)
t.mem.{ cpage, 3 } <- old_cpage;
t.mem.{ cpage, 4 } <- 0l;
t.mem.{ cpage, 5 } <- of_int val_len;
let cur_cpage = ref cpage in
let pos = ref 6 in
let seq = ref 0 in
while !val_idx < val_len do
if !pos >= t.pagesize then (
let cpage = get_page_from_free_list t in
incr seq;
t.mem.{ cpage, 0 } <- content2_magic;
t.mem.{ cpage, 1 } <- key;
t.mem.{ cpage, 2 } <- 0l; (* later updated if necessary *)
t.mem.{ cpage, 3 } <- of_int !seq;
t.mem.{ !cur_cpage, 2 } <- of_int cpage;
cur_cpage := cpage;
pos := 4
);
let len = val_len - !val_idx in
let ca_len = t.pagesize - !pos in
let words = min len ca_len in
let mem_pg = Bigarray.Array2.slice_left t.mem !cur_cpage in
Bigarray.Array1.blit
(Bigarray.Array1.sub v !val_idx words)
(Bigarray.Array1.sub mem_pg !pos words);
val_idx := !val_idx + words;
pos := !pos + words;
done;
(* Write hash table entry: *)
t.mem.{ ht_page, ht_pos } <- key;
t.mem.{ ht_page, ht_pos + 1 } <- of_int cpage;
if ht_inc then
t.mem.{ t.dpage, 3 } <- Int32.succ t.mem.{ t.dpage, 3 };
t.mem.{ t.dpage, 4 } <- Int32.succ t.mem.{ t.dpage, 4 };
)
();
(* Unlock: *)
unlock_page t t.dpage
;;
let rec unalloc_pages_of_binding t cpage =
let cpage' = to_int t.mem.{ cpage, 2 } in
t.mem.{ cpage, 0 } <- free_page_magic;
t.mem.{ cpage, 1 } <- t.mem.{ t.dpage, 5 };
t.mem.{ t.dpage, 5 } <- of_int cpage;
t.mem.{ t.dpage, 6 } <- Int32.succ t.mem.{ t.dpage, 6 };
if cpage' <> 0 then (
if t.mem.{ cpage', 0 } <> content2_magic then
raise(Corrupt_file "Bad content2 magic");
unalloc_pages_of_binding t cpage'
)
;;
(* the write_blocks implementation bases on iterators *)
type write_op =
[ `Remove_binding ]
type ctrl_op =
[ `Nop | write_op ]
type op_announcement =
{ op_remove_binding : bool }
type pointer_to_binding =
[ `Prev_binding of int
| `Hash_entry of int * int
]
let remove_binding t key ptr pg =
let next_cpage = t.mem.{ pg, 3 } in
( match ptr with
| `Hash_entry(ht_page, ht_pos) ->
t.mem.{ ht_page, ht_pos + 1 } <- (if next_cpage = 0l then
(-1l)
else
next_cpage);
| `Prev_binding pg' ->
t.mem.{ pg', 3 } <- next_cpage;
);
t.mem.{ t.dpage, 4 } <- Int32.pred t.mem.{ t.dpage, 4 };
(* Note: mem.{t.dpage, 3}, i.e. the number of used entries, does not change,
* even if the hash cell is set to (-1).
*)
unalloc_pages_of_binding t pg;
(* The new pointer to the next binding is the old after the deletion: *)
ptr
;;
let write_op it ops ptr op =
match op with
| `Nop ->
`Prev_binding it.i_binding_pg
| `Remove_binding ->
if not ops.op_remove_binding then
failwith "Netshm.write_blocks: Cannot do write operation unless announced";
remove_binding it.i_table it.i_key ptr it.i_binding_pg
;;
let rec write_blocks_start t ops key f total_pages (ptr : pointer_to_binding) pg () =
(* ptr: If [`Prev_binding pg], there is a previous binding that starts on
* page pg. If [`Hash_entry(pg,pos)] this is the first binding, and the
* hash entry on page [pg] and [pos] points to it.
*
* locking: the current, and if `Remove_binding is announced in ops,
* the previous binding (if any) are write locked.
* Additionally, t.dpage is locked over the whole time, too.
*)
lock_page t pg Write;
let followup =
unlock_protect t [ pg ]
(fun () ->
let it =
create_iterator t key total_pages pg in
write_blocks_extract it ops f ptr pg `Nop
)
() in
followup ()
and write_blocks_extract it ops f ptr pg old_op () =
let locked =
if ops.op_remove_binding then
match ptr with
| `Prev_binding pg' -> [ pg'; pg ]
| `Hash_entry(_,_) -> [ pg ]
else
[ pg ]
in
let followup =
unlock_protect it.i_table locked
(fun () ->
let val_frag =
Bigarray.Array1.sub it.i_current it.i_start_val it.i_len_val in
let op = ref old_op in
try
let new_op =
self_locked it.i_table
f (Some val_frag) in
if new_op <> `Nop then
op := new_op;
if it.i_next_pg <> 0 then (
next_page it;
write_blocks_extract it ops f ptr pg !op
)
else (
let new_op =
self_locked it.i_table
f None in
if new_op <> `Nop then
op := new_op;
raise Next
)
with
| Next ->
let ptr' =
write_op it ops ptr !op in
write_blocks_next it ops f ptr pg ptr'
| Break ->
let _ptr' =
write_op it ops ptr !op in
(fun () ->
unlock_page it.i_table pg)
)
()in
followup()
and write_blocks_next it ops f prev_ptr prev_pg ptr' () =
let pg = it.i_next_binding_pg in
if pg <> 0 then (
lock_page it.i_table pg Write;
if ops.op_remove_binding then
( match prev_ptr with
| `Prev_binding pg' -> unlock_page it.i_table pg'
| `Hash_entry(_,_) -> ()
)
else
unlock_page it.i_table prev_pg;
next_binding it;
write_blocks_extract it ops f ptr' pg `Nop ()
)
else (
if ops.op_remove_binding then
( match prev_ptr with
| `Prev_binding pg' -> unlock_page it.i_table pg'
| `Hash_entry(_,_) -> ()
);
unlock_page it.i_table prev_pg
)
;;
let write_blocks t ops key f =
let ops' =
{ op_remove_binding = List.mem `Remove_binding ops } in
if t.self_locked then
failwith "Netshm: Cannot modify table locked by caller";
lock_page t t.dpage Write;
unlock_protect t [ t.dpage ]
(fun () ->
let total_pages = t.mem.{ t.dpage, 1 } in
if Bigarray.Array2.dim1 t.mem <> (to_int total_pages) then
remap t (to_int total_pages);
let (ht_page, ht_pos, ht_found) = hash_lookup t key 0l false in
if ht_found then (
let ht_ptr = t.mem.{ ht_page, ht_pos+1 } in
if ht_ptr = (-1l) then (
(* key is unbound *)
()
)
else
if ht_ptr > 0l && ht_ptr < total_pages then (
let ptr = `Hash_entry(ht_page,ht_pos) in
write_blocks_start
t ops' key f total_pages ptr (to_int ht_ptr) ()
)
else
raise(Corrupt_file "Bad page number found")
)
)
();
unlock_page t t.dpage;
;;
let remove t key =
let first = ref true in
write_blocks t [`Remove_binding] key
(fun _ ->
if !first then (
first := false;
`Remove_binding
)
else
raise Break)
;;
let replace t key v =
(* This is the trivial implementation... Not really what we want. Anything
* better that reuses the pages of the old binding is a lot of work...
*)
group t
(fun () ->
remove t key;
add t key v
)
()
;;
let manage ?(pagesize = 256) ?init lm sd =
if pagesize < 160 then
failwith "open_table: minimum pagesize is 160";
if pagesize mod 4 <> 0 then
failwith "open_table: pagesize must be divisible by 4";
let pagesize = pagesize/4 in
if size_of_shm sd = 0 || init <> None then (
(* re-initialize table *)
let ld = create_gld sd pagesize lm in
let t =
{ sd = sd;
lm = ld;
mem = dummy_int32matrix();
pagesize = pagesize;
dpage = 0;
self_locked = false;
} in
let n =
match init with
| None -> 1000
| Some n -> n in
let n_ht = n * 2 in (* need 2 times more ht entries *)
let q_ht = (pagesize-4) / 4 in (* entries per ht page *)
let p_ht = (n_ht - 1) / q_ht + 1 in
(* number of hash table pages *)
remap t 2;
t.mem.{ 0, 0 } <- file_magic1;
t.mem.{ 0, 1 } <- file_magic2;
t.mem.{ 0, 2 } <- file_magic3;
t.mem.{ 0, 3 } <- file_magic4;
t.mem.{ 0, 4 } <- 1l;
t.mem.{ 1, 0 } <- descriptor_magic;
t.mem.{ 1, 1 } <- 2l;
t.mem.{ 1, 2 } <- 0l;
t.mem.{ 1, 3 } <- 0l;
t.mem.{ 1, 4 } <- 0l;
t.mem.{ 1, 5 } <- 0l;
t.mem.{ 1, 6 } <- 0l;
t.mem.{ 1, 7 } <- 0l;
for k = 0 to 15 do
t.mem.{ 1, 8 + 2*k } <- 0l;
t.mem.{ 1, 9 + 2*k } <- 0l;
done;
let t =
{ t with
dpage = 1
} in
let p = alloc_hash_extent t 0 p_ht in
t.mem.{ 1, 8 } <- of_int p;
t.mem.{ 1, 9 } <- of_int p_ht;
t
)
else (
(* manage existing table *)
let ld = create_gld sd pagesize lm in
let t =
{ sd = sd;
lm = ld;
mem = dummy_int32matrix();
pagesize = pagesize;
dpage = 0;
self_locked = false;
} in
remap t 1;
if (t.mem.{ 0, 0 } <> file_magic1 ||
t.mem.{ 0, 1 } <> file_magic2 ||
t.mem.{ 0, 2 } <> file_magic3 ||
t.mem.{ 0, 3 } <> file_magic4)
then
raise(Corrupt_file "Bad file magic");
let dpage = to_int t.mem.{ 0, 4 } in
remap t (dpage+1);
if t.mem.{dpage, 0} <> descriptor_magic then
raise(Corrupt_file "Bad descriptor magic");
let total_size = to_int t.mem.{ dpage, 1 } in
remap t total_size;
{ t with
dpage = dpage
}
)
(* Debug stuff: *)
let dump t =
printf "<shm length=%d\n" (length t);
iter
(fun key v ->
printf " %ld => [| " key;
let l = Bigarray.Array1.dim v in
for k = 0 to l - 1 do
if k > 0 then printf "; ";
printf "%ld" v.{ k }
done;
printf " |]\n";
)
t;
printf ">\n%!"
;;
let bigarray x =
Bigarray.Array1.of_array Bigarray.int32 Bigarray.c_layout
(Array.map Int32.of_int x)
;;
let memory t = t.mem