(* $Id$ *)
open Netamqp_types
open Netamqp_endpoint
open Uq_engines.Operators
open Printf
exception Not_open
exception Error of string
type auth_method = string * string * string
(* mech, username, password *)
type locale_preference =
[ `Pref of string
| `Select of string list -> string
]
type state =
[ `Closed
| `Start
| `Start_ok
| `Secure_ok
| `Tune_ok
| `Opened
| `Close_requested
]
type connection =
{ ep : endpoint;
mutable lp : locale_preference;
mutable vhost : string;
mutable state : state;
mutable auth_l : auth_method list;
mutable auth_s : string;
}
let create ep =
{ ep = ep;
lp = `Pref "";
vhost = "";
state = `Closed;
auth_l = [];
auth_s = ""
}
let when_down ep f =
let rec watch() =
let e = state_change_e ep in
Uq_engines.when_state
~is_done:(fun st ->
match st with
| `Disconnected | `Off | `Error _ ->
f()
| _ ->
watch()
)
e in
watch()
let split_re = Netstring_str.regexp "[ \t\r\n]+"
let split s =
Netstring_str.split split_re s
let null_uint4 =
Netnumber.uint4_of_int 0
let open_e c auth_l lp vhost =
if c.state <> `Closed then
failwith "Netamqp_connection.open_e: can only open closed connections";
c.lp <- lp;
c.auth_l <- auth_l;
c.vhost <- vhost;
c.state <- `Start;
connect c.ep;
let esys = event_system c.ep in
let tmo_g = Unixqueue.new_group esys in
let handshake_e, signal_handshake = Uq_engines.signal_engine esys in
let catch_error f =
try Some(f())
with
| Error _ as err ->
signal_handshake (`Error err); None in
(* Register the handler for the "start" method: *)
register_sync_s2c
c.ep (`AMQP_0_9 `Connection_start) 0
(fun start_meth ->
match start_meth with
| `AMQP_0_9
(`Connection_start(v_maj, v_min, props, mechs_s, locales)) ->
catch_error
(fun () ->
if c.state <> `Start then
raise(Error "Not in `Start state");
Netlog.logf `Info
"AMQP: connection-start mechs=%s locales=%s"
mechs_s locales;
let mechs_l = split mechs_s in
let (auth_s,user,pw) =
try
List.find
(fun (n, _, _) -> List.mem n mechs_l)
auth_l
with
| Not_found ->
raise(Error "No suitable authentication methods") in
c.auth_s <- auth_s;
let loc_l = split locales in
let loc =
match lp with
| `Pref l ->
if List.mem l loc_l then l else
if l = "" then "C" else List.hd loc_l
| `Select f ->
f loc_l in
(* This is for PLAIN auth - we don't have anything else: *)
let resp = "\000" ^ user ^ "\000" ^ pw in
c.state <- `Start_ok;
let client_props = [] in
`AMQP_0_9(`Connection_start_ok
(client_props, auth_s, resp, loc))
)
| _ ->
assert false
)
(fun () ->
()
);
(* Register the handler for the "tune" method: *)
register_sync_s2c
c.ep (`AMQP_0_9 `Connection_tune) 0
(fun tune_meth ->
match tune_meth with
| `AMQP_0_9(`Connection_tune(ch_max, frame_max, heartbeat)) ->
catch_error
(fun () ->
if c.state <> `Start_ok then
raise(Error "Not in `Start_ok state");
Netlog.logf `Info
"AMQP: connection-tune \
ch_max=%d frame_max=%Ld heartbeat=%d"
ch_max (Netnumber.int64_of_uint4 frame_max) heartbeat;
c.state <- `Tune_ok;
(* Maybe we have to lower the max frame size: *)
let mplex_eff_frame_max =
Netnumber.uint4_of_int (eff_max_frame_size c.ep) in
let eff_frame_max =
if frame_max = null_uint4 ||
(Netnumber.gt_uint4 frame_max mplex_eff_frame_max)
then
mplex_eff_frame_max
else
frame_max in
set_max_frame_size c.ep (Netnumber.int_of_uint4 eff_frame_max);
Netlog.logf `Info
"AMQP: connection-tune-ok frame_max=%Ld"
(Netnumber.int64_of_uint4 eff_frame_max);
`AMQP_0_9(`Connection_tune_ok(ch_max, eff_frame_max,
heartbeat))
)
| _ ->
assert false
)
(fun () ->
(* After tune, we have to open: *)
let open_e =
sync_c2s_e
c.ep (`AMQP_0_9 (`Connection_open(c.vhost, "", false)))
None 0 300.0 in
Uq_engines.when_state
~is_done:(fun (resp_m, _) ->
match resp_m with
| `AMQP_0_9(`Connection_open_ok _) ->
c.state <- `Opened;
signal_handshake (`Done ())
| _ ->
assert false
)
~is_error:(fun error -> signal_handshake(`Error error))
~is_aborted:(fun _ -> signal_handshake(`Error(Error "Aborted")))
open_e
);
(* Handler for "close" *)
register_sync_s2c
c.ep (`AMQP_0_9 `Connection_close) 0
(fun close_meth ->
match close_meth with
| `AMQP_0_9(`Connection_close(reply_code, reply_text,
class_id, meth_id)) ->
let msg =
sprintf "connection-close from server: \
code=%d text=%s class-id=%d meth-id=%d"
reply_code reply_text class_id meth_id in
Netlog.logf `Err "AMQP: %s" msg;
(* We arrange now that the close-ok response can still be sent,
and that the socket is closed after that. Any other activity
is suppressed. Also, the error is first reported to the user
when the socket is down.
*)
drop_frames c.ep; (* Drop all incoming frames *)
clear_output c.ep; (* Drop all pending output frames *)
expect_eof c.ep; (* Permit EOF from server *)
c.state <- `Closed;
when_down c.ep
(fun () -> (* called back when the socket is down *)
let proto = protocol c.ep in
let e =
create_method_exception
proto ~class_id ~meth_id ~reply_code ~reply_text in
signal_handshake (`Error e);
abort_and_propagate_error c.ep e;
);
(* Reply this: *)
Some(`AMQP_0_9 `Connection_close_ok)
| _ ->
assert false
)
(fun () ->
(* Trigger now the disconnect, following directly after sending
close-ok
*)
quick_disconnect c.ep;
);
(* Watch out for errors: *)
listen_for_errors
c.ep None
(fun e ->
Netlog.logf `Err
"AMQP: exception: %s" (Netexn.to_string e);
signal_handshake (`Error e); (* Maybe we are still handshaking *)
true
);
let ann_e =
announce_e c.ep in
let all_e =
Uq_engines.sync_engine ann_e handshake_e
++ (fun ((),()) ->
Unixqueue.clear esys tmo_g;
eps_e (`Done ()) esys) in
let timeout = ref false in
Unixqueue.once esys tmo_g (get_timeout c.ep)
(fun () ->
timeout := true;
all_e # abort();
);
all_e
>> (function
| `Aborted when !timeout -> `Error Timeout
| other -> other
)
let open_s c auth_l lp vhost =
sync (open_e c auth_l lp) vhost
let is_open c =
c.state = `Opened
let close_e c =
if c.state <> `Opened then
raise Not_open;
c.state <- `Close_requested;
let esys = event_system c.ep in
let e =
sync_c2s_e c.ep
(`AMQP_0_9 (`Connection_close(200, "OK", 0, 0)))
None
0
(get_timeout c.ep)
++ (fun (m, _) ->
match m with
| `AMQP_0_9(`Connection_close_ok) ->
c.state <- `Closed;
eps_e (`Done ()) esys
| _ ->
assert false
) in
expect_eof c.ep;
disconnect c.ep;
e
let close_s c =
sync close_e c
let plain_auth user pw =
("PLAIN", user, pw)
let endpoint c = c.ep
let event_system c = Netamqp_endpoint.event_system c.ep