(*
Copyright 2010 Gerd Stolpmann
This file is part of Plasma, a distributed filesystem and a
map/reduce computation framework. Unless you have a written license
agreement with the copyright holder (Gerd Stolpmann), the following
terms apply:
Plasma is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Plasma is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with Foobar. If not, see <http://www.gnu.org/licenses/>.
*)
(* $Id: dn_manager.ml 456 2011-10-11 13:26:23Z gerd $ *)
(** The manager accepts user connections, and performs user requests.
There is only one process running the manager.
It is fully programmed in an asynchronous style, so it is always
responsive. All real I/O is done by helper processes that implement
the [Datanode_io] program. Communication with this program is
sped up by using an shm object (instead of including the data to
read/write into the RPC calls). There are usually a number of these
helper processes, and a TCP connection to each of these. This TCP
connection is used for RPC calls that trigger I/O operations.
As mentioned, the payload data is not sent as part of the normal
RPC serialization of request or response data, but instead it is
put into the shm object, and the RPC calls only point to the
shm object.
*)
open Printf
let dlogf = Plasma_util.dlogf
let dlogr = Plasma_util.dlogr
module Rpcapi_clnt = Pfs_rpcapi_clnt
module Rpcapi_srv = Pfs_rpcapi_srv
module Rpcapi_aux = Pfs_rpcapi_aux
module A = Pfs_rpcapi_aux
module Dio_proxy = Rpcapi_clnt.Make'Datanode_io(Rpc_proxy.ManagedClient)
type waiting_request =
[ `R of int -> unit
| `W of int -> unit
]
(** The function is called when the event happened:
- [`R f]: A slot is free for a read request. [f] is called with the
slow index as arg.
- [`W f]: A slot is free for a write request. [f] is called with the
slow index as arg.
*)
(* Statistics: *)
type stats =
{ mutable cycle : int; (* statistics cycle number *)
mutable r_count : int; (* number of finished read requests *)
mutable r_lat : float; (* cumulated latencies of read requests *)
mutable r_peak: float; (* peak latency of read requests *)
mutable w_count : int; (* number of finished write requests *)
mutable w_lat : float; (* cumulated latencies of write requests *)
mutable w_peak: float; (* peak latency of write requests *)
mutable c_count : int; (* number of finished copy requests *)
mutable c_lat : float; (* cumulated latencies of copy requests *)
mutable c_peak: float; (* peak latency of copy requests *)
mutable t_t0 : float; (* time when throttling started *)
mutable t_t0_r : float; (* time when throttling started (w/ reset) *)
mutable t_count : int; (* number of throttle events *)
mutable t_lat : float; (* cumulated latency of throttling *)
mutable t_peak : float; (* peak latency of throttling *)
mutable q_peak : int; (* peak length of the wrequests queue *)
mutable req_count : (Rpc_server.connection_id, int) Hashtbl.t;
(* number of pending requests per connection *)
mutable req_t0 : float; (* time when req_count changed *)
mutable bconn_acc : float;
(* cumulated n * t where n is the number of busy connections (with
pending requests) and t is the period of time where n has been
valid
*)
}
type manager =
{ shm : Dn_shm.dn_shm;
(** The shm object *)
shm_name : string;
(** The name of the shm object *)
slot_used : bool array;
(** for each shm slot this array says whether it is used (true) or
free (false)
*)
wrequests : waiting_request Queue.t;
(** The queue of waiting requests *)
drequests : (unit -> unit) Queue.t;
(** Delayed requests because of slow flushing *)
io_mset : Rpc_proxy.ManagedSet.mset;
(** Clients to the I/O processes *)
safetrans : (int64, int64*int64) Hashtbl.t;
(** maps sf_id to (st_tmo,st_secret) *)
mutable last_sync : int64;
(** A number that is increased by 1 whenever a sync is completed *)
mutable syncing : bool;
(** Whether a sync is in progress *)
mutable slow_syncing : bool;
(** Whether sync cannot be done in the configured period, and write
requests are throttled
*)
wsync : (bool->unit) Queue.t;
(** Functions waiting on the compeletion of a sync. The bool argument
says whether the sync was successful.
*)
user_shm : (Rpc_server.connection_id, string list) Hashtbl.t;
(** Manages user-requested shm objects. The table maps TCP connection
IDs to the path names of the shm objects.
*)
mutable udsocket : string option;
(** name of a Unix Domain socket *)
store : Dn_store.dn_store;
(** The datastore (file with blocks). We keep this is here only for
calling {!Dn_store.will_read_block}
*)
mutable discover_server : Rpc_server.t option;
stats : stats; (** statistics *)
}
module Manager_var =
Netplex_cenv.Make_var_type(struct type t = manager end)
module Int64_var =
Netplex_cenv.Make_var_type(struct type t = int64 end)
module String_var =
Netplex_cenv.Make_var_type(struct type t = string end)
let current_manager() =
Manager_var.get "current_manager"
let verify m block rd_perm wr_perm ticket =
(** Verify that [ticket] contains the right verifier for accesses to [block].
Returns [true] if positive, [false] if negative.
*)
let open Pfs_rpcapi_aux in
try
dlogr
(fun () ->
sprintf "ticket id=%Ld tmo=%Ld vfy=%Ld rd=%B wr=%B range=%Ld+%Ld"
ticket.safetrans_id ticket.safetrans_tmo ticket.safetrans_vfy
ticket.read_perm ticket.write_perm ticket.range_start
ticket.range_length);
let (st_tmo, st_secret) = Hashtbl.find m.safetrans ticket.safetrans_id in
let t = Unix.gettimeofday() in
Int64.to_float st_tmo >= t && (
block >= ticket.range_start &&
Int64.sub block ticket.range_start < ticket.range_length &&
(not rd_perm || ticket.read_perm) &&
(not wr_perm || ticket.write_perm) &&
let v =
Plasma_util.compute_verifier
ticket.safetrans_id st_secret ticket.range_start ticket.range_length
ticket.read_perm ticket.write_perm in
( dlogr
(fun () ->
sprintf
"compute_verifier id=%Ld sec=%Ld range=%Ld+%Ld rd=%B wr=%B v=%Ld"
ticket.safetrans_id st_secret ticket.range_start ticket.range_length
ticket.read_perm ticket.write_perm v);
v = ticket.safetrans_vfy
)
)
with
| Not_found ->
false
let check_length m =
let n = Queue.length m.wrequests in
m.stats.q_peak <- max m.stats.q_peak n
let find_slots_and_process m =
(** Run over the [wrequests] queue, and find slots for the requests.
If a slot [s] is found for request function [f], the function is
called as [f s] (minimally delayed via event system). Processing
stops when [wrequests] becomes empty, or when there are no more
free slots.
*)
let cont = Netplex_cenv.self_cont() in
let esys = cont#event_system in
let delayed = Queue.create() in
let is_write =
function
| `R _ -> false
| `W _ -> true in
let rec find() =
let req = Queue.take m.wrequests in
if is_write req && m.slow_syncing then (
Queue.add req delayed;
find()
)
else
match req with
| `R on_slot -> on_slot
| `W on_slot -> on_slot in
( try
(* Flow control: if we are syncing slowly do not accept further
write requests.
This would make everything worse. We have to ensure to call
this function again when the sync is done.
*)
for k = 0 to Array.length m.slot_used-1 do
if not m.slot_used.(k) then (
let on_slot = find() in (* or Queue.Empty *)
m.slot_used.(k) <- true;
(** Run [onslot k] by pushing this onto the event queue. We do not
run this immediately because the event queue has some reasonable
exception handling, so we do not have to do this here.
*)
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0 (fun () -> on_slot k)
)
done
with
| Queue.Empty -> ()
);
(* Also run all drequests if possible *)
if not m.slow_syncing then (
Queue.iter
(fun f ->
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0 f
)
m.drequests;
Queue.clear m.drequests
);
Queue.transfer delayed m.wrequests
let release_slot m slot =
(** Release slot [slot], and run again over the [wrequests] queue to
check whether there is an element in the queue that can be processed
because of the free slot.
*)
m.slot_used.(slot) <- false;
find_slots_and_process m
let sync_notify m ok =
(** Run the registered functions [wsync] that wait on a sync. The functions
are removed from the [wsync] queue. [ok] is [true] when the sync
was successful.
*)
let cont = Netplex_cenv.self_cont() in
let esys = cont#event_system in
let q = Queue.create() in
Queue.transfer m.wsync q;
Queue.iter
(fun f ->
(** Run [f ok] by pushing this onto the event queue. We do not
run this immediately because the event queue has some reasonable
exception handling, so we do not have to do this here.
*)
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0 (fun () -> f ok)
)
q
let sync_cycle m =
(** This function is called every n seconds to start a sync. It performs
the sync, increases [last_sync], and notifies the function waiting
on sync completion.
*)
if m.syncing then (
if not m.slow_syncing then (
let t0 = Unix.gettimeofday() in
m.stats.t_t0 <- t0;
m.stats.t_t0_r <- t0;
m.slow_syncing <- true
)
)
else
if true (* not (Queue.is_empty m.wsync) *) then (
try
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
Dio_proxy.V1.sync'async
mc ()
(fun get_result ->
if m.slow_syncing then (
m.stats.t_count <- m.stats.t_count + 1;
let t1 = Unix.gettimeofday() in
let lat1 = t1 -. m.stats.t_t0_r in
m.stats.t_lat <- m.stats.t_lat +. lat1;
let lat2 = t1 -. m.stats.t_t0 in
m.stats.t_peak <- max m.stats.t_peak lat2
);
m.syncing <- false;
m.slow_syncing <- false;
let ok =
try get_result(); true
with error ->
Netlog.logf `Err "proc_sync exception (2): %s"
(Netexn.to_string error);
false in
if ok then
m.last_sync <- Int64.succ m.last_sync;
find_slots_and_process m;
sync_notify m ok
);
m.syncing <- true
with error ->
Netlog.logf `Err "proc_sync exception (1): %s" (Netexn.to_string error);
m.syncing <- false
)
let ignore_lost_connection emit x =
(** Call the [emit] function and ignore [Connection_lost] exceptions.
This is required when [emit] is invoked after some delay, and
not immediately after receiving the request
*)
try emit x
with Rpc_server.Connection_lost -> ()
let count_request m sess delta =
let l_old = Hashtbl.length m.stats.req_count in
let conn_id = Rpc_server.get_connection_id sess in
let n_old =
try Hashtbl.find m.stats.req_count conn_id
with Not_found -> 0 in
let n_new =
n_old + delta in
if n_new = 0 then
Hashtbl.remove m.stats.req_count conn_id
else
Hashtbl.replace m.stats.req_count conn_id n_new;
let l_new = Hashtbl.length m.stats.req_count in
if l_old <> l_new then (
let t0 = Unix.gettimeofday() in
m.stats.bconn_acc <-
m.stats.bconn_acc +. (float l_old) *. (t0 -. m.stats.req_t0);
m.stats.req_t0 <- t0
)
let proc_read conf sess (req, block, pos, len, ticket) emit =
(** Implementation of the [read] RPC. When we are done, we have to
call [emit].
*)
let m = current_manager() in
(** First check whether [ticket] permits us to do the read: *)
if not (verify m block true false ticket) then (
Netlog.logf `Err
"Got unauthorized read request for block %Ld (ticket %Ld)"
block ticket.Rpcapi_aux.safetrans_id;
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err
)
else (
(** Define a callback function that will be invoked when a slot in
the shm object is free:
*)
match req with
| `dnch_rpc ->
let on_slot slot =
let (mem,mpos) = Dn_shm.slot m.shm slot in
try
let t0 = Unix.gettimeofday() in
(** Fadvise that we will need the block. *)
Dn_store.will_read_block m.store block;
(** Pick now an RPC client and call the [Datanode_io] program to
start the read.
*)
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
Dio_proxy.V1.read'async
mc
(slot,block,pos,len)
(fun get_result ->
(** This function is called back when the read is done. The
data is now in the shm object and can be forwarded from
there
*)
let t1 = Unix.gettimeofday() in
count_request m sess (-1);
let ok =
try get_result(); true
with error ->
Netlog.logf `Err "proc_read exception (2): %s"
(Netexn.to_string error);
false in
if ok then (
(** Extract the data from the shm object and return to
caller
*)
let data = String.create len in
Netsys_mem.blit_memory_to_string mem mpos data 0 len;
ignore_lost_connection
emit (`dnch_rpc data)
)
else
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
release_slot m slot;
m.stats.r_count <- m.stats.r_count + 1;
let lat = t1 -. t0 in
m.stats.r_lat <- m.stats.r_lat +. lat;
m.stats.r_peak <- max m.stats.r_peak lat;
);
count_request m sess 1;
with error ->
Netlog.logf `Err "proc_read exception (1): %s"
(Netexn.to_string error);
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
release_slot m slot;
in
(** Push [on_slot] onto the queue of waiting requests, and check
immediately whether there is a free slot.
*)
Queue.push (`R on_slot) m.wrequests;
check_length m;
find_slots_and_process m
| `dnch_shm shm_obj ->
let t0 = Unix.gettimeofday() in
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
Dio_proxy.V1.read_shm'async
mc
(shm_obj,block,pos,len)
(fun get_result ->
let t1 = Unix.gettimeofday() in
count_request m sess (-1);
let ok =
try get_result(); true
with error ->
Netlog.logf `Err "proc_read exception (3): %s"
(Netexn.to_string error);
false in
if ok then (
ignore_lost_connection
emit `dnch_shm
)
else
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
m.stats.r_count <- m.stats.r_count + 1;
let lat = t1 -. t0 in
m.stats.r_lat <- m.stats.r_lat +. lat;
m.stats.r_peak <- max m.stats.r_peak lat;
);
count_request m sess 1
)
let dump_load m prefix =
let la = Rpc_proxy.ManagedSet.mset_load m.io_mset in
dlogf
"%s: mset_load=%s"
prefix
(String.concat "," (List.map string_of_int (Array.to_list la)))
let call_write rpc_name conf sess (block,ticket) init_shm emit =
(** Implementation of the [write] and [zero] RPCs. When we are done, we
have to call [emit].
*)
let m = current_manager() in
(** First check whether [st_vfy] permits us to do the write: *)
if not (verify m block false true ticket) then (
Netlog.logf `Err
"Got unauthorized write request for block %Ld (ticket %Ld)"
block ticket.Rpcapi_aux.safetrans_id;
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err
)
else (
(** Define a callback function that will be invoked when a slot in
the shm object is free:
*)
let on_slot slot =
(** Fill the slot with data, and the call [write] of the [Datanode_io]
program to start writing
*)
let t0 = Unix.gettimeofday() in
let (mem,mpos) = Dn_shm.slot m.shm slot in
try
init_shm mem mpos;
(* dump_load m "before write"; *)
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
Dio_proxy.V1.write'async
mc
(slot,block)
(fun get_result ->
(** This function is called back when the write is done. *)
let t1 = Unix.gettimeofday() in
count_request m sess (-1);
let ok =
try get_result(); true
with error ->
Netlog.logf `Err "%s exception (2): %s"
rpc_name (Netexn.to_string error);
false in
if ok then
ignore_lost_connection
emit()
else
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
release_slot m slot;
m.stats.w_count <- m.stats.w_count + 1;
let lat = t1 -. t0 in
m.stats.w_lat <- m.stats.w_lat +. lat;
m.stats.w_peak <- max m.stats.w_peak lat;
);
count_request m sess 1
(* dump_load m "after write"; *)
with error ->
Netlog.logf `Err
"%s exception (1): %s" rpc_name (Netexn.to_string error);
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
release_slot m slot
in
(** Push [on_slot] onto the queue of waiting requests, and check immediately
whether there is a free slot.
*)
Queue.push (`W on_slot) m.wrequests;
check_length m;
find_slots_and_process m
)
let proc_write conf sess (block,wr_req,ticket) emit =
match wr_req with
| `dnch_rpc data ->
let init_shm mem mpos =
data # blit_to_memory 0 mem mpos conf#dn_blocksize in
call_write "proc_write" conf sess (block,ticket) init_shm emit
| `dnch_shm shm_obj ->
let t0 = Unix.gettimeofday() in
let m = current_manager() in
let do_write() =
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
Dio_proxy.V1.write_shm'async
mc
(shm_obj,block)
(fun get_result ->
let t1 = Unix.gettimeofday() in
count_request m sess (-1);
let ok =
try get_result(); true
with error ->
Netlog.logf `Err "%s exception (3): %s"
"proc_write" (Netexn.to_string error);
false in
if ok then
ignore_lost_connection
emit()
else
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
m.stats.w_count <- m.stats.w_count + 1;
let lat = t1 -. t0 in
m.stats.w_lat <- m.stats.w_lat +. lat;
m.stats.w_peak <- max m.stats.w_peak lat;
);
count_request m sess 1
in
if m.slow_syncing then
(* Do the write later *)
Queue.add do_write m.drequests
else
do_write()
let proc_zero conf sess (block,ticket) emit =
let init_shm mem mpos =
Bigarray.Array1.fill
(Bigarray.Array1.sub mem mpos conf#dn_blocksize)
'\000' in
call_write "proc_zero" conf sess (block,ticket) init_shm emit
let proc_copy conf sess (block,dest_node,dest_ident,dest_block,
ticket,dest_ticket)
emit =
(** Implementation of the [copy] RPC. This is more or less directly
forwarded to [Dn_io].
The implicit [read] and [write] operations must be done by [Dn_io].
There is, however, another implementation option for the network
write in the remote copy case: The [read] could also put the data
block into shm, and the network write is done here. That would
allow us to better control the number of network connections to
remote systems. (Of course, we could also simply _count_ the number
of remote copy invocations here, and limit only that.)
*)
(* FIXME: local copies are not throttled! *)
(* TODO: we ignore right now ticket. Do something. *)
let m = current_manager() in
let on_slot slot =
(** Call [copy] of the [Datanode_io] program to start copying. The
slot is used as buffer.
*)
try
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
let t0 = Unix.gettimeofday() in
Dio_proxy.V1.copy'async
mc
(block,dest_node,dest_ident,dest_block,
dest_ticket,slot)
(fun get_result ->
let t1 = Unix.gettimeofday() in
count_request m sess (-1);
let ok =
try get_result(); true
with error ->
Netlog.logf `Err "proc_copy exception (2): %s"
(Netexn.to_string error);
false in
if ok then
ignore_lost_connection
emit()
else
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err;
release_slot m slot;
m.stats.c_count <- m.stats.c_count + 1;
let lat = t1 -. t0 in
m.stats.c_lat <- m.stats.c_lat +. lat;
m.stats.c_peak <- max m.stats.c_peak lat;
);
count_request m sess 1
with error ->
Netlog.logf `Err
"proc_copy exception (1): %s" (Netexn.to_string error);
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err
in
(** First check whether [ticket] permits us to do the read: *)
if not (verify m block true false ticket) then (
Netlog.logf `Err
"Got unauthorized read request for block %Ld (ticket %Ld)"
block ticket.Rpcapi_aux.safetrans_id;
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err
)
else (
(** Check [dest_ticket] immediately in case of a local copy *)
let identity = Dn_store.get_identity m.store in
if identity = dest_ident &&
not (verify m dest_block false true dest_ticket)
then (
Netlog.logf `Err
"Got unauthorized copy request for block %Ld to block %Ld (ticket %Ld)"
block dest_block dest_ticket.Rpcapi_aux.safetrans_id;
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err
)
else (
(** Push [on_slot] onto the queue of waiting requests, and check immediately
whether there is a free slot.
*)
Queue.push (`R on_slot) m.wrequests;
check_length m;
find_slots_and_process m
)
)
let proc_sync conf sess () emit =
(** Implementation of the [sync] RPC. When we are done, we have to
call [emit].
*)
let m = current_manager() in
(** Determine the current sync cycle number [cur_sync], and the cycle
number that must have completed so we can return success. If a
sync is currently being done, we have to await this cycle plus the
following, otherwise just the following. This ensures that all data
at the time of calling this RPC is synced to disk.
*)
let cur_sync = m.last_sync in
let wait_sync =
if m.syncing then Int64.add cur_sync 2L else Int64.add cur_sync 1L in
(** This is the callback function that is invoked when a sync cycle is
done
*)
let rec f is_ok =
if is_ok then (
if m.last_sync >= wait_sync then
ignore_lost_connection
emit() (** Return success *)
else
Queue.push f m.wsync (** Another round of waiting *)
)
else
(** Any error interrupts waiting, and we return the error: *)
ignore_lost_connection
(Rpc_server.reply_error sess) Rpc.System_err
in
Queue.push f m.wsync
module Cached(V:Netplex_cenv.VAR_TYPE) = struct
let cached f name emit emit_syserr =
(** Get a value from [Datanode_io] by calling [f]. The value is cached
into [name]
*)
try
emit(V.get name)
with
| Netplex_cenv.Container_variable_not_found _ ->
(** first time *)
let m = current_manager() in
let (mc,_) = Rpc_proxy.ManagedSet.mset_pick m.io_mset in
f
mc
()
(fun get_result ->
(** This function is called back when the RPC is done. *)
try
let size = get_result() in
V.set name size;
emit size
with
| error ->
Netlog.logf `Err "proc_get_%s exception (1): %s"
name (Netexn.to_string error);
emit_syserr()
)
end
let proc_identity conf sess exp_clustername emit =
(** We forward this call to [Datanode_io] the first time the identity is
requested. The identity does not change, so we can cache this value.
*)
if conf#dn_clustername <> exp_clustername then (
Netlog.logf `Err "proc_identity: bad clustername";
ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err;
);
let module C = Cached(String_var) in
C.cached
Dio_proxy.V1.identity'async
"identity"
(fun x -> ignore_lost_connection emit x)
(fun () ->
ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err)
let proc_identity_discover conf sess exp_clustername emit =
(** Similar, but we are quiet on errors *)
if conf#dn_clustername = exp_clustername then (
let module C = Cached(String_var) in
C.cached
Dio_proxy.V1.identity'async
"identity"
(fun x -> ignore_lost_connection emit x)
(fun () -> ())
)
let proc_size conf sess () emit =
(** We forward this call to [Datanode_io] the first time the size is
requested. The size does not change, so we can cache this value.
*)
let module C = Cached(Int64_var) in
C.cached
Dio_proxy.V1.size'async
"size"
(fun x -> ignore_lost_connection emit x)
(fun () ->
ignore_lost_connection (Rpc_server.reply_error sess) Rpc.System_err)
let proc_blocksize conf sess () emit =
emit conf#dn_blocksize
let proc_clustername conf sess () emit =
emit conf#dn_clustername
let remember_shm dir shm_names =
(** Remember this shm_name in the filesystem *)
let f = open_out (dir ^ "/shm_name.new") in
try
List.iter
(fun name ->
output_string f (name ^ "\n");
)
shm_names;
close_out f;
Unix.rename (dir ^ "/shm_name.new") (dir ^ "/shm_name")
with
| error ->
close_out_noerr f;
raise error
let remember_all_shm() =
let cont = Netplex_cenv.self_cont() in
let dir =
cont # socket_service # socket_service_config # controller_config #
socket_directory in
let m = current_manager() in
let names =
m.shm_name ::
(Hashtbl.fold (fun _ n acc -> n @ acc) m.user_shm []) in
remember_shm dir names
let drop_old_shm dir =
(** Look for a remembered old shm object and drop it *)
if Sys.file_exists (dir ^ "/shm_name") then (
let f = open_in (dir ^ "/shm_name") in
try
while true do
let n = input_line f in
try
Dn_shm.unlink n
with
| error ->
Netlog.logf `Warning "Cannot unlink old shm %s: %s"
n (Netexn.to_string error)
done
with
| End_of_file ->
close_in f
)
let sess_is_local sess =
try
match Rpc_server.get_socket_name sess with
| Unix.ADDR_INET(ip1,_) ->
( match Rpc_server.get_peer_name sess with
| Unix.ADDR_INET(ip2,_) ->
ip1=ip2
| _ ->
assert false
)
| Unix.ADDR_UNIX _ ->
true
with
| _ -> false
let proc_alloc_shm_if_local sess () emit =
let m = current_manager() in
let is_local = sess_is_local sess in
if is_local then (
try
let (fd, name) = Plasma_util.get_shm_fd "plasma" 0 in
Unix.close fd;
let conn = Rpc_server.get_connection_id sess in
let l =
try Hashtbl.find m.user_shm conn
with Not_found -> [] in
Hashtbl.replace m.user_shm conn (name :: l);
remember_all_shm();
ignore_lost_connection emit (Some name)
with
| error ->
Netlog.logf `Warning "Cannot allocate shm: %s"
(Netexn.to_string error);
ignore_lost_connection emit None
)
else
ignore_lost_connection emit None
let proc_udsocket_if_local sess () emit =
let is_local = sess_is_local sess in
if is_local then (
let m = current_manager() in
ignore_lost_connection emit m.udsocket
)
else
ignore_lost_connection emit None
let on_tcp_close conn_id =
let m = current_manager() in
let l =
try Hashtbl.find m.user_shm conn_id with Not_found -> [] in
List.iter
(fun n ->
try
Netsys_posix.shm_unlink n
with
| error ->
Netlog.logf `Warning "Cannot unlink shm %s: %s"
n (Netexn.to_string error)
)
l;
Hashtbl.remove m.user_shm conn_id;
remember_all_shm()
let create_io_mset conf =
(** We use here the [Rpc_proxy] abstraction for RPC clients with
automatically managed connections. As the [Datanode_io] processes
are running on the same system, connections are reliable, and
we need not to configure this part of [Rpc_proxy]. We simply
configure to open several connections up to a certain number.
The RPC calls will use any of these connections.
*)
let esys = (** use the container event system for these calls *)
(Netplex_cenv.self_cont()) # event_system in
let connector =
match Netplex_cenv.lookup "Dn_io" "RPC" with
| None ->
failwith("Socket not found for service Dn_io, protocol RPC")
| Some path ->
Rpc_client.Unix path in
let n = conf#dn_io_processes in
let mclient_config =
Rpc_proxy.ManagedClient.create_mclient_config
~programs:[ Rpcapi_clnt.Datanode_io.V1._program ]
() in
let mset_config =
Rpc_proxy.ManagedSet.create_mset_config
~mclient_config
~policy:`Balance_load
() in
let mset =
Rpc_proxy.ManagedSet.create_mset mset_config [| connector, n |] esys in
mset
let proc_reset_all_safetrans conf sess () emit =
let m = current_manager() in
Hashtbl.clear m.safetrans;
emit()
let proc_cancel_safetrans conf sess st_id_a emit =
let m = current_manager() in
Array.iter
(fun st_id ->
Hashtbl.remove m.safetrans st_id;
)
st_id_a;
emit()
let proc_safetrans conf sess ticket_a emit =
let m = current_manager() in
Array.iter
(fun t ->
Hashtbl.replace m.safetrans t.A.st_id (t.A.st_tmo,t.A.st_secret)
)
ticket_a;
emit ()
let maybe_timeout_safetrans m =
(** Cancel safetrans that have timed out *)
let t = Unix.gettimeofday() in
let l =
Hashtbl.fold
(fun st_id (st_tmo,_) acc ->
if t > Int64.to_float st_tmo then st_id :: acc else acc
)
m.safetrans
[] in
List.iter
(fun st_id ->
Hashtbl.remove m.safetrans st_id
)
l
let mfac = Hashtbl.create 5
let () =
Hashtbl.add mfac "writedata" Xdr_mstring.string_based_mstrings
let setup srv (cf,cfaddr,conf,shm_name,access) =
Rpcapi_srv.Datanode.V1.bind_async
~proc_null:(fun _ _ emit -> emit ())
~proc_identity:(proc_identity conf)
~proc_size:(proc_size conf)
~proc_blocksize:(proc_blocksize conf)
~proc_clustername:(proc_clustername conf)
~proc_read:(proc_read conf)
~proc_write:(proc_write conf)
~proc_copy:(proc_copy conf)
~proc_zero:(proc_zero conf)
~proc_sync:(proc_sync conf)
~proc_alloc_shm_if_local
~proc_udsocket_if_local
srv;
Rpcapi_srv.Datanode_ctrl.V1.bind_async
~proc_null:(fun _ _ emit -> emit ())
~proc_reset_all_safetrans:(proc_reset_all_safetrans conf)
~proc_cancel_safetrans:(proc_cancel_safetrans conf)
~proc_safetrans:(proc_safetrans conf)
srv;
Rpc_server.set_mstring_factories srv mfac;
Rpc_server.set_onclose_action srv on_tcp_close;
Pfs_auth.configure_rpc_server srv access
let new_stats() =
{ cycle = 0;
r_count = 0;
r_lat = 0.0;
r_peak = 0.0;
w_count = 0;
w_lat = 0.0;
w_peak = 0.0;
c_count = 0;
c_lat = 0.0;
c_peak = 0.0;
t_t0 = 0.0;
t_t0_r = 0.0;
t_count = 0;
t_lat = 0.0;
t_peak = 0.0;
q_peak = 0;
req_count = Hashtbl.create 7;
req_t0 = Unix.gettimeofday();
bconn_acc = 0.0;
}
let next_stats m =
let s = m.stats in
let t0 = Unix.gettimeofday() in
s.cycle <- s.cycle + 1;
s.r_count <- 0;
s.r_lat <- 0.0;
s.r_peak <- 0.0;
s.w_count <- 0;
s.w_lat <- 0.0;
s.w_peak <- 0.0;
s.c_count <- 0;
s.c_lat <- 0.0;
s.c_peak <- 0.0;
(* s.t_t0 <- 0.0; *)
s.t_t0_r <- t0;
s.t_count <- 0;
s.t_lat <- 0.0;
s.t_peak <- 0.0;
s.q_peak <- Queue.length m.wrequests;
(* s.req_count <- Hashtbl.create 7; *)
s.req_t0 <- t0;
s.bconn_acc <- 0.0
let stats_period = 60.0
let print_stats m =
let s = m.stats in
let t0 = Unix.gettimeofday() in
let l = Hashtbl.length s.req_count in
s.bconn_acc <- s.bconn_acc +. (float l) *. (t0 -. s.req_t0);
let msg =
sprintf "cycle=%d \
[reads: n=%d lat=%.1f avg=%.1f peak=%.1f] \
[writes: n=%d lat=%.1f avg=%.1f peak=%.1f] \
[copies: n=%d lat=%.1f avg=%.1f peak=%.1f] \
[throttling: n=%d lat=%.1f avg=%.1f peak=%.1f] \
[queue: peak_length=%d] \
[parallel requests: avg=%.1f]"
s.cycle
s.r_count s.r_lat (s.r_lat /. float s.r_count) s.r_peak
s.w_count s.w_lat (s.w_lat /. float s.w_count) s.w_peak
s.c_count s.c_lat (s.c_lat /. float s.c_count) s.c_peak
s.t_count s.t_lat (s.t_lat /. float s.t_count) s.t_peak
s.q_peak
(s.bconn_acc /. stats_period) in
let cont =
Netplex_cenv.self_cont() in
cont # log_subch "statistics" `Info msg;
next_stats m
let create_dn_discover_server conf container =
(* This is a UDP server, and hence special *)
let udp_port_opt = ref None in
List.iter
(fun p ->
Array.iter
(fun a ->
match a with
| `Socket(Unix.ADDR_INET(h,p)) ->
udp_port_opt := Some (h,p)
| _ -> ()
)
p # addresses
)
container # socket_service # socket_service_config # protocols;
match !udp_port_opt with
| Some(h,p) ->
let esys = container # event_system in
let socket_config =
( object(self)
inherit Rpc_server.default_socket_config
method listen_options =
{ Uq_engines.default_listen_options with
Uq_engines.lstn_reuseaddr = true
}
end
) in
let rpc =
Rpc_server.create2 (`Socket(Rpc.Udp,
Rpc_server.Internet(h,p),
socket_config)) esys in
Rpcapi_srv.Datanode_discover.V1.bind_async
~proc_null:(fun _ _ emit -> emit ())
~proc_identity:(proc_identity_discover conf)
rpc;
Some rpc
| None -> None
let factory() =
Rpc_netplex.rpc_factory
~configure:(fun cf cfaddr ->
let conf = Dn_config.extract_dn_config cf in
(** Create the shm object - optionally, drop the old
object. Shm objects have kernel persistence, so they
are not automatically dropped when the process
terminates. We drop explicitly at [pre_finish_hook]
time, but for handling abnormally terminated processes
we also remember the name of the shm object and
drop it at next startup time. This is done at
[pre_start_hook] time below.
*)
let shm_name =
Dn_shm.create (conf :> Dn_shm.dn_shm_config) in
let access =
Pfs_auth.extract_access_config cf cf#root_addr in
(cf, cfaddr, conf, shm_name, access)
)
~hooks:(fun (cf, cfaddr, conf, shm_name, access) ->
( object
inherit Netplex_kit.empty_processor_hooks()
method post_add_hook _ ctrl =
ctrl # add_plugin Netplex_sharedvar.plugin;
(** Also enable the [Dn_io] service: *)
Dn_io.add_service conf cf cfaddr ctrl
method pre_start_hook _ ctrl _ =
drop_old_shm
ctrl#controller_config#socket_directory;
remember_shm
ctrl#controller_config#socket_directory
[shm_name]
method post_start_hook cont =
let store =
Dn_store.open_store (conf :> Dn_store.dn_store_config) in
let shm =
Dn_shm.openshm
(conf :> Dn_shm.dn_shm_config)
shm_name in
let udsocket =
Netplex_cenv.lookup "Dn_manager" "RPC" in
(* Start the special server for [Dn_discover]: *)
let dsrv = create_dn_discover_server conf cont in
let m =
{ shm = shm;
shm_name = shm_name;
slot_used = Array.make conf#dn_shm_queue_length false;
wrequests = Queue.create();
drequests = Queue.create();
io_mset = create_io_mset conf;
safetrans = Hashtbl.create 17;
last_sync = 0L;
syncing = false;
slow_syncing = false;
wsync = Queue.create();
user_shm = Hashtbl.create 5;
udsocket = udsocket;
store = store;
discover_server = dsrv;
stats = new_stats()
} in
Manager_var.set "current_manager" m;
(** Publish the name of the shm object: *)
ignore(Netplex_sharedvar.create_var
"plasma.datanode.shm.name");
ignore(Netplex_sharedvar.set_value
"plasma.datanode.shm.name" shm_name);
(** Establish the timer calling [sync] periodically: *)
let _tm =
Netplex_cenv.create_timer
(fun tm -> sync_cycle m; true)
conf#dn_sync_period in
(** Establish a timer for [maybe_timeout_safetrans] *)
let _tm =
Netplex_cenv.create_timer
(fun tm -> maybe_timeout_safetrans m; true)
60.0 in
(** Establish a timer for printing statistics *)
let _tm =
Netplex_cenv.create_timer
(fun tm -> print_stats m; true)
stats_period in
();
method system_shutdown() =
let m = current_manager() in
( match m.discover_server with
| None -> ()
| Some rpc ->
Netlog.logf `Info "Stopping discovery server";
Rpc_server.stop_server rpc
)
method pre_finish_hook cont =
( try
Dn_shm.unlink shm_name
with
| error ->
Netlog.logf `Err
"Cannot unlink shm object: %s"
(Netexn.to_string error)
);
let m = current_manager() in
Dn_store.close_store m.store;
end
)
)
~name:"dn_manager"
~setup
()