(* Implementation of the worker *) open Mm_proto_aux let jobs_at_once = 100 let proc_run (controller_host, controller_port) () = let controller = Mm_proto_clnt.Controller.V1.create_client2 (`Socket(Rpc.Tcp, Rpc_client.Inet(controller_host,controller_port), Rpc_client.default_socket_config)) in (* Get the dimension of the matrices, and retrieve them from the ctrl:*) let ldim = Mm_proto_clnt.Controller.V1.get_dim controller left in let rdim = Mm_proto_clnt.Controller.V1.get_dim controller right in assert(ldim.rows = rdim.columns); let lmatrix = Array.make ldim.rows [| |] in for j = 0 to ldim.rows-1 do lmatrix.(j) <- Mm_proto_clnt.Controller.V1.get_row controller (left,j) done; let rmatrix = Array.make rdim.rows [| |] in for j = 0 to ldim.rows-1 do rmatrix.(j) <- Mm_proto_clnt.Controller.V1.get_row controller (right,j) done; (* Get jobs until there are no more jobs. *) let cont = ref true in while !cont do let jobs = Mm_proto_clnt.Controller.V1.pull_jobs controller jobs_at_once in cont := (jobs <> [| |]); let results = ref [] in Array.iter (fun job -> let lcol = job.left_col in let rrow = job.right_row in let s = ref 0.0 in for j = 0 to ldim.rows-1 do s := !s +. lmatrix.(j).(lcol) *. rmatrix.(rrow).(j) done; results := { res_job = job; res_val = !s } :: !results ) jobs; Mm_proto_clnt.Controller.V1.put_results controller (Array.of_list !results) done; (* Done: return "()" to caller *) () let configure cf addr = let controller_host = try cf # string_param(cf # resolve_parameter addr "controller_host") with Not_found -> failwith "Required param controller_host is missing" in let controller_port = try cf # int_param(cf # resolve_parameter addr "controller_port") with Not_found -> failwith "Required param controller_port is missing" in (controller_host, controller_port) let setup srv (controller_host, controller_port) = Mm_proto_srv.Worker.V1.bind ~proc_ping:(fun () -> ()) ~proc_run:(proc_run (controller_host, controller_port)) srv let worker_factory() = Rpc_netplex.rpc_factory ~configure ~name:"mm_worker" ~setup ()