Plasma GitLab Archive
Projects Blog Knowledge

Module Netamqp_basic


module Netamqp_basic: sig .. end
Sending and receiving messages via a queue

class type message = object .. end
val create_message : ?content_type:string ->
?content_encoding:string ->
?headers:Netamqp_rtypes.table ->
?delivery_mode:int ->
?priority:int ->
?correlation_id:string ->
?reply_to:string ->
?expiration:string ->
?message_id:string ->
?timestamp:float ->
?typ:string ->
?user_id:string ->
?app_id:string -> Xdr_mstring.mstring list -> message
Creates a new message. See the class type Netamqp_basic.message for documentation of the optional header fields.

The unnamed argument is the body.

val qos_e : channel:Netamqp_channel.channel_obj ->
?prefetch_size:int ->
?prefetch_count:int -> ?global:bool -> unit -> unit Uq_engines.engine
val qos_s : channel:Netamqp_channel.channel_obj ->
?prefetch_size:int -> ?prefetch_count:int -> ?global:bool -> unit -> unit
Sets the quality of service for the channel (or the whole connection if global=true).

  • prefetch_size: Sets the prefetch window in octets
  • prefetch_count: Sets the prefetch window in number of messages

val consume_e : channel:Netamqp_channel.channel_obj ->
queue:Netamqp_queue.queue_name ->
?consumer_tag:string ->
?no_local:bool ->
?no_ack:bool ->
?exclusive:bool ->
?no_wait:bool ->
?arguments:Netamqp_rtypes.table -> unit -> string Uq_engines.engine
val consume_s : channel:Netamqp_channel.channel_obj ->
queue:Netamqp_queue.queue_name ->
?consumer_tag:string ->
?no_local:bool ->
?no_ack:bool ->
?exclusive:bool ->
?no_wait:bool -> ?arguments:Netamqp_rtypes.table -> unit -> string
This method asks the server to start a "consumer", which is a transient request for messages from a specific queue. Consumers last as long as the channel they were declared on, or until the client cancels them.

Arguments:

  • queue: The queue to consume from
  • consumer_tag: Identifies the consumer. If empty or omitted, the server creates a unique identifier
  • no_local: Do not receive messages that were published over this connection
  • no_ack: Do not expect acknowledgements for consumed messages
  • exclusive: Request exclusive access to the queue
  • no_wait: whether not to wait for the response of the request. This is faster, but errors are not immediately reported, and automatically created consumer tags cannot be returned to the client.
  • arguments: Depends on the server implementation
Return value: The actual consumer tag
val cancel_e : channel:Netamqp_channel.channel_obj ->
consumer_tag:string -> ?no_wait:bool -> unit -> string Uq_engines.engine
val cancel_s : channel:Netamqp_channel.channel_obj ->
consumer_tag:string -> ?no_wait:bool -> unit -> string
Cancels the consumer identified by consumer_tag. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply.

Arguments:

  • consumer_tag: The consumer to cancel
  • no_wait: whether not to wait for the response of the request. This is faster, but errors are not immediately reported.
Return value: The tag of the actually cancelled consumer
val publish_e : channel:Netamqp_channel.channel_obj ->
exchange:Netamqp_exchange.exchange_name ->
routing_key:string ->
?mandatory:bool ->
?immediate:bool -> message -> unit Uq_engines.engine
val publish_s : channel:Netamqp_channel.channel_obj ->
exchange:Netamqp_exchange.exchange_name ->
routing_key:string ->
?mandatory:bool -> ?immediate:bool -> message -> unit
Publishes the passed message. Note that this is an async operation, and when this function is finished, this only means that the message has been written to the socket, but not more.

Arguments:

  • exchange: Name of exchange to send the message to
  • routing_key: The routing key for the exchange
  • mandatory: This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is false, the server silently drops the message.
  • immediate: This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is false, the server will queue the message, but with no guarantee that it will ever be consumed.

val on_return : channel:Netamqp_channel.channel_obj ->
cb:(reply_code:int ->
reply_text:string ->
exchange:Netamqp_exchange.exchange_name ->
routing_key:string -> message -> unit) ->
unit -> unit
Registers a handler so that cb is called back whenever a message is returned to the client. The handler remains active as long as the channel is open.

