(* $Id: netplex_kit.ml 2195 2015-01-01 12:23:39Z gerd $ *)
open Netplex_types
class type virtual v_processor =
object
inherit processor_hooks
method virtual process :
when_done:(unit -> unit) ->
container -> Unix.file_descr -> string -> unit
method virtual supported_ptypes : parallelization_type list
end
class empty_processor_hooks() : processor_hooks =
object
method post_add_hook _ _ = ()
method post_rm_hook _ _ = ()
method pre_start_hook _ _ _ = ()
method post_start_hook _ = ()
method pre_finish_hook _ = ()
method post_finish_hook _ _ _ = ()
method workload_hook _ _ _ = ()
method receive_message _ _ _ = ()
method receive_admin_message _ _ _ = ()
method shutdown () = ()
method system_shutdown () = ()
method global_exception_handler _ = true
(* i.e. we continue running by default *)
method container_event_system() =
Unixqueue.standard_event_system()
method container_run esys =
esys#run()
end
class processor_hooks_delegation (hooks : processor_hooks) : processor_hooks =
object(self)
method post_add_hook socksrv ctrl =
hooks # post_add_hook socksrv ctrl
method post_rm_hook socksrv ctrl =
hooks # post_rm_hook socksrv ctrl
method pre_start_hook socksrv ctrl cont =
hooks # pre_start_hook socksrv ctrl cont
method post_start_hook cont =
hooks # post_start_hook cont
method pre_finish_hook cont =
hooks # pre_finish_hook cont
method post_finish_hook socksrv ctrl cont =
hooks # post_finish_hook socksrv ctrl cont
method workload_hook cont busier n =
hooks # workload_hook cont busier n
method receive_message cont cmd cmdargs =
hooks # receive_message cont cmd cmdargs
method receive_admin_message cont cmd cmdargs =
hooks # receive_admin_message cont cmd cmdargs
method shutdown () =
hooks # shutdown()
method system_shutdown () =
hooks # system_shutdown()
method global_exception_handler e =
hooks # global_exception_handler e
method container_event_system() =
hooks # container_event_system()
method container_run esys =
hooks # container_run esys
end
class virtual processor_base (hooks : processor_hooks) : v_processor =
object(self)
inherit processor_hooks_delegation hooks
method virtual process :
when_done:(unit -> unit) ->
container -> Unix.file_descr -> string -> unit
method virtual supported_ptypes : parallelization_type list
end
module PTYPE = struct
type t = parallelization_type
let compare (x:t) (y:t) = Pervasives.compare x y
end
module PSet = Set.Make(PTYPE)
let to_set l =
List.fold_left
(fun acc x -> PSet.add x acc) PSet.empty l
let of_set s =
PSet.fold
(fun x acc -> x :: acc) s []
class protocol_switch_processor
(merge_list : (string * processor) list) : processor =
let r_merge_list = List.rev merge_list in
let supported_ptypes =
match merge_list with
| (_,hook1) :: mtl ->
of_set
(List.fold_left
(fun acc (_,hooks) ->
PSet.inter (to_set hooks#supported_ptypes) acc)
(to_set hook1#supported_ptypes)
mtl)
| [] ->
failwith "Netplex_kit.protocol_switch: list is empty" in
object(self)
method process ~when_done container fd proto_name =
let p_opt =
try Some(List.assoc proto_name merge_list) with Not_found -> None in
match p_opt with
| Some p ->
p # process ~when_done container fd proto_name
| None ->
failwith("Netplex_kit.protocol_switch: Unknown protocol: " ^
proto_name)
method supported_ptypes =
supported_ptypes
method post_add_hook socksrv ctrl =
List.iter (fun (_,hooks) -> hooks # post_add_hook socksrv ctrl) merge_list
method post_rm_hook socksrv ctrl =
List.iter (fun (_,hooks) -> hooks # post_rm_hook socksrv ctrl) r_merge_list
method pre_start_hook socksrv ctrl cont =
List.iter
(fun (_,hooks) -> hooks # pre_start_hook socksrv ctrl cont) merge_list
method post_start_hook cont =
List.iter (fun (_,hooks) -> hooks # post_start_hook cont) merge_list
method pre_finish_hook cont =
List.iter (fun (_,hooks) -> hooks # pre_finish_hook cont) r_merge_list
method post_finish_hook socksrv ctrl cont =
List.iter
(fun (_,hooks) -> hooks # post_finish_hook socksrv ctrl cont)
r_merge_list
method workload_hook cont busier n =
List.iter
(fun (_,hooks) -> hooks # workload_hook cont busier n)
r_merge_list
method receive_message cont cmd cmdargs =
List.iter
(fun (_,hooks) -> hooks # receive_message cont cmd cmdargs) merge_list
method receive_admin_message cont cmd cmdargs =
List.iter
(fun (_,hooks) -> hooks # receive_admin_message cont cmd cmdargs)
merge_list
method shutdown () =
List.iter (fun (_,hooks) -> hooks # shutdown()) merge_list
method system_shutdown () =
List.iter (fun (_,hooks) -> hooks # system_shutdown()) merge_list
method global_exception_handler e =
List.for_all
(fun (_,hooks) -> hooks # global_exception_handler e) merge_list
method container_event_system() =
(snd(List.hd merge_list)) # container_event_system()
method container_run esys =
(snd(List.hd merge_list)) # container_run esys
end
class protocol_switch_factory
name (merge_list : (string*processor_factory) list)
: processor_factory =
object(self)
method name = name
method create_processor ctrl_cfg cfg addr =
let named_processors =
List.map
(fun (proto_name, fac) ->
let sub_addrs =
cfg # resolve_section addr proto_name in
match sub_addrs with
| [ sub_addr ] ->
let p = fac # create_processor ctrl_cfg cfg sub_addr in
(proto_name, p)
| [] ->
failwith("Missing section: " ^ cfg#print addr ^ "." ^
proto_name)
| _ ->
failwith("Section must only occur once: " ^
cfg#print addr ^ "." ^ proto_name)
)
merge_list in
new protocol_switch_processor named_processors
end
let add_helper_service ctrl name hooks =
let helper_sockserv_cfg =
( object
method name = name
method protocols = []
method change_user_to = None
method startup_timeout = (-1.0)
method conn_limit = None
method gc_when_idle = false
method controller_config = ctrl#controller_config
end
) in
let helper_hooks =
( object
inherit processor_hooks_delegation hooks as super
method post_start_hook cont =
(* FIXME: This usually does not prevent that the process is kept
in the "starting" state. This state is left at the first time
[poll] is called, i.e. slightly later than 0.0 seconds in the
future.
*)
let g = Unixqueue.new_group cont#event_system in
Unixqueue.once cont#event_system g 0.0
(fun () -> super#post_start_hook cont)
end
) in
let helper_processor =
( object
inherit processor_base helper_hooks
method process ~when_done _ _ _ = assert false (* never called *)
method supported_ptypes = [ `Multi_processing; `Multi_threading ]
end
) in
let helper_service =
Netplex_sockserv.create_socket_service
helper_processor helper_sockserv_cfg in
let helper_wload_mng =
Netplex_workload.create_constant_workload_manager ~restart:false 1 in
ctrl # add_service helper_service helper_wload_mng
let create_protocol ?(lstn_backlog=20)
?(lstn_reuseaddr=true)
?(so_keepalive=true)
?(tcp_nodelay=false)
?local_chmod
?local_chown
?(configure_slave_socket=fun _ -> ())
name addrs : protocol =
( object
method name = name
method addresses = addrs
method lstn_backlog = lstn_backlog
method lstn_reuseaddr = lstn_reuseaddr
method so_keepalive = so_keepalive
method tcp_nodelay = tcp_nodelay
method local_chmod = local_chmod
method local_chown = local_chown
method configure_slave_socket = configure_slave_socket
end
)
let create_socket_service_config ?(startup_timeout = 60.0)
?change_user_to
?(gc_when_idle = false)
?conn_limit
name protos ctrl_cfg =
( object
method name = name
method protocols = protos
method change_user_to = change_user_to
method startup_timeout = startup_timeout
method conn_limit = conn_limit
method gc_when_idle = gc_when_idle
method controller_config = ctrl_cfg
end : socket_service_config
)