Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2011 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: nn_blockmap_box.ml 431 2011-10-05 20:27:11Z gerd $ *)

open Nn_blockmap
open Printf

let dlog = Plasma_util.dlog
let dlogr = Plasma_util.dlogr


class type blockmap_client_t =
object
  method access : id:int -> identity:string -> size:int64 -> bool * blockmap_t
  method inactivate : id:int -> unit
  method close : unit -> unit
  method shutdown : unit -> unit
end


let box_size = max (128 * 1024) (128 * Nn_config.bmaprowsize)
  (* normally 128K per message.

     How much do we need:
     - reserve: the response list has a max length of bmaprowsize (1024). Each
       list element counts as 12 words -> ~100 bytes on 64bit platforms
     - free, pin: the arg list has usually only a length of 1
     - get_changes: we only transfer 10 elements per req/resp cycle
       (~20K)
   *)


type request =
    [ `Req_access
    | `Req_active
    | `Req_filled
    | `Req_stats
    | `Req_fill_blockmap of int64 * string
    | `Req_fill_from_db of int64 * int
    | `Req_fill_done
    | `Req_has_free_blocks
    | `Req_reserve of int * owner * reserve_info
    | `Req_free of triv_range list * owner
    | `Req_pin of triv_range list * owner
    | `Req_commit of owner
    | `Req_rollback of owner
    | `Req_release of owner
    | `Req_get_changes_start of owner
    | `Req_get_changes_cont
    | `Req_inactivate
    | `Req_shutdown
    ]


let string_of_request =
  function
    | `Req_access -> "access"
    | `Req_active -> "active"
    | `Req_filled -> "filled"
    | `Req_stats -> "stats"
    | `Req_fill_blockmap(idx,_) -> sprintf "fill_blockmap(idx=%Ld)" idx
    | `Req_fill_from_db(idx,n) -> sprintf "fill_from_db(idx=%Ld,n=%d)" idx n
    | `Req_fill_done -> "fill_done"
    | `Req_has_free_blocks -> "has_free_blocks"
    | `Req_reserve(n,owner,ri) -> 
	sprintf "reserve(n=%d,owner=%s)" n (string_of_owner owner)
    | `Req_free(l,owner) -> 
	sprintf "free(%s,owner=%s)" (string_of_trl l) (string_of_owner owner)
    | `Req_pin(l,owner) ->
	sprintf "pin(%s,owner=%s)" (string_of_trl l) (string_of_owner owner)
    | `Req_commit owner -> 
	sprintf "commit(owner=%s)" (string_of_owner owner)
    | `Req_rollback owner -> 
	sprintf "rollback(owner=%s)" (string_of_owner owner)
    | `Req_release owner -> 
	sprintf "release(owner=%s)" (string_of_owner owner)
    | `Req_get_changes_start owner -> 
	sprintf "get_changes_start(owner=%s)" (string_of_owner owner)
    | `Req_get_changes_cont -> "get_changes_cont"
    | `Req_inactivate -> "inactivate"
    | `Req_shutdown -> "shutdown"

type bmap =
    { blockmap_id : int;
      blockmap_identity : string;
      blockmap_size : int64;
    }


type request_msg =
    { req_bmap : bmap;
      req_response_box : Netmcore.res_id;
      req : request
    }

type respexn =
    [ `Exn_inactive
    | `Exn_failure of string
    | `Exn_invalid_argument of string
    ]

let string_of_respexn =
  function
    | `Exn_inactive -> "Exn_inactive"
    | `Exn_failure msg -> "Exn_failure(" ^ msg ^ ")"
    | `Exn_invalid_argument msg -> "Exn_invalid_arg(" ^ msg ^ ")"

type response =
    [ `Resp_access of bool
	(* flag: whether the blockmap has been implicitly created *)
    | `Resp_active of bool
    | `Resp_filled of bool
    | `Resp_stats of int64 * int64 * int64
    | `Resp_fill_blockmap
    | `Resp_fill_from_db
    | `Resp_fill_done
    | `Resp_has_free_blocks of bool
    | `Resp_reserve of triv_range list
    | `Resp_free
    | `Resp_pin
    | `Resp_commit
    | `Resp_rollback
    | `Resp_release
    | `Resp_get_changes of bool * (int64 * string) list
    | `Resp_inactivate
    | `Resp_shutdown
    | respexn
    ]

let string_of_response =
  function
    | `Resp_access flag -> sprintf "access(created=%B)" flag
    | `Resp_active flag -> sprintf "active(active=%B)" flag
    | `Resp_filled flag -> sprintf "filled(filled=%B)" flag
    | `Resp_stats(u,t,f) -> sprintf "stats(used=%Ld,trans=%Ld,free=%Ld)" u t f
    | `Resp_fill_blockmap -> "fill_blockmap"
    | `Resp_fill_from_db -> "fill_from_db"
    | `Resp_fill_done -> "fill_done"
    | `Resp_has_free_blocks flag -> sprintf "has_free_blocks(has=%B)" flag
    | `Resp_reserve l -> sprintf "reserve(%s)" (string_of_trl l)
    | `Resp_free -> "free2"
    | `Resp_pin -> "pin"
    | `Resp_commit -> "commit"
    | `Resp_rollback -> "rollback"
    | `Resp_release -> "release"
    | `Resp_get_changes(more,l) -> 
	sprintf "get_changes(more=%B,rows=%s)" more
	  (String.concat "," (List.map (fun (idx,_) -> Int64.to_string idx) l))
    | `Resp_inactivate -> "inactivate"
    | `Resp_shutdown -> "shutdown"
    | #respexn as e -> string_of_respexn e

type response_msg =
    { resp : response
    }


module Res_var =
  Netplex_sharedvar.Make_var_type(struct type t = Netmcore.res_id end)

let blockmap_res_id_name =
  "plasma.namenode.blockmap_res_id"

let init() =
  let flag = 
    Netplex_sharedvar.create_var 
      ~enc:true
      blockmap_res_id_name in
  dlogr (fun () -> sprintf "Nn_blockmap_box: create_var -> %B" flag)


let send rbox_id sbox bmap req =
  dlogr (fun () ->
	   sprintf "Nn_blockmap_box.send: req=%s" (string_of_request req));
  let req_msg =
    { req_bmap = bmap;
      req_response_box = rbox_id;
      req = req
    } in
  Netcamlbox.camlbox_send sbox req_msg


let call rbox rbox_id sbox bmap req decode =
  send rbox_id sbox bmap req;
  let rl = ref [] in
  while !rl = [] do
    let sl = Netcamlbox.camlbox_wait rbox in
    rl := List.map (Netcamlbox.camlbox_get_copy rbox) sl;
    List.iter (Netcamlbox.camlbox_delete rbox) sl
  done;
  match !rl with
    | [resp_msg] ->
	dlogr
	  (fun () ->
	     sprintf "Nn_blockmap_box.call resp=%s"
	       (string_of_response resp_msg.resp));
	( match resp_msg.resp with
	    | `Exn_inactive ->
		raise Inactive
	    | `Exn_failure s ->
		failwith (s ^ " (via camlbox)")
	    | `Exn_invalid_argument s ->
		invalid_arg (s ^ " (via camlbox)")
	    | _ ->
		decode resp_msg.resp
	)
    | _ ->
	assert false


let expect x y =
  if x <> y then
    failwith "Nn_blockmap_box.expect"



let bm_client id identity size rbox rbox_id sbox bmap : blockmap_t =
  let _ = (rbox : response_msg Netcamlbox.camlbox) in
  let _ = (sbox : request_msg Netcamlbox.camlbox_sender) in
object(self)
  method active =
    call rbox rbox_id sbox bmap `Req_active
      (function
	 | `Resp_active b -> b
	 | _ -> assert false
      )

  method filled =
    call rbox rbox_id sbox bmap `Req_filled
      (function
	 | `Resp_filled b -> b
	 | _ -> assert false
      )

  method id =
    id

  method identity =
    identity

  method size =
    size

  method stats =
    call rbox rbox_id sbox bmap `Req_stats
      (function
	 | `Resp_stats (nu,nt,nf) -> (nu,nt,nf)
	 | _ -> assert false
      )

  method fill_blockmap index data =
    call rbox rbox_id sbox bmap
      (`Req_fill_blockmap(index,data)) (expect `Resp_fill_blockmap)

  method fill_from_db index n =
    call rbox rbox_id sbox bmap
      (`Req_fill_from_db(index,n)) (expect `Resp_fill_from_db)

  method fill_done () =
    call rbox rbox_id sbox bmap `Req_fill_done (expect `Resp_fill_done)

  method has_free_blocks =
    call rbox rbox_id sbox bmap `Req_has_free_blocks
      (function
	 | `Resp_has_free_blocks b -> b
	 | _ -> assert false
      )

  method reserve n owner ri =
    call rbox rbox_id sbox bmap (`Req_reserve(n,owner,ri))
      (function
	 | `Resp_reserve l -> l
	 | _ -> assert false
      )

  method free l owner =
    call rbox rbox_id sbox bmap (`Req_free(l,owner)) (expect `Resp_free)

  method pin l owner =
    call rbox rbox_id sbox bmap (`Req_pin(l,owner)) (expect `Resp_pin)

  method commit owner =
    call rbox rbox_id sbox bmap (`Req_commit owner) (expect `Resp_commit)

  method rollback owner =
    call rbox rbox_id sbox bmap (`Req_rollback owner) (expect `Resp_rollback)

  method release owner =
    call rbox rbox_id sbox bmap (`Req_release owner) (expect `Resp_release)

  method get_changes owner =
    let l = ref [] in
    let c = ref false in
    call rbox rbox_id sbox bmap (`Req_get_changes_start owner)
      (function
	 | `Resp_get_changes(more,part) ->
	     l := List.rev part @ !l;
	     c := more
	 | _ -> assert false
      );
    while !c do
      call rbox rbox_id sbox bmap `Req_get_changes_cont
	(function
	   | `Resp_get_changes(more,part) ->
	       l := List.rev part @ !l;
	       c := more
	   | _ -> assert false
	);
    done;
    List.rev !l

  method inactivate() =
    call rbox rbox_id sbox bmap `Req_inactivate (expect `Resp_inactivate)

  method dump =
    failwith "Nn_blockmap_box.dump: not implemented in client"
end


let client() : blockmap_client_t =
  let () =
    init() in
  let rbox, rbox_id =
    Netmcore_camlbox.create_camlbox "plasma_nn_bmapc" 2 box_size in
  let () =
    Pfs_pmanage.register_shm (Netmcore.get_shm rbox_id) in
  let () = 
    dlog "Nn_blockmap_box.client: waiting for server" in
  let _ =
    Netplex_sharedvar.wait_for_enc_value blockmap_res_id_name in
 let () = 
    dlog "Nn_blockmap_box.client: server ready" in
  let sbox_id =
    Res_var.get blockmap_res_id_name in
  let (sbox : request_msg Netcamlbox.camlbox_sender) =
    Netmcore_camlbox.lookup_camlbox_sender sbox_id in
  let () =
    dlog "Nn_blockmap_box.client: ready" in
object(self)
  method access ~id ~identity ~size =
    let bmap =
      { blockmap_id = id;
	blockmap_identity = identity;
	blockmap_size = size
      } in
    call rbox rbox_id sbox bmap `Req_access 
      (function
	 | `Resp_access create_flag ->
	     let c = bm_client id identity size rbox rbox_id sbox bmap in
	     (create_flag,c)
	 | _ ->
	     assert false
      )

  method inactivate ~id =
    let bmap =
      { blockmap_id = id;
	blockmap_identity = "";
	blockmap_size = 0L
      } in
    let c = bm_client id "" 0L rbox rbox_id sbox bmap in
    c # inactivate()

  method shutdown() =
    let bmap =
      { blockmap_id = 0;
	blockmap_identity = "";
	blockmap_size = 0L
      } in
    call rbox rbox_id sbox bmap `Req_shutdown (expect `Resp_shutdown)

  method close() =
    Netmcore.release sbox_id

end


let s_fetch rbox =
  let rl = ref [] in
  while !rl = [] do
    let sl = Netcamlbox.camlbox_wait rbox in
    rl := List.map (Netcamlbox.camlbox_get_copy rbox) sl;
    List.iter (Netcamlbox.camlbox_delete rbox) sl
  done;
  !rl


let rec nsplit n l =
  if n=0 then ([], l) else
    match l with
      | x :: r ->
	  let l1, l2 = nsplit (n-1) r in
	  (x::l1), l2
      | [] ->
	  ([], [])


let s_exec getch_map bmap req id =
  match req with
    | `Req_access -> assert false
    | `Req_active -> `Resp_active(bmap # active)
    | `Req_filled -> `Resp_filled(bmap # filled)
    | `Req_stats -> `Resp_stats(bmap # stats)
    | `Req_has_free_blocks -> `Resp_has_free_blocks(bmap # has_free_blocks)
    | `Req_fill_blockmap(index,row) -> 
	bmap # fill_blockmap index row;
	`Resp_fill_blockmap
    | `Req_fill_from_db(index,n) ->
	bmap # fill_from_db index n;
	`Resp_fill_from_db
    | `Req_fill_done ->
	bmap # fill_done();
	`Resp_fill_done
    | `Req_reserve(n,owner,ri) ->
	let l = bmap # reserve n owner ri in
	`Resp_reserve l
    | `Req_free(l,owner) ->
	bmap # free l owner;
	`Resp_free
    | `Req_pin(l,owner) ->
	bmap # pin l owner;
	`Resp_pin
    | `Req_commit owner ->
	bmap # commit owner;
	`Resp_commit
    | `Req_rollback owner ->
	bmap # rollback owner;
	`Resp_rollback
    | `Req_release owner ->
	bmap # release owner;
	`Resp_release
    | `Req_get_changes_start owner ->
	let l = bmap # get_changes owner in
	let l1, l2 = nsplit 10 l in
	if l2 <> [] then
	  Hashtbl.replace getch_map id l2;
	`Resp_get_changes(l2 <> [], l1)
    | `Req_get_changes_cont ->
	let l =
	  try Hashtbl.find getch_map id 
	  with Not_found -> failwith "Req_get_changes_cont: not found" in
	let l1, l2 = nsplit 10 l in
	if l2 = [] then
	  Hashtbl.remove getch_map id
	else
	  Hashtbl.replace getch_map id l2;
	`Resp_get_changes(l2 <> [], l1)
    | `Req_inactivate ->
	assert false
    | `Req_shutdown ->
	assert false


let serve () =
  let () =
    Netlog.logf `Info "Blockmap server starting up" in
  let () =
    init() in
  let rbox, rbox_id =
    Netmcore_camlbox.create_camlbox "plasma_nn_bmaps" 8 box_size in
  let () =
    Pfs_pmanage.register_shm (Netmcore.get_shm rbox_id) in
  let () =
    Netlog.logf `Info "Blockmap server publishing camlbox" in
  let () =
    Res_var.set blockmap_res_id_name rbox_id in
  let () =
    Netlog.logf `Info "Blockmap server camlbox published" in
  let bmaps =
    Hashtbl.create 5 in
  let getch_map =
    Hashtbl.create 5 in
  let client_boxes =
    Hashtbl.create 5 in
  let client_used =
    Hashtbl.create 5 in
  let t_hk =
    ref (Unix.time()) in
  let lookup_client id =
    let box =
      try Hashtbl.find client_boxes id
      with Not_found ->
	dlog "Looking up camlbox";
	let box =
	  Netmcore_camlbox.lookup_camlbox_sender id in
	dlog "Got camlbox";
	Hashtbl.add client_boxes id box;
	box in
    Hashtbl.replace client_used id ();
    box in
  let up = ref true in
  while !up do
    let reqs =
      s_fetch rbox in
    List.iter
      (fun req ->
	 dlogr (fun () ->
		  sprintf "Got request: %s" (string_of_request req.req));
	 let bmap_id = req.req_bmap.blockmap_id in
	 let sbox_id = req.req_response_box in
	 (* Some requests work even if the bmap does not yet exist. Check
	    for these requests first
	  *)
	 let resp_msg =
	   match req.req with
	     | `Req_access ->
		 let create_flag =
		   not (Hashtbl.mem bmaps bmap_id) in
		 if create_flag then (
		   dlog "about to create blockmap";
		   let bmap = 
		     new blockmap 
		       ~id:bmap_id
		       ~identity:req.req_bmap.blockmap_identity
		       ~size:req.req_bmap.blockmap_size in
		   Hashtbl.add bmaps bmap_id bmap;
		   dlog "created blockmap";
		 );
		 { resp = `Resp_access create_flag }
	     | `Req_inactivate ->
		 ( try
		     let bmap = Hashtbl.find bmaps bmap_id in
		     bmap # inactivate()
		   with Not_found -> ()
		 );
		 Hashtbl.remove bmaps bmap_id;
		 { resp = `Resp_inactivate }
	     | `Req_shutdown ->
		 up := false;
		 { resp = `Resp_shutdown }
	     | _ ->
		 ( try
		     let bmap =
		       try Hashtbl.find bmaps bmap_id
		       with Not_found -> raise Inactive in
		     let resp = s_exec getch_map bmap req.req sbox_id in
		     { resp = resp }
		   with
		     | Inactive ->
			 { resp = `Exn_inactive }
		     | Failure msg ->
			 { resp = `Exn_failure msg }
		     | Invalid_argument msg ->
			 { resp = `Exn_invalid_argument msg }
		 ) in
	 try
	   let sbox = lookup_client sbox_id in
	   dlogr
	     (fun () ->
		sprintf "Responding: %s" (string_of_response resp_msg.resp));
	   Netcamlbox.camlbox_send sbox resp_msg	   
	 with
	   | Netmcore.No_resource(`Resource id) ->  (* lookup failed *)
	       Netlog.logf `Err "Cannot find response box: rbox_id=%d" id
	   | error ->
	       Netlog.logf `Crit "Exception %s" (Netexn.to_string error)
      )
      reqs;
    let t = Unix.time() in
    if not !up || t -. 60.0 > !t_hk then (
      (* housekeeping: remove/release all boxes that were not accessed in
	 the last round of processing
       *)
      let l_remove = ref [] in
      Hashtbl.iter
	(fun id _ ->
	   if not !up || not (Hashtbl.mem client_used id) then
	     l_remove := id :: !l_remove
	)
	client_boxes;
      List.iter
	(fun id ->
	   Netmcore.release id;
	   Hashtbl.remove client_boxes id;
	   Hashtbl.remove getch_map id
	)
	!l_remove;
      Hashtbl.clear client_used;
      t_hk := t
    )
  done

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml