Plasma GitLab Archive
Projects Blog Knowledge

(* This test creates a number of concurrent connections to PlasmaFS.
   Every connection creates and modifies a file. It is checked whether
   the parallel requests are handled in the right way. In particular,
   the effectiveness of the inode serializer is tested (for generating
   unique IDs).

   There were reproducible hangs for Plasma-0.5 (fixed in 0.5.1):
   ./ps_concurrent_writes -n 100 -p 100 -commit

   (Setup: Installed a single namenode on a quad-core machine with
   20 processes. Also, it is needed to scale authnode a bit up.)

 *)

open Uq_engines.Operators
open Printf


let delay = 0.0

let submit_e cc esys n commit_flag =
  printf "Start\n%!";

  let c = Plasma_client.open_cluster_cc cc esys in
  Plasma_client.configure_auth_daemon c;
  let ii = Plasma_client.regular_ii c 0o666 in

  let n_conflict = ref 0 in
  let n_ok = ref 0 in
  let n_exist = ref 0 in
  let n_error = ref 0 in

  let rec submit_loop_e k =
    if k > 0 then
      Plasma_client.start_e c
      ++ (fun trans ->
	    create_loop_e trans
	    ++ (fun () ->
		  if commit_flag then
		    Plasma_client.commit_e trans
		  else
		    Plasma_client.abort_e trans
	       )
	 )
      ++ (fun () ->
	    submit_loop_e (k-1)
	 )
    else
      eps_e (`Done()) esys

  and create_loop_e trans =
    Plasma_client.create_inode_e trans ii
    ++ (fun inode ->
	  (* eps_e (`Done()) esys *)
	  Uq_engines.meta_engine
	    (Plasma_client.link_e trans "/target" inode)
	  ++ (fun st ->
		( match st with
		    | `Done() ->
			incr n_ok;
			Uq_engines.delay_engine delay
			  (fun () -> eps_e (`Done()) esys)
			  esys
		    | `Error (Plasma_client.Plasma_error `eexist) ->
			incr n_exist;
			Uq_engines.delay_engine delay
			  (fun () -> eps_e (`Done()) esys)
			  esys
		    | `Error (Plasma_client.Plasma_error `econflict) ->
			incr n_conflict;
			create_loop_e trans
		    | `Error other ->
			eprintf "Error: %s\n%!"
			  (Netexn.to_string other);
			incr n_error;
			eps_e (`Done()) esys
		    | `Aborted ->
			assert false
		);
	     )
       )
  in

  submit_loop_e n
  ++ (fun () ->
	printf "Client results: n_ok=%d n_exist=%d n_conflict=%d n_error=%d\n%!"
	  !n_ok !n_exist !n_conflict !n_error;
	Plasma_client.close_cluster c;
	eps_e (`Done()) esys
     )


(*
let submit_e cc esys n commit_flag =
  printf "Start\n%!";

  let c = Plasma_client.open_cluster_cc cc esys in
  Plasma_client.configure_auth_daemon c;
  let ii = Plasma_client.regular_ii c 0o666 in

  let n_conflict = ref 0 in
  let n_ok = ref 0 in
  let n_exist = ref 0 in
  let n_error = ref 0 in

  let rec submit_loop_e trans k =
    if k > 0 then
      Plasma_client.create_inode_e trans ii
      ++ (fun _inode ->
	    submit_loop_e trans (k-1)
	 )
    else
      eps_e (`Done()) esys
  in

  Plasma_client.start_e c
  ++ (fun trans ->
	submit_loop_e trans n
	++ (fun () ->
	      printf "Client results: n_ok=%d n_exist=%d n_conflict=%d n_error=%d\n%!"
		!n_ok !n_exist !n_conflict !n_error;
	      Plasma_client.close_cluster c;
	      eps_e (`Done()) esys
	   )
     )
 *)

(*
let prepare_file cc =
  let esys = Unixqueue.create_unix_event_system() in
  let c = Plasma_client.open_cluster_cc cc esys in
  Plasma_client.configure_auth_daemon c;

  let trans = Plasma_client.start c in
  let ii = Plasma_client.regular_ii c 0o666 in
  let inode =
    try
      Plasma_client.create_file trans "/target" ii
    with
      | Plasma_client.Plasma_error `eexist ->
	  Plasma_client.lookup trans "/target" false in
  Plasma_client.commit trans;
  Plasma_client.close_cluster c;
  inode
 *)


let main () =
  let cluster = ref None in
  let namenodes = ref [] in

  let n = ref 100 in
  let p = ref 4 in
  let fork = ref false in
  let commit = ref false in
  Arg.parse
    [ "-cluster", Arg.String (fun s -> cluster := Some s),
      "<name>    Set the cluster name";
      
      "-namenode", Arg.String (fun n -> namenodes := n :: !namenodes),
      "<host>:<port>   Also use this namenode (can be given several times)";

      "-n", Arg.Set_int n,
      "<num>   Number of file operations per client";

      "-p", Arg.Set_int p,
      "<num>   Number of parallel clients";

      "-fork", Arg.Set fork,
      "    Use several processes for testing";

      "-commit", Arg.Set commit,
      "    Commit transactions";
    ]
    (fun arg ->
       raise(Arg.Bad("Unexpected arg: " ^ arg))
    )
    "usage: ps_concurrent_writes";

  let nn_nodes =
    if !namenodes = [] then None else Some !namenodes in
  let cc = Plasma_client_config.get_config
    ?clustername:!cluster
    ?nn_nodes () in
  (* let inode = prepare_file cc in *)

  let esys = Unixqueue.create_unix_event_system() in
  let pids = ref [] in
  for k = 1 to !p do
    if !fork then
      match Unix.fork() with
	| 0 ->
	    let _e = submit_e cc esys !n !commit in
	    Unixqueue.run esys;
	    exit 0
	| pid -> pids := pid :: !pids
    else
      let _e = submit_e cc esys !n !commit in
      ()
  done;
  if !fork then
    List.iter
      (fun pid ->
	 ignore(Unix.waitpid [] pid)
      )
      !pids
  else
    Unixqueue.run esys;

  printf "Done\n%!"


let () = main()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml