(* Implementation of the controller/multiplier *) open Mm_proto_aux (* The state is kept in global variables for simplicity. This means we can only do one multiplication at a time. *) let current_matrices = ref None let current_job_queue = ref None let current_result = ref None let with_matrix f = match !current_matrices with | None -> failwith "No matrix" | Some(l,r) -> f l r let proc_get_dim which = with_matrix (fun l r -> let m = if which = left then l else r in let rows = Array.length m in let cols = if rows > 0 then Array.length m.(0) else 0 in { rows = rows; columns = cols } ) let proc_get_row (which,row) = with_matrix (fun l r -> let m = if which = left then l else r in m.(row) ) let incr_queue (lc, lc_max, rr, rr_max) = let lc' = lc+1 in if lc' = lc_max then ( let rr' = rr+1 in if rr' = rr_max then None else Some(0, lc_max, rr', rr_max) ) else Some (lc', lc_max, rr, rr_max) let rec pull n queue_opt = match queue_opt with | None -> [] | Some(lc, lc_max, rr, rr_max) -> (* The "queue" is actually represented as two counters lc and rr *) if n > 0 then ( let j = { left_col = lc; right_row = rr } in let queue' = incr_queue (lc, lc_max, rr, rr_max) in current_job_queue := queue'; j :: pull (n-1) queue' ) else [] let proc_pull_jobs n = let jobs = pull n !current_job_queue in (Array.of_list jobs) let proc_put_results results = match !current_result with | None -> () | Some m -> Array.iter (fun r -> m.( r.res_job.right_row ).( r.res_job.left_col ) <- r.res_val ) results; () let fill m rows cols = for j = 0 to rows-1 do for k = 0 to cols-1 do m.(j).(k) <- Random.float 1.0 done done let proc_test_multiply workers _ (lrows,rcols,rrows) emit = (* This is an asynchronous RPC implmentation. This means we don't have to reply the result immediately. Instead we get an [emit] function, and we can call this function at some time in the future to pass the result value back to the caller of the RPC. *) let lcols = rrows in let lmatrix = Array.make_matrix lrows lcols 0.0 in let rmatrix = Array.make_matrix rrows rcols 0.0 in fill lmatrix lrows lcols; fill rmatrix rrows rcols; current_matrices := Some(lmatrix,rmatrix); current_result := Some(Array.make_matrix rrows lcols 0.0); current_job_queue := Some(0, lcols, 0, rrows); (* Now start the computations by telling all workers to go: *) let n = ref 0 in let esys = (Netplex_cenv.self_cont()) # event_system in let worker_clients = ref [] in List.iter (fun (host,port) -> let worker = Mm_proto_clnt.Worker.V1.create_client2 ~esys (`Socket(Rpc.Tcp, Rpc_client.Inet(host,port), Rpc_client.default_socket_config)) in worker_clients := worker :: !worker_clients; Mm_proto_clnt.Worker.V1.run'async worker () (fun get_result -> (* This function is called back when the worker passes a result back for "run" *) decr n; ( try let () = get_result() in () (* check for exceptions *) with error -> Netplex_cenv.logf `Err "Error from worker: %s" (Printexc.to_string error) ); if !n=0 then ( (* All workers done! *) assert(!current_job_queue = None); (* Delete the result: *) current_matrices := None; current_result := None; emit (); List.iter Rpc_client.shut_down !worker_clients ) ); incr n ) workers let configure cf addr = let workers_sect = cf # resolve_section addr "worker" in let workers = List.map (fun w_addr -> let host = try cf # string_param(cf # resolve_parameter w_addr "host") with Not_found -> failwith "Required param host is missing" in let port = try cf # int_param(cf # resolve_parameter w_addr "port") with Not_found -> failwith "Required param port is missing" in (host,port) ) workers_sect in workers let setup srv workers = Mm_proto_srv.Multiplier.V1.bind_async ~proc_ping:(fun _ () emit -> emit()) ~proc_test_multiply:(proc_test_multiply workers) srv; Mm_proto_srv.Controller.V1.bind ~proc_ping:(fun () -> ()) ~proc_get_dim ~proc_get_row ~proc_pull_jobs ~proc_put_results srv let controller_factory() = Rpc_netplex.rpc_factory ~configure ~name:"mm_controller" ~setup ~hooks:(fun _ -> ( object(self) inherit Netplex_kit.empty_processor_hooks() method post_start_hook _ = Random.self_init() end ) ) ()