Plasma GitLab Archive
Projects Blog Knowledge

(* $Id$ *)

open Cache_aux
open Cache_util
open Printf

class type cache_config =
object
  method directory : string
  method max_size : int64
  method save_cache_period : int
  method save_cache_speed : int
  method rpc_program_number : int32
end


module StringMap = Map.Make(String)
module Int64Map = Map.Make(Int64)


module Int = struct
  type t = int
  let compare x y = x - y
end


module IntMap = Map.Make(Int)



module DList = struct
  (* doubly linked list *)

  type 'a cell =
      { data : 'a;
	mutable prev : 'a cell option;
	mutable next : 'a cell option;
      }

  type 'a t = 
      { mutable head : 'a cell option;
	mutable last : 'a cell option;
      }


  let verify l =
    match l.head with
      | Some start ->
	  assert(start.prev = None);
	  let cur = ref start in
	  while !cur.next <> None do
	    match !cur.next with
	      | Some n ->
		  ( match n.prev with
		      | Some p -> assert (p == !cur)
		      | None -> assert false
		  );
		  cur := n;
	      | None ->
		  assert false
	  done;
	  ( match l.last with
	      | Some z -> assert (z == !cur)
	      | None -> assert false
	  );
      | None ->
	  assert (l.last = None)

  let create() =
    { head = None; last = None }

  let head l = 
    match l.head with
      | Some cell -> cell
      | None -> raise Not_found

  let last l = 
    match l.last with
      | Some cell -> cell
      | None -> raise Not_found
    
  let next cell = 
    match cell.next with
      | Some cell -> cell
      | None -> raise Not_found

  let prev cell = 
    match cell.prev with
      | Some cell -> cell
      | None -> raise Not_found

  let contents cell =
    cell.data

  let prepend_cell l cell =
    cell.prev <- None;
    cell.next <- l.head;
    ( match l.head with
	| Some p ->
	    p.prev <- Some cell
	| None ->
	    ()
    );
    l.head <- Some cell;
    if l.last = None then l.last <- Some cell

  let prepend l data =
    let cell =
      { data = data;
	prev = None;
	next = None
      } in
    prepend_cell l cell


  let append_cell l cell =
    cell.prev <- l.last;
    cell.next <- None;
    ( match l.last with
	| Some p ->
	    p.next <- Some cell
	| None ->
	    ()
    );
    l.last <- Some cell;
    if l.head = None then l.head <- Some cell


  let append l data =
    let cell =
      { data = data;
	prev = None;
	next = None
      } in
    append_cell l cell


  let delete l cell =
    (* There is no check that [cell] is actually a member of [l]! *)
    ( match cell.prev with
	| None ->
	    l.head <- cell.next
	| Some p ->
	    p.next <- cell.next
    );
    ( match cell.next with
	| None ->
	    l.last <- cell.prev
	| Some n ->
	    n.prev <- cell.prev
    );
    cell.prev <- None;
    cell.next <- None

  let to_list l =
    let h = ref [] in
    let p = ref l.last in
    while !p <> None do
      match !p with
	| Some cell ->
	    h := cell.data :: !h;
	    p := cell.prev;
	| None ->
	    assert false
    done;
    !h

end


type gen_entry =
    [ `Entry of entry DList.cell
    | `Locked_key of string * int64    (* key, expiration *)
    ]



type cache_instance =
    { mutable config : config;
      rpc_prognr : int32;
      directory : string;
      mutable esys : Unixqueue.event_system option;
      mutable group : Unixqueue.group option;  (* Group for additional timers *)

      mutable by_key : gen_entry StringMap.t;
      (* Maps the key to the entry *)

      mutable by_usage : entry DList.t;
      (* Orders the entries by usage: The most recently accessed entry is
       * at the head of the list. The least recently accessed entry is at
       * the end of the list.
       *)

      mutable by_expiration : gen_entry StringMap.t Int64Map.t;
      (* Maps expiration timestamp to mappings from key to entry. Entries that
       * do not expire are not member of this mapping. Expiration means that
       * the entry is completely deleted from the cache.
       *)

      stats : stats;

      mutable saving : bool
    }

let to_hex s =
  let b = Buffer.create 100 in
  bprintf b "<";
  for k = 0 to String.length s - 1 do
    bprintf b "%02x" (Char.code s.[k])
  done;
  bprintf b ">";
  Buffer.contents b


let delete_expiration cache key exp =
  (* Deletes [key] from [by_expiration] *)
  if exp <> 0L then (
    try
      let exp_map = Int64Map.find exp cache.by_expiration in (* or Not_found *)
      let exp_map' = StringMap.remove key exp_map in
      if exp_map' = StringMap.empty then
	cache.by_expiration <- Int64Map.remove exp cache.by_expiration
      else
	cache.by_expiration <- Int64Map.add exp exp_map' cache.by_expiration
    with
      | Not_found -> ()
  )


let add_expiration cache key exp ge =
  (* Adds [key] to [by_expiration] *)
  if exp <> 0L then (
    let exp_map = 
      try Int64Map.find exp cache.by_expiration 
      with Not_found -> StringMap.empty in
    let exp_map' = StringMap.add key ge exp_map in
    cache.by_expiration <- Int64Map.add exp exp_map' cache.by_expiration
  )



let wsize = Sys.word_size / 8

let entry_size e =
  (* rough guess about the size in bytes of entry e *)
  44 * wsize + 40 + String.length e.e_key + e.e_value_length


let delete_entry cache cell =
  (* Deletes [cell], a member of [by_usage], entirely from the cache *)
  let e = DList.contents cell in
  DList.delete cache.by_usage cell;
  cache.by_key <- StringMap.remove e.e_key cache.by_key;
  delete_expiration cache e.e_key e.e_expiration;
  cache.stats.num_entries <- cache.stats.num_entries - 1;
  let l64 = Int64.of_int (entry_size e) in
  cache.stats.num_bytes <- Int64.sub cache.stats.num_bytes l64


let delete_key cache key =
  (* Deletes [key] entirely from the cache *)
  try
    let ge = StringMap.find key cache.by_key in (* or Not_found *)
    match ge with
      | `Entry cell ->
	  delete_entry cache cell
      | `Locked_key (_, exp) ->
	  cache.by_key <- StringMap.remove key cache.by_key;
	  delete_expiration cache key exp
  with
    | Not_found -> ()


let add_entry ?(where = `Prepend) cache e =
  (* Adds the new regular entry [e] to the cache *)
  (* Precondition: e.e_key is not yet member of the cache *)
  let cell =
    match where with
      | `Prepend -> 
	  DList.prepend cache.by_usage e;
	  DList.head cache.by_usage 
      | `Append ->
	  DList.append cache.by_usage e;
	  DList.last cache.by_usage 
  in
  cache.by_key <- StringMap.add e.e_key (`Entry cell) cache.by_key;
  add_expiration cache e.e_key e.e_expiration (`Entry cell);
  cache.stats.num_entries <- cache.stats.num_entries + 1;
  let l64 = Int64.of_int (entry_size e) in
  cache.stats.num_bytes <- Int64.add cache.stats.num_bytes l64



let lock_entry cache cell expires =
  (* Locks the existing [cell], and schedules it for expiration *)
  let e = DList.contents cell in
  e.e_delete_flag <- true;
  e.e_expiration <- expires;
  let old_exp = e.e_expiration in
  delete_expiration cache e.e_key old_exp;
  add_expiration cache e.e_key expires (`Entry cell)

  

let lock_key cache key expires =
  (* Locks the non-existing [key] until it expires *)
  let lk = `Locked_key(key,expires) in
  cache.by_key <- StringMap.add key lk cache.by_key;
  add_expiration cache key expires lk


let prune_by_size cache =
  while cache.stats.num_bytes > cache.config.max_size do
    try 
      let cell = DList.last cache.by_usage in  (* or Not_found *)
      delete_entry cache cell
    with
      | Not_found -> assert false
  done


let prune_by_expiration cache =
  let now = Int64.of_float(Unix.gettimeofday()) in
  ( try
      Int64Map.iter
	(fun exp m ->
	   if exp > now then raise Exit;
	   StringMap.iter
	     (fun key ge -> 
		match ge with
		  | `Entry cell ->
		      delete_entry cache cell
		  | `Locked_key(key,_) ->
		      delete_key cache key
	     )
	     m
	)
	cache.by_expiration
    with
      | Exit -> ()
  )


type full_key =
    { key : Cache_client.key;
      modulo : int;
    }


let set_directly_1 cache key bucket data expires opts =
  (* Is [key] already stored in the cache? *)
  cache.stats.num_calls_set <- cache.stats.num_calls_set + 1;
  try
    let ge = StringMap.find key cache.by_key in  (* or Not_found *)
    match ge with
      | `Locked_key _ -> 
	  (* [key] is locked but the entry is non-existent *)
	  raise Not_found
      | `Entry cell ->
	  let e = DList.contents cell in
	  (* Yes, stored as [e] *)
	  if opts.opt_overwrite then (
	    if e.e_delete_flag && not opts.opt_undelete then
	      `Not_stored  (* not allowed to overwrite deleted entry *)
	    else (
	      let data_md5 = Digest.string data in
	      let is_change = e.e_value_hash <> data_md5 in
	      let do_store = is_change || not opts.opt_setifchanged in
	      if do_store then (
		delete_entry cache cell;
		let now = Int64.of_float(Unix.gettimeofday()) in
		e.e_modification <- now;
		e.e_expiration <- expires;
		e.e_value <- data;
		e.e_value_hash <- data_md5;
		e.e_value_length <- String.length data;
		e.e_bucket <- bucket;
		add_entry cache e;
		prune_by_size cache;
		`Stored
	      )
	      else
		`Not_stored
	    )
	  )
	  else
	    `Not_stored  (* not allowed to overwrite *)

  with
    | Not_found ->
	(* [key] is not member of the cache! *)
	if opts.opt_add then (
	  let key_is_locked =
	    try 
	      match StringMap.find key cache.by_key with
		| `Locked_key _ -> true
		| _ -> assert false
	    with Not_found -> false in
	  let do_add = not key_is_locked || opts.opt_undelete in
	  if do_add then (
	    if key_is_locked then delete_key cache key;
	    let now = Int64.of_float(Unix.gettimeofday()) in
	    let e =
	      { e_key = key;
		e_creation = now;
		e_modification = now;
		e_expiration = expires;
		e_delete_flag = false;
		e_value = data;
		e_value_hash = Digest.string data;
		e_value_length = String.length data;
		e_counter = 0;
		e_bucket = bucket
	      } in
	    add_entry cache e;
	    prune_by_size cache;
	    `Stored
	  )
	  else
	    `Not_stored   (* key is locked *)
	)
	else
	  `Not_stored   (* not allowed to add *)
;;


let set_directly cache fk data expires opts =
  let key = hash_of_key fk.key in
  let bucket = bucket_of_hash fk.modulo key in
  set_directly_1 cache key bucket data expires opts
;;


let proc_set cache sess (key,bucket,data,expires,opts) reply =
  let result =
    set_directly_1 cache key bucket data expires opts in
  let result' =
    match result with
      | `Stored -> `stored
      | `Not_stored -> `not_stored in
  reply result'
;;


let get_directly_1 cache key opts =
  cache.stats.num_calls_get <- cache.stats.num_calls_get + 1;
  try
    let ge = StringMap.find key cache.by_key in  (* or Not_found *)
    match ge with
      | `Locked_key _ -> 
	  (* [key] is locked but the entry is non-existent *)
	  raise Not_found
      | `Entry cell ->
	  let e = DList.contents cell in
	  ( match opts.opt_getifmodifiedsince with
	      | None -> ()
	      | Some ts ->
		  if ts > e.e_modification then raise Not_found
	  );
	  ( match opts.opt_getifnotmd5 with
	      | None -> ()
	      | Some md5 ->
		  if md5 = e.e_value_hash then raise Not_found
	  );
	  let e' =
	    if opts.opt_novalue then
	      { e with e_value = "" }
	    else
	      e in
	  cache.stats.num_hits <- cache.stats.num_hits + 1;
	  e.e_counter <- e.e_counter + 1;
	  (* Move e to the begining of usage list: *)
	  DList.delete cache.by_usage cell;
	  DList.prepend_cell cache.by_usage cell;
	  `Found e'

  with
    | Not_found ->
	`Not_found
;;


let get_directly cache fk opts =
  let key = hash_of_key fk.key in
  get_directly_1 cache key opts
;;


let proc_get cache sess (key,opts) reply =
  let result =
    get_directly_1 cache key opts in
  let result' =
    match result with
      | `Found e -> `found e
      | `Not_found -> `not_found in
  reply result'
;;


let delete_directly_1 cache key expires opts =
  cache.stats.num_calls_delete <- cache.stats.num_calls_delete + 1;
  ( try
      let ge = StringMap.find key cache.by_key in  (* or Not_found *)
      match ge with
	| `Locked_key _ -> 
	    (* [key] is already locked *)
	    ()
	| `Entry cell ->
	    let e = DList.contents cell in
	    let do_delete =
	      not e.e_delete_flag &&
		( match opts.opt_delifolderthan with
		    | None -> true
		    | Some ts -> e.e_modification < ts
		) &&
		( match opts.opt_delifmd5 with
		    | None -> true
		    | Some md5 -> e.e_value_hash = md5
		) &&
		(match opts.opt_delifmd5 with
		    | None -> true
		    | Some md5 -> e.e_value_hash <> md5
		)
  	    in

	    if do_delete then (
	      let now = Int64.of_float(Unix.gettimeofday()) in
	      if expires <= now then (
		(* Delete immediately *)
		delete_entry cache cell
	      )
	      else (
		(* Lock entry and delete in the future *)
		lock_entry cache cell expires
	      )
	    )
    with
      | Not_found ->
	  if opts.opt_strictlock then (
	    let now = Int64.of_float(Unix.gettimeofday()) in
	    if expires > now then (
	      lock_key cache key expires
	    )
	  )
  );
  ()
;;


let delete_directly cache fk expires opts =
  let key = hash_of_key fk.key in
  delete_directly_1 cache key expires opts
;;


let proc_delete cache sess (key,expires,opts) reply =
  let () =
    delete_directly_1 cache key expires opts in
  reply ()
;;


let clear_directly cache =
  cache.by_key <- StringMap.empty;
  cache.by_usage <- DList.create();
  cache.by_expiration<- Int64Map.empty;
  cache.stats.num_entries <- 0;
  cache.stats.num_bytes <- 0L;
;;


let proc_clear cache sess () reply =
  clear_directly cache;
  reply ()
;;


let get_config cache = cache.config


let proc_get_config cache sess () reply =
  reply cache.config
;;

let set_config cache new_config =
  cache.config <- new_config;
  prune_by_size cache;  (* maybe max_size shrinked *)
;;

let proc_set_config cache sess new_config reply =
  set_config cache new_config;
  reply ()
;;

let get_stats cache = cache.stats

let proc_get_stats cache sess () reply =
  reply cache.stats
;;


let clear_counters cache =
  cache.stats.num_calls_get <- 0;
  cache.stats.num_calls_set <- 0;
  cache.stats.num_calls_delete <- 0;
  cache.stats.num_hits <- 0;
  cache.stats.counters_reset_on <- Int64.of_float(Unix.gettimeofday());
;;


let proc_clear_counters cache sess () reply =
  clear_counters cache;
  reply ()
;;



exception End_of_cache

let verify_entries ~log cache =
  let now = Int64.of_float (Unix.gettimeofday()) in
  let n_regular = ref 0 in
  let success = ref true in
  ( try
      let cur = ref (try DList.head cache.by_usage
		     with Not_found -> raise End_of_cache) in
      while true do
	incr n_regular;
	let e = DList.contents !cur in
	( try
	    let ge = StringMap.find e.e_key cache.by_key in
	    match ge with
	      | `Entry cell ->
		  if cell != !cur then (
		    log (sprintf "* Wrong cell found for key: %s" (to_hex e.e_key));
		    success := false;
		  )
	      | `Locked_key _ ->
		  log (sprintf "* Lock found for key: %s" (to_hex e.e_key));
		  success := false;
	  with
	    | Not_found -> 
		log (sprintf "* Key not found: %s" (to_hex e.e_key));
		success := false
	);
	if e.e_expiration <> 0L then
	  ( try
	      let m = Int64Map.find e.e_expiration cache.by_expiration in
	      ( try
		  let ge = StringMap.find e.e_key m in
		  match ge with
		    | `Entry cell ->
			if cell != !cur then (
			  log (sprintf "* Wrong cell found for key in expiration map: %s" (to_hex e.e_key));
			  success := false;
			)
		    | `Locked_key _ ->
			log (sprintf "* Lock found for key in expiration map: %s" (to_hex e.e_key));
			success := false;
		with Not_found ->
		  log (sprintf "* Key not found in expiration map: %s" (to_hex e.e_key));
		  success := false
	      )
	    with Not_found ->
	      log (sprintf "* Expiration entry not found: %s" (to_hex e.e_key));
	      success := false
	  );
	cur := ( try DList.next !cur with Not_found -> raise End_of_cache )
      done;
      assert false
    with
      | End_of_cache -> ()
  );

  let n_by_key = ref 0 in
  let n_by_key_can_expire = ref 0 in
  StringMap.iter
    (fun _ ge -> 
       incr n_by_key;
       ( match ge with
	   | `Entry cell ->
	       let e = DList.contents cell in
	       if e.e_expiration <> 0L then incr n_by_key_can_expire
	   | `Locked_key _ ->
	       incr n_by_key_can_expire
       )
    )
    cache.by_key;

  let n_by_expiration = ref 0 in
  Int64Map.iter
    (fun d m ->
       if d = 0L then (
	 log "* Expiration date 0 is invalid in by_expiration";
	 success := false
       )
       else if d < Int64.sub now 100L then (
	 log (sprintf "* Very old entries found in by_expiration, age = %Ld seconds"
		(Int64.sub now d));
	 success := false
       );
       StringMap.iter
	 (fun _ _ -> incr n_by_expiration)
	 m)
    cache.by_expiration;

  (* n_by_key > n_regular because we can also have locked keys in by_key *)
  if !n_by_key < !n_regular then (
    log "* Too few entries in by_key";
    success := false;
  );

  if !n_by_key_can_expire <> !n_by_expiration then (
    log "* Wrong number of entries in by_key/by_expiration";
    success := false;
  );

  log (sprintf "* Statistics: %d regular entries, %d locked keys, %d expirable"
	 !n_regular
	 (!n_by_key - !n_regular)
	 !n_by_key_can_expire);

  if not !success then failwith "verify_entries"


let verify ~log cache =
  try
    log "Starting verification of cache structure";
    log "- Starting DList.verify";
    DList.verify cache.by_usage;
    log "- Starting verify_entries";
    verify_entries ~log cache;
    log "Verification successful!";
  with
    | error ->
	log (sprintf "Exception in verification: %s" (Printexc.to_string error))
;;


let create cconfig =
  let config =
    { max_size = cconfig # max_size;
      save_cache_period = cconfig # save_cache_period;
      save_cache_speed = cconfig # save_cache_speed;
    } in
  let stats =
    { num_entries = 0;
      num_bytes = 0L;
      num_calls_get = 0;
      num_calls_set = 0;
      num_calls_delete = 0;
      num_hits = 0;
      counters_reset_on = Int64.of_float(Unix.gettimeofday());
    } in
  let cache =
    { config = config;
      rpc_prognr = cconfig # rpc_program_number;
      directory = cconfig # directory;
      esys = None;
      group = None;
      by_key = StringMap.empty;
      by_usage = DList.create();
      by_expiration = Int64Map.empty;
      stats = stats;
      saving = false
    } in
  cache
;;


let bind srv cache =
  Cache_srv.Cache.V1.bind_async
    ~program_number:(Netnumber.uint4_of_int32 cache.rpc_prognr)
    ~proc_ping:(fun _ _ reply -> reply())
    ~proc_set:(proc_set cache)
    ~proc_get:(proc_get cache)
    ~proc_delete:(proc_delete cache)
    ~proc_clear:(proc_clear cache)
    ~proc_get_config:(proc_get_config cache)
    ~proc_set_config:(proc_set_config cache)
    ~proc_get_stats:(proc_get_stats cache)
    ~proc_clear_counters:(proc_clear_counters cache)
    srv;
;;


let rec check_for_expired_entries cache esys group =
  Unixqueue.once esys group 1.0
    (fun () ->
       prune_by_expiration cache;
       check_for_expired_entries cache esys group
    )


(* File format:
 *
 * Every file "cache-<bucket>.data" is a sequence of:
 * - Ordinal number as 4 bytes (big endian)
 * - Size of the following record, as 4 bytes (big endian)
 * - Record, as XDR of type entry.
 *
 * The ordinal numbers express the order in by_usage.
 *)


(* Sys.readdir has a memory leak in O'Caml 3.09. So we use our own function
 * to read a directory. Note: Fixed in 3.09.3!
 *)
let read_dir d =
  try
    let dh = Unix.opendir d in
    let f = ref [] in
    try
      while true do
        let n = Unix.readdir dh in
        if n <> "." && n <> ".." then
          f := n :: !f;
      done;
      assert false
    with
        End_of_file ->
          Unix.closedir dh;
          List.rev !f
      | error ->
          Unix.closedir dh;   (* Cannot read entry *)
          raise error
  with
    | Unix.Unix_error(e,_,_) ->
	raise(Sys_error (Unix.error_message e ^ ": " ^ d))
;;


let prepare_directory d =
  (* Scans through the files in directory [d], removes all files with
   * suffix ".new", and returns the list of the remaining files.
   *)
  let files = read_dir d in
  let rem_files = ref [] in
  List.iter
    (fun f ->
       let df = Filename.concat d f in
       if Filename.check_suffix f ".new" then 
	 Sys.remove df
       else
	 rem_files := df :: !rem_files
    )
    files;
  !rem_files
;;


let xdr_entry_type =
  lazy (Netxdr.validate_xdr_type Cache_aux.xdrt_entry)


let save_one cache ord e files =
  (* Save the entry [e]. The hash table [files] maps bucket numbers
   * to open files.
   *)
  let e_xdr = Cache_aux._of_entry e in
  let data = 
    Netxdr.pack_xdr_value_as_string e_xdr
      (Lazy.force xdr_entry_type)
      [] in
  let bucket = e.e_bucket in
  let name = "cache-" ^ string_of_int bucket ^ ".new" in
  let fullname = Filename.concat cache.directory name in
  let file =
    try
      Hashtbl.find files bucket
    with
      | Not_found ->
	  let f = 
	    open_out_gen [Open_wronly; Open_append; Open_creat ] 0o666 
		    fullname in
	  Hashtbl.add files bucket f;
	  f in
  let size = String.length data in
  output_string file (Netnumber.BE.int4_as_string (Netnumber.int4_of_int ord));
  output_string file (Netnumber.BE.int4_as_string (Netnumber.int4_of_int size));
  output_string file data;
  size
;;



let version = "2006-11-20"


let rec save_thread cache ord l files t0 n f =
  let close_files() =
    Hashtbl.iter (fun _ f -> close_out f) files
  in
  try
    match l with
      | [] ->
	  close_files();
	  f()
      | e :: l' ->
	  let size = save_one cache ord e files in
	  let n' = n +. (float size) in
	  let now = Unix.gettimeofday() in
	  let speed = cache.config.save_cache_speed in
	  let t1 = 
	    if speed = 0 then
	      t0 
	    else
	      t0  +.  n' /. (float speed) in
	  let tdelta = max 0.0 (t1 -. now) in
	  let esys = 
	    match cache.esys with Some es -> es | None -> assert false in
	  let g = 
	    match cache.group with Some g -> g | None -> assert false in
	  Unixqueue.once esys g tdelta
	    (fun () ->
	       save_thread cache (ord+1) l' files t0 n' f)
  with
    | error ->
	close_files();
	cache.saving <- false;
	raise error
;;


let save cache =
  if cache.saving then 
    failwith "Save operation is already started";

  if cache.esys = None || cache.group = None then
    failwith "Cache is not activated";

  let old_files =
    prepare_directory cache.directory in
  
  let l =
    DList.to_list cache.by_usage in

  let files = Hashtbl.create 50 in
  let t0 = Unix.gettimeofday() in

  save_thread cache 0 l files t0 0.0
    (fun () ->
       cache.saving <- false;
       List.iter
	 (fun old_file ->
	    try Sys.remove old_file with _ -> ()
	 )
	 old_files;
       Hashtbl.iter
	 (fun bucket _ ->
	    let name = "cache-" ^ string_of_int bucket in
	    let fullname = Filename.concat cache.directory name in
	    Sys.rename (fullname ^ ".new") (fullname ^ ".data")
	 )
	 files;
       let format_file =
	 Filename.concat cache.directory "FORMAT" in
       Netchannels.with_out_obj_channel
	 (new Netchannels.output_channel (open_out format_file))
	 (fun ch ->
	    ch # output_string version
	 )
    )
;;


let rec check_for_save cache esys group =
  Unixqueue.once esys group (float cache.config.save_cache_period)
    (fun () ->
       check_for_save cache esys group;
       if not cache.saving then save cache
    )
;;


let close_files openfiles =
  IntMap.iter
    (fun _ files -> List.iter close_in files)
    openfiles


let load_from cache files =
  let openfiles = ref IntMap.empty in
  List.iter
    (fun name ->
       let file = open_in (Filename.concat cache.directory name) in
       try
	 let s = Bytes.create 4 in
	 really_input file s 0 4;   (* or End_of_file *)
	 let ord = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in
	 if IntMap.mem ord !openfiles then (
	   let other_files = IntMap.find ord !openfiles in
	   openfiles := IntMap.add ord (file :: other_files) !openfiles
	 )
	 else
	   openfiles := IntMap.add ord [file] !openfiles
       with
	 | End_of_file ->
	     close_in file  (* ... and ignore *)
	 | error ->
	     close_in file;
	     close_files !openfiles;
	     raise error
    )
    files;
  !openfiles
;;


let load_one cache now file =
  let s = Bytes.create 4 in
  really_input file s 0 4;
  let size = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in

  let data = Bytes.create size in
  really_input file data 0 size;
  
  let xdr = 
    Netxdr.unpack_xdr_value ~fast:true data (Lazy.force xdr_entry_type) [] in
  let e = Cache_aux._to_entry xdr in
  let esize = Int64.of_int (entry_size e) in

  let not_expired =
    e.e_expiration = 0L || e.e_expiration > now in

  let fits_by_size =
    Int64.add cache.stats.num_bytes esize <= cache.config.max_size in

  if not_expired && fits_by_size then
    add_entry ~where:`Append cache e
;;


let int_map_min m =
  (* Return (key,value) of the smallest key in m *)
  let r = ref None in
  ( try
      IntMap.iter 
	(fun k x -> r := Some(k,x); raise Exit)
	m
    with Exit -> ()
  );
  match !r with
    | None -> raise Not_found
    | Some(k,x) -> (k,x)
;;


let rec load_loop cache now openfiles =
  if openfiles = IntMap.empty then
    ()
  else (
    let (ord, files) = int_map_min openfiles in
    let file = List.hd files in
    let files' = List.tl files in

    load_one cache now file;

    let openfiles =   (* remove [file] *)
      if files' = [] then
	IntMap.remove ord openfiles
      else
	IntMap.add ord files' openfiles in
    try
      let s = Bytes.create 4 in
      really_input file s 0 4;   (* or End_of_file *)
      let ord = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in
      let openfiles = 
	try
	  let files' = IntMap.find ord openfiles in
	  IntMap.add ord (file :: files') openfiles
	with
	  | Not_found ->
	      IntMap.add ord [file] openfiles in
      load_loop cache now openfiles
    with
      | End_of_file ->
	  close_in file;
	  load_loop cache now openfiles
  )
;;


let load cache =
  let format_file =
    Filename.concat cache.directory "FORMAT" in
  if Sys.file_exists format_file then (
    let format =
      Netchannels.with_in_obj_channel
	(new Netchannels.input_channel (open_in format_file))
	(fun ch ->
	   ch # input_line ()
	) in
    if format = version then (
      let files = read_dir cache.directory in
      let files = List.filter (fun n -> Filename.check_suffix n ".data") files in
      let openfiles = load_from cache files in
      
      try
	(* reinit *)
	cache.by_key <- StringMap.empty;
	cache.by_usage <- DList.create();
	cache.by_expiration <- Int64Map.empty;
	
	let now = Int64.of_float(Unix.gettimeofday()) in
	load_loop cache now openfiles;

	prune_by_size cache
      with
	| error -> 
	    close_files openfiles
    )
    else (
      (* Maybe we want a special message... *)
      ()
    )
  )
;;


let activate esys cache =
  if cache.group <> None then
    failwith "Cache_server.activate: already activated";
  cache.esys <- Some esys;
  let group = Unixqueue.new_group esys in
  cache.group <- Some group;
  (* Check every second for expired entries: *)
  check_for_expired_entries cache esys group;
  (* Arrange that the cache is regularly saved: *)
  if cache.config.save_cache_period > 0 then (
    check_for_save cache esys group;
  );
  (* Load cache contents from disk *)
  load cache
;;


let shutdown cache =
  ( match cache.esys, cache.group with
      | Some esys, Some group ->
	  Unixqueue.clear esys group;
	  cache.esys <- None;
	  cache.group <- None
      | _ -> ()
  )
;;

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml