(* $Id$ *) open Cache_aux open Cache_util open Printf class type cache_config = object method directory : string method max_size : int64 method save_cache_period : int method save_cache_speed : int method rpc_program_number : int32 end module StringMap = Map.Make(String) module Int64Map = Map.Make(Int64) module Int = struct type t = int let compare x y = x - y end module IntMap = Map.Make(Int) module DList = struct (* doubly linked list *) type 'a cell = { data : 'a; mutable prev : 'a cell option; mutable next : 'a cell option; } type 'a t = { mutable head : 'a cell option; mutable last : 'a cell option; } let verify l = match l.head with | Some start -> assert(start.prev = None); let cur = ref start in while !cur.next <> None do match !cur.next with | Some n -> ( match n.prev with | Some p -> assert (p == !cur) | None -> assert false ); cur := n; | None -> assert false done; ( match l.last with | Some z -> assert (z == !cur) | None -> assert false ); | None -> assert (l.last = None) let create() = { head = None; last = None } let head l = match l.head with | Some cell -> cell | None -> raise Not_found let last l = match l.last with | Some cell -> cell | None -> raise Not_found let next cell = match cell.next with | Some cell -> cell | None -> raise Not_found let prev cell = match cell.prev with | Some cell -> cell | None -> raise Not_found let contents cell = cell.data let prepend_cell l cell = cell.prev <- None; cell.next <- l.head; ( match l.head with | Some p -> p.prev <- Some cell | None -> () ); l.head <- Some cell; if l.last = None then l.last <- Some cell let prepend l data = let cell = { data = data; prev = None; next = None } in prepend_cell l cell let append_cell l cell = cell.prev <- l.last; cell.next <- None; ( match l.last with | Some p -> p.next <- Some cell | None -> () ); l.last <- Some cell; if l.head = None then l.head <- Some cell let append l data = let cell = { data = data; prev = None; next = None } in append_cell l cell let delete l cell = (* There is no check that [cell] is actually a member of [l]! *) ( match cell.prev with | None -> l.head <- cell.next | Some p -> p.next <- cell.next ); ( match cell.next with | None -> l.last <- cell.prev | Some n -> n.prev <- cell.prev ); cell.prev <- None; cell.next <- None let to_list l = let h = ref [] in let p = ref l.last in while !p <> None do match !p with | Some cell -> h := cell.data :: !h; p := cell.prev; | None -> assert false done; !h end type gen_entry = [ `Entry of entry DList.cell | `Locked_key of string * int64 (* key, expiration *) ] type cache_instance = { mutable config : config; rpc_prognr : int32; directory : string; mutable esys : Unixqueue.event_system option; mutable group : Unixqueue.group option; (* Group for additional timers *) mutable by_key : gen_entry StringMap.t; (* Maps the key to the entry *) mutable by_usage : entry DList.t; (* Orders the entries by usage: The most recently accessed entry is * at the head of the list. The least recently accessed entry is at * the end of the list. *) mutable by_expiration : gen_entry StringMap.t Int64Map.t; (* Maps expiration timestamp to mappings from key to entry. Entries that * do not expire are not member of this mapping. Expiration means that * the entry is completely deleted from the cache. *) stats : stats; mutable saving : bool } let to_hex s = let b = Buffer.create 100 in bprintf b "<"; for k = 0 to String.length s - 1 do bprintf b "%02x" (Char.code s.[k]) done; bprintf b ">"; Buffer.contents b let delete_expiration cache key exp = (* Deletes [key] from [by_expiration] *) if exp <> 0L then ( try let exp_map = Int64Map.find exp cache.by_expiration in (* or Not_found *) let exp_map' = StringMap.remove key exp_map in if exp_map' = StringMap.empty then cache.by_expiration <- Int64Map.remove exp cache.by_expiration else cache.by_expiration <- Int64Map.add exp exp_map' cache.by_expiration with | Not_found -> () ) let add_expiration cache key exp ge = (* Adds [key] to [by_expiration] *) if exp <> 0L then ( let exp_map = try Int64Map.find exp cache.by_expiration with Not_found -> StringMap.empty in let exp_map' = StringMap.add key ge exp_map in cache.by_expiration <- Int64Map.add exp exp_map' cache.by_expiration ) let wsize = Sys.word_size / 8 let entry_size e = (* rough guess about the size in bytes of entry e *) 44 * wsize + 40 + String.length e.e_key + e.e_value_length let delete_entry cache cell = (* Deletes [cell], a member of [by_usage], entirely from the cache *) let e = DList.contents cell in DList.delete cache.by_usage cell; cache.by_key <- StringMap.remove e.e_key cache.by_key; delete_expiration cache e.e_key e.e_expiration; cache.stats.num_entries <- cache.stats.num_entries - 1; let l64 = Int64.of_int (entry_size e) in cache.stats.num_bytes <- Int64.sub cache.stats.num_bytes l64 let delete_key cache key = (* Deletes [key] entirely from the cache *) try let ge = StringMap.find key cache.by_key in (* or Not_found *) match ge with | `Entry cell -> delete_entry cache cell | `Locked_key (_, exp) -> cache.by_key <- StringMap.remove key cache.by_key; delete_expiration cache key exp with | Not_found -> () let add_entry ?(where = `Prepend) cache e = (* Adds the new regular entry [e] to the cache *) (* Precondition: e.e_key is not yet member of the cache *) let cell = match where with | `Prepend -> DList.prepend cache.by_usage e; DList.head cache.by_usage | `Append -> DList.append cache.by_usage e; DList.last cache.by_usage in cache.by_key <- StringMap.add e.e_key (`Entry cell) cache.by_key; add_expiration cache e.e_key e.e_expiration (`Entry cell); cache.stats.num_entries <- cache.stats.num_entries + 1; let l64 = Int64.of_int (entry_size e) in cache.stats.num_bytes <- Int64.add cache.stats.num_bytes l64 let lock_entry cache cell expires = (* Locks the existing [cell], and schedules it for expiration *) let e = DList.contents cell in e.e_delete_flag <- true; e.e_expiration <- expires; let old_exp = e.e_expiration in delete_expiration cache e.e_key old_exp; add_expiration cache e.e_key expires (`Entry cell) let lock_key cache key expires = (* Locks the non-existing [key] until it expires *) let lk = `Locked_key(key,expires) in cache.by_key <- StringMap.add key lk cache.by_key; add_expiration cache key expires lk let prune_by_size cache = while cache.stats.num_bytes > cache.config.max_size do try let cell = DList.last cache.by_usage in (* or Not_found *) delete_entry cache cell with | Not_found -> assert false done let prune_by_expiration cache = let now = Int64.of_float(Unix.gettimeofday()) in ( try Int64Map.iter (fun exp m -> if exp > now then raise Exit; StringMap.iter (fun key ge -> match ge with | `Entry cell -> delete_entry cache cell | `Locked_key(key,_) -> delete_key cache key ) m ) cache.by_expiration with | Exit -> () ) type full_key = { key : Cache_client.key; modulo : int; } let set_directly_1 cache key bucket data expires opts = (* Is [key] already stored in the cache? *) cache.stats.num_calls_set <- cache.stats.num_calls_set + 1; try let ge = StringMap.find key cache.by_key in (* or Not_found *) match ge with | `Locked_key _ -> (* [key] is locked but the entry is non-existent *) raise Not_found | `Entry cell -> let e = DList.contents cell in (* Yes, stored as [e] *) if opts.opt_overwrite then ( if e.e_delete_flag && not opts.opt_undelete then `Not_stored (* not allowed to overwrite deleted entry *) else ( let data_md5 = Digest.string data in let is_change = e.e_value_hash <> data_md5 in let do_store = is_change || not opts.opt_setifchanged in if do_store then ( delete_entry cache cell; let now = Int64.of_float(Unix.gettimeofday()) in e.e_modification <- now; e.e_expiration <- expires; e.e_value <- data; e.e_value_hash <- data_md5; e.e_value_length <- String.length data; e.e_bucket <- bucket; add_entry cache e; prune_by_size cache; `Stored ) else `Not_stored ) ) else `Not_stored (* not allowed to overwrite *) with | Not_found -> (* [key] is not member of the cache! *) if opts.opt_add then ( let key_is_locked = try match StringMap.find key cache.by_key with | `Locked_key _ -> true | _ -> assert false with Not_found -> false in let do_add = not key_is_locked || opts.opt_undelete in if do_add then ( if key_is_locked then delete_key cache key; let now = Int64.of_float(Unix.gettimeofday()) in let e = { e_key = key; e_creation = now; e_modification = now; e_expiration = expires; e_delete_flag = false; e_value = data; e_value_hash = Digest.string data; e_value_length = String.length data; e_counter = 0; e_bucket = bucket } in add_entry cache e; prune_by_size cache; `Stored ) else `Not_stored (* key is locked *) ) else `Not_stored (* not allowed to add *) ;; let set_directly cache fk data expires opts = let key = hash_of_key fk.key in let bucket = bucket_of_hash fk.modulo key in set_directly_1 cache key bucket data expires opts ;; let proc_set cache sess (key,bucket,data,expires,opts) reply = let result = set_directly_1 cache key bucket data expires opts in let result' = match result with | `Stored -> `stored | `Not_stored -> `not_stored in reply result' ;; let get_directly_1 cache key opts = cache.stats.num_calls_get <- cache.stats.num_calls_get + 1; try let ge = StringMap.find key cache.by_key in (* or Not_found *) match ge with | `Locked_key _ -> (* [key] is locked but the entry is non-existent *) raise Not_found | `Entry cell -> let e = DList.contents cell in ( match opts.opt_getifmodifiedsince with | None -> () | Some ts -> if ts > e.e_modification then raise Not_found ); ( match opts.opt_getifnotmd5 with | None -> () | Some md5 -> if md5 = e.e_value_hash then raise Not_found ); let e' = if opts.opt_novalue then { e with e_value = "" } else e in cache.stats.num_hits <- cache.stats.num_hits + 1; e.e_counter <- e.e_counter + 1; (* Move e to the begining of usage list: *) DList.delete cache.by_usage cell; DList.prepend_cell cache.by_usage cell; `Found e' with | Not_found -> `Not_found ;; let get_directly cache fk opts = let key = hash_of_key fk.key in get_directly_1 cache key opts ;; let proc_get cache sess (key,opts) reply = let result = get_directly_1 cache key opts in let result' = match result with | `Found e -> `found e | `Not_found -> `not_found in reply result' ;; let delete_directly_1 cache key expires opts = cache.stats.num_calls_delete <- cache.stats.num_calls_delete + 1; ( try let ge = StringMap.find key cache.by_key in (* or Not_found *) match ge with | `Locked_key _ -> (* [key] is already locked *) () | `Entry cell -> let e = DList.contents cell in let do_delete = not e.e_delete_flag && ( match opts.opt_delifolderthan with | None -> true | Some ts -> e.e_modification < ts ) && ( match opts.opt_delifmd5 with | None -> true | Some md5 -> e.e_value_hash = md5 ) && (match opts.opt_delifmd5 with | None -> true | Some md5 -> e.e_value_hash <> md5 ) in if do_delete then ( let now = Int64.of_float(Unix.gettimeofday()) in if expires <= now then ( (* Delete immediately *) delete_entry cache cell ) else ( (* Lock entry and delete in the future *) lock_entry cache cell expires ) ) with | Not_found -> if opts.opt_strictlock then ( let now = Int64.of_float(Unix.gettimeofday()) in if expires > now then ( lock_key cache key expires ) ) ); () ;; let delete_directly cache fk expires opts = let key = hash_of_key fk.key in delete_directly_1 cache key expires opts ;; let proc_delete cache sess (key,expires,opts) reply = let () = delete_directly_1 cache key expires opts in reply () ;; let clear_directly cache = cache.by_key <- StringMap.empty; cache.by_usage <- DList.create(); cache.by_expiration<- Int64Map.empty; cache.stats.num_entries <- 0; cache.stats.num_bytes <- 0L; ;; let proc_clear cache sess () reply = clear_directly cache; reply () ;; let get_config cache = cache.config let proc_get_config cache sess () reply = reply cache.config ;; let set_config cache new_config = cache.config <- new_config; prune_by_size cache; (* maybe max_size shrinked *) ;; let proc_set_config cache sess new_config reply = set_config cache new_config; reply () ;; let get_stats cache = cache.stats let proc_get_stats cache sess () reply = reply cache.stats ;; let clear_counters cache = cache.stats.num_calls_get <- 0; cache.stats.num_calls_set <- 0; cache.stats.num_calls_delete <- 0; cache.stats.num_hits <- 0; cache.stats.counters_reset_on <- Int64.of_float(Unix.gettimeofday()); ;; let proc_clear_counters cache sess () reply = clear_counters cache; reply () ;; exception End_of_cache let verify_entries ~log cache = let now = Int64.of_float (Unix.gettimeofday()) in let n_regular = ref 0 in let success = ref true in ( try let cur = ref (try DList.head cache.by_usage with Not_found -> raise End_of_cache) in while true do incr n_regular; let e = DList.contents !cur in ( try let ge = StringMap.find e.e_key cache.by_key in match ge with | `Entry cell -> if cell != !cur then ( log (sprintf "* Wrong cell found for key: %s" (to_hex e.e_key)); success := false; ) | `Locked_key _ -> log (sprintf "* Lock found for key: %s" (to_hex e.e_key)); success := false; with | Not_found -> log (sprintf "* Key not found: %s" (to_hex e.e_key)); success := false ); if e.e_expiration <> 0L then ( try let m = Int64Map.find e.e_expiration cache.by_expiration in ( try let ge = StringMap.find e.e_key m in match ge with | `Entry cell -> if cell != !cur then ( log (sprintf "* Wrong cell found for key in expiration map: %s" (to_hex e.e_key)); success := false; ) | `Locked_key _ -> log (sprintf "* Lock found for key in expiration map: %s" (to_hex e.e_key)); success := false; with Not_found -> log (sprintf "* Key not found in expiration map: %s" (to_hex e.e_key)); success := false ) with Not_found -> log (sprintf "* Expiration entry not found: %s" (to_hex e.e_key)); success := false ); cur := ( try DList.next !cur with Not_found -> raise End_of_cache ) done; assert false with | End_of_cache -> () ); let n_by_key = ref 0 in let n_by_key_can_expire = ref 0 in StringMap.iter (fun _ ge -> incr n_by_key; ( match ge with | `Entry cell -> let e = DList.contents cell in if e.e_expiration <> 0L then incr n_by_key_can_expire | `Locked_key _ -> incr n_by_key_can_expire ) ) cache.by_key; let n_by_expiration = ref 0 in Int64Map.iter (fun d m -> if d = 0L then ( log "* Expiration date 0 is invalid in by_expiration"; success := false ) else if d < Int64.sub now 100L then ( log (sprintf "* Very old entries found in by_expiration, age = %Ld seconds" (Int64.sub now d)); success := false ); StringMap.iter (fun _ _ -> incr n_by_expiration) m) cache.by_expiration; (* n_by_key > n_regular because we can also have locked keys in by_key *) if !n_by_key < !n_regular then ( log "* Too few entries in by_key"; success := false; ); if !n_by_key_can_expire <> !n_by_expiration then ( log "* Wrong number of entries in by_key/by_expiration"; success := false; ); log (sprintf "* Statistics: %d regular entries, %d locked keys, %d expirable" !n_regular (!n_by_key - !n_regular) !n_by_key_can_expire); if not !success then failwith "verify_entries" let verify ~log cache = try log "Starting verification of cache structure"; log "- Starting DList.verify"; DList.verify cache.by_usage; log "- Starting verify_entries"; verify_entries ~log cache; log "Verification successful!"; with | error -> log (sprintf "Exception in verification: %s" (Printexc.to_string error)) ;; let create cconfig = let config = { max_size = cconfig # max_size; save_cache_period = cconfig # save_cache_period; save_cache_speed = cconfig # save_cache_speed; } in let stats = { num_entries = 0; num_bytes = 0L; num_calls_get = 0; num_calls_set = 0; num_calls_delete = 0; num_hits = 0; counters_reset_on = Int64.of_float(Unix.gettimeofday()); } in let cache = { config = config; rpc_prognr = cconfig # rpc_program_number; directory = cconfig # directory; esys = None; group = None; by_key = StringMap.empty; by_usage = DList.create(); by_expiration = Int64Map.empty; stats = stats; saving = false } in cache ;; let bind srv cache = Cache_srv.Cache.V1.bind_async ~program_number:(Netnumber.uint4_of_int32 cache.rpc_prognr) ~proc_ping:(fun _ _ reply -> reply()) ~proc_set:(proc_set cache) ~proc_get:(proc_get cache) ~proc_delete:(proc_delete cache) ~proc_clear:(proc_clear cache) ~proc_get_config:(proc_get_config cache) ~proc_set_config:(proc_set_config cache) ~proc_get_stats:(proc_get_stats cache) ~proc_clear_counters:(proc_clear_counters cache) srv; ;; let rec check_for_expired_entries cache esys group = Unixqueue.once esys group 1.0 (fun () -> prune_by_expiration cache; check_for_expired_entries cache esys group ) (* File format: * * Every file "cache-<bucket>.data" is a sequence of: * - Ordinal number as 4 bytes (big endian) * - Size of the following record, as 4 bytes (big endian) * - Record, as XDR of type entry. * * The ordinal numbers express the order in by_usage. *) (* Sys.readdir has a memory leak in O'Caml 3.09. So we use our own function * to read a directory. Note: Fixed in 3.09.3! *) let read_dir d = try let dh = Unix.opendir d in let f = ref [] in try while true do let n = Unix.readdir dh in if n <> "." && n <> ".." then f := n :: !f; done; assert false with End_of_file -> Unix.closedir dh; List.rev !f | error -> Unix.closedir dh; (* Cannot read entry *) raise error with | Unix.Unix_error(e,_,_) -> raise(Sys_error (Unix.error_message e ^ ": " ^ d)) ;; let prepare_directory d = (* Scans through the files in directory [d], removes all files with * suffix ".new", and returns the list of the remaining files. *) let files = read_dir d in let rem_files = ref [] in List.iter (fun f -> let df = Filename.concat d f in if Filename.check_suffix f ".new" then Sys.remove df else rem_files := df :: !rem_files ) files; !rem_files ;; let xdr_entry_type = lazy (Netxdr.validate_xdr_type Cache_aux.xdrt_entry) let save_one cache ord e files = (* Save the entry [e]. The hash table [files] maps bucket numbers * to open files. *) let e_xdr = Cache_aux._of_entry e in let data = Netxdr.pack_xdr_value_as_string e_xdr (Lazy.force xdr_entry_type) [] in let bucket = e.e_bucket in let name = "cache-" ^ string_of_int bucket ^ ".new" in let fullname = Filename.concat cache.directory name in let file = try Hashtbl.find files bucket with | Not_found -> let f = open_out_gen [Open_wronly; Open_append; Open_creat ] 0o666 fullname in Hashtbl.add files bucket f; f in let size = String.length data in output_string file (Netnumber.BE.int4_as_string (Netnumber.int4_of_int ord)); output_string file (Netnumber.BE.int4_as_string (Netnumber.int4_of_int size)); output_string file data; size ;; let version = "2006-11-20" let rec save_thread cache ord l files t0 n f = let close_files() = Hashtbl.iter (fun _ f -> close_out f) files in try match l with | [] -> close_files(); f() | e :: l' -> let size = save_one cache ord e files in let n' = n +. (float size) in let now = Unix.gettimeofday() in let speed = cache.config.save_cache_speed in let t1 = if speed = 0 then t0 else t0 +. n' /. (float speed) in let tdelta = max 0.0 (t1 -. now) in let esys = match cache.esys with Some es -> es | None -> assert false in let g = match cache.group with Some g -> g | None -> assert false in Unixqueue.once esys g tdelta (fun () -> save_thread cache (ord+1) l' files t0 n' f) with | error -> close_files(); cache.saving <- false; raise error ;; let save cache = if cache.saving then failwith "Save operation is already started"; if cache.esys = None || cache.group = None then failwith "Cache is not activated"; let old_files = prepare_directory cache.directory in let l = DList.to_list cache.by_usage in let files = Hashtbl.create 50 in let t0 = Unix.gettimeofday() in save_thread cache 0 l files t0 0.0 (fun () -> cache.saving <- false; List.iter (fun old_file -> try Sys.remove old_file with _ -> () ) old_files; Hashtbl.iter (fun bucket _ -> let name = "cache-" ^ string_of_int bucket in let fullname = Filename.concat cache.directory name in Sys.rename (fullname ^ ".new") (fullname ^ ".data") ) files; let format_file = Filename.concat cache.directory "FORMAT" in Netchannels.with_out_obj_channel (new Netchannels.output_channel (open_out format_file)) (fun ch -> ch # output_string version ) ) ;; let rec check_for_save cache esys group = Unixqueue.once esys group (float cache.config.save_cache_period) (fun () -> check_for_save cache esys group; if not cache.saving then save cache ) ;; let close_files openfiles = IntMap.iter (fun _ files -> List.iter close_in files) openfiles let load_from cache files = let openfiles = ref IntMap.empty in List.iter (fun name -> let file = open_in (Filename.concat cache.directory name) in try let s = Bytes.create 4 in really_input file s 0 4; (* or End_of_file *) let ord = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in if IntMap.mem ord !openfiles then ( let other_files = IntMap.find ord !openfiles in openfiles := IntMap.add ord (file :: other_files) !openfiles ) else openfiles := IntMap.add ord [file] !openfiles with | End_of_file -> close_in file (* ... and ignore *) | error -> close_in file; close_files !openfiles; raise error ) files; !openfiles ;; let load_one cache now file = let s = Bytes.create 4 in really_input file s 0 4; let size = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in let data = Bytes.create size in really_input file data 0 size; let xdr = Netxdr.unpack_xdr_value ~fast:true data (Lazy.force xdr_entry_type) [] in let e = Cache_aux._to_entry xdr in let esize = Int64.of_int (entry_size e) in let not_expired = e.e_expiration = 0L || e.e_expiration > now in let fits_by_size = Int64.add cache.stats.num_bytes esize <= cache.config.max_size in if not_expired && fits_by_size then add_entry ~where:`Append cache e ;; let int_map_min m = (* Return (key,value) of the smallest key in m *) let r = ref None in ( try IntMap.iter (fun k x -> r := Some(k,x); raise Exit) m with Exit -> () ); match !r with | None -> raise Not_found | Some(k,x) -> (k,x) ;; let rec load_loop cache now openfiles = if openfiles = IntMap.empty then () else ( let (ord, files) = int_map_min openfiles in let file = List.hd files in let files' = List.tl files in load_one cache now file; let openfiles = (* remove [file] *) if files' = [] then IntMap.remove ord openfiles else IntMap.add ord files' openfiles in try let s = Bytes.create 4 in really_input file s 0 4; (* or End_of_file *) let ord = Netnumber.int_of_int4(Netnumber.BE.read_int4 s 0) in let openfiles = try let files' = IntMap.find ord openfiles in IntMap.add ord (file :: files') openfiles with | Not_found -> IntMap.add ord [file] openfiles in load_loop cache now openfiles with | End_of_file -> close_in file; load_loop cache now openfiles ) ;; let load cache = let format_file = Filename.concat cache.directory "FORMAT" in if Sys.file_exists format_file then ( let format = Netchannels.with_in_obj_channel (new Netchannels.input_channel (open_in format_file)) (fun ch -> ch # input_line () ) in if format = version then ( let files = read_dir cache.directory in let files = List.filter (fun n -> Filename.check_suffix n ".data") files in let openfiles = load_from cache files in try (* reinit *) cache.by_key <- StringMap.empty; cache.by_usage <- DList.create(); cache.by_expiration <- Int64Map.empty; let now = Int64.of_float(Unix.gettimeofday()) in load_loop cache now openfiles; prune_by_size cache with | error -> close_files openfiles ) else ( (* Maybe we want a special message... *) () ) ) ;; let activate esys cache = if cache.group <> None then failwith "Cache_server.activate: already activated"; cache.esys <- Some esys; let group = Unixqueue.new_group esys in cache.group <- Some group; (* Check every second for expired entries: *) check_for_expired_entries cache esys group; (* Arrange that the cache is regularly saved: *) if cache.config.save_cache_period > 0 then ( check_for_save cache esys group; ); (* Load cache contents from disk *) load cache ;; let shutdown cache = ( match cache.esys, cache.group with | Some esys, Some group -> Unixqueue.clear esys group; cache.esys <- None; cache.group <- None | _ -> () ) ;;