Plasma GitLab Archive
Projects Blog Knowledge

(* $Id$ *)

open Cache_util

type key = Cache_util.key

type timestamp = int64

type entry = Cache_aux.entry =
    { mutable e_key : Digest.t;
      mutable e_creation : timestamp;
      mutable e_modification : timestamp;
      mutable e_expiration : timestamp;
      mutable e_delete_flag : bool;
      mutable e_value : string;
      mutable e_value_hash : Digest.t;
      mutable e_value_length : int;
      mutable e_counter : int;
      mutable e_bucket : int;
    }

type set_options = Cache_aux.set_options =
    {
        mutable opt_overwrite : bool;
        mutable opt_add : bool;
        mutable opt_undelete : bool;
        mutable opt_setifchanged : bool;
      }

type get_options = Cache_aux.get_options =
     {
        mutable opt_novalue : bool;
        mutable opt_getifmodifiedsince : timestamp option;
        mutable opt_getifnotmd5 : Digest.t option;
      }

type delete_options = Cache_aux.delete_options =
    {
        mutable opt_strictlock : bool;
        mutable opt_delifolderthan : timestamp option;
        mutable opt_delifmd5 : Digest.t option;
        mutable opt_delifnotmd5 : Digest.t option;
      }

type config = Cache_aux.config =
    {
        mutable max_size : int64;
        mutable save_cache_period : int;
        mutable save_cache_speed : int;
      }

type stats = Cache_aux.stats =
     {
        mutable num_entries : int;
        mutable num_bytes : int64;
        mutable num_calls_get : int;
        mutable num_calls_set : int;
        mutable num_calls_delete : int;
        mutable num_hits : int;
        mutable counters_reset_on : timestamp;
      }


type 'a async_reply =
    ((unit -> 'a) -> unit)

exception Server_not_alive


class type client_config =
object
  method buckets : Rpc_client.connector array
  method is_alive : int -> bool
  method query_timeout : float
  method idle_timeout : float
end


class type async_client =
object
  method client_config : client_config

  method set : [`Stored|`Not_stored|`Timeout] async_reply ->
               key ->
               string ->
               timestamp ->
               set_options ->
		 unit

  method get : [`Found of entry|`Not_found|`Timeout] async_reply ->
               key ->
               get_options ->
                 unit

  method delete : [`Ok|`Timeout] async_reply ->
                  key ->
                  timestamp ->
                  delete_options ->
                    unit

  method clear : [`Ok|`Timeout] async_reply ->
                   unit

  method get_config : [ `Ok of ( Rpc_client.connector * config ) list
                      | `Timeout
		      ] async_reply ->
                        unit

  method set_config : [`Ok|`Timeout] async_reply ->
                      ( Rpc_client.connector * config ) list ->
                        unit

  method get_stats : [ `Ok of ( Rpc_client.connector * stats ) list
                     | `Timeout
		     ] async_reply ->
                       unit

  method clear_counters : [`Ok|`Timeout] async_reply -> unit

  method shutdown : unit -> unit

end


class type sync_client =
object
  method client_config : client_config

  method set : key ->
               string ->
               int64 ->
               set_options ->
		 [`Stored|`Not_stored|`Timeout]

  method get : key ->
               get_options ->
                 [`Found of entry|`Not_found|`Timeout]

  method delete : key ->
                  int64 ->
                  delete_options ->
                    [`Ok|`Timeout]

  method clear : unit -> [`Ok|`Timeout]

  method get_config : unit -> 
                      [ `Ok of ( Rpc_client.connector * config ) list
		      | `Timeout
		      ]

  method set_config : ( Rpc_client.connector * config ) list ->
                        [`Ok|`Timeout]

  method get_stats : unit -> 
                     [ `Ok of ( Rpc_client.connector * stats ) list
		     | `Timeout
		     ]

  method clear_counters : unit -> [`Ok|`Timeout]

  method shutdown : unit -> unit
end


let transform_reply pass_reply_to_user transform f_anyway get_reply_from_rpc =
  f_anyway();
  let ok = ref true in
  try
    let rpc_reply = get_reply_from_rpc() in
    try
      pass_reply_to_user (fun () -> transform rpc_reply)
    with
      | e' -> ok := false; raise e'
  with
    | Rpc_client.Message_timeout when !ok ->
	pass_reply_to_user (fun () -> `Timeout)
    | e when !ok ->   (* ok = exception comes from get_reply_from_rpc *)
	pass_reply_to_user (fun () -> raise e)
;;


let handle_rpc_error client pass_reply get_reply_from_rpc =
  pass_reply
    (fun () ->
       try
	 get_reply_from_rpc()
       with
	 | e ->
	     Rpc_client.shut_down client;
	     raise e
    )


class async_client_impl config esys : async_client =
object(self)
  val clients = Hashtbl.create 50
  val client_usage = 
    (Hashtbl.create 50 :
       ('c, [`Count of int | `Unused of Unixqueue.group] ref) Hashtbl.t)

  method client_config = config

  method private connector_of x =
    let modulo = Array.length config#buckets in
    let j = 
      match x with
	| `Bucket b -> b
	| #key as k -> bucket_of_hash modulo (hash_of_key k) in
    let conn = (config # buckets).(j)  in
    if not (config # is_alive j) then (
      self # disconnect_client conn;
      raise Server_not_alive
    );
    conn


  method private client_of_connector connector =
    try
      Hashtbl.find clients connector
    with
      | Not_found ->
	  let m =
	    `Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config) in
	  let c = 
	    Cache_clnt.Cache.V1.create_client2 ~esys m in
	  Rpc_client.configure c 0 config#query_timeout;
	  Hashtbl.add clients connector c;
	  c


  method private disconnect_client connector =
    try
      let client = Hashtbl.find clients connector in
      Rpc_client.shut_down client;
      Hashtbl.remove clients connector
    with
      | Not_found -> ()


  method shutdown() =
    Hashtbl.iter
      (fun conn client ->
	 Rpc_client.shut_down client
      )
      clients;
    Hashtbl.clear clients


  method private incr_usage connector =
    try
      let usage = Hashtbl.find client_usage connector in
      ( match !usage with
	  | `Count n ->
	      usage := `Count(n+1)
	  | `Unused tmo_group -> 
	      Unixqueue.clear esys tmo_group;
	      usage := `Count 1
      )
    with
      | Not_found ->
	  Hashtbl.add client_usage connector (ref (`Count 1))
	    

  method private decr_usage connector () =
    try
      let usage = Hashtbl.find client_usage connector in
      ( match !usage with
	  | `Count n ->
	      assert(n >= 1);
	      let n' = n-1 in
	      if n' = 0 then (
		let tmo = config#idle_timeout in
		if tmo > 0.0 then (
		  let tmo_group = Unixqueue.new_group esys in
		  usage := `Unused tmo_group;
		  Unixqueue.once esys tmo_group tmo
		    (fun () ->
		       self # disconnect_client connector;
		       Hashtbl.remove client_usage connector
		    )
		)
		else
		  if tmo = 0.0 then
		    (
		      self # disconnect_client connector;
		      Hashtbl.remove client_usage connector
		    )
		  else
		    usage := `Count 0
	      )
	      else
		usage := `Count n'
	  | `Unused _ ->
	      assert false
      )
    with
      | Not_found ->
	  assert false


  method private connectors =
    (* Return all connectors mentioned in client_config *)
    let l = ref [] in
    for k = 0 to Array.length config#buckets - 1 do
      let c = (config#buckets).(k) in
      if not (List.mem c !l) then l := c :: !l
    done;
    !l


  method private with_client connector f =
    try
      let client = self # client_of_connector connector in
      f connector client
    with
      | Rpc_client.Client_is_down ->
	  (* CHECK: Maybe make it configurable what to do in this case *)
	  Hashtbl.remove clients connector;
	  let g = Unixqueue.new_group esys in
	  Unixqueue.once esys g 0.0
	    (fun () -> self # with_client connector f)


  method set pass_reply k v ts opts =
    let h = hash_of_key k in
    let modulo = Array.length config#buckets in
    let bucket = bucket_of_hash modulo h in
    self # with_client
      (self # connector_of (`Bucket bucket))
      (fun connector client ->
	 self # incr_usage connector;
	 Cache_clnt.Cache.V1.set'async
	   client
	   (h, bucket, v, ts, opts)
	   (handle_rpc_error 
	       client
	       (transform_reply
		  pass_reply
		  (function 
		     | `stored -> `Stored
		     | `not_stored -> `Not_stored
		  )
		  (self # decr_usage connector)
	       )
	   )
      )


  method get pass_reply k opts =
    let h = hash_of_key k in
    let modulo = Array.length config#buckets in
    let bucket = bucket_of_hash modulo h in
    self # with_client
      (self # connector_of (`Bucket bucket))
      (fun connector client ->
	 self # incr_usage connector;
	 Cache_clnt.Cache.V1.get'async
	   client
	   (h, opts)
	   (handle_rpc_error
	      client
	      (transform_reply
		 pass_reply
		 (function 
		    | `found e -> `Found e
		    | `not_found -> `Not_found
		 )
		 (self # decr_usage connector)
	      )
	   )
      )


  method delete pass_reply k ts opts =
    let h = hash_of_key k in
    let modulo = Array.length config#buckets in
    let bucket = bucket_of_hash modulo h in
    self # with_client
      (self # connector_of (`Bucket bucket))
      (fun connector client ->
	 self # incr_usage connector;
	 Cache_clnt.Cache.V1.delete'async
	   client
	   (h, ts, opts)
	   (handle_rpc_error
	      client
	      (transform_reply
		 pass_reply
		 (fun () -> `Ok)
		 (self # decr_usage connector)
	      )
	   )
      )


  method private multi_invoke : 'arg 'res . 
                                [ `Ok of (Rpc_client.connector * 'res) list
                                | `Timeout
                                ] async_reply ->
                                (Rpc_client.t -> 'arg -> 'res async_reply -> unit) ->
                                (Rpc_client.connector -> 'arg) ->
				Rpc_client.connector list ->
                                unit =
    fun pass_reply rpc_call arg_of_conn connectors ->
      (* Call [rpc_call] for every connector in [connectors]. The argument
       * is the result of [arg_of_conn]. The result is accumulated, and
       * passed back using [pass_reply] when all replies have arrived.
       *)
      let result = ref (`Ok []) in
      let n = ref 0 in
      List.iter
	(fun connector ->
	   self # with_client
	     connector
	     (fun connector client ->
		self # incr_usage connector;
		rpc_call
		  client
		  (arg_of_conn connector)
		  (fun get_reply ->
		     decr n;
		     self # decr_usage connector ();
		     ( try
			 let r = get_reply() in
			 ( match !result with
			     | `Ok l -> result := `Ok ((connector, r) :: l)
			     | _ -> ()
			 )
		       with
			 | Rpc_client.Message_timeout ->
			     ( match !result with
				 | `Ok _ -> result := `Timeout
				 | _ -> ()
			     );
			     Rpc_client.shut_down client
			 | error ->
			     result := `Error error;
			     Rpc_client.shut_down client
		     );
		     if !n = 0 then 
		       pass_reply
			 (fun () ->
			    match !result with
			      | `Error e -> raise e
			      | (`Ok _|`Timeout) as regular -> regular
			 )
		  )
	     );
	   incr n
	)
	connectors
	
  method clear pass_reply =
    self # multi_invoke 
      (transform_reply
	 pass_reply
	 (function
	    | `Ok _ -> `Ok
	    | `Timeout -> `Timeout
	 )
	 (fun () -> ())
      )
      Cache_clnt.Cache.V1.clear'async
      (fun connector -> ())
      (self # connectors)

  method clear_counters pass_reply =
    self # multi_invoke 
      (transform_reply
	 pass_reply
	 (function
	    | `Ok _ -> `Ok
	    | `Timeout -> `Timeout
	 )
	 (fun () -> ())
      )
      Cache_clnt.Cache.V1.clear_counters'async
      (fun connector -> ())
      (self # connectors)

  method get_config pass_reply =
    self # multi_invoke 
      pass_reply
      Cache_clnt.Cache.V1.get_config'async
      (fun connector -> ())
      (self # connectors)

  method get_stats pass_reply =
    self # multi_invoke 
      pass_reply
      Cache_clnt.Cache.V1.get_stats'async
      (fun connector -> ())
      (self # connectors)

  method set_config pass_reply new_configs =
    self # multi_invoke 
      (transform_reply
	 pass_reply
	 (function
	    | `Ok _ -> `Ok
	    | `Timeout -> `Timeout
	 )
	 (fun () -> ())
      )
      Cache_clnt.Cache.V1.set_config'async
      (fun connector -> List.assoc connector new_configs)
      (List.map fst new_configs)
end


let create_async_client = new async_client_impl


let synchronize esys f_async =
  let get_reply = ref (fun () -> assert false) in
  f_async (fun gr -> get_reply := gr);
  Unixqueue.run esys;
  !get_reply()


class sync_client_impl config : sync_client =
  let config =
    ( object
	method buckets = config # buckets
	method query_timeout = config # query_timeout 
	method idle_timeout = (-1.0)   (* Not supported by the sync client *)
	method is_alive = config # is_alive
      end
    ) in
  let esys = Unixqueue.create_unix_event_system() in
  let ac = new async_client_impl config esys in
object(self)
  method client_config = config

  method set k v ts opts =
    synchronize
      esys
      (fun pr -> ac # set pr k v ts opts)

  method get k opts =
    synchronize
      esys
      (fun pr -> ac # get pr k opts)

  method delete k ts opts =
    synchronize
      esys
      (fun pr -> ac # delete pr k ts opts)

  method clear () =
    synchronize esys ac#clear

  method get_config() =
    synchronize esys ac#get_config

  method set_config new_configs =
    synchronize
      esys
      (fun pr -> ac # set_config pr new_configs)

  method get_stats() =
    synchronize esys ac#get_stats

  method clear_counters() =
    synchronize esys ac#clear_counters

  method shutdown() =
    ac # shutdown()

end


let create_sync_client = new sync_client_impl

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml