Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: netamqp_connection.ml 53444 2011-03-10 14:08:13Z gerd $ *)

open Netamqp_types
open Netamqp_endpoint
open Uq_engines.Operators
open Printf

exception Not_open
exception Error of string


type auth_method = string * string * string
    (* mech, username, password *)

type locale_preference =
    [ `Pref of string
    | `Select of string list -> string
    ]

type state =
    [ `Closed
    | `Start
    | `Start_ok
    | `Secure_ok
    | `Tune_ok
    | `Opened
    | `Close_requested
    ]

type connection =
    { ep : endpoint;
      mutable lp : locale_preference;
      mutable vhost : string;
      mutable state : state;
      mutable auth_l : auth_method list;
      mutable auth_s : string;
    }

let create ep =
  { ep = ep;
    lp = `Pref "";
    vhost = "";
    state = `Closed;
    auth_l = [];
    auth_s = ""
  }

let when_down ep f =
  let rec watch() =
    let e = state_change_e ep in
    Uq_engines.when_state
      ~is_done:(fun st ->
		  match st with
		    | `Disconnected | `Off | `Error _ ->
			f()
		    | _ ->
			watch()
	       )
      e in
  watch()


let null_uint4 =
  Rtypes.uint4_of_int 0

let open_e c auth_l lp vhost =
  if c.state <> `Closed then
    failwith "Netamqp_connection.open_e: can only open closed connections";
  c.lp <- lp;
  c.auth_l <- auth_l;
  c.vhost <- vhost;
  c.state <- `Start;

  connect c.ep;

  let esys = event_system c.ep in
  let tmo_g = Unixqueue.new_group esys in

  let handshake_e, signal_handshake = Uq_engines.signal_engine esys in

  let catch_error f =
    try Some(f())
    with
      | Error _ as err ->
	  signal_handshake (`Error err); None in

  (* Register the handler for the "start" method: *)
  register_sync_s2c
    c.ep (`AMQP_0_9 `Connection_start) 0 
    (fun start_meth ->
       match start_meth with
	 | `AMQP_0_9
	     (`Connection_start(v_maj, v_min, props, mechs_s, locales)) ->
	     catch_error
	       (fun () ->
		  if c.state <> `Start then
		    raise(Error "Not in `Start state");
		  Netlog.logf `Info
		    "AMQP: connection-start mechs=%s locales=%s"
		    mechs_s locales;
		  let mechs_l = Pcre.split mechs_s in
		  let (auth_s,user,pw) =
		    try
		      List.find
			(fun (n, _, _) -> List.mem n mechs_l)
			auth_l
		    with
		      | Not_found ->
			  raise(Error "No suitable authentication methods") in
		  c.auth_s <- auth_s;
		  let loc_l = Pcre.split locales in
		  let loc =
		    match lp with
		      | `Pref l ->
			  if List.mem l loc_l then l else
			    if l = "" then "C" else List.hd loc_l
		      | `Select f ->
			  f loc_l in
		  (* This is for PLAIN auth - we don't have anything else: *)
		  let resp = "\000" ^ user ^ "\000" ^ pw in
		  c.state <- `Start_ok;
		  let client_props = [] in
		  `AMQP_0_9(`Connection_start_ok
			      (client_props, auth_s, resp, loc))
	       )
	 | _ ->
	     assert false
    )
    (fun () ->
       ()
    );

  (* Register the handler for the "tune" method: *)
  register_sync_s2c
    c.ep (`AMQP_0_9 `Connection_tune) 0 
    (fun tune_meth ->
       match tune_meth with
	 | `AMQP_0_9(`Connection_tune(ch_max, frame_max, heartbeat)) ->
	     catch_error
	       (fun () ->
		  if c.state <> `Start_ok then
		    raise(Error "Not in `Start_ok state");
		  Netlog.logf `Info
		    "AMQP: connection-tune \
                     ch_max=%d frame_max=%Ld heartbeat=%d"
		    ch_max (Rtypes.int64_of_uint4 frame_max) heartbeat;
		  c.state <- `Tune_ok;
		  (* Maybe we have to lower the max frame size: *)
		  let mplex_eff_frame_max =
		    Rtypes.uint4_of_int (eff_max_frame_size c.ep) in
		  let eff_frame_max =
		    if frame_max = null_uint4 || 
		       (Rtypes.gt_uint4 frame_max mplex_eff_frame_max)
		    then
		      mplex_eff_frame_max
		    else
		      frame_max in
		  set_max_frame_size c.ep (Rtypes.int_of_uint4 eff_frame_max);
		  Netlog.logf `Info
		    "AMQP: connection-tune-ok frame_max=%Ld"
		    (Rtypes.int64_of_uint4 eff_frame_max);
		  `AMQP_0_9(`Connection_tune_ok(ch_max, eff_frame_max, 
						heartbeat))
	       )
	 | _ ->
	     assert false
    )
    (fun () ->
       (* After tune, we have to open: *)
       let open_e =
	 sync_c2s_e
	   c.ep (`AMQP_0_9 (`Connection_open(c.vhost, "", false))) 
	   None 0 300.0 in
       Uq_engines.when_state
	 ~is_done:(fun (resp_m, _) ->
		     match resp_m with
		       | `AMQP_0_9(`Connection_open_ok _) ->
			   c.state <- `Opened;
			   signal_handshake (`Done ())
		       | _ ->
			   assert false
		  )
	 ~is_error:(fun error -> signal_handshake(`Error error))
	 ~is_aborted:(fun _ -> signal_handshake(`Error(Error "Aborted")))
	 open_e
    );

  (* Handler for "close" *)
  register_sync_s2c
    c.ep (`AMQP_0_9 `Connection_close) 0
    (fun close_meth ->
       match close_meth with
	 | `AMQP_0_9(`Connection_close(reply_code, reply_text,
				       class_id, meth_id)) ->
	     let msg =
	       sprintf "connection-close from server: \
                        code=%d text=%s class-id=%d meth-id=%d"
		 reply_code reply_text class_id meth_id in
	     Netlog.logf `Err "AMQP: %s" msg;
	     (* We arrange now that the close-ok response can still be sent,
		and that the socket is closed after that. Any other activity
		is suppressed. Also, the error is first reported to the user
		when the socket is down.
	      *)
	     drop_frames c.ep;   (* Drop all incoming frames *)
	     clear_output c.ep;  (* Drop all pending output frames *)
	     expect_eof c.ep;    (* Permit EOF from server *)
	     c.state <- `Closed;
	     when_down c.ep
	       (fun () -> (* called back when the socket is down *)
		  let proto = protocol c.ep in
		  let e =
		    create_method_exception
		      proto ~class_id ~meth_id ~reply_code ~reply_text in
		  signal_handshake (`Error e);
		  abort_and_propagate_error c.ep e;
	       );
	     (* Reply this: *)
	     Some(`AMQP_0_9 `Connection_close_ok)
	 | _ ->
	     assert false
    )
    (fun () -> 
       (* Trigger now the disconnect, following directly after sending
	  close-ok
	*)
       quick_disconnect c.ep; 
    );

  (* Watch out for errors: *)
  listen_for_errors
    c.ep None
    (fun e ->
       Netlog.logf `Err
	 "AMQP: exception: %s" (Netexn.to_string e);
       signal_handshake (`Error e);  (* Maybe we are still handshaking *)
       true
    );

  let ann_e =
    announce_e c.ep in

  let all_e =
    Uq_engines.sync_engine ann_e handshake_e
    ++ (fun ((),()) -> 
	  Unixqueue.clear esys tmo_g;
	  eps_e (`Done ()) esys) in

  let timeout = ref false in
  Unixqueue.once esys tmo_g (get_timeout c.ep)
    (fun () ->
       timeout := true;
       all_e # abort();
    );

  all_e
  >> (function
	| `Aborted when !timeout -> `Error Timeout
	| other -> other
     )

let open_s c auth_l lp vhost =
  sync (open_e c auth_l lp) vhost
  

let is_open c =
  c.state = `Opened


let close_e c =
  if c.state <> `Opened then
    raise Not_open;
  
  c.state <- `Close_requested;

  let esys = event_system c.ep in
  let e =
    sync_c2s_e c.ep
      (`AMQP_0_9 (`Connection_close(200, "OK", 0, 0)))
      None
      0
      (get_timeout c.ep)
    ++ (fun (m, _) ->
	  match m with
	    | `AMQP_0_9(`Connection_close_ok) ->
		c.state <- `Closed;
		eps_e (`Done ()) esys
	    | _ ->
		assert false
       ) in

  expect_eof c.ep;
  disconnect c.ep;

  e


let close_s c =
  sync close_e c

let plain_auth user pw =
  ("PLAIN", user, pw)


let endpoint c = c.ep
let event_system c = Netamqp_endpoint.event_system c.ep

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml