(* N subprocesses solve matrix multiplications Each child has a camlbox where it receives matrices (of type float array array) to multiply. The master also has a camlbox where it receives the results. For simplicity of the example we only support sqare matrices. Invoke like: ./manymult <n> <size> <n_workers> where: - <n> = number of matrices to multiply - <size> = size of the matrices - <n_workers> = number of children *) open Printf type request = { req_id : int; size : int; left : float array array; right : float array array } type response = { resp_id : int; worker_id : int; result : float array array } type task = { request : request; mutable response_opt : response option } type worker_arg = [ `Multiply of request | `Terminate ] type worker_ret = [ `Ready of int (* worker_id *) | `Response of response ] let msg_size = 1024 * 1024 (* 1M fixed size *) let create_tasks n size = Array.init n (fun k -> let left = Array.init size (fun _ -> Array.init size (fun _ -> Random.float 1.0 )) in let right = Array.init size (fun _ -> Array.init size (fun _ -> Random.float 1.0 )) in let req = { req_id = k; size = size; left = left; right = right; } in { request = req; response_opt = None } ) let multiply wid req = let r = Array.make_matrix req.size req.size 0.0 in for row = 0 to req.size-1 do for col = 0 to req.size-1 do let s = ref 0.0 in for j = 0 to req.size-1 do s := !s +. req.left.(j).(col) *. req.right.(row).(j) done; r.(row).(col) <- !s done done; { resp_id = req.req_id; worker_id = wid; result = r } let worker wid wbox mname mfd = let ms = Netcamlbox.camlbox_sender_of_fd mname mfd in Netcamlbox.unlink_camlbox (Netcamlbox.camlbox_addr wbox); Netcamlbox.camlbox_send ms (`Ready wid); let cont = ref true in while !cont do let new_list = Netcamlbox.camlbox_wait wbox in List.iter (fun k -> let (msg : worker_arg ref) = Netcamlbox.camlbox_get wbox k in match !msg with | `Multiply req -> let (resp : response) = multiply wid req in Netcamlbox.camlbox_delete wbox k; Netcamlbox.camlbox_send ms (`Response resp) | `Terminate -> cont := false ) new_list done; exit 0 let wslots = 2 let prepare n_workers = (* Create master box: *) let mname = "camlbox_" ^ string_of_int(Unix.getpid()) in let mbox = Netcamlbox.create_camlbox mname (2*n_workers) msg_size in let mfd = Netcamlbox.camlbox_fd mname in (* Create worker boxes: *) let wboxes = Array.init n_workers (fun k -> let wname = mname ^ "_" ^ string_of_int k in let wbox = Netcamlbox.create_camlbox wname wslots msg_size in let ws = Netcamlbox.camlbox_sender wname in (wbox, ws, ref 2) ) in (* Fork workers: *) let pids = ref [] in Array.iteri (fun wid (wbox, _, _) -> match Unix.fork() with | 0 -> worker wid wbox mname mfd | pid -> pids := pid :: !pids ) wboxes; (mbox, wboxes, !pids) let wait_until_ready n_workers mbox = let missing = ref n_workers in while !missing > 0 do let idx_list = Netcamlbox.camlbox_wait mbox in List.iter (fun idx -> let (msg : worker_ret) = Netcamlbox.camlbox_get mbox idx in ( match msg with | `Ready _ -> decr missing | _ -> assert false ); Netcamlbox.camlbox_delete mbox idx ) idx_list done let master tasks n_workers (mbox, wboxes, pids) = (* Loop *) let unresponded = ref (Array.length tasks) in let todo = ref (Array.to_list (Array.init (Array.length tasks) (fun k -> k))) in let free_slots = ref(n_workers * wslots) in while !unresponded > 0 do if !free_slots > 0 && !todo <> [] then ( (* Submit new request: *) let k_task = List.hd !todo in todo := List.tl !todo; let task = tasks.(k_task) in let submitted = ref false in Array.iter (fun (_, ws, wfree) -> if not !submitted && !wfree > 0 then ( let (msg : worker_arg ref) = ref (`Multiply task.request) in Netcamlbox.camlbox_send ws msg; decr wfree; decr free_slots; submitted := true ) ) wboxes; assert(!submitted) ) else ( (* All workers busy: wait for response *) let idx_list = Netcamlbox.camlbox_wait mbox in List.iter (fun idx -> let (msg : worker_ret) = Netcamlbox.camlbox_get_copy mbox idx in Netcamlbox.camlbox_delete mbox idx; match msg with | `Response resp -> tasks.( resp.resp_id ).response_opt <- Some resp; let (_, _, wfree) = wboxes.( resp.worker_id ) in incr wfree; incr free_slots; decr unresponded; | _ -> assert false ) idx_list ) done; (* Check that we've got all results: *) Array.iter (fun task -> assert(task.response_opt <> None) ) tasks; (* Ask the children to terminate: *) Array.iter (fun (_, ws, _) -> let (msg : worker_arg ref) = ref `Terminate in Netcamlbox.camlbox_send ws msg; ) wboxes; (* Collect children: *) List.iter (fun pid -> ignore(Unix.waitpid [] pid) ) pids let main() = let n = int_of_string Sys.argv.(1) in let size = int_of_string Sys.argv.(2) in let n_workers = int_of_string Sys.argv.(3) in printf "Forking children...\n%!"; let boxtuple = prepare n_workers in printf "Creating tasks...\n%!"; let tasks = create_tasks n size in printf "Performing tasks...\n%!"; let t0 = Unix.gettimeofday() in let (mbox,_,_) = boxtuple in wait_until_ready n_workers mbox; Netcamlbox.unlink_camlbox (Netcamlbox.camlbox_addr mbox); master tasks n_workers boxtuple; let t1 = Unix.gettimeofday() in printf "t = %f\n%!" (t1-.t0) let () = main()