Plasma GitLab Archive
Projects Blog Knowledge

(* Implementation of the worker *)

open Mm_proto_aux


let jobs_at_once = 100


let proc_run (controller_host, controller_port) () =
  let controller =
    Mm_proto_clnt.Controller.V1.create_client2
      (`Socket(Rpc.Tcp,
	       Rpc_client.Inet(controller_host,controller_port),
	       Rpc_client.default_socket_config)) in

  (* Get the dimension of the matrices, and retrieve them from the ctrl:*)
  let ldim =
    Mm_proto_clnt.Controller.V1.get_dim controller left in
  let rdim =
    Mm_proto_clnt.Controller.V1.get_dim controller right in
  assert(ldim.rows = rdim.columns);

  let lmatrix =
    Array.make ldim.rows [| |] in
  for j = 0 to ldim.rows-1 do
    lmatrix.(j) <- Mm_proto_clnt.Controller.V1.get_row controller (left,j)
  done;
  let rmatrix =
    Array.make rdim.rows [| |] in
  for j = 0 to ldim.rows-1 do
    rmatrix.(j) <- Mm_proto_clnt.Controller.V1.get_row controller (right,j)
  done;
  
  (* Get jobs until there are no more jobs. *)
  let cont = ref true in
  while !cont do
    let jobs = Mm_proto_clnt.Controller.V1.pull_jobs controller jobs_at_once in
    cont := (jobs <> [| |]);
    
    let results = ref [] in
    Array.iter
      (fun job ->
	 let lcol = job.left_col in
	 let rrow = job.right_row in
	 let s = ref 0.0 in
	 for j = 0 to ldim.rows-1 do
	   s := !s +. lmatrix.(j).(lcol) *. rmatrix.(rrow).(j)
	 done;
	 results :=
	   { res_job = job;
	     res_val = !s
	   } :: !results
      )
      jobs;
    
    Mm_proto_clnt.Controller.V1.put_results controller (Array.of_list !results)
  done;

  (* Done: return "()" to caller *)
  ()


let configure cf addr =
  let controller_host =
    try cf # string_param(cf # resolve_parameter addr "controller_host")
    with Not_found ->
      failwith "Required param controller_host is missing" in
  let controller_port =
    try cf # int_param(cf # resolve_parameter addr "controller_port")
    with Not_found ->
      failwith "Required param controller_port is missing" in
  (controller_host, controller_port)
  

let setup srv (controller_host, controller_port) =
  Mm_proto_srv.Worker.V1.bind
    ~proc_ping:(fun () -> ())
    ~proc_run:(proc_run (controller_host, controller_port))
    srv

let worker_factory() =
  Rpc_netplex.rpc_factory
    ~configure
    ~name:"mm_worker"
    ~setup
    ()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml