(* $Id: cache_client.mli 15019 2007-11-09 18:54:46Z gerd $ *)
(** Client to access cache servers *)
(** {2 Theory of operation} *)
(** {b Buckets.} The client can be used to access one or several cache servers.
* The whole cache store is divided up in so-called buckets. The user
* is free to
*
* - use only a single bucket on a single server
* - put every bucket on a different server
* - even have multiple buckets per server
*
* This scheme is primarily intended for distributing the whole
* store over multiple servers. Having multiple buckets per server
* gives some freedom for reorganizing the distribution scheme (e.g.
* add servers later). The library determines which
* key lives on which server by computing the [e_bucket] component
* of the entries. Functions for doing this can be found in
* {!Cache_util}:
*
* {[
* let e_key = Cache_util.hash_of_key k in
* let e_bucket = Cache_util.bucket_of_hash n e_key
* ]}
*
* The number [n] is the number of buckets.
*)
(** {b Expiration.} Entries can have a timestamp that says when the entries
* are removed from the cache. It is ensured that the [get] operation never
* returns an expired entry when both client and server have the same clock.
* Independent of expiration by time, the cache may also expire entries
* because the cache becomes too large. The cache implements a LRU semantics:
* The least recently used entries are removed first. As "use" any [get]
* or [set] operation qualifies.
*)
(** {b Deletion.} There is a third mechanism for removing
* entries: One can also actively delete entries. The [delete] operation
* takes a delete timestamp as argument. If this timestamp is in the past
* (e.g. 0), the deletion is performed immediately. Otherwise the entry
* is scheduled for deletion at the given time. The [e_delete_flag] says
* whether such a deletion will happen.
*
* In order to coordinate parallel accesses to the cache, all entries
* that are scheduled for deletion are locked for overwriting. Any
* [set] operation is refused by default. You can break this lock by
* setting the [opt_undelete] option. In this case, [set] not only overwrites
* but also cancels the scheduled deletion.
*
* The [delete] operation has effectively a higher precedence than the
* [set] operation unless you specify [opt_undelete]. This makes [delete]
* different from setting a new expiration timestamp in the entry by
* calling [set]. The higher precedence is useful if you have new knowledge
* about the validity period of entries, and you want to prevent that
* restricting the validity period interfers with the regular use of [set].
*
* Note that a scheduled deletion does not prevent the entry from expiring
* because of the expiration timestamp or because of exceeding the maximum
* size of the cache.
*)
(** {b Atomicity.} The operations [set] and [delete] are either fully
* conducted, or not conducted at all. For example, if you call [set]
* with [opt_setifchanged], there are only two outcomes:
*
* - The [set] operation is done: The [value] is overwritten, the
* new modification timestamp is set, the expiration timestamp is
* set to the passed value, and any delete flag is removed.
* - The [set] operation is not done: The entry is not modified at all.
*
* As [opt_setifchanged] causes that [set] is only performed for
* value changes, you get implicitly the effect of only setting
* the modification and expiration timestamps when the value is
* distinct from the previous one.
*)
(** {2 Interface} *)
type key =
[ `Hash of Digest.t (* Provide the hash value *)
| `String of string (* Use the hash value of this string as key *)
]
(** Keys of hash entries can be given in two ways:
* - [`Hash dg]: As digest of a string
* - [`String s]: As string (to be digested)
*)
type timestamp = int64
(** The time in seconds since the epoch *)
type entry = Cache_aux.entry =
{ mutable e_key : Digest.t; (** The key in hashed form *)
mutable e_creation : timestamp; (** Time of first addition to the cache *)
mutable e_modification : timestamp; (** Time of last modification in the cache *)
mutable e_expiration : timestamp; (** Time when the entry will expire; 0 = never *)
mutable e_delete_flag : bool; (** Whether this entry is scheduled for deletion *)
mutable e_value : string; (** The value of the entry, an arbitrary string *)
mutable e_value_hash : Digest.t; (** The MD5 sum of the value *)
mutable e_value_length : int; (** The length of the value *)
mutable e_counter : int; (** The number of [get] accesses to this entry *)
mutable e_bucket : int; (** The bucket number of this entry *)
}
(** Entries are the primary contents of the cache *)
type set_options = Cache_aux.set_options =
{
mutable opt_overwrite : bool; (** Allow overwriting existing entries *)
mutable opt_add : bool; (** Allow adding new entries *)
mutable opt_undelete : bool; (** Allow the modification of entries
* that are scheduled for deletion. Such entries are revived, and
* count no longer as deleted
*)
mutable opt_setifchanged : bool; (** Modifies [opt_overwrite]:
* Overwriting is only permitted if the new value is distinct from
* the old one
*)
}
(** Modifies the [set] operation *)
type get_options = Cache_aux.get_options =
{
mutable opt_novalue : bool; (** Do not return the [e_value]
* component of entries, but an empty string as replacement (for
* getting meta data only)
*)
mutable opt_getifmodifiedsince :
timestamp option; (** When [Some t]: Only return
* entries that have [e_modification >= t]. When [None]: No
* restriction.
*)
mutable opt_getifnotmd5 :
Digest.t option; (** When [Some dg]: Only return
* entries when [e_value_hash <> dg]. When [None]: No restriction.
*)
}
(** Modifies the [get] operation *)
type delete_options = Cache_aux.delete_options =
{
mutable opt_strictlock : bool; (** Forces that the entry is even
* scheduled for deletion if it does not exist. Unless
* [opt_undelete] is given, a [set] operation cannot be successful
* for the key while the entry is in the delete queue, i.e. the
* key is locked until the deletion is really conducted.
*)
mutable opt_delifolderthan :
timestamp option; (** When [Some t]: It is required
* that [e_modification < t] in order to schedule an entry for
* deletion. When [None]: no such requirement.
*)
mutable opt_delifmd5 :
Digest.t option; (** When [Some dg]: It is required
* that [e_value_hash = dg] in order to schedule an entry for
* deletion. When [None]: no such requirement.
*)
mutable opt_delifnotmd5 :
Digest.t option; (** When [Some dg]: It is required
* that [e_value_hash <> dg] in order to schedule an entry for
* deletion. When [None]: no such requirement.
*)
}
(** Modifies the [delete] operation *)
type config = Cache_aux.config =
{
mutable max_size : int64; (** The maximum size in bytes *)
mutable save_cache_period : int; (** Save the cache to disk every
* this number of seconds. 0 or negative number disables saving
* to disk entirely
*)
mutable save_cache_speed : int; (** How fast to save to disk in
* bytes per second. Setting this to a reasonable value increases
* the responsiveness of the cache server while a save operation
* is in progress. Setting to 0 means as fast as possible.
*)
}
(** Settings for the cache server *)
type stats = Cache_aux.stats =
{
mutable num_entries : int; (** The number of entries in the
* cache, including entries scheduled for deletion.
*)
mutable num_bytes : int64; (** The size of the cache in bytes,
* excluding overhead space
*)
mutable num_calls_get : int; (** The number of [get] calls *)
mutable num_calls_set : int; (** The number of [set] calls *)
mutable num_calls_delete : int; (** The number of [delete] calls *)
mutable num_hits : int; (** The number of [get] calls
* returning [`Found].
*)
mutable counters_reset_on :
timestamp; (** When the counters were last reset *)
}
(** Statistics of a cache server *)
type 'a async_reply =
((unit -> 'a) -> unit)
(** Result type of an asynchronous call. A typical example of such
* a callback function is
* {[ let process_result get_result =
* try
* let result = get_result() in (* May raise exception *)
* ...
* with error -> ...
* ]}
*)
exception Server_not_alive
(** Raised by RPC calls if the config returns [false] for the
* [is_alive] test.
*)
(** Configures the RPC client. *)
class type client_config =
object
method buckets : Rpc_client.connector array
(** For every bucket [i=0..n-1] the array element [buckets.(i)] contains the
* address of the server storing this bucket. It is allowed to use the
* same server for several buckets.
*
* This array must be non-empty. If you only use one bucket/one server
* just return a one-element array.
*)
method is_alive : int -> bool
(** Before a connection to an RPC server is used, this function is
called with the bucket index [i]. If it returns [true] the server
is assumed to be alive. Otherwise, the exception [Server_not_alive]
is raised immediately. This feature is useful for checking the
liveliness of servers proactively, e.g. by pinging them in regular
intervals.
If this feature is not needed, just return [true] for every [i]
Implementation restriction: works only for the RPC calls
[get], [set], [delete], but not for the configuration and statistics
calls.
*)
method query_timeout : float
(** After this number of seconds the queries time out and reply
* [`Timeout]. [(-1.0)] disabled this feature.
*)
method idle_timeout : float
(** After this number of seconds idle TCP connections are closed *)
end
(** Client access to cache by asynchronous RPC calls *)
class type async_client =
object
method client_config : client_config
(** The configuration of this client *)
method set : [`Stored|`Not_stored|`Timeout] async_reply ->
key ->
string ->
timestamp ->
set_options ->
unit
(** [set r k v t_exp opts]: Tries to set key [k] to value [v] with
* expiration timestamp [t_opt]. The options [opts] determine which
* kind of set operation is allowed. The result can be [`Stored]
* meaning a successful modification, [`Not_stored] meaning the
* refusal of a modification, or [`Timeout] meaning a timeout
* contacting the server. When the result is available, the
* function [r] is called back.
*)
method get : [`Found of entry|`Not_found|`Timeout] async_reply ->
key ->
get_options ->
unit
(** [get r k opts]: Tries to get the entry for key [k]. The options
* [opts] determine which entries can be retrieved in this way.
* The result can be [`Found e] with the entry [e], [`Not_found]
* if the key cannot be located or the options prevent the entry
* from being returned, or [`Timeout] if the server is not reachable.
* When the result is available, the
* function [r] is called back.
*)
method delete : [`Ok|`Timeout] async_reply ->
key ->
timestamp ->
delete_options ->
unit
(** [delete r k t_del opts]: Tries to move the entry for key [k] from
* the regular store to the delete queue where it is marked as being
* scheduled for deletion at time [t_del]. If [t_del] is in the past,
* the deletion is conducted immediately. If the entry does not
* qualify for deletion because of a failed requirement in [opts],
* it is silently not moved to the delete queue, without telling the
* caller. The result is [`Ok] meaning the server could be contacted,
* or [`Timeout] meaning a timeout happened.
* When the result is available, the
* function [r] is called back.
*)
method clear : [`Ok|`Timeout] async_reply ->
unit
(** [clear r]: Delete all entries. The result is [`Ok] meaning the
* deletion was successful, or [`Timeout] meaning a timeout happened.
* When the result is available, the
* function [r] is called back.
*)
method get_config : [ `Ok of ( Rpc_client.connector * config ) list
| `Timeout
] async_reply ->
unit
(** [get_config r]: Return the connector and the server config for
* all servers as list [l]. The connector may serve as identifier
* for the server. The result is [`Ok l] meaning the
* config of all servers could be retrieved, or [`Timeout] meaning a
* timeout happened for at least one server.
* When the result is available, the
* function [r] is called back.
*)
method set_config : [`Ok|`Timeout] async_reply ->
( Rpc_client.connector * config ) list ->
unit
(** [set_config r]: Set the server config for the servers identified
* by connectors in list [l]. The result is [`Ok l] meaning the
* config of all named servers could be set, or [`Timeout] meaning a
* timeout happened for at least one server.
* When the result is available, the
* function [r] is called back.
*)
method get_stats : [ `Ok of ( Rpc_client.connector * stats ) list
| `Timeout
] async_reply ->
unit
(** [get_stats r]: Return the connector and the statistics record for
* all servers as list [l]. The connector may serve as identifier
* for the server. The result is [`Ok l] meaning the
* config of all servers could be retrieved, or [`Timeout] meaning a
* timeout happened for at least one server.
* When the result is available, the
* function [r] is called back.
*)
method clear_counters : [`Ok|`Timeout] async_reply -> unit
(** [clear_counters r]: Reset the statistics counters for all servers.
* The result is [`Ok] meaning the counters could be reset on all
* servers, or [`Timeout] meaning a timeout happened for at least one
* server.
* When the result is available, the
* function [r] is called back.
*)
method shutdown : unit -> unit
(** Shutdown all network connections. If further RPC calls are requested,
* new network connections are opened.
*)
end
(** Client access to cache by synchronous RPC calls *)
class type sync_client =
object
method client_config : client_config
(** The configuration of this client *)
method set : key ->
string ->
int64 ->
set_options ->
[`Stored|`Not_stored|`Timeout]
(** [set r k v t_exp opts]: Tries to set key [k] to value [v] with
* expiration timestamp [t_opt]. The options [opts] determine which
* kind of set operation is allowed. The result can be [`Stored]
* meaning a successful modification, [`Not_stored] meaning the
* refusal of a modification, or [`Timeout] meaning a timeout
* contacting the server.
*)
method get : key ->
get_options ->
[`Found of entry|`Not_found|`Timeout]
(** [get r k opts]: Tries to get the entry for key [k]. The options
* [opts] determine which entries can be retrieved in this way.
* The result can be [`Found e] with the entry [e], [`Not_found]
* if the key cannot be located or the options prevent the entry
* from being returned, or [`Timeout] if the server is not reachable.
*)
method delete : key ->
int64 ->
delete_options ->
[`Ok|`Timeout]
(** [delete r k t_del opts]: Tries to move the entry for key [k] from
* the regular store to the delete queue where it is marked as being
* scheduled for deletion at time [t_del]. If [t_del] is in the past,
* the deletion is conducted immediately. If the entry does not
* qualify for deletion because of a failed requirement in [opts],
* it is silently not moved to the delete queue, without telling the
* caller. The result is [`Ok] meaning the server could be contacted,
* or [`Timeout] meaning a timeout happened.
*)
method clear : unit -> [`Ok|`Timeout]
(** [clear r]: Delete all entries. The result is [`Ok] meaning the
* deletion was successful, or [`Timeout] meaning a timeout happened.
*)
method get_config : unit ->
[ `Ok of ( Rpc_client.connector * config ) list
| `Timeout
]
(** [get_config r]: Return the connector and the server config for
* all servers as list [l]. The connector may serve as identifier
* for the server. The result is [`Ok l] meaning the
* config of all servers could be retrieved, or [`Timeout] meaning a
* timeout happened for at least one server.
*)
method set_config : ( Rpc_client.connector * config ) list ->
[`Ok|`Timeout]
(** [set_config r]: Set the server config for the servers identified
* by connectors in list [l]. The result is [`Ok l] meaning the
* config of all named servers could be set, or [`Timeout] meaning a
* timeout happened for at least one server.
*)
method get_stats : unit ->
[ `Ok of ( Rpc_client.connector * stats ) list
| `Timeout
]
(** [get_stats r]: Return the connector and the statistics record for
* all servers as list [l]. The connector may serve as identifier
* for the server. The result is [`Ok l] meaning the
* config of all servers could be retrieved, or [`Timeout] meaning a
* timeout happened for at least one server.
*)
method clear_counters : unit -> [`Ok|`Timeout]
(** [clear_counters r]: Reset the statistics counters for all servers.
* The result is [`Ok] meaning the counters could be reset on all
* servers, or [`Timeout] meaning a timeout happened for at least one
* server.
*)
method shutdown : unit -> unit
(** Shutdown all network connections. If further RPC calls are requested,
* new network connections are opened.
*)
end
val create_async_client : client_config -> Unixqueue.event_system -> async_client
(** Create an asynchronous client for the passed config and the passed
* event system. Note that you must [Unixqueue.run] the event system in
* order to conduct the requested operations.
*)
val create_sync_client : client_config -> sync_client
(** Create a synchronous client for the passed config *)