Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: netamqp_channel.ml 53444 2011-03-10 14:08:13Z gerd $ *)

open Netamqp_types
open Netamqp_endpoint
open Uq_engines.Operators
open Printf

exception Not_open
exception Error of string

type channel = int

type channel_obj =
    { c : Netamqp_connection.connection;
      ep : Netamqp_endpoint.endpoint;
      ch : int;
      mutable tmo : float;
    }

let default_tmo = 60.0

let open_e c ch =
  let ep = Netamqp_connection.endpoint c in
  if not (Netamqp_connection.is_open c) then
    raise(Netamqp_connection.Not_open);

  (*
  if is_channel_enabled ep ch then
    failwith "Netamqp_channel.open_e: Channel is already open";
   *)
  
  let ch_obj =
    { c = c; ep = ep; ch = ch; tmo = default_tmo } in
  let esys =
    event_system ep in

  enable_channel ep ch;

  register_sync_s2c ep
    (`AMQP_0_9 `Channel_close)
    ch
    (fun m ->
       match m with
	 | `AMQP_0_9(`Channel_close(reply_code, reply_text,
				    class_id, meth_id)) ->
	     let msg =
	       sprintf "channel-close from server: \
                          code=%d text=%s class-id=%d meth-id=%d"
		 reply_code reply_text class_id meth_id in
	     Netlog.logf `Err "AMQP: %s" msg;
	     let proto = protocol ep in
	     let e =
	       create_method_exception
		 proto ~class_id ~meth_id ~reply_code ~reply_text in
	     propagate_error ep e (Some ch);
	     Some(`AMQP_0_9 `Channel_close_ok)
	 | _ ->
	     assert false
    )
    (fun () -> 
       disable_channel ep ch;
       (* This will not prevent the close-ok from being delivered because
	  sync_s2c puts the output messages onto the priority queue
	*)
    );

  register_sync_s2c ep
    (`AMQP_0_9 `Channel_flow)
    ch
    (fun m ->
       match m with
	 | `AMQP_0_9(`Channel_flow active) ->
	     flow_control ep ch active;
	     Some(`AMQP_0_9 (`Channel_flow active))
	 | _ ->
	     assert false
    )
    (fun () -> ());

  sync_c2s_e ep
    (`AMQP_0_9 (`Channel_open ""))
    None
    ch
    ch_obj.tmo
  ++ (fun (m,_) ->
	match m with
	  | `AMQP_0_9(`Channel_open_ok "") ->
	      eps_e (`Done ch_obj) esys
	  | _ ->
	      assert false
     )

let open_s c ch =
  sync (open_e c) ch
  

let open_next_e c =
  let ep = Netamqp_connection.endpoint c in
  open_e c (suggest_channel ep)


let open_next_s c =
  sync open_next_e c

let close_e co =
  let c = co.c in
  let ep = co.ep in
  if not (Netamqp_connection.is_open c) then
    raise(Netamqp_connection.Not_open);

  if not (is_channel_enabled ep co.ch) then raise Not_open;

  let esys = event_system ep in
  let e =
    sync_c2s_e ep
      ~on_timeout:(fun () ->
		     disable_channel ep co.ch
		  )
      (`AMQP_0_9 (`Channel_close(200, "OK", 0, 0)))
      None
      co.ch
      co.tmo
    ++ (fun (m, _) ->
	  match m with
	    | `AMQP_0_9(`Channel_close_ok) ->
		disable_channel ep co.ch;
		eps_e (`Done ()) esys
	    | _ ->
		assert false
       ) in
  e

let close_s co =
  sync close_e co

let is_open co =
  let c = co.c in
  let ep = co.ep in
  Netamqp_connection.is_open c && is_channel_enabled ep co.ch


let configure_sync_timeout co tmo =
  co.tmo <- tmo

let sync_c2s_e ?no_wait co meth d_opt =
  Netamqp_endpoint.sync_c2s_e
    ~on_timeout:(fun() ->
		   (* trigger channel closure *)
		   ignore(close_e co)
		)
    ?no_wait co.ep meth d_opt co.ch co.tmo

let sync_c2s_s ?no_wait co meth d_opt =
  sync (sync_c2s_e ?no_wait co meth) d_opt


let connection co = co.c
let endpoint co = co.ep
let event_system co = Netamqp_connection.event_system co.c
let number co = co.ch

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml