Plasma GitLab Archive
Projects Blog Knowledge

(* $Id$ *)

(** Client to access cache servers *)

(** {2 Theory of operation} *)

(** {b Buckets.} The client can be used to access one or several cache servers.
  * The whole cache store is divided up in so-called buckets. The user
  * is free to
  *
  * - use only a single bucket on a single server
  * - put every bucket on a different server
  * - even have multiple buckets per server
  *
  * This scheme is primarily intended for distributing the whole
  * store over multiple servers. Having multiple buckets per server
  * gives some freedom for reorganizing the distribution scheme (e.g.
  * add servers later). The library determines which 
  * key lives on which server by computing the [e_bucket] component
  * of the entries. Functions for doing this can be found in
  * {!Cache_util}:
  *
  * {[ 
  *    let e_key = Cache_util.hash_of_key k in
  *    let e_bucket = Cache_util.bucket_of_hash n e_key
  * ]}
  *
  * The number [n] is the number of buckets.
 *)

(** {b Expiration.} Entries can have a timestamp that says when the entries
  * are removed from the cache. It is ensured that the [get] operation never
  * returns an expired entry when both client and server have the same clock.
  * Independent of expiration by time, the cache may also expire entries
  * because the cache becomes too large. The cache implements a LRU semantics:
  * The least recently used entries are removed first. As "use" any [get]
  * or [set] operation qualifies.
 *)

(** {b Deletion.} There is a third mechanism for removing
 * entries: One can also actively delete entries. The [delete] operation
 * takes a delete timestamp as argument. If this timestamp is in the past
 * (e.g. 0), the deletion is performed immediately. Otherwise the entry
 * is scheduled for deletion at the given time. The [e_delete_flag] says
 * whether such a deletion will happen.
 *
 * In order to coordinate parallel accesses to the cache, all entries
 * that are scheduled for deletion are locked for overwriting. Any
 * [set] operation is refused by default. You can break this lock by
 * setting the [opt_undelete] option. In this case, [set] not only overwrites
 * but also cancels the scheduled deletion. 
 * 
 * The [delete] operation has effectively a higher precedence than the
 * [set] operation unless you specify [opt_undelete]. This makes [delete]
 * different from setting a new expiration timestamp in the entry by
 * calling [set]. The higher precedence is useful if you have new knowledge
 * about the validity period of entries, and you want to prevent that
 * restricting the validity period interfers with the regular use of [set].
 *
 * Note that a scheduled deletion does not prevent the entry from expiring
 * because of the expiration timestamp or because of exceeding the maximum 
 * size of the cache.
 *)

(** {b Atomicity.} The operations [set] and [delete] are either fully
 * conducted, or not conducted at all. For example, if you call [set]
 * with [opt_setifchanged], there are only two outcomes:
 *
 * - The [set] operation is done: The [value] is overwritten, the
 *   new modification timestamp is set, the expiration timestamp is
 *   set to the passed value, and any delete flag is removed.
 * - The [set] operation is not done: The entry is not modified at all.
 *
 * As [opt_setifchanged] causes that [set] is only performed for
 * value changes, you get implicitly the effect of only setting
 * the modification and expiration timestamps when the value is
 * distinct from the previous one.
 *)

(** {2 Interface} *)

type key =
    [ `Hash of Digest.t   (* Provide the hash value *)
    | `String of string   (* Use the hash value of this string as key *)
    ]
    (** Keys of hash entries can be given in two ways:
      * - [`Hash dg]: As digest of a string
      * - [`String s]: As string (to be digested)
     *)

type timestamp = int64
    (** The time in seconds since the epoch *)

type entry = Cache_aux.entry =
    { mutable e_key : Digest.t;            (** The key in hashed form *)
      mutable e_creation : timestamp;      (** Time of first addition to the cache *)
      mutable e_modification : timestamp;  (** Time of last modification in the cache *)
      mutable e_expiration : timestamp;    (** Time when the entry will expire; 0 = never *)
      mutable e_delete_flag : bool;        (** Whether this entry is scheduled for deletion *)
      mutable e_value : string;            (** The value of the entry, an arbitrary string *)
      mutable e_value_hash : Digest.t;     (** The MD5 sum of the value *)
      mutable e_value_length : int;        (** The length of the value *)
      mutable e_counter : int;             (** The number of [get] accesses to this entry *)
      mutable e_bucket : int;              (** The bucket number of this entry *)
    }
    (** Entries are the primary contents of the cache *)


type set_options = Cache_aux.set_options =
    {
      mutable opt_overwrite : bool;     (** Allow overwriting existing entries *)
      mutable opt_add : bool;           (** Allow adding new entries *)
      mutable opt_undelete : bool;      (** Allow the modification of entries 
          * that are scheduled for deletion. Such entries are revived, and
          * count no longer as deleted 
	  *)
      mutable opt_setifchanged : bool;  (** Modifies [opt_overwrite]: 
          * Overwriting is only permitted if the new value is distinct from
          * the old one 
   	  *)
    }
    (** Modifies the [set] operation *)


type get_options = Cache_aux.get_options =
    {
      mutable opt_novalue : bool;       (** Do not return the [e_value] 
          * component of entries, but an empty string as replacement (for
          * getting meta data only) 
 	  *)
      mutable opt_getifmodifiedsince : 
                 timestamp option;      (** When [Some t]: Only return
          * entries that have [e_modification >= t]. When [None]: No 
          * restriction.
          *)
      mutable opt_getifnotmd5 : 
                  Digest.t option;      (** When [Some dg]: Only return
          * entries when [e_value_hash <> dg]. When [None]: No restriction.
 	  *)
    }
    (** Modifies the [get] operation *)

type delete_options = Cache_aux.delete_options =
    {
      mutable opt_strictlock : bool;    (** Forces that the entry is even
          * scheduled for deletion if it does not exist. Unless 
          * [opt_undelete] is given, a [set] operation cannot be successful
          * for the key while the entry is in the delete queue, i.e. the
          * key is locked until the deletion is really conducted.
	  *)
      mutable opt_delifolderthan : 
                  timestamp option;     (** When [Some t]: It is required
          * that [e_modification < t] in order to schedule an entry for
          * deletion. When [None]: no such requirement.
 	  *)
      mutable opt_delifmd5 : 
                  Digest.t option;      (** When [Some dg]: It is required
          * that [e_value_hash = dg] in order to schedule an entry for
          * deletion. When [None]: no such requirement.
 	  *)
      mutable opt_delifnotmd5 : 
                  Digest.t option;      (** When [Some dg]: It is required
          * that [e_value_hash <> dg] in order to schedule an entry for
          * deletion. When [None]: no such requirement.
 	  *)
    }
    (** Modifies the [delete] operation *)

type config = Cache_aux.config =
    {
      mutable max_size : int64;         (** The maximum size in bytes *)
      mutable save_cache_period : int;  (** Save the cache to disk every 
          * this number of seconds. 0 or negative number disables saving
          * to disk entirely
          *)
      mutable save_cache_speed : int;   (** How fast to save to disk in
          * bytes per second. Setting this to a reasonable value increases
          * the responsiveness of the cache server while a save operation
          * is in progress. Setting to 0 means as fast as possible.
	  *)
    }
    (** Settings for the cache server *)

type stats = Cache_aux.stats =
    {
       mutable num_entries : int;      (** The number of entries in the 
           * cache, including entries scheduled for deletion.
	   *)
       mutable num_bytes : int64;      (** The size of the cache in bytes,
           * excluding overhead space
  	   *)
       mutable num_calls_get : int;    (** The number of [get] calls *)
       mutable num_calls_set : int;    (** The number of [set] calls *)
       mutable num_calls_delete : int; (** The number of [delete] calls *)
       mutable num_hits : int;         (** The number of [get] calls 
           * returning [`Found].
	   *)
       mutable counters_reset_on : 
                  timestamp;           (** When the counters were last reset *)
    }
    (** Statistics of a cache server *)
 


type 'a async_reply =
    ((unit -> 'a) -> unit)
      (** Result type of an asynchronous call. A typical example of such
        * a callback function is
        * {[ let process_result get_result =
        *      try 
        *        let result = get_result() in  (* May raise exception *)
        *        ...
        *      with error -> ...
        * ]}
       *)

exception Server_not_alive
  (** Raised by RPC calls if the config returns [false] for the 
    * [is_alive] test.
   *)


(** Configures the RPC client. *)
class type client_config =
object
  method buckets : Rpc_client.connector array
    (** For every bucket [i=0..n-1] the array element [buckets.(i)] contains the
      * address of the server storing this bucket. It is allowed to use the
      * same server for several buckets.
      *
      * This array must be non-empty. If you only use one bucket/one server
      * just return a one-element array.
     *)

  method is_alive : int -> bool
    (** Before a connection to an RPC server is used, this function is
        called with the bucket index [i]. If it returns [true] the server
        is assumed to be alive. Otherwise, the exception [Server_not_alive]
        is raised immediately. This feature is useful for checking the
        liveliness of servers proactively, e.g. by pinging them in regular
        intervals.

        If this feature is not needed, just return [true] for every [i]

        Implementation restriction: works only for the RPC calls
        [get], [set], [delete], but not for the configuration and statistics
        calls.
     *)

  method query_timeout : float
    (** After this number of seconds the queries time out and reply
      * [`Timeout]. [(-1.0)] disabled this feature.
     *)

  method idle_timeout : float
    (** After this number of seconds idle TCP connections are closed *)
end


(** Client access to cache by asynchronous RPC calls *)
class type async_client =
object
  method client_config : client_config
    (** The configuration of this client *)

  method set : [`Stored|`Not_stored|`Timeout] async_reply ->
               key ->
               string ->
               timestamp ->
               set_options ->
		 unit
    (** [set r k v t_exp opts]: Tries to set key [k] to value [v] with
      * expiration timestamp [t_opt]. The options [opts] determine which
      * kind of set operation is allowed. The result can be [`Stored]
      * meaning a successful modification, [`Not_stored] meaning the
      * refusal of a modification, or [`Timeout] meaning a timeout
      * contacting the server. When the result is available, the
      * function [r] is called back.
     *)

  method get : [`Found of entry|`Not_found|`Timeout] async_reply ->
               key ->
               get_options ->
                 unit
    (** [get r k opts]: Tries to get the entry for key [k]. The options
      * [opts] determine which entries can be retrieved in this way.
      * The result can be [`Found e] with the entry [e], [`Not_found]
      * if the key cannot be located or the options prevent the entry
      * from being returned, or [`Timeout] if the server is not reachable.
      * When the result is available, the
      * function [r] is called back.
     *)

  method delete : [`Ok|`Timeout] async_reply ->
                  key ->
                  timestamp ->
                  delete_options ->
                    unit
    (** [delete r k t_del opts]: Tries to move the entry for key [k] from
      * the regular store to the delete queue where it is marked as being
      * scheduled for deletion at time [t_del]. If [t_del] is in the past,
      * the deletion is conducted immediately. If the entry does not
      * qualify for deletion because of a failed requirement in [opts],
      * it is silently not moved to the delete queue, without telling the
      * caller. The result is [`Ok] meaning the server could be contacted,
      * or [`Timeout] meaning a timeout happened.
      * When the result is available, the
      * function [r] is called back.
     *)

  method clear : [`Ok|`Timeout] async_reply ->
                   unit
    (** [clear r]: Delete all entries.  The result is [`Ok] meaning the
      * deletion was successful, or [`Timeout] meaning a timeout happened.
      * When the result is available, the
      * function [r] is called back.
     *)

  method get_config : [ `Ok of ( Rpc_client.connector * config ) list
                      | `Timeout
		      ] async_reply ->
                        unit
    (** [get_config r]: Return the connector and the server config for
      * all servers as list [l]. The connector may serve as identifier
      * for the server. The result is [`Ok l] meaning the
      * config of all servers could be retrieved, or [`Timeout] meaning a
      * timeout happened for at least one server.
      * When the result is available, the
      * function [r] is called back.
     *)

  method set_config : [`Ok|`Timeout] async_reply ->
                      ( Rpc_client.connector * config ) list ->
                        unit
    (** [set_config r]: Set the server config for the servers identified
      * by connectors in list [l]. The result is [`Ok l] meaning the
      * config of all named servers could be set, or [`Timeout] meaning a
      * timeout happened for at least one server.
      * When the result is available, the
      * function [r] is called back.
     *)

  method get_stats : [ `Ok of ( Rpc_client.connector * stats ) list
                     | `Timeout
		     ] async_reply ->
                       unit
    (** [get_stats r]: Return the connector and the statistics record for
      * all servers as list [l]. The connector may serve as identifier
      * for the server. The result is [`Ok l] meaning the
      * config of all servers could be retrieved, or [`Timeout] meaning a
      * timeout happened for at least one server.
      * When the result is available, the
      * function [r] is called back.
     *)

  method clear_counters : [`Ok|`Timeout] async_reply -> unit
    (** [clear_counters r]: Reset the statistics counters for all servers.
      * The result is [`Ok] meaning the counters could be reset on all
      * servers, or [`Timeout] meaning a timeout happened for at least one
      * server.
      * When the result is available, the
      * function [r] is called back.
     *)

  method shutdown : unit -> unit
    (** Shutdown all network connections. If further RPC calls are requested,
      * new network connections are opened.
     *)
end


(** Client access to cache by synchronous RPC calls *)
class type sync_client =
object
  method client_config : client_config
    (** The configuration of this client *)

  method set : key ->
               string ->
               int64 ->
               set_options ->
		 [`Stored|`Not_stored|`Timeout]
    (** [set r k v t_exp opts]: Tries to set key [k] to value [v] with
      * expiration timestamp [t_opt]. The options [opts] determine which
      * kind of set operation is allowed. The result can be [`Stored]
      * meaning a successful modification, [`Not_stored] meaning the
      * refusal of a modification, or [`Timeout] meaning a timeout
      * contacting the server. 
     *)

  method get : key ->
               get_options ->
                 [`Found of entry|`Not_found|`Timeout]
    (** [get r k opts]: Tries to get the entry for key [k]. The options
      * [opts] determine which entries can be retrieved in this way.
      * The result can be [`Found e] with the entry [e], [`Not_found]
      * if the key cannot be located or the options prevent the entry
      * from being returned, or [`Timeout] if the server is not reachable.
     *)

  method delete : key ->
                  int64 ->
                  delete_options ->
                    [`Ok|`Timeout]
    (** [delete r k t_del opts]: Tries to move the entry for key [k] from
      * the regular store to the delete queue where it is marked as being
      * scheduled for deletion at time [t_del]. If [t_del] is in the past,
      * the deletion is conducted immediately. If the entry does not
      * qualify for deletion because of a failed requirement in [opts],
      * it is silently not moved to the delete queue, without telling the
      * caller. The result is [`Ok] meaning the server could be contacted,
      * or [`Timeout] meaning a timeout happened.
     *)

  method clear : unit -> [`Ok|`Timeout]
    (** [clear r]: Delete all entries.  The result is [`Ok] meaning the
      * deletion was successful, or [`Timeout] meaning a timeout happened.
     *)

  method get_config : unit -> 
                      [ `Ok of ( Rpc_client.connector * config ) list
		      | `Timeout
		      ]
    (** [get_config r]: Return the connector and the server config for
      * all servers as list [l]. The connector may serve as identifier
      * for the server. The result is [`Ok l] meaning the
      * config of all servers could be retrieved, or [`Timeout] meaning a
      * timeout happened for at least one server.
     *)

  method set_config : ( Rpc_client.connector * config ) list ->
                        [`Ok|`Timeout]
    (** [set_config r]: Set the server config for the servers identified
      * by connectors in list [l]. The result is [`Ok l] meaning the
      * config of all named servers could be set, or [`Timeout] meaning a
      * timeout happened for at least one server.
     *)

  method get_stats : unit -> 
                     [ `Ok of ( Rpc_client.connector * stats ) list
		     | `Timeout
		     ]
    (** [get_stats r]: Return the connector and the statistics record for
      * all servers as list [l]. The connector may serve as identifier
      * for the server. The result is [`Ok l] meaning the
      * config of all servers could be retrieved, or [`Timeout] meaning a
      * timeout happened for at least one server.
     *)

  method clear_counters : unit -> [`Ok|`Timeout]
    (** [clear_counters r]: Reset the statistics counters for all servers.
      * The result is [`Ok] meaning the counters could be reset on all
      * servers, or [`Timeout] meaning a timeout happened for at least one
      * server.
     *)

  method shutdown : unit -> unit
    (** Shutdown all network connections. If further RPC calls are requested,
      * new network connections are opened.
     *)
end


val create_async_client : client_config -> Unixqueue.event_system -> async_client
  (** Create an asynchronous client for the passed config and the passed
    * event system. Note that you must [Unixqueue.run] the event system in
    * order to conduct the requested operations.
   *)

val create_sync_client : client_config -> sync_client
  (** Create a synchronous client for the passed config *)

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml