(* $Id$ *) (** Client to access cache servers *) (** {2 Theory of operation} *) (** {b Buckets.} The client can be used to access one or several cache servers. * The whole cache store is divided up in so-called buckets. The user * is free to * * - use only a single bucket on a single server * - put every bucket on a different server * - even have multiple buckets per server * * This scheme is primarily intended for distributing the whole * store over multiple servers. Having multiple buckets per server * gives some freedom for reorganizing the distribution scheme (e.g. * add servers later). The library determines which * key lives on which server by computing the [e_bucket] component * of the entries. Functions for doing this can be found in * {!Cache_util}: * * {[ * let e_key = Cache_util.hash_of_key k in * let e_bucket = Cache_util.bucket_of_hash n e_key * ]} * * The number [n] is the number of buckets. *) (** {b Expiration.} Entries can have a timestamp that says when the entries * are removed from the cache. It is ensured that the [get] operation never * returns an expired entry when both client and server have the same clock. * Independent of expiration by time, the cache may also expire entries * because the cache becomes too large. The cache implements a LRU semantics: * The least recently used entries are removed first. As "use" any [get] * or [set] operation qualifies. *) (** {b Deletion.} There is a third mechanism for removing * entries: One can also actively delete entries. The [delete] operation * takes a delete timestamp as argument. If this timestamp is in the past * (e.g. 0), the deletion is performed immediately. Otherwise the entry * is scheduled for deletion at the given time. The [e_delete_flag] says * whether such a deletion will happen. * * In order to coordinate parallel accesses to the cache, all entries * that are scheduled for deletion are locked for overwriting. Any * [set] operation is refused by default. You can break this lock by * setting the [opt_undelete] option. In this case, [set] not only overwrites * but also cancels the scheduled deletion. * * The [delete] operation has effectively a higher precedence than the * [set] operation unless you specify [opt_undelete]. This makes [delete] * different from setting a new expiration timestamp in the entry by * calling [set]. The higher precedence is useful if you have new knowledge * about the validity period of entries, and you want to prevent that * restricting the validity period interfers with the regular use of [set]. * * Note that a scheduled deletion does not prevent the entry from expiring * because of the expiration timestamp or because of exceeding the maximum * size of the cache. *) (** {b Atomicity.} The operations [set] and [delete] are either fully * conducted, or not conducted at all. For example, if you call [set] * with [opt_setifchanged], there are only two outcomes: * * - The [set] operation is done: The [value] is overwritten, the * new modification timestamp is set, the expiration timestamp is * set to the passed value, and any delete flag is removed. * - The [set] operation is not done: The entry is not modified at all. * * As [opt_setifchanged] causes that [set] is only performed for * value changes, you get implicitly the effect of only setting * the modification and expiration timestamps when the value is * distinct from the previous one. *) (** {2 Interface} *) type key = [ `Hash of Digest.t (* Provide the hash value *) | `String of string (* Use the hash value of this string as key *) ] (** Keys of hash entries can be given in two ways: * - [`Hash dg]: As digest of a string * - [`String s]: As string (to be digested) *) type timestamp = int64 (** The time in seconds since the epoch *) type entry = Cache_aux.entry = { mutable e_key : Digest.t; (** The key in hashed form *) mutable e_creation : timestamp; (** Time of first addition to the cache *) mutable e_modification : timestamp; (** Time of last modification in the cache *) mutable e_expiration : timestamp; (** Time when the entry will expire; 0 = never *) mutable e_delete_flag : bool; (** Whether this entry is scheduled for deletion *) mutable e_value : string; (** The value of the entry, an arbitrary string *) mutable e_value_hash : Digest.t; (** The MD5 sum of the value *) mutable e_value_length : int; (** The length of the value *) mutable e_counter : int; (** The number of [get] accesses to this entry *) mutable e_bucket : int; (** The bucket number of this entry *) } (** Entries are the primary contents of the cache *) type set_options = Cache_aux.set_options = { mutable opt_overwrite : bool; (** Allow overwriting existing entries *) mutable opt_add : bool; (** Allow adding new entries *) mutable opt_undelete : bool; (** Allow the modification of entries * that are scheduled for deletion. Such entries are revived, and * count no longer as deleted *) mutable opt_setifchanged : bool; (** Modifies [opt_overwrite]: * Overwriting is only permitted if the new value is distinct from * the old one *) } (** Modifies the [set] operation *) type get_options = Cache_aux.get_options = { mutable opt_novalue : bool; (** Do not return the [e_value] * component of entries, but an empty string as replacement (for * getting meta data only) *) mutable opt_getifmodifiedsince : timestamp option; (** When [Some t]: Only return * entries that have [e_modification >= t]. When [None]: No * restriction. *) mutable opt_getifnotmd5 : Digest.t option; (** When [Some dg]: Only return * entries when [e_value_hash <> dg]. When [None]: No restriction. *) } (** Modifies the [get] operation *) type delete_options = Cache_aux.delete_options = { mutable opt_strictlock : bool; (** Forces that the entry is even * scheduled for deletion if it does not exist. Unless * [opt_undelete] is given, a [set] operation cannot be successful * for the key while the entry is in the delete queue, i.e. the * key is locked until the deletion is really conducted. *) mutable opt_delifolderthan : timestamp option; (** When [Some t]: It is required * that [e_modification < t] in order to schedule an entry for * deletion. When [None]: no such requirement. *) mutable opt_delifmd5 : Digest.t option; (** When [Some dg]: It is required * that [e_value_hash = dg] in order to schedule an entry for * deletion. When [None]: no such requirement. *) mutable opt_delifnotmd5 : Digest.t option; (** When [Some dg]: It is required * that [e_value_hash <> dg] in order to schedule an entry for * deletion. When [None]: no such requirement. *) } (** Modifies the [delete] operation *) type config = Cache_aux.config = { mutable max_size : int64; (** The maximum size in bytes *) mutable save_cache_period : int; (** Save the cache to disk every * this number of seconds. 0 or negative number disables saving * to disk entirely *) mutable save_cache_speed : int; (** How fast to save to disk in * bytes per second. Setting this to a reasonable value increases * the responsiveness of the cache server while a save operation * is in progress. Setting to 0 means as fast as possible. *) } (** Settings for the cache server *) type stats = Cache_aux.stats = { mutable num_entries : int; (** The number of entries in the * cache, including entries scheduled for deletion. *) mutable num_bytes : int64; (** The size of the cache in bytes, * excluding overhead space *) mutable num_calls_get : int; (** The number of [get] calls *) mutable num_calls_set : int; (** The number of [set] calls *) mutable num_calls_delete : int; (** The number of [delete] calls *) mutable num_hits : int; (** The number of [get] calls * returning [`Found]. *) mutable counters_reset_on : timestamp; (** When the counters were last reset *) } (** Statistics of a cache server *) type 'a async_reply = ((unit -> 'a) -> unit) (** Result type of an asynchronous call. A typical example of such * a callback function is * {[ let process_result get_result = * try * let result = get_result() in (* May raise exception *) * ... * with error -> ... * ]} *) exception Server_not_alive (** Raised by RPC calls if the config returns [false] for the * [is_alive] test. *) (** Configures the RPC client. *) class type client_config = object method buckets : Rpc_client.connector array (** For every bucket [i=0..n-1] the array element [buckets.(i)] contains the * address of the server storing this bucket. It is allowed to use the * same server for several buckets. * * This array must be non-empty. If you only use one bucket/one server * just return a one-element array. *) method is_alive : int -> bool (** Before a connection to an RPC server is used, this function is called with the bucket index [i]. If it returns [true] the server is assumed to be alive. Otherwise, the exception [Server_not_alive] is raised immediately. This feature is useful for checking the liveliness of servers proactively, e.g. by pinging them in regular intervals. If this feature is not needed, just return [true] for every [i] Implementation restriction: works only for the RPC calls [get], [set], [delete], but not for the configuration and statistics calls. *) method query_timeout : float (** After this number of seconds the queries time out and reply * [`Timeout]. [(-1.0)] disabled this feature. *) method idle_timeout : float (** After this number of seconds idle TCP connections are closed *) end (** Client access to cache by asynchronous RPC calls *) class type async_client = object method client_config : client_config (** The configuration of this client *) method set : [`Stored|`Not_stored|`Timeout] async_reply -> key -> string -> timestamp -> set_options -> unit (** [set r k v t_exp opts]: Tries to set key [k] to value [v] with * expiration timestamp [t_opt]. The options [opts] determine which * kind of set operation is allowed. The result can be [`Stored] * meaning a successful modification, [`Not_stored] meaning the * refusal of a modification, or [`Timeout] meaning a timeout * contacting the server. When the result is available, the * function [r] is called back. *) method get : [`Found of entry|`Not_found|`Timeout] async_reply -> key -> get_options -> unit (** [get r k opts]: Tries to get the entry for key [k]. The options * [opts] determine which entries can be retrieved in this way. * The result can be [`Found e] with the entry [e], [`Not_found] * if the key cannot be located or the options prevent the entry * from being returned, or [`Timeout] if the server is not reachable. * When the result is available, the * function [r] is called back. *) method delete : [`Ok|`Timeout] async_reply -> key -> timestamp -> delete_options -> unit (** [delete r k t_del opts]: Tries to move the entry for key [k] from * the regular store to the delete queue where it is marked as being * scheduled for deletion at time [t_del]. If [t_del] is in the past, * the deletion is conducted immediately. If the entry does not * qualify for deletion because of a failed requirement in [opts], * it is silently not moved to the delete queue, without telling the * caller. The result is [`Ok] meaning the server could be contacted, * or [`Timeout] meaning a timeout happened. * When the result is available, the * function [r] is called back. *) method clear : [`Ok|`Timeout] async_reply -> unit (** [clear r]: Delete all entries. The result is [`Ok] meaning the * deletion was successful, or [`Timeout] meaning a timeout happened. * When the result is available, the * function [r] is called back. *) method get_config : [ `Ok of ( Rpc_client.connector * config ) list | `Timeout ] async_reply -> unit (** [get_config r]: Return the connector and the server config for * all servers as list [l]. The connector may serve as identifier * for the server. The result is [`Ok l] meaning the * config of all servers could be retrieved, or [`Timeout] meaning a * timeout happened for at least one server. * When the result is available, the * function [r] is called back. *) method set_config : [`Ok|`Timeout] async_reply -> ( Rpc_client.connector * config ) list -> unit (** [set_config r]: Set the server config for the servers identified * by connectors in list [l]. The result is [`Ok l] meaning the * config of all named servers could be set, or [`Timeout] meaning a * timeout happened for at least one server. * When the result is available, the * function [r] is called back. *) method get_stats : [ `Ok of ( Rpc_client.connector * stats ) list | `Timeout ] async_reply -> unit (** [get_stats r]: Return the connector and the statistics record for * all servers as list [l]. The connector may serve as identifier * for the server. The result is [`Ok l] meaning the * config of all servers could be retrieved, or [`Timeout] meaning a * timeout happened for at least one server. * When the result is available, the * function [r] is called back. *) method clear_counters : [`Ok|`Timeout] async_reply -> unit (** [clear_counters r]: Reset the statistics counters for all servers. * The result is [`Ok] meaning the counters could be reset on all * servers, or [`Timeout] meaning a timeout happened for at least one * server. * When the result is available, the * function [r] is called back. *) method shutdown : unit -> unit (** Shutdown all network connections. If further RPC calls are requested, * new network connections are opened. *) end (** Client access to cache by synchronous RPC calls *) class type sync_client = object method client_config : client_config (** The configuration of this client *) method set : key -> string -> int64 -> set_options -> [`Stored|`Not_stored|`Timeout] (** [set r k v t_exp opts]: Tries to set key [k] to value [v] with * expiration timestamp [t_opt]. The options [opts] determine which * kind of set operation is allowed. The result can be [`Stored] * meaning a successful modification, [`Not_stored] meaning the * refusal of a modification, or [`Timeout] meaning a timeout * contacting the server. *) method get : key -> get_options -> [`Found of entry|`Not_found|`Timeout] (** [get r k opts]: Tries to get the entry for key [k]. The options * [opts] determine which entries can be retrieved in this way. * The result can be [`Found e] with the entry [e], [`Not_found] * if the key cannot be located or the options prevent the entry * from being returned, or [`Timeout] if the server is not reachable. *) method delete : key -> int64 -> delete_options -> [`Ok|`Timeout] (** [delete r k t_del opts]: Tries to move the entry for key [k] from * the regular store to the delete queue where it is marked as being * scheduled for deletion at time [t_del]. If [t_del] is in the past, * the deletion is conducted immediately. If the entry does not * qualify for deletion because of a failed requirement in [opts], * it is silently not moved to the delete queue, without telling the * caller. The result is [`Ok] meaning the server could be contacted, * or [`Timeout] meaning a timeout happened. *) method clear : unit -> [`Ok|`Timeout] (** [clear r]: Delete all entries. The result is [`Ok] meaning the * deletion was successful, or [`Timeout] meaning a timeout happened. *) method get_config : unit -> [ `Ok of ( Rpc_client.connector * config ) list | `Timeout ] (** [get_config r]: Return the connector and the server config for * all servers as list [l]. The connector may serve as identifier * for the server. The result is [`Ok l] meaning the * config of all servers could be retrieved, or [`Timeout] meaning a * timeout happened for at least one server. *) method set_config : ( Rpc_client.connector * config ) list -> [`Ok|`Timeout] (** [set_config r]: Set the server config for the servers identified * by connectors in list [l]. The result is [`Ok l] meaning the * config of all named servers could be set, or [`Timeout] meaning a * timeout happened for at least one server. *) method get_stats : unit -> [ `Ok of ( Rpc_client.connector * stats ) list | `Timeout ] (** [get_stats r]: Return the connector and the statistics record for * all servers as list [l]. The connector may serve as identifier * for the server. The result is [`Ok l] meaning the * config of all servers could be retrieved, or [`Timeout] meaning a * timeout happened for at least one server. *) method clear_counters : unit -> [`Ok|`Timeout] (** [clear_counters r]: Reset the statistics counters for all servers. * The result is [`Ok] meaning the counters could be reset on all * servers, or [`Timeout] meaning a timeout happened for at least one * server. *) method shutdown : unit -> unit (** Shutdown all network connections. If further RPC calls are requested, * new network connections are opened. *) end val create_async_client : client_config -> Unixqueue.event_system -> async_client (** Create an asynchronous client for the passed config and the passed * event system. Note that you must [Unixqueue.run] the event system in * order to conduct the requested operations. *) val create_sync_client : client_config -> sync_client (** Create a synchronous client for the passed config *)