(* Implementation of worker processes *)
open Sort1_proto_aux
let kept_data = Hashtbl.create 10
(* Maps partition ID to sortdata *)
let delayed_actions = ref []
(* List of tuples (part1_id, part2_id, f): When both partitions are in
[kept_data], run f
*)
let worker_clients = Hashtbl.create 10
(* Maps endpoint names to RPC clients *)
let get_worker_client endpoint =
try
Hashtbl.find worker_clients endpoint
with
| Not_found ->
let esys = (Netplex_cenv.self_cont())#event_system in
let connector =
Netplex_sockserv.any_file_client_connector endpoint in
let client =
Sort1_proto_clnt.Worker.V1.create_client2
~esys
(`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
Hashtbl.replace worker_clients endpoint client;
client
let controller_clients = Hashtbl.create 10
(* Maps endpoint names to RPC clients. Should only be one *)
let get_controller_client endpoint =
try
Hashtbl.find controller_clients endpoint
with
| Not_found ->
let esys = (Netplex_cenv.self_cont())#event_system in
let connector =
Netplex_sockserv.any_file_client_connector endpoint in
let client =
Sort1_proto_clnt.Controller.V1.create_client2
~esys
(`Socket(Rpc.Tcp, connector, Rpc_client.default_socket_config)) in
Hashtbl.replace controller_clients endpoint client;
client
let check_for_errors name get_reply =
try
let () = get_reply() in
()
with
| error ->
Netplex_cenv.logf `Err
"Got exception when calling %s: %s"
name
(Netexn.to_string error)
let check_for_delayed_actions() =
let new_delayed_actions = ref [] in
List.iter
(fun (p1,p2,f) ->
if Hashtbl.mem kept_data p1 && Hashtbl.mem kept_data p2 then (
(* We run this within "once" so exceptions can be easily handled *)
let esys = (Netplex_cenv.self_cont()) # event_system in
let g = Unixqueue.new_group esys in
Unixqueue.once esys g 0.0 f
)
else
new_delayed_actions := (p1,p2,f) :: !new_delayed_actions
)
!delayed_actions;
delayed_actions := List.rev !new_delayed_actions
let execute_op part_id data cont =
match cont with
| `keep ->
Hashtbl.replace kept_data part_id data;
check_for_delayed_actions()
| `forward fwd ->
let client = get_worker_client fwd.destination in
Sort1_proto_clnt.Worker.V1.merge_partition'async
client
(fwd.merge_with_partition_id,
part_id,
data,
fwd.new_partition_id,
fwd.continuation)
(check_for_errors "merge_partition")
| `return ep ->
let client = get_controller_client ep in
Sort1_proto_clnt.Controller.V1.return_result'async
client
data
(check_for_errors "return_result")
let proc_sort_partition session (part_id, data, cont) emit =
Array.sort String.compare data;
emit ();
execute_op part_id data cont
let merge part1_id part2_id partr_id cont () =
let data1 =
try Hashtbl.find kept_data part1_id
with Not_found -> assert false in
let data2 =
try Hashtbl.find kept_data part2_id
with Not_found -> assert false in
Hashtbl.remove kept_data part1_id;
Hashtbl.remove kept_data part2_id;
Gc.major();
let l1 = Array.length data1 in
let l2 = Array.length data2 in
let datar = Array.create (l1 + l2) "" in
let k1 = ref 0 in
let k2 = ref 0 in
let kr = ref 0 in
while !k1 < l1 && !k2 < l2 do
if data1.( !k1 ) < data2.( !k2 ) then (
datar.( !kr ) <- data1.( !k1 );
incr k1;
incr kr
)
else (
datar.( !kr ) <- data2.( !k2 );
incr k2;
incr kr
)
done;
if !k1 < l1 then
Array.blit data1 !k1 datar !kr (l1 - !k1);
if !k2 < l2 then
Array.blit data2 !k2 datar !kr (l2 - !k2);
execute_op partr_id datar cont
let proc_merge_partition session
(part1_id, part2_id, data2, partr_id, cont)
emit =
Hashtbl.replace kept_data part2_id data2;
delayed_actions :=
(part1_id, part2_id, merge part1_id part2_id partr_id cont) ::
!delayed_actions;
emit ();
check_for_delayed_actions()
let configure cf addr = ()
let setup srv () =
Sort1_proto_srv.Worker.V1.bind_async
~proc_null:(fun _ _ emit -> emit ())
~proc_sort_partition
~proc_merge_partition
srv
let worker_factory() =
Rpc_netplex.rpc_factory
~name:"sort_worker"
~configure
~setup
~hooks:(fun _ ->
object(self)
inherit Netplex_kit.empty_processor_hooks()
method post_start_hook _ =
let _t =
Netplex_cenv.create_timer
(fun _ ->
Gc.major();
true
)
1.0 in
()
end
)
()