Arguments of the callback:

  • reply_code: Reason for return, as error code
  • reply_text: Reason for return, as error text
  • exchange: the exchange the message was originally published to. May be empty, meaning the default exchange.
  • routing_key: the routing key name specified when the message was published

val on_deliver : channel:Netamqp_channel.channel_obj ->
cb:(consumer_tag:string ->
delivery_tag:Rtypes.uint8 ->
redelivered:bool ->
exchange:Netamqp_exchange.exchange_name ->
routing_key:string -> message -> unit) ->
unit -> unit
Registers a handler so that cb is called back whenever a message is delivered to the client. The handler remains active as long as the channel is open.

Arguments of the callback:

  • consumer_tag: The name of the consumer to which this delivery refers to
  • delivery_tag: A unique name of this delivery. Needed for acknowledging the message
  • redelivered: indicates that the message has been previously delivered to this or another client
  • exchange: the exchange the message was originally published to. May be empty, meaning the default exchange.
  • routing_key: the routing key name specified when the message was published

type 'a get_message = out:(delivery_tag:Rtypes.uint8 ->
redelivered:bool ->
exchange:Netamqp_exchange.exchange_name ->
routing_key:string ->
message_count:Rtypes.uint4 -> message -> 'a) ->
unit -> 'a
The result of get is returned by providing this function to the caller. When this function is called with an out argument, it will immediately call out back with the result values. The return value of cb is the return value of get_message.

Arguments of out:

  • delivery_tag: A unique name of this delivery. Needed for acknowledging the message
  • redelivered: indicates that the message has been previously delivered to this or another client
  • exchange: the exchange the message was originally published to. May be empty, meaning the default exchange.
  • routing_key: the routing key name specified when the message was published
  • message_count: ?

type 'a get_result = [ `Empty | `Message of 'a get_message ] 
Responses of get can return a message, or indicate that the queue is empty
val get_e : channel:Netamqp_channel.channel_obj ->
queue:Netamqp_queue.queue_name ->
?no_ack:bool -> unit -> 'a get_result Uq_engines.engine
val get_s : channel:Netamqp_channel.channel_obj ->
queue:Netamqp_queue.queue_name ->
?no_ack:bool -> unit -> 'a get_result
Fetches a message synchronously from the queue.

Arguments:

  • queue: the name of the queue
  • no_ack: Do not expect acknowledgements for consumed messages
The result is made available via a function get_result. For example, to just get the message:

        let get_result = get_s ... () in
        match get_result with
          | `Empty -> failwith "No message on the queue"
          | `Message get_msg ->
              let message =
		  get_msg 
		    ~out:(fun ~delivery_tag ~redelivered ~exchange ~routing_key
			      ~message_count message ->
			    message
			 )
		    () in
                 ...
      

val ack_e : channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 -> ?multiple:bool -> unit -> unit Uq_engines.engine
val ack_s : channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 -> ?multiple:bool -> unit -> unit
Acknowledges the delivery of the message identified by delivery_tag. Note that this is an async operation, and when this function is finished, this only means that the request has been written to the socket, but not more.

Arguments:

  • multiple: If set, the delivery tag is treated as "up to and including", so that the client can acknowledge multiple messages with a single method.

val reject_e : channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 -> requeue:bool -> unit -> unit Uq_engines.engine
val reject_s : channel:Netamqp_channel.channel_obj ->
delivery_tag:Rtypes.uint8 -> requeue:bool -> unit -> unit
Rejects a message (instead of acknowledging it). Note that this is an async operation, and when this function is finished, this only means that the request has been written to the socket, but not more.

Arguments:

  • delivery_tag: identifies the message to reject
  • requeue: Requeue the message. The message is never sent again to this channel.

val recover_e : channel:Netamqp_channel.channel_obj ->
requeue:bool -> unit -> unit Uq_engines.engine
val recover_s : channel:Netamqp_channel.channel_obj -> requeue:bool -> unit -> unit
Redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered.

Arguments:

  • requeue: If this field is false, the message will be redelivered to the original recipient. If this bit is set, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.


recover-async is deprecated and omitted here
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml