Plasma GitLab Archive
Projects Blog Knowledge

(* N subprocesses solve matrix multiplications 

   Each child has a camlbox where it receives matrices (of type
   float array array) to multiply. The master also has a camlbox where
   it receives the results.

   For simplicity of the example we only support sqare matrices.

   Invoke like:

   ./manymult <n> <size> <n_workers>

   where:
   - <n> = number of matrices to multiply
   - <size> = size of the matrices
   - <n_workers> = number of children 
 *)

open Printf

type request =
    { req_id : int;
      size : int;
      left : float array array;
      right : float array array
    }

type response =
    { resp_id : int;
      worker_id : int;
      result : float array array
    }

type task =
    { request : request;
      mutable response_opt : response option
    }

type worker_arg =
    [ `Multiply of request
    | `Terminate
    ]

type worker_ret =
   [ `Ready of int   (* worker_id *)
   | `Response of response
   ]

let msg_size =
  1024 * 1024  (* 1M fixed size *)

let create_tasks n size =
  Array.init
    n
    (fun k ->
       let left =
	 Array.init size
	   (fun _ -> 
	      Array.init size
		(fun _ ->
		   Random.float 1.0
		)) in
       let right =
	 Array.init size
	   (fun _ -> 
	      Array.init size
		(fun _ ->
		   Random.float 1.0
		)) in
       let req =
	 { req_id = k;
	   size = size;
	   left = left;
	   right = right;
	 } in
       { request = req;
	 response_opt = None
       }
    )

let multiply wid req =
  let r = Array.make_matrix req.size req.size 0.0 in
  for row = 0 to req.size-1 do
    for col = 0 to req.size-1 do
      let s = ref 0.0 in
      for j = 0 to req.size-1 do
	s := !s +. req.left.(j).(col) *. req.right.(row).(j)
      done;
      r.(row).(col) <- !s
    done
  done;
  { resp_id = req.req_id;
    worker_id = wid;
    result = r
  }

let worker wid wbox mname mfd =
  let ms = Netcamlbox.camlbox_sender_of_fd mname mfd in
  Netcamlbox.unlink_camlbox (Netcamlbox.camlbox_addr wbox);
  Netcamlbox.camlbox_send ms (`Ready wid);
  let cont = ref true in
  while !cont do
    let new_list =
      Netcamlbox.camlbox_wait wbox in
    List.iter
      (fun k ->
	 let (msg : worker_arg ref) =
	   Netcamlbox.camlbox_get wbox k in
	 match !msg with
	   | `Multiply req ->
	       let (resp : response) = multiply wid req in
	       Netcamlbox.camlbox_delete wbox k;
	       Netcamlbox.camlbox_send ms (`Response resp)
	   | `Terminate ->
	       cont := false
      )
      new_list
  done;
  exit 0

let wslots = 2

let prepare n_workers =
  (* Create master box: *)
  let mname = "camlbox_" ^ string_of_int(Unix.getpid()) in
  let mbox = Netcamlbox.create_camlbox mname (2*n_workers) msg_size in
  let mfd = Netcamlbox.camlbox_fd mname in
  
  (* Create worker boxes: *)
  let wboxes =
    Array.init n_workers
      (fun k ->
	 let wname = mname ^ "_" ^ string_of_int k in
	 let wbox = Netcamlbox.create_camlbox wname wslots msg_size in
	 let ws = Netcamlbox.camlbox_sender wname in
	 (wbox, ws, ref 2)
      ) in

  (* Fork workers: *)
  let pids = ref [] in
  Array.iteri
    (fun wid (wbox, _, _) ->
       match Unix.fork() with
	 | 0 ->
	     worker wid wbox mname mfd
	 | pid ->
	     pids := pid :: !pids
    )
    wboxes;

  (mbox, wboxes, !pids)


let wait_until_ready n_workers mbox =
  let missing = ref n_workers in
  while !missing > 0 do
    let idx_list = Netcamlbox.camlbox_wait mbox in
    List.iter
      (fun idx ->
         let (msg : worker_ret) = Netcamlbox.camlbox_get mbox idx in
         ( match msg with
             | `Ready _ -> decr missing
             | _ -> assert false
         );
         Netcamlbox.camlbox_delete mbox idx
      )
      idx_list
  done


let master tasks n_workers (mbox, wboxes, pids) =
  (* Loop *)
  let unresponded = ref (Array.length tasks) in
  let todo =
    ref 
      (Array.to_list
	 (Array.init 
	    (Array.length tasks)
	    (fun k -> k))) in
  let free_slots = ref(n_workers * wslots) in
  while !unresponded > 0 do
    if !free_slots > 0 && !todo <> [] then (
      (* Submit new request: *)
      let k_task = List.hd !todo in
      todo := List.tl !todo;
      let task = tasks.(k_task) in
      let submitted = ref false in
      Array.iter
	(fun (_, ws, wfree) ->
	   if not !submitted && !wfree > 0 then (
	     let (msg : worker_arg ref) = ref (`Multiply task.request) in
	     Netcamlbox.camlbox_send ws msg;
	     decr wfree;
	     decr free_slots;
	     submitted := true
	   )
	)
	wboxes;
      assert(!submitted)
    )
    else (
      (* All workers busy: wait for response *)
      let idx_list = Netcamlbox.camlbox_wait mbox in
      List.iter
	(fun idx ->
	   let (msg : worker_ret) = Netcamlbox.camlbox_get_copy mbox idx in
	   Netcamlbox.camlbox_delete mbox idx;
           match msg with
             | `Response resp ->
   	          tasks.( resp.resp_id ).response_opt <- Some resp;
	          let (_, _, wfree) = wboxes.( resp.worker_id ) in
	          incr wfree;
	          incr free_slots;
	          decr unresponded;
             | _ -> assert false
	)
	idx_list
    )
  done;

  (* Check that we've got all results: *)
  Array.iter
    (fun task ->
       assert(task.response_opt <> None)
    )
    tasks;

  (* Ask the children to terminate: *)
  Array.iter
    (fun (_, ws, _) ->
       let (msg : worker_arg ref) = ref `Terminate in
       Netcamlbox.camlbox_send ws msg;
    )
    wboxes;

  (* Collect children: *)
  List.iter
    (fun pid ->
       ignore(Unix.waitpid [] pid)
    )
    pids


let main() =
  let n = int_of_string Sys.argv.(1) in
  let size = int_of_string Sys.argv.(2) in
  let n_workers = int_of_string Sys.argv.(3) in

  printf "Forking children...\n%!";
  let boxtuple = prepare n_workers in
  printf "Creating tasks...\n%!";
  let tasks = create_tasks n size in
  printf "Performing tasks...\n%!";
  let t0 = Unix.gettimeofday() in
  let (mbox,_,_) = boxtuple in
  wait_until_ready n_workers mbox;
  Netcamlbox.unlink_camlbox (Netcamlbox.camlbox_addr mbox);
  master tasks n_workers boxtuple;
  let t1 = Unix.gettimeofday() in
  printf "t = %f\n%!" (t1-.t0)
  


let () =
  main()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml