(* $Id: netamqp_endpoint.ml 53446 2011-03-10 14:45:03Z gerd $ *) (* TODO: - heartbeats (also: heartbeat frame length) - Move all version-specific stuff to Netamqp_methods FIXME: - there is no way to unregister handlers other than to close channels or connections! - unclear what to do if we get incomplete content messages. Right now we close the connection. *) open Netamqp_types open Uq_engines.Operators open Printf type connector = [ `Sockaddr of Unix.sockaddr | `Inet of string * int | `Implied ] type transport_layer = [ `TCP of connector | `Custom of (unit -> Netamqp_transport.amqp_multiplex_controller Uq_engines.engine) ] type state = [ `Off | `Connected of bool | `Disconnecting of bool | `Disconnected | `Error of exn ] type proto_revision_0_9 = [ `One ] type protocol = [ `AMQP_0_9 of proto_revision_0_9 ] type sync_client_to_server_method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.sync_client_to_server_method_t ] type sync_server_to_client_method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.sync_server_to_client_method_t ] type sync_client_initiated_method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.sync_client_initiated_method_t ] type sync_server_initiated_method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.sync_server_initiated_method_t ] type sync_server_initiated_method_type_t = [ `AMQP_0_9 of Netamqp_methods_0_9.sync_server_initiated_method_type_t ] type async_client_to_server_method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.async_client_to_server_method_t ] type async_server_to_client_method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.async_server_to_client_method_t ] type async_server_to_client_method_type_t = [ `AMQP_0_9 of Netamqp_methods_0_9.async_server_to_client_method_type_t ] type props_t = [ `AMQP_0_9 of Netamqp_methods_0_9.props_t ] type any_server_to_client_method_type_0_9_t = [ Netamqp_methods_0_9.sync_server_to_client_method_type_t | Netamqp_methods_0_9.async_server_to_client_method_type_t ] type any_server_to_client_method_type_t = [ `AMQP_0_9 of any_server_to_client_method_type_0_9_t ] type any_server_to_client_method_0_9_t = [ Netamqp_methods_0_9.sync_server_to_client_method_t | Netamqp_methods_0_9.async_server_to_client_method_t ] type any_server_to_client_method_t = [ `AMQP_0_9 of any_server_to_client_method_0_9_t ] type method_t = [ `AMQP_0_9 of Netamqp_methods_0_9.method_t ] type method_type_t = [ `AMQP_0_9 of Netamqp_methods_0_9.method_type_t ] type data = props_t * Xdr_mstring.mstring list type reg = < reg_callback : any_server_to_client_method_t -> data option -> unit; reg_error : exn -> unit; reg_tmo_group : Unixqueue.group option; reg_once : bool; (* inactivate reg after calling back once *) reg_sync : bool; (* part of sync call -> receives errors *) > (* registered handler *) type build = { meth : any_server_to_client_method_t; mutable props : props_t option; mutable exp_size : int64; mutable cur_size : int64; data : Xdr_mstring.mstring Queue.t } (* build [data] from frame pieces *) type err_listener = < err_callback : exn -> bool > type endpoint = { esys : Unixqueue.event_system; transport_layer : transport_layer; protocol : protocol; mutable state : state; mutable state_notifications : (unit -> unit) Queue.t; mutable timeout : float; mutable conn_eng : Netamqp_transport.amqp_multiplex_controller Uq_engines.engine option; mutable announced : bool; mutable announce_notification : (unit Uq_engines.final_state -> unit) option; mutable conn_tmo_group : Unixqueue.group option; mutable channels : (channel, bool) Hashtbl.t; (* if a channel is present in the hashtbl it is enabled. The bool arg is the flow control flag *) mutable suggest_cnt : int; mutable out_q : (channel, (Netamqp_types.frame * (exn option->unit)) Queue.t) Hashtbl.t; (* The unit->unit function is called when the frame is sent *) mutable out_prio_q : Netamqp_types.frame Queue.t; mutable out_next_ch : channel Queue.t; mutable in_build : (channel, build) Hashtbl.t; mutable in_tab : (channel * any_server_to_client_method_type_t, reg list) Hashtbl.t; mutable err_tab : (channel option, err_listener list) Hashtbl.t; mutable out_active : bool; mutable drop_frames : bool; mutable expect_eof : bool; mutable quick_disconnect : bool; } (**********************************************************************) module Debug = struct let enable = ref false end let dlog = Netlog.Debug.mk_dlog "Netamqp_endpoint" Debug.enable let dlogr = Netlog.Debug.mk_dlogr "Netamqp_endpoint" Debug.enable let () = Netlog.Debug.register_module "Netamqp_endpoint" Debug.enable (**********************************************************************) let mplex_opt ep = match ep.conn_eng with | Some e -> ( match e#state with | `Done mplex -> Some mplex | _ -> None ) | None -> None let mplex ep = match mplex_opt ep with | None -> failwith "Netamqp_endpoint.mplex" | Some mplex -> mplex let string_of_state = function | `Off -> "off" | `Connected flag -> sprintf "connected(%B)" flag | `Disconnecting flag -> sprintf "disconnecting(%B)" flag | `Disconnected -> "disconnected" | `Error e -> sprintf "error(%s)" (Netexn.to_string e) let protocol ep = ep.protocol let update_state ep new_state = dlogr (fun () -> sprintf "update_state: %s" (string_of_state new_state) ); ep.state <- new_state; (* We need to ensure that users of this notification service have the chance to get all state changes. So we have to call [f] immediately back, and not from a [once] handler. Also, the users may want to add new elements to state_notifications for setting up the next round of notifications. *) let g = Unixqueue.new_group ep.esys in let q = Queue.create() in Queue.transfer ep.state_notifications q; Queue.iter (fun f -> try f() with e -> Unixqueue.once ep.esys g 0.0 (fun () -> raise e) ) q let fire_regs ep ch mt f = let regs = try Hashtbl.find ep.in_tab (ch,mt) with Not_found -> [] in let regs' = List.filter (fun reg -> not reg#reg_once) regs in if regs' = [] then Hashtbl.remove ep.in_tab (ch, mt) else Hashtbl.replace ep.in_tab (ch, mt) regs'; List.iter (fun reg -> f reg) regs let invoke_regs ep ch mt m d_opt = let invoke_reg reg = ( match reg # reg_tmo_group with | None -> () | Some g -> Unixqueue.clear ep.esys g ); let g = Unixqueue.new_group ep.esys in try reg # reg_callback m d_opt with error -> Unixqueue.once ep.esys g 0.0 (fun () -> raise error) in fire_regs ep ch mt invoke_reg let error_regs ep ch mt err = let error_reg reg = ( match reg # reg_tmo_group with | None -> () | Some g -> Unixqueue.clear ep.esys g ); let g = Unixqueue.new_group ep.esys in try reg # reg_error err with error -> Unixqueue.once ep.esys g 0.0 (fun () -> raise error) in fire_regs ep ch mt error_reg let error_regs_to_channel ep chan_opt err = let keys = Hashtbl.fold (fun (ch,mtype) _ acc -> if chan_opt = None || chan_opt = Some ch then (ch,mtype)::acc else acc ) ep.in_tab [] in List.iter (fun (ch,mtype) -> error_regs ep ch mtype err) keys let rm_channel_from_in_tab ep chan = let keys = Hashtbl.fold (fun (ch,mtype) _ acc -> if ch = chan then (ch,mtype)::acc else acc) ep.in_tab [] in List.iter (fun key -> Hashtbl.remove ep.in_tab key) keys let tell_listeners ep ch_opt err = let g = Unixqueue.new_group ep.esys in let listeners = try Hashtbl.find ep.err_tab ch_opt with Not_found -> [] in let listeners' = List.filter (fun listener -> try listener # err_callback err with error -> Unixqueue.once ep.esys g 0.0 (fun () -> raise error); false ) listeners in if listeners' = [] then Hashtbl.remove ep.err_tab ch_opt else Hashtbl.replace ep.err_tab ch_opt listeners' let abort ep = dlog "abort"; ( match ep.conn_eng with | None -> () | Some eng -> ( match eng#state with | `Done mplex -> mplex # inactivate() | `Working _ -> eng#abort() | _ -> () ); ep.conn_eng <- None ); ( match ep.conn_tmo_group with | None -> () | Some g -> Unixqueue.clear ep.esys g; ep.conn_tmo_group <- None ) let propagate_error ep err chan_opt = dlogr (fun () -> sprintf "propagate_error chan=%s err=%s" (match chan_opt with | None -> "all" | Some ch -> string_of_int ch ) (Netexn.to_string err) ); ( match ep.announce_notification with | None -> () | Some notify -> notify (`Error err); ep.announce_notification <- None ); error_regs_to_channel ep chan_opt err; ( match chan_opt with | None -> let keys = Hashtbl.fold (fun ch_opt _ acc -> ch_opt :: acc) ep.err_tab [] in List.iter (fun ch_opt -> tell_listeners ep ch_opt err) keys | Some chan -> List.iter (fun ch_opt -> tell_listeners ep ch_opt err ) [ Some chan; None ] ) let abort_and_propagate_error ep err = abort ep; propagate_error ep err None; update_state ep (`Error err) let enable_channel_i ep ch = if not (Hashtbl.mem ep.channels ch) then ( dlogr (fun () -> sprintf "enable_channel %d" ch); Hashtbl.add ep.channels ch true; Hashtbl.add ep.out_q ch (Queue.create()); Queue.add ch ep.out_next_ch ) let enable_channel ep ch = if ch < 1 || ch > 65535 then invalid_arg "Netamqp_endpoint.enable_channel"; enable_channel_i ep ch let disable_channel ep ch = if ch < 1 || ch > 65535 then invalid_arg "Netamqp_endpoint.disable_channel"; if Hashtbl.mem ep.channels ch then ( dlogr (fun () -> sprintf "disable_channel %d" ch); Hashtbl.remove ep.channels ch; Hashtbl.remove ep.out_q ch; Hashtbl.remove ep.in_build ch; error_regs_to_channel ep (Some ch) Method_dropped; rm_channel_from_in_tab ep ch; Hashtbl.remove ep.err_tab (Some ch); let q = Queue.create() in Queue.iter (fun ch' -> if ch <> ch' then Queue.add ch' q) ep.out_next_ch; Queue.clear ep.out_next_ch; Queue.transfer q ep.out_next_ch; ) let is_channel_enabled ep ch = Hashtbl.mem ep.channels ch let suggest_channel ep = let rec loop () = let ch = ep.suggest_cnt in ep.suggest_cnt <- (ch+1) mod 65536; if Hashtbl.mem ep.channels ch then loop() else ch in if Hashtbl.length ep.channels >= 65536 then failwith "Netamqp_endpoint.suggest_channel: all channels are used"; loop() let flow_control ep ch flow_on = if not (Hashtbl.mem ep.channels ch) then failwith "Netamqp_endpoint.flow_control: this channel is disabled"; dlogr (fun () -> sprintf "flow_control ch=%d flag=%B" ch flow_on); Hashtbl.replace ep.channels ch flow_on let is_clean_for_disconnect ep = (* Can we finish the disconnect? Are all requested tasks done? *) ep.announced && ( Queue.is_empty ep.out_prio_q ) && ( let p = ref true in Hashtbl.iter (fun _ q -> p := !p && Queue.is_empty q) ep.out_q; !p ) && ( ep.quick_disconnect || ( let p = ref true in Hashtbl.iter (fun _ rl -> List.iter (fun r -> p := !p && not (r#reg_sync)) rl ) ep.in_tab; !p ) ) let maybe_disconnect ep = match ep.state with | `Disconnecting _ -> if is_clean_for_disconnect ep then ( dlog "disconnect"; abort ep; update_state ep `Disconnected ) | _ -> () let abort_for_eof ep = if ep.expect_eof then ( dlog "expected eof"; abort ep; update_state ep `Disconnected ) else abort_and_propagate_error ep Unexpected_eof (**********************************************************************) (* Protocol versioning *) (**********************************************************************) let method_has_content_0_9 m = let mtype = Netamqp_methods_0_9.type_of_method m in List.mem mtype Netamqp_methods_0_9.content_method_types let type_of_method (m : method_t) : method_type_t = match m with | `AMQP_0_9 m' -> `AMQP_0_9 (Netamqp_methods_0_9.type_of_method m') let type_of_any_server_to_client_method (m : any_server_to_client_method_t) : any_server_to_client_method_type_t = let mt1 = type_of_method (m :> method_t) in match mt1 with | `AMQP_0_9 mt2 -> ( match mt2 with | #any_server_to_client_method_type_0_9_t as mt3 -> `AMQP_0_9 mt3 | _ -> assert false ) let response_types_of_method (m : method_t) : any_server_to_client_method_type_t list = match m with | `AMQP_0_9 m' -> let mtype = Netamqp_methods_0_9.type_of_method m' in List.map (fun mt -> match mt with | #any_server_to_client_method_type_0_9_t as mt' -> `AMQP_0_9 mt' | _ -> assert false ) (Netamqp_methods_0_9.responses_of_method mtype) let string_of_method_type (m : method_type_t) = match m with | `AMQP_0_9 m' -> Netamqp_methods_0_9.string_of_method_type m' let string_of_method (m : method_t) = string_of_method_type (type_of_method m) ^ "(...)" let proto_version_string = function | `AMQP_0_9 `One -> "\000\009\001" let coerce_to_sync_server_to_client_method_t (m1 : method_t) : sync_server_to_client_method_t = match m1 with | `AMQP_0_9 m2 -> ( match m2 with | #Netamqp_methods_0_9.sync_server_to_client_method_t as m3 -> `AMQP_0_9 m3 | _ -> assert false (* coercion failed *) ) let coerce_to_async_server_to_client_method_t (m1 : method_t) : async_server_to_client_method_t = match m1 with | `AMQP_0_9 m2 -> ( match m2 with | #Netamqp_methods_0_9.async_server_to_client_method_t as m3 -> `AMQP_0_9 m3 | _ -> assert false (* coercion failed *) ) let coerce_to_sync_server_initiated_method_t (m1 : method_t) : sync_server_initiated_method_t = match m1 with | `AMQP_0_9 m2 -> ( match m2 with | #Netamqp_methods_0_9.sync_server_initiated_method_t as m3 -> `AMQP_0_9 m3 | _ -> assert false (* coercion failed *) ) (**********************************************************************) (* Input side *) (**********************************************************************) let dispatch_method ep (m : any_server_to_client_method_t) d_opt ch = let mt = type_of_any_server_to_client_method m in let regs = try Hashtbl.find ep.in_tab (ch, mt) with Not_found -> [] in if regs = [] then ( (* let mt = type_of_method (m :> method_t) in propagate_error ep (Method_cannot_be_dispatched(ch, string_of_method_type mt)) (Some ch) *) (* FIXME: Unclear what to do now. We just log the incident *) Netlog.logf `Err "Netamqp: Method cannot be dispatched channel=%d method=%s" ch (string_of_method (m :> method_t)) ) else ( dlogr (fun () -> sprintf "dispatch_method ch=%d data=%B meth=%s" ch (d_opt <> None) (string_of_method (m :> method_t)) ); invoke_regs ep ch mt m d_opt; maybe_disconnect ep ) let handle_frame_0_9 ep frame = (* AMQP-0-9-specific version *) dlogr (fun () -> let n = Xdr_mstring.length_mstrings frame.frame_payload in let s = Xdr_mstring.prefix_mstrings frame.frame_payload (min n 200) in sprintf "decode_message type=%s ch=%d payload=%s" ( match frame.frame_type with | `Method -> "method" | `Header -> "header" | `Body -> "body" | `Heartbeat -> "heartbeat" | `Proto_header -> "proto_header" ) frame.frame_channel (Rpc_util.hex_dump_s s 0 (String.length s) ^ if n > String.length s then "..." else "") ); let msg = Netamqp_methods_0_9.decode_message frame in ( match msg with | `Method (#any_server_to_client_method_0_9_t as m) -> (* First check whether we are already building a data item *) if Hashtbl.mem ep.in_build frame.frame_channel then ( dlog "content not fully transmitted - dropping method"; abort_and_propagate_error ep (Unexpected_frame frame); Hashtbl.remove ep.in_build frame.frame_channel ); (* If the method is not followed by data we can immediately dispatch on it. Otherwise we need to create a builder *) if method_has_content_0_9 m then ( dlog "method with content"; let b = { meth = `AMQP_0_9 m; props = None; exp_size = 0L; cur_size = 0L; data = Queue.create() } in Hashtbl.add ep.in_build frame.frame_channel b ) else dispatch_method ep (`AMQP_0_9 m) None frame.frame_channel; if not ep.announced then ( match ep.announce_notification with | None -> () | Some f -> f (`Done()); ep.announced <- true ) | `Method _ -> dlog "client cannot receive this type of method"; abort_and_propagate_error ep (Unexpected_frame frame) | `Header(props,size) -> (* Check whether we have a builder in the right state *) ( try let b = Hashtbl.find ep.in_build frame.frame_channel in if b.props <> None then raise Not_found; (* bad *) b.props <- Some (`AMQP_0_9 props); b.exp_size <- size; dlog "good content header"; with | Not_found -> dlog "unexpected content header"; abort_and_propagate_error ep (Unexpected_frame frame) ) | `Body mstrings -> (* Check whether we have a builder in the right state *) ( try let b = Hashtbl.find ep.in_build frame.frame_channel in if b.props = None then raise Not_found; (* bad *) let props = match b.props with Some p -> p | None -> assert false in let l = Xdr_mstring.length_mstrings mstrings in b.cur_size <- Int64.add b.cur_size (Int64.of_int l); if b.cur_size > b.exp_size then raise Not_found; dlog "good content body frame"; List.iter (fun ms -> Queue.add ms b.data) mstrings; if b.cur_size = b.exp_size then ( Hashtbl.remove ep.in_build frame.frame_channel; let q = Queue.fold (fun acc ms -> ms :: acc) [] b.data in let d = (props, q) in dispatch_method ep b.meth (Some d) frame.frame_channel ) with | Not_found -> dlog "unexpected content body, or size mismatch"; abort_and_propagate_error ep (Unexpected_frame frame) ) | `Heartbeat -> () | `Proto_header p -> ( match ep.announce_notification with | None -> dlog "unexpexted proto header"; abort_and_propagate_error ep (Unexpected_frame frame) | Some f -> dlog "protocol not supported"; f (`Error Protocol_is_not_supported) ) ) let handle_frame ep frame = (* We received this frame. Do something with it. *) match ep.protocol with | `AMQP_0_9 _ -> handle_frame_0_9 ep frame (* The input_thread is started once we are connected. The function continuously calls itself to accept new frames. When we get disconnected, the recursion silently dies. *) let rec input_thread ep = match mplex_opt ep with | None -> () | Some mplex -> dlog "input_thread"; mplex # start_reading ~when_done:(fun result -> match result with | `Ok frame -> dlog "got frame"; let cont = try ep.drop_frames || (handle_frame ep frame; true) with | e -> abort_and_propagate_error ep e; false in if cont then input_thread ep | `Error e -> abort_and_propagate_error ep e | `End_of_file -> abort_for_eof ep ) () (**********************************************************************) (* Output side *) (**********************************************************************) let rec output_thread ep = if not ep.out_active then output_start ep and output_start ep = match mplex_opt ep with | None -> () | Some mplex -> ep.out_active <- true; dlog "checking for output"; let can_output = ref false in (* Check whether we can do something. *) if not(Queue.is_empty ep.out_prio_q) then ( can_output := true; output_next ep mplex ((Queue.take ep.out_prio_q), (fun _ -> ())) ) else ( let n = Queue.length ep.out_next_ch in let k = ref 0 in while !k < n do incr k; let ch = Queue.take ep.out_next_ch in Queue.add ch ep.out_next_ch; let ch_is_on = try Hashtbl.find ep.channels ch with Not_found -> false in if ch_is_on then ( let ch_q = try Hashtbl.find ep.out_q ch with Not_found -> Queue.create() in if not (Queue.is_empty ch_q) then ( k := n; can_output := true; output_next ep mplex (Queue.take ch_q) ) ) done; if not !can_output then ( (* If we reach this point, nothing can be output. Stop the output thread. Also, we have to check whether the disconnect procedure can be continued. *) ep.out_active <- false; dlog "nothing to output"; maybe_disconnect ep ) ) and output_next ep mplex (frame,is_sent) = dlog "output_next"; mplex # start_writing ~when_done:(fun r -> ep.out_active <- false; match r with | `Ok() -> (* Success. Try to write the next frame: *) is_sent None; output_start ep | `Error e -> (* Error *) is_sent (Some e); abort_and_propagate_error ep e ) frame let shared_sub_mstring (ms : Xdr_mstring.mstring) sub_pos sub_len : Xdr_mstring.mstring = (* Returns an mstring that accesses the substring of ms at sub_pos with length sub_len. The returned mstring shares the representation with ms *) let ms_len = ms#length in if sub_len < 0 || sub_pos < 0 || sub_pos > ms_len - sub_len then invalid_arg "Netamqp_endpoint.shared_sub_mstring"; ( object(self) method length = sub_len method blit_to_string mpos s spos len = ms#blit_to_string (sub_pos+mpos) s spos len method blit_to_memory mpos mem mempos len = ms#blit_to_memory (sub_pos+mpos) mem mempos len method as_string = let (s,pos) = ms#as_string in (s,pos+sub_pos) method as_memory = let (m,pos) = ms#as_memory in (m,pos+sub_pos) method preferred = ms#preferred end ) let split_mstrings l max_len = (* Transform the list of mstrings l into another list where the mstrings are not longer than max_len *) let rec transform ms pos rem = let len = min rem max_len in let ms0 = shared_sub_mstring ms pos len in let rem' = rem - len in if rem' > 0 then ms0 :: transform ms (pos+len) rem' else [ms0] in List.flatten (List.map (fun ms -> transform ms 0 ms#length) l) let mk_frames_0_9 ep (m : Netamqp_methods_0_9.method_t) d_opt ch = let m_frame = Netamqp_methods_0_9.encode_message (`Method m) ch in if method_has_content_0_9 m then ( match d_opt with | None -> failwith "Netamqp_endpoint: This method needs payload data!" | Some(prop,body) -> ( match prop with | `AMQP_0_9 prop' -> let size = Int64.of_int (Xdr_mstring.length_mstrings body) in let h_frame = Netamqp_methods_0_9.encode_message (`Header(prop', size)) ch in (* There is a limit on the maximum frame length. The body frames must not be longer than that! There are 8 bytes for the frame itself. *) let body_max = match mplex_opt ep with | None -> (* strange *) 4096 - 8 | Some mplex -> mplex#eff_max_frame_size - 8 in dlogr (fun () -> sprintf "body_max=%d" body_max); let split_bodies = split_mstrings body body_max in dlogr (fun () -> sprintf "orig_length=%Ld bodies=%d" size (List.length split_bodies) ); let body_frames = List.map (fun b -> Netamqp_methods_0_9.encode_message (`Body [b]) ch ) split_bodies in m_frame :: h_frame :: body_frames ) ) else [ m_frame ] let mk_frames ep m d_opt ch = match m with | `AMQP_0_9 m' -> mk_frames_0_9 ep m' d_opt ch (**********************************************************************) (* connect logic *) (**********************************************************************) let connect ep = let do_it = match ep.state with | `Off -> true | `Connected _ -> false | `Disconnecting _ | `Disconnected | `Error _ -> failwith "Netamqp_endpoint.connect: already disconnecting or disconnected" in if do_it then ( dlog "connect"; let conn_eng = match ep.transport_layer with | `TCP conn -> let spec, host_opt = match conn with | `Sockaddr (Unix.ADDR_INET(ip,port)) -> `Sock_inet(Unix.SOCK_STREAM, ip, port), None | `Sockaddr (Unix.ADDR_UNIX path) -> `Sock_unix(Unix.SOCK_STREAM, path), None | `Inet(name,port) -> `Sock_inet_byname(Unix.SOCK_STREAM, name, port), (Some name) | `Implied -> failwith "Netamqp_endpoint.connect: `Implied is not a \ valid address for TCP" in Uq_engines.connector (`Socket(spec, Uq_engines.default_connect_options)) ep.esys ++ (fun st -> match st with | `Socket(fd, _) -> dlog "socket connection established"; let mplex = Netamqp_transport.tcp_amqp_multiplex_controller ~close_inactive_descr:true fd ep.esys in ( match ep.conn_tmo_group with | None -> () | Some g -> Unixqueue.clear ep.esys g ); ep.conn_tmo_group <- None; eps_e (`Done mplex) ep.esys | _ -> assert false ) >> (function | `Done mplex -> `Done mplex | `Error Not_found when host_opt <> None -> (* Current ocamlnet-3.2 still raises Not_found when the host is not found. This will be changed to the following exception in one of the next releases. *) `Error (Uq_resolver.Host_not_found (match host_opt with | Some h -> h | None -> assert false )) | `Error e -> `Error e | `Aborted -> `Aborted ) | `Custom f -> f() in ep.conn_eng <- Some conn_eng; if ep.timeout >= 0.0 then ( let g = Unixqueue.new_group ep.esys in Unixqueue.once ep.esys g ep.timeout (fun () -> dlog "connect times out"; abort_and_propagate_error ep Timeout ); ep.conn_tmo_group <- Some g; ); update_state ep (`Connected false); Uq_engines.when_state ~is_done:(fun _ -> update_state ep (`Connected true); input_thread ep; output_thread ep; ) conn_eng; ) let disconnect ep = match ep.state with | `Off -> () (* ignored *) | `Connected is_online -> dlog "disconnecting"; update_state ep (`Disconnecting is_online); maybe_disconnect ep (* We don't handle the case specially when is_online is false. This means we are still connecting to the server. It is possible that the user has already requested something, so we should execute these requests first. *) | `Disconnecting _ -> () | `Disconnected -> () | `Error _ -> () let quick_disconnect ep = ep.quick_disconnect <- true; disconnect ep (**********************************************************************) (* create/reset, API stuff *) (**********************************************************************) let create trans protocol esys = let ep = { esys = esys; transport_layer = trans; protocol = protocol; state = `Off; state_notifications = Queue.create(); timeout = 300.0; conn_eng = None; announced = false; announce_notification = None; conn_tmo_group = None; channels = Hashtbl.create 7; suggest_cnt = 1; out_q = Hashtbl.create 7; out_prio_q = Queue.create(); out_next_ch = Queue.create(); in_build = Hashtbl.create 7; in_tab = Hashtbl.create 7; err_tab = Hashtbl.create 7; out_active = false; drop_frames = false; expect_eof = false; quick_disconnect = false; } in enable_channel_i ep 0; ep let reset ep = abort ep; ep.state <- `Off; ep.state_notifications <- Queue.create(); ep.timeout <- 300.0; ep.conn_eng <- None; ep.announced <- false; ep.announce_notification <- None; ep.conn_tmo_group <- None; ep.channels <- Hashtbl.create 7; ep.out_q <- Hashtbl.create 7; ep.out_prio_q <- Queue.create(); ep.out_next_ch <- Queue.create(); ep.in_build <- Hashtbl.create 7; ep.in_tab <- Hashtbl.create 7; ep.err_tab <- Hashtbl.create 7; ep.out_active <- false; ep.drop_frames <- false; ep.expect_eof <- false; ep.quick_disconnect <- false; enable_channel_i ep 0 let configure_timeout ep tmo = ep.timeout <- tmo let get_timeout ep = ep.timeout let default_port = 5672 let event_system ep = ep.esys let state ep = ep.state let state_change_e ep = let eng, notify = Uq_engines.signal_engine ep.esys in Queue.add (fun () -> notify (`Done ep.state)) ep.state_notifications; eng let set_max_frame_size ep s = match mplex_opt ep with | None -> failwith "Netamqp_endpoint.set_max_frame_size: not connected" | Some mplex -> mplex#set_max_frame_size s let eff_max_frame_size ep = match mplex_opt ep with | None -> failwith "Netamqp_endpoint.eff_max_frame_size: not connected" | Some mplex -> mplex#eff_max_frame_size let getsockname ep = match mplex_opt ep with | None -> failwith "Netamqp_endpoint.getsockname: not connected" | Some mplex -> mplex#getsockname let getpeername ep = match mplex_opt ep with | None -> failwith "Netamqp_endpoint.getpeername: not connected" | Some mplex -> mplex#getpeername let drop_frames ep = ep.drop_frames <- true let clear_output ep = Queue.clear ep.out_prio_q; Hashtbl.iter (fun ch q -> Queue.clear q) ep.out_q let expect_eof ep = ep.expect_eof <- true let listen_for_errors ep ch_opt f = ( match ch_opt with | None -> () | Some ch -> if ch < 0 || ch > 65535 then invalid_arg "Netamqp_endpoint.listen_for_errors: bad channel"; if not (is_channel_enabled ep ch) then failwith "Netamqp_endpoint.listen_for_errors: this channel is \ disabled"; ); let listener = ( object method err_callback = f end ) in let l = try Hashtbl.find ep.err_tab ch_opt with Not_found -> [] in Hashtbl.replace ep.err_tab ch_opt (listener :: l) let create_method_exception proto ~class_id ~meth_id ~reply_code ~reply_text = let mname = try match proto with | `AMQP_0_9 `One -> Netamqp_methods_0_9.string_of_method_id ~class_id ~meth_id with | Not_found -> sprintf "%d:%d" class_id meth_id in Method_exception(mname, reply_code, reply_text) let check_up ?(off_ok=false) ep = match ep.state with | `Off -> if not off_ok then failwith "Netamqp_endpoint: state=`Off - connect first!" | `Connected _ -> () | `Disconnecting _ -> failwith "Netamqp_endpoint: state=`Disconnecting - not accepting \ requests anymore" | `Disconnected -> failwith "Netamqp_endpoint: state=`Disconnected - reset&connect first!" | `Error _ -> failwith "Netamqp_endpoint: state=`Error - reset&connect first!" (**********************************************************************) (* API for queue interaction *) (**********************************************************************) exception Exit_sync let sync f arg = let e = f arg in if Unixqueue.is_running e#event_system then failwith "Event queue is already processing events"; let exit_sync _ = let g = Unixqueue.new_group e#event_system in Unixqueue.once e#event_system g 0.0 (fun () -> raise Exit_sync) in Uq_engines.when_state ~is_done:exit_sync ~is_error:exit_sync ~is_aborted:exit_sync e; ( try Unixqueue.run e#event_system with Exit_sync -> () ); match e#state with | `Done v -> v | `Error e -> raise e | `Aborted -> failwith "Aborted" | `Working _ -> assert false let announce_e ep = if ep.announce_notification <> None then failwith "Netamqp_endpoint.announce_e: has already been called"; check_up ep; dlog "announce"; let data = proto_version_string ep.protocol in let frame = { frame_type = `Proto_header; frame_channel = 0; frame_payload = [ Netamqp_rtypes.mk_mstring data ] } in Queue.add frame ep.out_prio_q; let eng, notify = Uq_engines.signal_engine ep.esys in ep.announce_notification <- Some notify; eng let announce_s ep = sync announce_e ep let async_c2s_e ep (m : async_client_to_server_method_t) d_opt ch = if ch < 0 || ch > 65535 then invalid_arg "Netamqp_endpoint.async_c2s_e: bad channel"; check_up ep; let q = try Hashtbl.find ep.out_q ch with Not_found -> failwith "Netamqp_endpoint.async_c2s_e: this channel is disabled" in dlogr (fun() -> sprintf "async_c2s_e ch=%d data=%B meth=%s" ch (d_opt <> None) (string_of_method (m :> method_t)) ); let (eng, signal) = Uq_engines.signal_engine ep.esys in let sg = function | None -> signal (`Done ()) | Some e -> signal (`Error e) in let frames = mk_frames ep (m :> method_t) d_opt ch in List.iter (fun fr -> Queue.add (fr,sg) q) frames; output_thread ep; eng let async_c2s_s ep m d_opt ch = sync (async_c2s_e ep m d_opt) ch let async_c2s ep m d_opt ch = ignore (async_c2s_e ep m d_opt ch) let sync_c2s_e ?no_wait ?(on_timeout = fun () -> ()) ep (m : sync_client_initiated_method_t) (d_opt : data option) ch tmo = if ch < 0 || ch > 65535 then invalid_arg "Netamqp_endpoint.sync_c2s: bad channel"; check_up ep; dlogr (fun() -> sprintf "sync_c2s request ch=%d data=%B meth=%s" ch (d_opt <> None) (string_of_method (m :> method_t)) ); let q = try Hashtbl.find ep.out_q ch with Not_found -> failwith "Netamqp_endpoint.sync_c2s: this channel is disabled" in let eng, notify = Uq_engines.signal_engine ep.esys in let sg = match no_wait with | None -> (fun _ -> ()) | Some nw -> (function | None -> notify (`Done (nw,None)) | Some e -> notify (`Error e) ) in let frames = mk_frames ep (m :> method_t) d_opt ch in List.iter (fun fr -> Queue.add (fr,sg) q) frames; let resp_mtypes = response_types_of_method (m :> method_t) in ( match no_wait with | None -> let tmo_g = Unixqueue.new_group ep.esys in let (reg : reg) = ( object method reg_callback (m : any_server_to_client_method_t) d_opt = dlogr (fun() -> sprintf "sync_c2s response ch=%d data=%B meth=%s" ch (d_opt <> None) (string_of_method (m :> method_t)) ); let m' = coerce_to_sync_server_to_client_method_t (m :> method_t) in notify(`Done(m',d_opt)) method reg_error err = dlogr (fun() -> sprintf "sync_c2s error ch=%d err=%s" ch (Netexn.to_string err) ); notify(`Error err) method reg_tmo_group = Some tmo_g method reg_once = true method reg_sync = true end ) in List.iter (fun resp_mtype -> let l = try Hashtbl.find ep.in_tab (ch,resp_mtype) with Not_found -> [] in Hashtbl.replace ep.in_tab (ch,resp_mtype) (reg :: l) ) resp_mtypes; if tmo >= 0.0 then ( Unixqueue.once ep.esys tmo_g tmo (fun () -> on_timeout(); error_regs_to_channel ep (Some ch) Timeout ) ) | Some nw -> () ); output_thread ep; eng let sync_c2s_s ?no_wait ?on_timeout ep m d_opt ch tmo = sync (sync_c2s_e ?no_wait ?on_timeout ep m d_opt ch) tmo let register_async_s2c ep (mtype : async_server_to_client_method_type_t) ch cb = if ch < 0 || ch > 65535 then invalid_arg "Netamqp_endpoint.register_async_s2c: bad channel"; check_up ~off_ok:true ep; if not(is_channel_enabled ep ch) then failwith "Netamqp_endpoint.register_async_s2c: this channel is disabled"; let reg = ( object method reg_callback m d_opt = dlogr (fun() -> sprintf "async_s2c ch=%d data=%B meth=%s" ch (d_opt <> None) (string_of_method (m :> method_t)) ); let m' = coerce_to_async_server_to_client_method_t (m :> method_t) in cb m' d_opt method reg_error _ = () method reg_tmo_group = None method reg_once = false method reg_sync = false end ) in let mtype' = (mtype :> any_server_to_client_method_type_t) in let l = try Hashtbl.find ep.in_tab (ch,mtype') with Not_found -> [] in Hashtbl.replace ep.in_tab (ch,mtype') (reg :: l) let register_sync_s2c ep (mtype : sync_server_initiated_method_type_t) ch cb post_action = if ch < 0 || ch > 65535 then invalid_arg "Netamqp_endpoint.register_sync_s2c: bad channel"; check_up ~off_ok:true ep; if not(is_channel_enabled ep ch) then failwith "Netamqp_endpoint.register_sync_s2c: this channel is disabled"; let reg = ( object method reg_callback m d_opt = dlogr (fun() -> sprintf "sync_s2c request ch=%d data=%B meth=%s" ch (d_opt <> None) (string_of_method (m :> method_t)) ); let m' = coerce_to_sync_server_initiated_method_t (m :> method_t) in let resp_opt = (cb m' : sync_client_to_server_method_t option) in match resp_opt with | None -> dlogr (fun () -> sprintf "sync_s2c no response ch=%d" ch ) | Some resp -> dlogr (fun() -> sprintf "sync_s2c response ch=%d meth=%s" ch (string_of_method (resp :> method_t)) ); let frames = mk_frames ep (resp :> method_t) None ch in List.iter (fun fr -> Queue.add fr ep.out_prio_q) frames; post_action(); output_thread ep method reg_error _ = () method reg_tmo_group = None method reg_once = false method reg_sync = false end ) in let mtype' = (mtype :> any_server_to_client_method_type_t) in let l = try Hashtbl.find ep.in_tab (ch,mtype') with Not_found -> [] in Hashtbl.replace ep.in_tab (ch,mtype') (reg :: l)