(* $Id$ *) open Cache_util type key = Cache_util.key type timestamp = int64 type entry = Cache_aux.entry = { mutable e_key : Digest.t; mutable e_creation : timestamp; mutable e_modification : timestamp; mutable e_expiration : timestamp; mutable e_delete_flag : bool; mutable e_value : string; mutable e_value_hash : Digest.t; mutable e_value_length : int; mutable e_counter : int; mutable e_bucket : int; } type set_options = Cache_aux.set_options = { mutable opt_overwrite : bool; mutable opt_add : bool; mutable opt_undelete : bool; mutable opt_setifchanged : bool; } type get_options = Cache_aux.get_options = { mutable opt_novalue : bool; mutable opt_getifmodifiedsince : timestamp option; mutable opt_getifnotmd5 : Digest.t option; } type delete_options = Cache_aux.delete_options = { mutable opt_strictlock : bool; mutable opt_delifolderthan : timestamp option; mutable opt_delifmd5 : Digest.t option; mutable opt_delifnotmd5 : Digest.t option; } type config = Cache_aux.config = { mutable max_size : int64; mutable save_cache_period : int; mutable save_cache_speed : int; } type stats = Cache_aux.stats = { mutable num_entries : int; mutable num_bytes : int64; mutable num_calls_get : int; mutable num_calls_set : int; mutable num_calls_delete : int; mutable num_hits : int; mutable counters_reset_on : timestamp; } type 'a async_reply = ((unit -> 'a) -> unit) exception Server_not_alive class type client_config = object method buckets : Rpc_client.connector array method is_alive : int -> bool method query_timeout : float method idle_timeout : float end class type async_client = object method client_config : client_config method set : [`Stored|`Not_stored|`Timeout] async_reply -> key -> string -> timestamp -> set_options -> unit method get : [`Found of entry|`Not_found|`Timeout] async_reply -> key -> get_options -> unit method delete : [`Ok|`Timeout] async_reply -> key -> timestamp -> delete_options -> unit method clear : [`Ok|`Timeout] async_reply -> unit method get_config : [ `Ok of ( Rpc_client.connector * config ) list | `Timeout ] async_reply -> unit method set_config : [`Ok|`Timeout] async_reply -> ( Rpc_client.connector * config ) list -> unit method get_stats : [ `Ok of ( Rpc_client.connector * stats ) list | `Timeout ] async_reply -> unit method clear_counters : [`Ok|`Timeout] async_reply -> unit method shutdown : unit -> unit end class type sync_client = object method client_config : client_config method set : key -> string -> int64 -> set_options -> [`Stored|`Not_stored|`Timeout] method get : key -> get_options -> [`Found of entry|`Not_found|`Timeout] method delete : key -> int64 -> delete_options -> [`Ok|`Timeout] method clear : unit -> [`Ok|`Timeout] method get_config : unit -> [ `Ok of ( Rpc_client.connector * config ) list | `Timeout ] method set_config : ( Rpc_client.connector * config ) list -> [`Ok|`Timeout] method get_stats : unit -> [ `Ok of ( Rpc_client.connector * stats ) list | `Timeout ] method clear_counters : unit -> [`Ok|`Timeout] method shutdown : unit -> unit end let transform_reply pass_reply_to_user transform f_anyway get_reply_from_rpc = f_anyway(); let ok = ref true in try let rpc_reply = get_reply_from_rpc() in try pass_reply_to_user (fun () -> transform rpc_reply) with | e' -> ok := false; raise e' with | Rpc_client.Message_timeout when !ok -> pass_reply_to_user (fun () -> `Timeout) | e when !ok -> (* ok = exception comes from get_reply_from_rpc *) pass_reply_to_user (fun () -> raise e) ;; let handle_rpc_error client pass_reply get_reply_from_rpc = pass_reply (fun () -> try get_reply_from_rpc() with | e -> Rpc_client.shut_down client; raise e ) class async_client_impl config esys : async_client = object(self) val clients = Hashtbl.create 50 val client_usage = (Hashtbl.create 50 : ('c, [`Count of int | `Unused of Unixqueue.group] ref) Hashtbl.t) method client_config = config method private connector_of x = let modulo = Array.length config#buckets in let j = match x with | `Bucket b -> b | #key as k -> bucket_of_hash modulo (hash_of_key k) in let conn = (config # buckets).(j) in if not (config # is_alive j) then ( self # disconnect_client conn; raise Server_not_alive ); conn method private client_of_connector connector = try Hashtbl.find clients connector with | Not_found -> let m = `Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config) in let c = Cache_clnt.Cache.V1.create_client2 ~esys m in Rpc_client.configure c 0 config#query_timeout; Hashtbl.add clients connector c; c method private disconnect_client connector = try let client = Hashtbl.find clients connector in Rpc_client.shut_down client; Hashtbl.remove clients connector with | Not_found -> () method shutdown() = Hashtbl.iter (fun conn client -> Rpc_client.shut_down client ) clients; Hashtbl.clear clients method private incr_usage connector = try let usage = Hashtbl.find client_usage connector in ( match !usage with | `Count n -> usage := `Count(n+1) | `Unused tmo_group -> Unixqueue.clear esys tmo_group; usage := `Count 1 ) with | Not_found -> Hashtbl.add client_usage connector (ref (`Count 1)) method private decr_usage connector () = try let usage = Hashtbl.find client_usage connector in ( match !usage with | `Count n -> assert(n >= 1); let n' = n-1 in if n' = 0 then ( let tmo = config#idle_timeout in if tmo > 0.0 then ( let tmo_group = Unixqueue.new_group esys in usage := `Unused tmo_group; Unixqueue.once esys tmo_group tmo (fun () -> self # disconnect_client connector; Hashtbl.remove client_usage connector ) ) else if tmo = 0.0 then ( self # disconnect_client connector; Hashtbl.remove client_usage connector ) else usage := `Count 0 ) else usage := `Count n' | `Unused _ -> assert false ) with | Not_found -> assert false method private connectors = (* Return all connectors mentioned in client_config *) let l = ref [] in for k = 0 to Array.length config#buckets - 1 do let c = (config#buckets).(k) in if not (List.mem c !l) then l := c :: !l done; !l method private with_client connector f = try let client = self # client_of_connector connector in f connector client with | Rpc_client.Client_is_down -> (* CHECK: Maybe make it configurable what to do in this case *) Hashtbl.remove clients connector; let g = Unixqueue.new_group esys in Unixqueue.once esys g 0.0 (fun () -> self # with_client connector f) method set pass_reply k v ts opts = let h = hash_of_key k in let modulo = Array.length config#buckets in let bucket = bucket_of_hash modulo h in self # with_client (self # connector_of (`Bucket bucket)) (fun connector client -> self # incr_usage connector; Cache_clnt.Cache.V1.set'async client (h, bucket, v, ts, opts) (handle_rpc_error client (transform_reply pass_reply (function | `stored -> `Stored | `not_stored -> `Not_stored ) (self # decr_usage connector) ) ) ) method get pass_reply k opts = let h = hash_of_key k in let modulo = Array.length config#buckets in let bucket = bucket_of_hash modulo h in self # with_client (self # connector_of (`Bucket bucket)) (fun connector client -> self # incr_usage connector; Cache_clnt.Cache.V1.get'async client (h, opts) (handle_rpc_error client (transform_reply pass_reply (function | `found e -> `Found e | `not_found -> `Not_found ) (self # decr_usage connector) ) ) ) method delete pass_reply k ts opts = let h = hash_of_key k in let modulo = Array.length config#buckets in let bucket = bucket_of_hash modulo h in self # with_client (self # connector_of (`Bucket bucket)) (fun connector client -> self # incr_usage connector; Cache_clnt.Cache.V1.delete'async client (h, ts, opts) (handle_rpc_error client (transform_reply pass_reply (fun () -> `Ok) (self # decr_usage connector) ) ) ) method private multi_invoke : 'arg 'res . [ `Ok of (Rpc_client.connector * 'res) list | `Timeout ] async_reply -> (Rpc_client.t -> 'arg -> 'res async_reply -> unit) -> (Rpc_client.connector -> 'arg) -> Rpc_client.connector list -> unit = fun pass_reply rpc_call arg_of_conn connectors -> (* Call [rpc_call] for every connector in [connectors]. The argument * is the result of [arg_of_conn]. The result is accumulated, and * passed back using [pass_reply] when all replies have arrived. *) let result = ref (`Ok []) in let n = ref 0 in List.iter (fun connector -> self # with_client connector (fun connector client -> self # incr_usage connector; rpc_call client (arg_of_conn connector) (fun get_reply -> decr n; self # decr_usage connector (); ( try let r = get_reply() in ( match !result with | `Ok l -> result := `Ok ((connector, r) :: l) | _ -> () ) with | Rpc_client.Message_timeout -> ( match !result with | `Ok _ -> result := `Timeout | _ -> () ); Rpc_client.shut_down client | error -> result := `Error error; Rpc_client.shut_down client ); if !n = 0 then pass_reply (fun () -> match !result with | `Error e -> raise e | (`Ok _|`Timeout) as regular -> regular ) ) ); incr n ) connectors method clear pass_reply = self # multi_invoke (transform_reply pass_reply (function | `Ok _ -> `Ok | `Timeout -> `Timeout ) (fun () -> ()) ) Cache_clnt.Cache.V1.clear'async (fun connector -> ()) (self # connectors) method clear_counters pass_reply = self # multi_invoke (transform_reply pass_reply (function | `Ok _ -> `Ok | `Timeout -> `Timeout ) (fun () -> ()) ) Cache_clnt.Cache.V1.clear_counters'async (fun connector -> ()) (self # connectors) method get_config pass_reply = self # multi_invoke pass_reply Cache_clnt.Cache.V1.get_config'async (fun connector -> ()) (self # connectors) method get_stats pass_reply = self # multi_invoke pass_reply Cache_clnt.Cache.V1.get_stats'async (fun connector -> ()) (self # connectors) method set_config pass_reply new_configs = self # multi_invoke (transform_reply pass_reply (function | `Ok _ -> `Ok | `Timeout -> `Timeout ) (fun () -> ()) ) Cache_clnt.Cache.V1.set_config'async (fun connector -> List.assoc connector new_configs) (List.map fst new_configs) end let create_async_client = new async_client_impl let synchronize esys f_async = let get_reply = ref (fun () -> assert false) in f_async (fun gr -> get_reply := gr); Unixqueue.run esys; !get_reply() class sync_client_impl config : sync_client = let config = ( object method buckets = config # buckets method query_timeout = config # query_timeout method idle_timeout = (-1.0) (* Not supported by the sync client *) method is_alive = config # is_alive end ) in let esys = Unixqueue.create_unix_event_system() in let ac = new async_client_impl config esys in object(self) method client_config = config method set k v ts opts = synchronize esys (fun pr -> ac # set pr k v ts opts) method get k opts = synchronize esys (fun pr -> ac # get pr k opts) method delete k ts opts = synchronize esys (fun pr -> ac # delete pr k ts opts) method clear () = synchronize esys ac#clear method get_config() = synchronize esys ac#get_config method set_config new_configs = synchronize esys (fun pr -> ac # set_config pr new_configs) method get_stats() = synchronize esys ac#get_stats method clear_counters() = synchronize esys ac#clear_counters method shutdown() = ac # shutdown() end let create_sync_client = new sync_client_impl