(* $Id: cache_server.ml 2 2015-01-13 21:46:03Z gerd $ *)
open Cache_aux
open Cache_util
open Printf
class type cache_config =
object
method directory : string
method max_size : int64
method save_cache_period : int
method save_cache_speed : int
method rpc_program_number : int32
end
module StringMap = Map.Make(String)
module Int64Map = Map.Make(Int64)
module Int = struct
type t = int
let compare x y = x - y
end
module IntMap = Map.Make(Int)
module DList = struct
(* doubly linked list *)
type 'a cell =
{ data : 'a;
mutable prev : 'a cell option;
mutable next : 'a cell option;
}
type 'a t =
{ mutable head : 'a cell option;
mutable last : 'a cell option;
}
let verify l =
match l.head with
| Some start ->
assert(start.prev = None);
let cur = ref start in
while !cur.next <> None do
match !cur.next with
| Some n ->
( match n.prev with
| Some p -> assert (p == !cur)
| None -> assert false
);
cur := n;
| None ->
assert false
done;
( match l.last with
| Some z -> assert (z == !cur)
| None -> assert false
);
| None ->
assert (l.last = None)
let create() =
{ head = None; last = None }
let head l =
match l.head with
| Some cell -> cell
| None -> raise Not_found
let last l =
match l.last with
| Some cell -> cell
| None -> raise Not_found
let next cell =
match cell.next with
| Some cell -> cell
| None -> raise Not_found
let prev cell =
match cell.prev with
| Some cell -> cell
| None -> raise Not_found
let contents cell =
cell.data
let prepend_cell l cell =
cell.prev <- None;
cell.next <- l.head;
( match l.head with
| Some p ->
p.prev <- Some cell
| None ->
()
);
l.head <- Some cell;
if l.last = None then l.last <- Some cell
let prepend l data =
let cell =
{ data = data;
prev = None;
next = None
} in
prepend_cell l cell
let append_cell l cell =
cell.prev <- l.last;
cell.next <- None;
( match l.last with
| Some p ->
p.next <- Some cell
| None ->
()
);
l.last <- Some cell;
if l.head = None then l.head <- Some cell
let append l data =
let cell =
{ data = data;
prev = None;
next = None
} in
append_cell l cell
let delete l cell =
(* There is no check that [cell] is actually a member of [l]! *)
( match cell.prev with
| None ->
l.head <- cell.next
| Some p ->
p.next <- cell.next
);
( match cell.next with
| None ->
l.last <- cell.prev
| Some n ->
n.prev <- cell.prev
);
cell.prev <- None;
cell.next <- None
let to_list l =
let h = ref [] in
let p = ref l.last in
while !p <> None do
match !p with
| Some cell ->
h := cell.data :: !h;
p := cell.prev;
| None ->
assert false
done;
!h
end
type gen_entry =
[ `Entry of entry DList.cell
| `Locked_key of string * int64 (* key, expiration *)
]
type cache_instance =
{ mutable config : config;
rpc_prognr : int32;
directory : string;
mutable esys : Unixqueue.event_system option;
mutable group : Unixqueue.group option; (* Group for additional timers *)
mutable by_key : gen_entry StringMap.t;
(* Maps the key to the entry *)
mutable by_usage : entry DList.t;
(* Orders the entries by usage: The most recently accessed entry is
* at the head of the list. The least recently accessed entry is at
* the end of the list.
*)
mutable by_expiration : gen_entry StringMap.t Int64Map.t;
(* Maps expiration timestamp to mappings from key to entry. Entries that
* do not expire are not member of this mapping. Expiration means that
* the entry is completely deleted from the cache.
*)
stats : stats;
mutable saving : bool
}
let to_hex s =
let b = Buffer.create 100 in
bprintf b "<";
for k = 0 to String.length s - 1 do
bprintf b "%02x" (Char.code s.[k])
done;
bprintf b ">";
Buffer.contents b
let delete_expiration cache key exp =
(* Deletes [key] from [by_expiration] *)
if exp <> 0L then (
try
let exp_map = Int64Map.find exp cache.by_expiration in (* or Not_found *)
let exp_map' = StringMap.remove key exp_map in
if exp_map' = StringMap.empty then
cache.by_expiration <- Int64Map.remove exp cache.by_expiration
else
cache.by_expiration <- Int64Map.add exp exp_map' cache.by_expiration
with
| Not_found -> ()
)
let add_expiration cache key exp ge =
(* Adds [key] to [by_expiration] *)
if exp <> 0L then (
let exp_map =
try Int64Map.find exp cache.by_expiration
with Not_found -> StringMap.empty in
let exp_map' = StringMap.add key ge exp_map in
cache.by_expiration <- Int64Map.add exp exp_map' cache.by_expiration
)
let wsize = Sys.word_size / 8
let entry_size e =
(* rough guess about the size in bytes of entry e *)
44 * wsize + 40 + String.length e.e_key + e.e_value_length
let delete_entry cache cell =
(* Deletes [cell], a member of [by_usage], entirely from the cache *)
let e = DList.contents cell in
DList.delete cache.by_usage cell;
cache.by_key <- StringMap.remove e.e_key cache.by_key;
delete_expiration cache e.e_key e.e_expiration;
cache.stats.num_entries <- cache.stats.num_entries - 1;
let l64 = Int64.of_int (entry_size e) in
cache.stats.num_bytes <- Int64.sub cache.stats.num_bytes l64
let delete_key cache key =
(* Deletes [key] entirely from the cache *)
try
let ge = StringMap.find key cache.by_key in (* or Not_found *)
match ge with
| `Entry cell ->
delete_entry cache cell
| `Locked_key (_, exp) ->
cache.by_key <- StringMap.remove key cache.by_key;
delete_expiration cache key exp
with
| Not_found -> ()
let add_entry ?(where = `Prepend) cache e =
(* Adds the new regular entry [e] to the cache *)
(* Precondition: e.e_key is not yet member of the cache *)
let cell =
match where with
| `Prepend ->
DList.prepend cache.by_usage e;
DList.head cache.by_usage
| `Append ->
DList.append cache.by_usage e;
DList.last cache.by_usage
in
cache.by_key <- StringMap.add e.e_key (`Entry cell) cache.by_key;
add_expiration cache e.e_key e.e_expiration (`Entry cell);
cache.stats.num_entries <- cache.stats.num_entries + 1;
let l64 = Int64.of_int (entry_size e) in
cache.stats.num_bytes <- Int64.add cache.stats.num_bytes l64
let lock_entry cache cell expires =
(* Locks the existing [cell], and schedules it for expiration *)
let e = DList.contents cell in
e.e_delete_flag <- true;
e.e_expiration <- expires;
let old_exp = e.e_expiration in
delete_expiration cache e.e_key old_exp;
add_expiration cache e.e_key expires (`Entry cell)
let lock_key cache key expires =
(* Locks the non-existing [key] until it expires *)
let lk = `Locked_key(key,expires) in
cache.by_key <- StringMap.add key lk cache.by_key;
add_expiration cache key expires lk
let prune_by_size cache =
while cache.stats.num_bytes > cache.config.max_size do
try
let cell = DList.last cache.by_usage in (* or Not_found *)
delete_entry cache cell
with
| Not_found -> assert false
done
let prune_by_expiration cache =
let now = Int64.of_float(Unix.gettimeofday()) in
( try
Int64Map.iter
(fun exp m ->
if exp > now then raise Exit;
StringMap.iter
(fun key ge ->
match ge with
| `Entry cell ->
delete_entry cache cell
| `Locked_key(key,_) ->
delete_key cache key
)
m
)
cache.by_expiration
with
| Exit -> ()
)
type full_key =
{ key : Cache_client.key;
modulo : int;
}
let set_directly_1 cache key bucket data expires opts =
(* Is [key] already stored in the cache? *)
cache.stats.num_calls_set <- cache.stats.num_calls_set + 1;
try
let ge = StringMap.find key cache.by_key in (* or Not_found *)
match ge with
| `Locked_key _ ->
(* [key] is locked but the entry is non-existent *)
raise Not_found
| `Entry cell ->
let e = DList.contents cell in
(* Yes, stored as [e] *)
if opts.opt_overwrite then (
if e.e_delete_flag && not opts.opt_undelete then
`Not_stored (* not allowed to overwrite deleted entry *)
else (
let data_md5 = Digest.string data in
let is_change = e.e_value_hash <> data_md5 in
let do_store = is_change || not opts.opt_setifchanged in
if do_store then (
delete_entry cache cell;
let now = Int64.of_float(Unix.gettimeofday()) in
e.e_modification <- now;
e.e_expiration <- expires;
e.e_value <- data;
e.e_value_hash <- data_md5;
e.e_value_length <- String.length data;
e.e_bucket <- bucket;
add_entry cache e;
prune_by_size cache;
`Stored
)
else
`Not_stored
)
)
else
`Not_stored (* not allowed to overwrite *)
with
| Not_found ->
(* [key] is not member of the cache! *)
if opts.opt_add then (
let key_is_locked =
try
match StringMap.find key cache.by_key with
| `Locked_key _ -> true
| _ -> assert false
with Not_found -> false in
let do_add = not key_is_locked || opts.opt_undelete in
if do_add then (
if key_is_locked then delete_key cache key;
let now = Int64.of_float(Unix.gettimeofday()) in
let e =
{ e_key = key;
e_creation = now;
e_modification = now;
e_expiration = expires;
e_delete_flag = false;
e_value = data;
e_value_hash = Digest.string data;
e_value_length = String.length data;
e_counter = 0;
e_bucket = bucket
} in
add_entry cache e;
prune_by_size cache;
`Stored
)
else
`Not_stored (* key is locked *)
)
else
`Not_stored (* not allowed to add *)
;;
let set_directly cache fk data expires opts =
let key = hash_of_key fk.key in
let bucket = bucket_of_hash fk.modulo key in
set_directly_1 cache key bucket data expires opts
;;
let proc_set cache sess (key,bucket,data,expires,opts) reply =
let result =
set_directly_1 cache key bucket data expires opts in
let result' =
match result with
| `Stored -> `stored
| `Not_stored -> `not_stored in
reply result'
;;
let get_directly_1 cache key opts =
cache.stats.num_calls_get <- cache.stats.num_calls_get + 1;
try
let ge = StringMap.find key cache.by_key in (* or Not_found *)
match ge with
| `Locked_key _ ->
(* [key] is locked but the entry is non-existent *)
raise Not_found
| `Entry cell ->
let e = DList.contents cell in
( match opts.opt_getifmodifiedsince with
| None -> ()
| Some ts ->
if ts > e.e_modification then raise Not_found
);
( match opts.opt_getifnotmd5 with
| None -> ()
| Some md5 ->
if md5 = e.e_value_hash then raise Not_found
);
let e' =
if opts.opt_novalue then
{ e with e_value = "" }
else
e in
cache.stats.num_hits <- cache.stats.num_hits + 1;
e.e_counter <- e.e_counter + 1;
(* Move e to the begining of usage list: *)
DList.delete cache.by_usage cell;
DList.prepend_cell cache.by_usage cell;
`Found e'
with
| Not_found ->
`Not_found
;;
let get_directly cache fk opts =
let key = hash_of_key fk.key in
get_directly_1 cache key opts
;;
let proc_get cache sess (key,opts) reply =
let result =
get_directly_1 cache key opts in
let result' =
match result with
| `Found e -> `found e
| `Not_found -> `not_found in
reply result'
;;
let delete_directly_1 cache key expires opts =
cache.stats.num_calls_delete <- cache.stats.num_calls_delete + 1;
( try
let ge = StringMap.find key cache.by_key in (* or Not_found *)
match ge with
| `Locked_key _ ->
(* [key] is already locked *)
()
| `Entry cell ->
let e = DList.contents cell in
let do_delete =
not e.e_delete_flag &&
( match opts.opt_delifolderthan with
| None -> true
| Some ts -> e.e_modification < ts
) &&
( match opts.opt_delifmd5 with
| None -> true
| Some md5 -> e.e_value_hash = md5
) &&
(match opts.opt_delifmd5 with
| None -> true
| Some md5 -> e.e_value_hash <> md5
)
in
if do_delete then (
let now = Int64.of_float(Unix.gettimeofday()) in
if expires <= now then (
(* Delete immediately *)
delete_entry cache cell
)
else (
(* Lock entry and delete in the future *)
lock_entry cache cell expires
)
)
with
| Not_found ->
if opts.opt_strictlock then (
let now = Int64.of_float(Unix.gettimeofday()) in
if expires > now then (
lock_key cache key expires
)
)
);
()
;;
let delete_directly cache fk expires opts =
let key = hash_of_key fk.key in
delete_directly_1 cache key expires opts
;;
let proc_delete cache sess (key,expires,opts) reply =
let () =
delete_directly_1 cache key expires opts in
reply ()
;;
let clear_directly cache =
cache.by_key <- StringMap.empty;
cache.by_usage <- DList.create();
cache.by_expiration<- Int64Map.empty;
cache.stats.num_entries <- 0;
cache.stats.num_bytes <- 0L;
;;
let proc_clear cache sess () reply =
clear_directly cache;
reply ()
;;
let get_config cache = cache.config
let proc_get_config cache sess () reply =
reply cache.config
;;
let set_config cache new_config =
cache.config <- new_config;
prune_by_size cache; (* maybe max_size shrinked *)
;;
let proc_set_config cache sess new_config reply =
set_config cache new_config;
reply ()
;;
let get_stats cache = cache.stats
let proc_get_stats cache sess () reply =
reply cache.stats
;;
let clear_counters cache =
cache.stats.num_calls_get <- 0;
cache.stats.num_calls_set <- 0;
cache.stats.num_calls_delete <- 0;
cache.stats.num_hits <- 0;
cache.stats.counters_reset_on <- Int64.of_float(Unix.gettimeofday());
;;
let proc_clear_counters cache sess () reply =
clear_counters cache;
reply ()
;;
exception End_of_cache
let verify_entries ~log cache =
let now = Int64.of_float (Unix.gettimeofday()) in
let n_regular = ref 0 in
let success = ref true in
( try
let cur = ref (try DList.head cache.by_usage
with Not_found -> raise End_of_cache) in
while true do
incr n_regular;
let e = DList.contents !cur in
( try
let ge = StringMap.find e.e_key cache.by_key in
match ge with
| `Entry cell ->
if cell != !cur then (
log (sprintf "* Wrong cell found for key: %s" (to_hex e.e_key));
success := false;
)
| `Locked_key _ ->
log (sprintf "* Lock found for key: %s" (to_hex e.e_key));
success := false;
with
| Not_found ->
log (sprintf "* Key not found: %s" (to_hex e.e_key));
success := false
);
if e.e_expiration <> 0L then
( try
let m = Int64Map.find e.e_expiration cache.by_expiration in
( try
let ge = StringMap.find e.e_key m in
match ge with
| `Entry cell ->
if cell != !cur then (
log (sprintf "* Wrong cell found for key in expiration map: %s" (to_hex e.e_key));
success := false;
)
| `Locked_key _ ->
log (sprintf "* Lock found for key in expiration map: %s" (to_hex e.e_key));
success := false;
with Not_found ->
log (sprintf "* Key not found in expiration map: %s" (to_hex e.e_key));
success := false
)
with Not_found ->
log (sprintf "* Expiration entry not found: %s" (to_hex e.e_key));
success := false
);
cur := ( try DList.next !cur with Not_found -> raise End_of_cache )
done;
assert false
with
| End_of_cache -> ()
);
let n_by_key = ref 0 in
let n_by_key_can_expire = ref 0 in
StringMap.iter
(fun _ ge ->
incr n_by_key;
( match ge with
| `Entry cell ->
let e = DList.contents cell in
if e.e_expiration <> 0L then incr n_by_key_can_expire
| `Locked_key _ ->
incr n_by_key_can_expire
)
)
cache.by_key;
let n_by_expiration = ref 0 in
Int64Map.iter
(fun d m ->
if d = 0L then (
log "* Expiration date 0 is invalid in by_expiration";
success := false
)
else if d < Int64.sub now 100L then (
log (sprintf "* Very old entries found in by_expiration, age = %Ld seconds"
(Int64.sub now d));
success := false
);
StringMap.iter
(fun _ _ -> incr n_by_expiration)
m)
cache.by_expiration;
(* n_by_key > n_regular because we can also have locked keys in by_key *)
if !n_by_key < !n_regular then (
log "* Too few entries in by_key";
success := false;
);
if !n_by_key_can_expire <> !n_by_expiration then (
log "* Wrong number of entries in by_key/by_expiration";
success := false;
);
log (sprintf "* Statistics: %d regular entries, %d locked keys, %d expirable"
!n_regular
(!n_by_key - !n_regular)
!n_by_key_can_expire);
if not !success then failwith "verify_entries"
let verify ~log cache =
try
log "Starting verification of cache structure";
log "- Starting DList.verify";
DList.verify cache.by_usage;
log "- Starting verify_entries";
verify_entries ~log cache;
log "Verification successful!";
with
| error ->
log (sprintf "Exception in verification: %s" (Printexc.to_string error))
;;
let create cconfig =
let config =
{ max_size = cconfig # max_size;
save_cache_period = cconfig # save_cache_period;
save_cache_speed = cconfig # save_cache_speed;
} in
let stats =
{ num_entries = 0;
num_bytes = 0L;
num_calls_get = 0;
num_calls_set = 0;
num_calls_delete = 0;
num_hits = 0;
counters_reset_on = Int64.of_float(Unix.gettimeofday());
} in
let cache =
{ config = config;
rpc_prognr = cconfig # rpc_program_number;
directory = cconfig # directory;
esys = None;
group = None;
by_key = StringMap.empty;
by_usage = DList.create();
by_expiration = Int64Map.empty;
stats = stats;
saving = false
} in
cache
;;
let bind srv cache =
Cache_srv.Cache.V1.bind_async
~program_number:(Netnumber.uint4_of_int32 cache.rpc_prognr)
~proc_ping:(fun _ _ reply -> reply())
~proc_set:(proc_set cache)
~proc_get:(proc_get cache)
~proc_delete:(proc_delete cache)
~proc_clear:(proc_clear cache)
~proc_get_config:(proc_get_config cache)
~proc_set_config:(proc_set_config cache)
~proc_get_stats:(proc_get_stats cache)
~proc_clear_counters:(proc_clear_counters cache)
srv;
;;
let rec check_for_expired_entries cache esys group =
Unixqueue.once esys group 1.0
(fun () ->
prune_by_expiration cache;
check_for_expired_entries cache esys group
)
(* File format:
*
* Every file "cache-<bucket>.data" is a sequence of:
* - Ordinal number as 4 bytes (big endian)
* - Size of the following record, as 4 bytes (big endian)
* - Record, as XDR of type entry.
*
* The ordinal numbers express the order in by_usage.
*)
(* Sys.readdir has a memory leak in O'Caml 3.09. So we use our own function
* to read a directory. Note: Fixed in 3.09.3!
*)
let read_dir d =
try
let dh = Unix.opendir d in
let f = ref [] in
try
while true do
let n = Unix.readdir dh in
if n <> "." && n <> ".." then
f := n :: !f;
done;
assert false
with
End_of_file ->
Unix.closedir dh;
List.rev !f
| error ->
Unix.closedir dh; (* Cannot read entry *)
raise error
with
| Unix.Unix_error(e,_,_) ->
raise(Sys_error (Unix.error_message e ^ ": " ^ d))
;;
let prepare_directory d =
(* Scans through the files in directory [d], removes all files with
* suffix ".new", and returns the list of the remaining files.
*)
let files = read_dir d in
let rem_files = ref [] in
List.iter
(fun f ->
let df = Filename.concat d f in
if Filename.check_suffix f ".new" then
Sys.remove df
else
rem_files := df :: !rem_files
)
files;
!rem_files
;;
let xdr_entry_type =
lazy (Netxdr.validate_xdr_type Cache_aux.xdrt_entry)
let save_one cache ord e files =
(* Save the entry [e]. The hash table [files] maps bucket numbers
* to open files.
*)
let e_xdr = Cache_aux._of_entry e in
let data =
Netxdr.pack_xdr_value_as_string e_xdr
(Lazy.force xdr_entry_type)
[] in
let bucket = e.e_bucket in
let name = "cache-" ^ string_of_int bucket ^ ".new" in
let fullname = Filename.concat cache.directory name in
let file =
try
Hashtbl.find files bucket
with
| Not_found ->
let f =
open_out_gen [Open_wronly; Open_append; Open_creat ] 0o666
fullname in
Hashtbl.add files bucket f;
f in
let size = String.length data in
output_string file (Netnumber.BE.int4_as_string (Netnumber.int4_of_int ord));
output_string file (Netnumber.BE.int4_as_string (Netnumber.int4_of_int size));
output_string file data;
size
;;
let version = "2006-11-20"
let rec save_thread cache ord l files t0 n f =
let close_files() =
Hashtbl.iter (fun _ f -> close_out f) files
in
try
match l with
| [] ->
close_files();
f()
| e :: l' ->
let size = save_one cache ord e files in
let n' = n +. (float size) in
let now = Unix.gettimeofday() in
let speed = cache.config.save_cache_speed in
let t1 =
if speed = 0 then
t0
else
t0 +. n' /. (float speed) in
let tdelta = max 0.0 (t1 -. now) in
let esys =
match cache.esys with Some es -> es | None -> assert false in
let g =
match cache.group with Some g -> g | None -> assert false in
Unixqueue.once esys g tdelta
(fun () ->
save_thread cache (ord+1) l' files t0 n' f)
with
| error ->
close_files();
cache.saving <- false;
raise error
;;
let save cache =
if cache.saving then
failwith "Save operation is already started";
if cache.esys = None || cache.group = None then
failwith "Cache is not activated";
let old_files =
prepare_directory cache.directory in
let l =
DList.to_list cache.by_usage in
let files = Hashtbl.create 50 in
let t0 = Unix.gettimeofday() in
save_thread cache 0 l files t0 0.0
(fun () ->
cache.saving <- false;
List.iter
(fun old_file ->
try Sys.remove old_file with _ -> ()
)
old_files;
Hashtbl.iter
(fun bucket _ ->
let name = "cache-" ^ string_of_int bucket in
let fullname = Filename.concat cache.directory name in
Sys.rename (fullname ^ ".new") (fullname ^ ".data")
)
files;
let format_file =
Filename.concat cache.directory "FORMAT" in
Netchannels.with_out_obj_channel
(new Netchannels.output_channel (open_out format_file))
(fun ch ->
ch # output_string version
)
)
;;
let rec check_for_save cache esys group =
Unixqueue.once esys group (float cache.config.save_cache_period)
(fun () ->
check_for_save cache esys group;
if not cache.saving then save cache
)
;;
let close_files openfiles =
IntMap.iter
(fun _ files -> List.iter close_in files)
openfiles
let load_from cache files =
let openfiles = ref IntMap.empty in
List.iter
(fun name ->
let file = open_in (Filename.concat cache.directory name) in
try
let s = String.create 4 in
really_input file s 0 4; (* or End_of_file *)
let ord = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in
if IntMap.mem ord !openfiles then (
let other_files = IntMap.find ord !openfiles in
openfiles := IntMap.add ord (file :: other_files) !openfiles
)
else
openfiles := IntMap.add ord [file] !openfiles
with
| End_of_file ->
close_in file (* ... and ignore *)
| error ->
close_in file;
close_files !openfiles;
raise error
)
files;
!openfiles
;;
let load_one cache now file =
let s = String.create 4 in
really_input file s 0 4;
let size = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in
let data = String.create size in
really_input file data 0 size;
let xdr =
Netxdr.unpack_xdr_value ~fast:true data (Lazy.force xdr_entry_type) [] in
let e = Cache_aux._to_entry xdr in
let esize = Int64.of_int (entry_size e) in
let not_expired =
e.e_expiration = 0L || e.e_expiration > now in
let fits_by_size =
Int64.add cache.stats.num_bytes esize <= cache.config.max_size in
if not_expired && fits_by_size then
add_entry ~where:`Append cache e
;;
let int_map_min m =
(* Return (key,value) of the smallest key in m *)
let r = ref None in
( try
IntMap.iter
(fun k x -> r := Some(k,x); raise Exit)
m
with Exit -> ()
);
match !r with
| None -> raise Not_found
| Some(k,x) -> (k,x)
;;
let rec load_loop cache now openfiles =
if openfiles = IntMap.empty then
()
else (
let (ord, files) = int_map_min openfiles in
let file = List.hd files in
let files' = List.tl files in
load_one cache now file;
let openfiles = (* remove [file] *)
if files' = [] then
IntMap.remove ord openfiles
else
IntMap.add ord files' openfiles in
try
let s = String.create 4 in
really_input file s 0 4; (* or End_of_file *)
let ord = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in
let openfiles =
try
let files' = IntMap.find ord openfiles in
IntMap.add ord (file :: files') openfiles
with
| Not_found ->
IntMap.add ord [file] openfiles in
load_loop cache now openfiles
with
| End_of_file ->
close_in file;
load_loop cache now openfiles
)
;;
let load cache =
let format_file =
Filename.concat cache.directory "FORMAT" in
if Sys.file_exists format_file then (
let format =
Netchannels.with_in_obj_channel
(new Netchannels.input_channel (open_in format_file))
(fun ch ->
ch # input_line ()
) in
if format = version then (
let files = read_dir cache.directory in
let files = List.filter (fun n -> Filename.check_suffix n ".data") files in
let openfiles = load_from cache files in
try
(* reinit *)
cache.by_key <- StringMap.empty;
cache.by_usage <- DList.create();
cache.by_expiration <- Int64Map.empty;
let now = Int64.of_float(Unix.gettimeofday()) in
load_loop cache now openfiles;
prune_by_size cache
with
| error ->
close_files openfiles
)
else (
(* Maybe we want a special message... *)
()
)
)
;;
let activate esys cache =
if cache.group <> None then
failwith "Cache_server.activate: already activated";
cache.esys <- Some esys;
let group = Unixqueue.new_group esys in
cache.group <- Some group;
(* Check every second for expired entries: *)
check_for_expired_entries cache esys group;
(* Arrange that the cache is regularly saved: *)
if cache.config.save_cache_period > 0 then (
check_for_save cache esys group;
);
(* Load cache contents from disk *)
load cache
;;
let shutdown cache =
( match cache.esys, cache.group with
| Some esys, Some group ->
Unixqueue.clear esys group;
cache.esys <- None;
cache.group <- None
| _ -> ()
)
;;