(* Copyright 2011 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: nn_blockmap_box.ml 431 2011-10-05 20:27:11Z gerd $ *) open Nn_blockmap open Printf let dlog = Plasma_util.dlog let dlogr = Plasma_util.dlogr class type blockmap_client_t = object method access : id:int -> identity:string -> size:int64 -> bool * blockmap_t method inactivate : id:int -> unit method close : unit -> unit method shutdown : unit -> unit end let box_size = max (128 * 1024) (128 * Nn_config.bmaprowsize) (* normally 128K per message. How much do we need: - reserve: the response list has a max length of bmaprowsize (1024). Each list element counts as 12 words -> ~100 bytes on 64bit platforms - free, pin: the arg list has usually only a length of 1 - get_changes: we only transfer 10 elements per req/resp cycle (~20K) *) type request = [ `Req_access | `Req_active | `Req_filled | `Req_stats | `Req_fill_blockmap of int64 * string | `Req_fill_from_db of int64 * int | `Req_fill_done | `Req_has_free_blocks | `Req_reserve of int * owner * reserve_info | `Req_free of triv_range list * owner | `Req_pin of triv_range list * owner | `Req_commit of owner | `Req_rollback of owner | `Req_release of owner | `Req_get_changes_start of owner | `Req_get_changes_cont | `Req_inactivate | `Req_shutdown ] let string_of_request = function | `Req_access -> "access" | `Req_active -> "active" | `Req_filled -> "filled" | `Req_stats -> "stats" | `Req_fill_blockmap(idx,_) -> sprintf "fill_blockmap(idx=%Ld)" idx | `Req_fill_from_db(idx,n) -> sprintf "fill_from_db(idx=%Ld,n=%d)" idx n | `Req_fill_done -> "fill_done" | `Req_has_free_blocks -> "has_free_blocks" | `Req_reserve(n,owner,ri) -> sprintf "reserve(n=%d,owner=%s)" n (string_of_owner owner) | `Req_free(l,owner) -> sprintf "free(%s,owner=%s)" (string_of_trl l) (string_of_owner owner) | `Req_pin(l,owner) -> sprintf "pin(%s,owner=%s)" (string_of_trl l) (string_of_owner owner) | `Req_commit owner -> sprintf "commit(owner=%s)" (string_of_owner owner) | `Req_rollback owner -> sprintf "rollback(owner=%s)" (string_of_owner owner) | `Req_release owner -> sprintf "release(owner=%s)" (string_of_owner owner) | `Req_get_changes_start owner -> sprintf "get_changes_start(owner=%s)" (string_of_owner owner) | `Req_get_changes_cont -> "get_changes_cont" | `Req_inactivate -> "inactivate" | `Req_shutdown -> "shutdown" type bmap = { blockmap_id : int; blockmap_identity : string; blockmap_size : int64; } type request_msg = { req_bmap : bmap; req_response_box : Netmcore.res_id; req : request } type respexn = [ `Exn_inactive | `Exn_failure of string | `Exn_invalid_argument of string ] let string_of_respexn = function | `Exn_inactive -> "Exn_inactive" | `Exn_failure msg -> "Exn_failure(" ^ msg ^ ")" | `Exn_invalid_argument msg -> "Exn_invalid_arg(" ^ msg ^ ")" type response = [ `Resp_access of bool (* flag: whether the blockmap has been implicitly created *) | `Resp_active of bool | `Resp_filled of bool | `Resp_stats of int64 * int64 * int64 | `Resp_fill_blockmap | `Resp_fill_from_db | `Resp_fill_done | `Resp_has_free_blocks of bool | `Resp_reserve of triv_range list | `Resp_free | `Resp_pin | `Resp_commit | `Resp_rollback | `Resp_release | `Resp_get_changes of bool * (int64 * string) list | `Resp_inactivate | `Resp_shutdown | respexn ] let string_of_response = function | `Resp_access flag -> sprintf "access(created=%B)" flag | `Resp_active flag -> sprintf "active(active=%B)" flag | `Resp_filled flag -> sprintf "filled(filled=%B)" flag | `Resp_stats(u,t,f) -> sprintf "stats(used=%Ld,trans=%Ld,free=%Ld)" u t f | `Resp_fill_blockmap -> "fill_blockmap" | `Resp_fill_from_db -> "fill_from_db" | `Resp_fill_done -> "fill_done" | `Resp_has_free_blocks flag -> sprintf "has_free_blocks(has=%B)" flag | `Resp_reserve l -> sprintf "reserve(%s)" (string_of_trl l) | `Resp_free -> "free2" | `Resp_pin -> "pin" | `Resp_commit -> "commit" | `Resp_rollback -> "rollback" | `Resp_release -> "release" | `Resp_get_changes(more,l) -> sprintf "get_changes(more=%B,rows=%s)" more (String.concat "," (List.map (fun (idx,_) -> Int64.to_string idx) l)) | `Resp_inactivate -> "inactivate" | `Resp_shutdown -> "shutdown" | #respexn as e -> string_of_respexn e type response_msg = { resp : response } module Res_var = Netplex_sharedvar.Make_var_type(struct type t = Netmcore.res_id end) let blockmap_res_id_name = "plasma.namenode.blockmap_res_id" let init() = let flag = Netplex_sharedvar.create_var ~enc:true blockmap_res_id_name in dlogr (fun () -> sprintf "Nn_blockmap_box: create_var -> %B" flag) let send rbox_id sbox bmap req = dlogr (fun () -> sprintf "Nn_blockmap_box.send: req=%s" (string_of_request req)); let req_msg = { req_bmap = bmap; req_response_box = rbox_id; req = req } in Netcamlbox.camlbox_send sbox req_msg let call rbox rbox_id sbox bmap req decode = send rbox_id sbox bmap req; let rl = ref [] in while !rl = [] do let sl = Netcamlbox.camlbox_wait rbox in rl := List.map (Netcamlbox.camlbox_get_copy rbox) sl; List.iter (Netcamlbox.camlbox_delete rbox) sl done; match !rl with | [resp_msg] -> dlogr (fun () -> sprintf "Nn_blockmap_box.call resp=%s" (string_of_response resp_msg.resp)); ( match resp_msg.resp with | `Exn_inactive -> raise Inactive | `Exn_failure s -> failwith (s ^ " (via camlbox)") | `Exn_invalid_argument s -> invalid_arg (s ^ " (via camlbox)") | _ -> decode resp_msg.resp ) | _ -> assert false let expect x y = if x <> y then failwith "Nn_blockmap_box.expect" let bm_client id identity size rbox rbox_id sbox bmap : blockmap_t = let _ = (rbox : response_msg Netcamlbox.camlbox) in let _ = (sbox : request_msg Netcamlbox.camlbox_sender) in object(self) method active = call rbox rbox_id sbox bmap `Req_active (function | `Resp_active b -> b | _ -> assert false ) method filled = call rbox rbox_id sbox bmap `Req_filled (function | `Resp_filled b -> b | _ -> assert false ) method id = id method identity = identity method size = size method stats = call rbox rbox_id sbox bmap `Req_stats (function | `Resp_stats (nu,nt,nf) -> (nu,nt,nf) | _ -> assert false ) method fill_blockmap index data = call rbox rbox_id sbox bmap (`Req_fill_blockmap(index,data)) (expect `Resp_fill_blockmap) method fill_from_db index n = call rbox rbox_id sbox bmap (`Req_fill_from_db(index,n)) (expect `Resp_fill_from_db) method fill_done () = call rbox rbox_id sbox bmap `Req_fill_done (expect `Resp_fill_done) method has_free_blocks = call rbox rbox_id sbox bmap `Req_has_free_blocks (function | `Resp_has_free_blocks b -> b | _ -> assert false ) method reserve n owner ri = call rbox rbox_id sbox bmap (`Req_reserve(n,owner,ri)) (function | `Resp_reserve l -> l | _ -> assert false ) method free l owner = call rbox rbox_id sbox bmap (`Req_free(l,owner)) (expect `Resp_free) method pin l owner = call rbox rbox_id sbox bmap (`Req_pin(l,owner)) (expect `Resp_pin) method commit owner = call rbox rbox_id sbox bmap (`Req_commit owner) (expect `Resp_commit) method rollback owner = call rbox rbox_id sbox bmap (`Req_rollback owner) (expect `Resp_rollback) method release owner = call rbox rbox_id sbox bmap (`Req_release owner) (expect `Resp_release) method get_changes owner = let l = ref [] in let c = ref false in call rbox rbox_id sbox bmap (`Req_get_changes_start owner) (function | `Resp_get_changes(more,part) -> l := List.rev part @ !l; c := more | _ -> assert false ); while !c do call rbox rbox_id sbox bmap `Req_get_changes_cont (function | `Resp_get_changes(more,part) -> l := List.rev part @ !l; c := more | _ -> assert false ); done; List.rev !l method inactivate() = call rbox rbox_id sbox bmap `Req_inactivate (expect `Resp_inactivate) method dump = failwith "Nn_blockmap_box.dump: not implemented in client" end let client() : blockmap_client_t = let () = init() in let rbox, rbox_id = Netmcore_camlbox.create_camlbox "plasma_nn_bmapc" 2 box_size in let () = Pfs_pmanage.register_shm (Netmcore.get_shm rbox_id) in let () = dlog "Nn_blockmap_box.client: waiting for server" in let _ = Netplex_sharedvar.wait_for_enc_value blockmap_res_id_name in let () = dlog "Nn_blockmap_box.client: server ready" in let sbox_id = Res_var.get blockmap_res_id_name in let (sbox : request_msg Netcamlbox.camlbox_sender) = Netmcore_camlbox.lookup_camlbox_sender sbox_id in let () = dlog "Nn_blockmap_box.client: ready" in object(self) method access ~id ~identity ~size = let bmap = { blockmap_id = id; blockmap_identity = identity; blockmap_size = size } in call rbox rbox_id sbox bmap `Req_access (function | `Resp_access create_flag -> let c = bm_client id identity size rbox rbox_id sbox bmap in (create_flag,c) | _ -> assert false ) method inactivate ~id = let bmap = { blockmap_id = id; blockmap_identity = ""; blockmap_size = 0L } in let c = bm_client id "" 0L rbox rbox_id sbox bmap in c # inactivate() method shutdown() = let bmap = { blockmap_id = 0; blockmap_identity = ""; blockmap_size = 0L } in call rbox rbox_id sbox bmap `Req_shutdown (expect `Resp_shutdown) method close() = Netmcore.release sbox_id end let s_fetch rbox = let rl = ref [] in while !rl = [] do let sl = Netcamlbox.camlbox_wait rbox in rl := List.map (Netcamlbox.camlbox_get_copy rbox) sl; List.iter (Netcamlbox.camlbox_delete rbox) sl done; !rl let rec nsplit n l = if n=0 then ([], l) else match l with | x :: r -> let l1, l2 = nsplit (n-1) r in (x::l1), l2 | [] -> ([], []) let s_exec getch_map bmap req id = match req with | `Req_access -> assert false | `Req_active -> `Resp_active(bmap # active) | `Req_filled -> `Resp_filled(bmap # filled) | `Req_stats -> `Resp_stats(bmap # stats) | `Req_has_free_blocks -> `Resp_has_free_blocks(bmap # has_free_blocks) | `Req_fill_blockmap(index,row) -> bmap # fill_blockmap index row; `Resp_fill_blockmap | `Req_fill_from_db(index,n) -> bmap # fill_from_db index n; `Resp_fill_from_db | `Req_fill_done -> bmap # fill_done(); `Resp_fill_done | `Req_reserve(n,owner,ri) -> let l = bmap # reserve n owner ri in `Resp_reserve l | `Req_free(l,owner) -> bmap # free l owner; `Resp_free | `Req_pin(l,owner) -> bmap # pin l owner; `Resp_pin | `Req_commit owner -> bmap # commit owner; `Resp_commit | `Req_rollback owner -> bmap # rollback owner; `Resp_rollback | `Req_release owner -> bmap # release owner; `Resp_release | `Req_get_changes_start owner -> let l = bmap # get_changes owner in let l1, l2 = nsplit 10 l in if l2 <> [] then Hashtbl.replace getch_map id l2; `Resp_get_changes(l2 <> [], l1) | `Req_get_changes_cont -> let l = try Hashtbl.find getch_map id with Not_found -> failwith "Req_get_changes_cont: not found" in let l1, l2 = nsplit 10 l in if l2 = [] then Hashtbl.remove getch_map id else Hashtbl.replace getch_map id l2; `Resp_get_changes(l2 <> [], l1) | `Req_inactivate -> assert false | `Req_shutdown -> assert false let serve () = let () = Netlog.logf `Info "Blockmap server starting up" in let () = init() in let rbox, rbox_id = Netmcore_camlbox.create_camlbox "plasma_nn_bmaps" 8 box_size in let () = Pfs_pmanage.register_shm (Netmcore.get_shm rbox_id) in let () = Netlog.logf `Info "Blockmap server publishing camlbox" in let () = Res_var.set blockmap_res_id_name rbox_id in let () = Netlog.logf `Info "Blockmap server camlbox published" in let bmaps = Hashtbl.create 5 in let getch_map = Hashtbl.create 5 in let client_boxes = Hashtbl.create 5 in let client_used = Hashtbl.create 5 in let t_hk = ref (Unix.time()) in let lookup_client id = let box = try Hashtbl.find client_boxes id with Not_found -> dlog "Looking up camlbox"; let box = Netmcore_camlbox.lookup_camlbox_sender id in dlog "Got camlbox"; Hashtbl.add client_boxes id box; box in Hashtbl.replace client_used id (); box in let up = ref true in while !up do let reqs = s_fetch rbox in List.iter (fun req -> dlogr (fun () -> sprintf "Got request: %s" (string_of_request req.req)); let bmap_id = req.req_bmap.blockmap_id in let sbox_id = req.req_response_box in (* Some requests work even if the bmap does not yet exist. Check for these requests first *) let resp_msg = match req.req with | `Req_access -> let create_flag = not (Hashtbl.mem bmaps bmap_id) in if create_flag then ( dlog "about to create blockmap"; let bmap = new blockmap ~id:bmap_id ~identity:req.req_bmap.blockmap_identity ~size:req.req_bmap.blockmap_size in Hashtbl.add bmaps bmap_id bmap; dlog "created blockmap"; ); { resp = `Resp_access create_flag } | `Req_inactivate -> ( try let bmap = Hashtbl.find bmaps bmap_id in bmap # inactivate() with Not_found -> () ); Hashtbl.remove bmaps bmap_id; { resp = `Resp_inactivate } | `Req_shutdown -> up := false; { resp = `Resp_shutdown } | _ -> ( try let bmap = try Hashtbl.find bmaps bmap_id with Not_found -> raise Inactive in let resp = s_exec getch_map bmap req.req sbox_id in { resp = resp } with | Inactive -> { resp = `Exn_inactive } | Failure msg -> { resp = `Exn_failure msg } | Invalid_argument msg -> { resp = `Exn_invalid_argument msg } ) in try let sbox = lookup_client sbox_id in dlogr (fun () -> sprintf "Responding: %s" (string_of_response resp_msg.resp)); Netcamlbox.camlbox_send sbox resp_msg with | Netmcore.No_resource(`Resource id) -> (* lookup failed *) Netlog.logf `Err "Cannot find response box: rbox_id=%d" id | error -> Netlog.logf `Crit "Exception %s" (Netexn.to_string error) ) reqs; let t = Unix.time() in if not !up || t -. 60.0 > !t_hk then ( (* housekeeping: remove/release all boxes that were not accessed in the last round of processing *) let l_remove = ref [] in Hashtbl.iter (fun id _ -> if not !up || not (Hashtbl.mem client_used id) then l_remove := id :: !l_remove ) client_boxes; List.iter (fun id -> Netmcore.release id; Hashtbl.remove client_boxes id; Hashtbl.remove getch_map id ) !l_remove; Hashtbl.clear client_used; t_hk := t ) done