Plasma GitLab Archive
Projects Blog Knowledge

(* This test creates a number of concurrent connections to PlasmaFS.
   Every connection creates, and modifies a file, and finally deletes it.
   It is checked whether
   the parallel requests are handled in the right way. In particular,
   the effectiveness of the commit monitor is tested (because the commits here
   will be serialized - all targeting the same directory).
 *)

open Uq_engines.Operators
open Printf


let delay = 0.0

let submit_e cc esys j n =
  printf "Start\n%!";

  let c = Plasma_client.open_cluster_cc cc esys in
  Plasma_client.configure_auth_daemon c;
  let ii = Plasma_client.regular_ii c 0o666 in

  let n_ok = ref 0 in
  let name = sprintf "/testfile_%d" j in

  let rec submit_loop_e k =
    if k > 0 then
      Plasma_client.start_e c
      ++ (fun trans ->
	    create_e trans name
	    ++ (fun () ->
		  Plasma_client.commit_e trans
	       )
	 )
      ++ (fun () -> 
	    Plasma_client.start_e c)
      ++ (fun trans ->
	    delete_e trans name
	    ++ (fun () ->
		  Plasma_client.commit_e trans
	       )
	 )
      ++ (fun () ->
	    incr n_ok;
	    submit_loop_e (k-1)
	 )
    else
      eps_e (`Done()) esys

  and create_e trans name =
    Plasma_client.create_inode_e trans ii
    ++ (fun inode ->
	  (* eps_e (`Done()) esys *)
	  Plasma_client.link_e trans name inode
       )
  and delete_e trans name =
    Plasma_client.unlink_e trans name
  in

  submit_loop_e n
  >> (fun st ->
	Plasma_client.close_cluster c;
	( match st with
	    | `Done () ->
		printf "Client results: n_ok=%d\n%!" !n_ok
	    | `Error err ->
		printf "Exception: %s\n%!" (Netexn.to_string err);
		printf "(Before error: Client results: n_ok=%d)\n%!" !n_ok
	    | `Aborted ->
		printf "Aborted.\n%!"
	);
	`Done ()
     )


let main () =
  let cluster = ref None in
  let namenodes = ref [] in

  let n = ref 100 in
  let p = ref 4 in
  let fork = ref false in
  Arg.parse
    [ "-cluster", Arg.String (fun s -> cluster := Some s),
      "<name>    Set the cluster name";
      
      "-namenode", Arg.String (fun n -> namenodes := n :: !namenodes),
      "<host>:<port>   Also use this namenode (can be given several times)";

      "-n", Arg.Set_int n,
      "<num>   Number of file operations per client";

      "-p", Arg.Set_int p,
      "<num>   Number of parallel clients";

      "-fork", Arg.Set fork,
      "    Use several processes for testing";
    ]
    (fun arg ->
       raise(Arg.Bad("Unexpected arg: " ^ arg))
    )
    "usage: ps_link_unlink";

  let nn_nodes =
    if !namenodes = [] then None else Some !namenodes in
  let cc = Plasma_client_config.get_config
    ?clustername:!cluster
    ?nn_nodes () in
  (* let inode = prepare_file cc in *)

  let esys = Unixqueue.create_unix_event_system() in
  let pids = ref [] in
  for j = 1 to !p do
    if !fork then
      match Unix.fork() with
	| 0 ->
	    let _e = submit_e cc esys j !n in
	    Unixqueue.run esys;
	    exit 0
	| pid -> pids := pid :: !pids
    else
      let _e = submit_e cc esys j !n in
      ()
  done;
  if !fork then
    List.iter
      (fun pid ->
	 ignore(Unix.waitpid [] pid)
      )
      !pids
  else
    Unixqueue.run esys;

  printf "Done\n%!"


let () = main()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml