(* $Id: cache_client.ml 11914 2007-06-28 23:06:44Z gerd $ *)
open Cache_util
type key = Cache_util.key
type timestamp = int64
type entry = Cache_aux.entry =
{ mutable e_key : Digest.t;
mutable e_creation : timestamp;
mutable e_modification : timestamp;
mutable e_expiration : timestamp;
mutable e_delete_flag : bool;
mutable e_value : string;
mutable e_value_hash : Digest.t;
mutable e_value_length : int;
mutable e_counter : int;
mutable e_bucket : int;
}
type set_options = Cache_aux.set_options =
{
mutable opt_overwrite : bool;
mutable opt_add : bool;
mutable opt_undelete : bool;
mutable opt_setifchanged : bool;
}
type get_options = Cache_aux.get_options =
{
mutable opt_novalue : bool;
mutable opt_getifmodifiedsince : timestamp option;
mutable opt_getifnotmd5 : Digest.t option;
}
type delete_options = Cache_aux.delete_options =
{
mutable opt_strictlock : bool;
mutable opt_delifolderthan : timestamp option;
mutable opt_delifmd5 : Digest.t option;
mutable opt_delifnotmd5 : Digest.t option;
}
type config = Cache_aux.config =
{
mutable max_size : int64;
mutable save_cache_period : int;
mutable save_cache_speed : int;
}
type stats = Cache_aux.stats =
{
mutable num_entries : int;
mutable num_bytes : int64;
mutable num_calls_get : int;
mutable num_calls_set : int;
mutable num_calls_delete : int;
mutable num_hits : int;
mutable counters_reset_on : timestamp;
}
type 'a async_reply =
((unit -> 'a) -> unit)
exception Server_not_alive
class type client_config =
object
method buckets : Rpc_client.connector array
method is_alive : int -> bool
method query_timeout : float
method idle_timeout : float
end
class type async_client =
object
method client_config : client_config
method set : [`Stored|`Not_stored|`Timeout] async_reply ->
key ->
string ->
timestamp ->
set_options ->
unit
method get : [`Found of entry|`Not_found|`Timeout] async_reply ->
key ->
get_options ->
unit
method delete : [`Ok|`Timeout] async_reply ->
key ->
timestamp ->
delete_options ->
unit
method clear : [`Ok|`Timeout] async_reply ->
unit
method get_config : [ `Ok of ( Rpc_client.connector * config ) list
| `Timeout
] async_reply ->
unit
method set_config : [`Ok|`Timeout] async_reply ->
( Rpc_client.connector * config ) list ->
unit
method get_stats : [ `Ok of ( Rpc_client.connector * stats ) list
| `Timeout
] async_reply ->
unit
method clear_counters : [`Ok|`Timeout] async_reply -> unit
method shutdown : unit -> unit
end
class type sync_client =
object
method client_config : client_config
method set : key ->
string ->
int64 ->
set_options ->
[`Stored|`Not_stored|`Timeout]
method get : key ->
get_options ->
[`Found of entry|`Not_found|`Timeout]
method delete : key ->
int64 ->
delete_options ->
[`Ok|`Timeout]
method clear : unit -> [`Ok|`Timeout]
method get_config : unit ->
[ `Ok of ( Rpc_client.connector * config ) list
| `Timeout
]
method set_config : ( Rpc_client.connector * config ) list ->
[`Ok|`Timeout]
method get_stats : unit ->
[ `Ok of ( Rpc_client.connector * stats ) list
| `Timeout
]
method clear_counters : unit -> [`Ok|`Timeout]
method shutdown : unit -> unit
end
let transform_reply pass_reply_to_user transform f_anyway get_reply_from_rpc =
f_anyway();
let ok = ref true in
try
let rpc_reply = get_reply_from_rpc() in
try
pass_reply_to_user (fun () -> transform rpc_reply)
with
| e' -> ok := false; raise e'
with
| Rpc_client.Message_timeout when !ok ->
pass_reply_to_user (fun () -> `Timeout)
| e when !ok -> (* ok = exception comes from get_reply_from_rpc *)
pass_reply_to_user (fun () -> raise e)
;;
let handle_rpc_error client pass_reply get_reply_from_rpc =
pass_reply
(fun () ->
try
get_reply_from_rpc()
with
| e ->
Rpc_client.shut_down client;
raise e
)
class async_client_impl config esys : async_client =
object(self)
val clients = Hashtbl.create 50
val client_usage =
(Hashtbl.create 50 :
('c, [`Count of int | `Unused of Unixqueue.group] ref) Hashtbl.t)
method client_config = config
method private connector_of x =
let modulo = Array.length config#buckets in
let j =
match x with
| `Bucket b -> b
| #key as k -> bucket_of_hash modulo (hash_of_key k) in
let conn = (config # buckets).(j) in
if not (config # is_alive j) then (
self # disconnect_client conn;
raise Server_not_alive
);
conn
method private client_of_connector connector =
try
Hashtbl.find clients connector
with
| Not_found ->
let m =
`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config) in
let c =
Cache_clnt.Cache.V1.create_client2 ~esys m in
Rpc_client.configure c 0 config#query_timeout;
Hashtbl.add clients connector c;
c
method private disconnect_client connector =
try
let client = Hashtbl.find clients connector in
Rpc_client.shut_down client;
Hashtbl.remove clients connector
with
| Not_found -> ()
method shutdown() =
Hashtbl.iter
(fun conn client ->
Rpc_client.shut_down client
)
clients;
Hashtbl.clear clients
method private incr_usage connector =
try
let usage = Hashtbl.find client_usage connector in
( match !usage with
| `Count n ->
usage := `Count(n+1)
| `Unused tmo_group ->
Unixqueue.clear esys tmo_group;
usage := `Count 1
)
with
| Not_found ->
Hashtbl.add client_usage connector (ref (`Count 1))
method private decr_usage connector () =
try
let usage = Hashtbl.find client_usage connector in
( match !usage with
| `Count n ->
assert(n >= 1);
let n' = n-1 in
if n' = 0 then (
let tmo = config#idle_timeout in
if tmo > 0.0 then (
let tmo_group = Unixqueue.new_group esys in
usage := `Unused tmo_group;
Unixqueue.once esys tmo_group tmo
(fun () ->
self # disconnect_client connector;
Hashtbl.remove client_usage connector
)
)
else
if tmo = 0.0 then
(
self # disconnect_client connector;
Hashtbl.remove client_usage connector
)
else
usage := `Count 0
)
else
usage := `Count n'
| `Unused _ ->
assert false
)
with
| Not_found ->
assert false
method private connectors =
(* Return all connectors mentioned in client_config *)
let l = ref [] in
for k = 0 to Array.length config#buckets - 1 do
let c = (config#buckets).(k) in
if not (List.mem c !l) then l := c :: !l
done;
!l
method private with_client connector f =
try
let client = self # client_of_connector connector in
f connector client
with
| Rpc_client.Client_is_down ->
(* CHECK: Maybe make it configurable what to do in this case *)
Hashtbl.remove clients connector;
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0
(fun () -> self # with_client connector f)
method set pass_reply k v ts opts =
let h = hash_of_key k in
let modulo = Array.length config#buckets in
let bucket = bucket_of_hash modulo h in
self # with_client
(self # connector_of (`Bucket bucket))
(fun connector client ->
self # incr_usage connector;
Cache_clnt.Cache.V1.set'async
client
(h, bucket, v, ts, opts)
(handle_rpc_error
client
(transform_reply
pass_reply
(function
| `stored -> `Stored
| `not_stored -> `Not_stored
)
(self # decr_usage connector)
)
)
)
method get pass_reply k opts =
let h = hash_of_key k in
let modulo = Array.length config#buckets in
let bucket = bucket_of_hash modulo h in
self # with_client
(self # connector_of (`Bucket bucket))
(fun connector client ->
self # incr_usage connector;
Cache_clnt.Cache.V1.get'async
client
(h, opts)
(handle_rpc_error
client
(transform_reply
pass_reply
(function
| `found e -> `Found e
| `not_found -> `Not_found
)
(self # decr_usage connector)
)
)
)
method delete pass_reply k ts opts =
let h = hash_of_key k in
let modulo = Array.length config#buckets in
let bucket = bucket_of_hash modulo h in
self # with_client
(self # connector_of (`Bucket bucket))
(fun connector client ->
self # incr_usage connector;
Cache_clnt.Cache.V1.delete'async
client
(h, ts, opts)
(handle_rpc_error
client
(transform_reply
pass_reply
(fun () -> `Ok)
(self # decr_usage connector)
)
)
)
method private multi_invoke : 'arg 'res .
[ `Ok of (Rpc_client.connector * 'res) list
| `Timeout
] async_reply ->
(Rpc_client.t -> 'arg -> 'res async_reply -> unit) ->
(Rpc_client.connector -> 'arg) ->
Rpc_client.connector list ->
unit =
fun pass_reply rpc_call arg_of_conn connectors ->
(* Call [rpc_call] for every connector in [connectors]. The argument
* is the result of [arg_of_conn]. The result is accumulated, and
* passed back using [pass_reply] when all replies have arrived.
*)
let result = ref (`Ok []) in
let n = ref 0 in
List.iter
(fun connector ->
self # with_client
connector
(fun connector client ->
self # incr_usage connector;
rpc_call
client
(arg_of_conn connector)
(fun get_reply ->
decr n;
self # decr_usage connector ();
( try
let r = get_reply() in
( match !result with
| `Ok l -> result := `Ok ((connector, r) :: l)
| _ -> ()
)
with
| Rpc_client.Message_timeout ->
( match !result with
| `Ok _ -> result := `Timeout
| _ -> ()
);
Rpc_client.shut_down client
| error ->
result := `Error error;
Rpc_client.shut_down client
);
if !n = 0 then
pass_reply
(fun () ->
match !result with
| `Error e -> raise e
| (`Ok _|`Timeout) as regular -> regular
)
)
);
incr n
)
connectors
method clear pass_reply =
self # multi_invoke
(transform_reply
pass_reply
(function
| `Ok _ -> `Ok
| `Timeout -> `Timeout
)
(fun () -> ())
)
Cache_clnt.Cache.V1.clear'async
(fun connector -> ())
(self # connectors)
method clear_counters pass_reply =
self # multi_invoke
(transform_reply
pass_reply
(function
| `Ok _ -> `Ok
| `Timeout -> `Timeout
)
(fun () -> ())
)
Cache_clnt.Cache.V1.clear_counters'async
(fun connector -> ())
(self # connectors)
method get_config pass_reply =
self # multi_invoke
pass_reply
Cache_clnt.Cache.V1.get_config'async
(fun connector -> ())
(self # connectors)
method get_stats pass_reply =
self # multi_invoke
pass_reply
Cache_clnt.Cache.V1.get_stats'async
(fun connector -> ())
(self # connectors)
method set_config pass_reply new_configs =
self # multi_invoke
(transform_reply
pass_reply
(function
| `Ok _ -> `Ok
| `Timeout -> `Timeout
)
(fun () -> ())
)
Cache_clnt.Cache.V1.set_config'async
(fun connector -> List.assoc connector new_configs)
(List.map fst new_configs)
end
let create_async_client = new async_client_impl
let synchronize esys f_async =
let get_reply = ref (fun () -> assert false) in
f_async (fun gr -> get_reply := gr);
Unixqueue.run esys;
!get_reply()
class sync_client_impl config : sync_client =
let config =
( object
method buckets = config # buckets
method query_timeout = config # query_timeout
method idle_timeout = (-1.0) (* Not supported by the sync client *)
method is_alive = config # is_alive
end
) in
let esys = Unixqueue.create_unix_event_system() in
let ac = new async_client_impl config esys in
object(self)
method client_config = config
method set k v ts opts =
synchronize
esys
(fun pr -> ac # set pr k v ts opts)
method get k opts =
synchronize
esys
(fun pr -> ac # get pr k opts)
method delete k ts opts =
synchronize
esys
(fun pr -> ac # delete pr k ts opts)
method clear () =
synchronize esys ac#clear
method get_config() =
synchronize esys ac#get_config
method set_config new_configs =
synchronize
esys
(fun pr -> ac # set_config pr new_configs)
method get_stats() =
synchronize esys ac#get_stats
method clear_counters() =
synchronize esys ac#clear_counters
method shutdown() =
ac # shutdown()
end
let create_sync_client = new sync_client_impl