Plasma GitLab Archive
Projects Blog Knowledge

Module Uq_engines

module Uq_engines: sig .. end

An engine performs a certain task in an autonomous way. Engines are attached to a Unixqueue.event_system, and do their task by generating events for resources of the operating system, and by handling such events. Engines are in one of four states: They may be still working, they may be done, they may be aborted, or they may be in an error state. The three latter states a called final states, because they indicate that the engine has stopped operation.

It is possible to ask an engine to notify another object when it changes its state. For simplicity, notification is done by invoking a callback function, and not by issuing notification events.

Effectively, engines provide a calculus for cooperative microthreading. This calculus includes combinators for sequential execution and synchronization. Moreover, it is easy to connect it with callback-style microthreading - one can arrange callbacks when an engine is done, and one can catch callbacks and turn them into engines.


exception Closed_channel

Raised when a method of a closed channel object is called (only channel methods count).

This exception should be regarded as equivalent to Netchannels.Closed_channel, but need not be the same exception.

exception Broken_communication

Some engines indicate this error when they cannot continue because the other endpoint of communication signals an error.

This exception is not raised, but used as argument of the `Error state.

exception Watchdog_timeout

Used by the watchdog engine to indicate a timeout.

This exception is not raised, but used as argument of the `Error state.

exception Timeout

Used by input_engine and output_engine to indicate timeouts

exception Addressing_method_not_supported

Raised by client_endpoint_connector and server_endpoint_acceptor to indicate that the passed address is not supported by the class.

exception Cancelled

The callback function of a multiplex_controller is invoked with this exception if the operation is cancelled.

Engine definition

type 't engine_state = [ `Aborted | `Done of 't | `Error of exn | `Working of int ] 

The type of states with result values of type 't:

  • `Working n: The engine is working. The number n counts the number of events that have been processed.
  • `Done arg: The engine has completed its task without errors. The argument arg is the result value of the engine
  • `Error exn: The engine has aborted because of an error. The argument exn describes the error as an exception.
  • `Aborted: The engine has aborted because the abort method was called
type 't final_state = [ `Aborted | `Done of 't | `Error of exn ] 

Same as engine_state without `Working. These are only the final states.

val string_of_state : 'a engine_state -> string

For debug purposes: Returns a string describing the state

class type ['t] engine = object .. end

This class type defines the interface an engine must support.

class ['t] delegate_engine : 't #engine -> ['t] engine

Turns an engine value into a class

Engines and callbacks

val when_state : ?is_done:('a -> unit) ->
?is_error:(exn -> unit) ->
?is_aborted:(unit -> unit) ->
?is_progressing:(int -> unit) -> 'a #engine -> unit

Watches the state of the argument engine, and arranges that one of the functions is called when the corresponding state change is done. Once a final state is reached, the engine is no longer watched. Note that when_state only observes future state changes.

If one of the functions raises an exception, this exception is propagated to the caller of

is_done : The state transitions to `Done. The argument of is_done is the argument of the `Done state.
is_error : The state transitions to `Error. The argument of is_error is the argument of the `Error state.
is_aborted : The state transitions to `Aborted.
is_progressing : This function is called when the `Working state changes. The int argument is the new `Working arg.
class ['a] signal_engine : Unixqueue.event_system -> object .. end

let se = new signal_engine esys: The engine se remains in `Working 0 until the method se # signal x is called.

val signal_engine : Unixqueue.event_system ->
'a engine * ('a final_state -> unit)

let (se, signal) = signal_engine esys: Same as function


The following combinators serve as the control structures to connect primitive engines with each other.

class [['a, 'b]] map_engine : map_done:('a -> 'b engine_state) -> ?map_error:exn -> 'b engine_state -> ?map_aborted:unit -> 'b engine_state -> ?propagate_working:bool -> 'a #engine -> ['b] engine

The map_engine observes the argument engine, and when the state changes to `Done, `Error, or `Aborted, the corresponding mapping function is called, and the resulting state becomes the state of the mapped engine.

val map_engine : map_done:('a -> 'b engine_state) ->
?map_error:(exn -> 'b engine_state) ->
?map_aborted:(unit -> 'b engine_state) ->
?propagate_working:bool -> 'a #engine -> 'b engine

Same as function

class [['a, 'b]] fmap_engine : 'a #engine -> ('a final_state -> 'b final_state) -> ['b] engine

Similar to map_engine but different calling conventions: The mapping function is called when the argument engine reaches a final state, and this state can be mapped to another final state.

val fmap_engine : 'a #engine ->
('a final_state -> 'b final_state) ->
'b engine

Same as function

After opening Uq_engines.Operators, this is also available as operator >>, e.g.

         e >>
             | `Done r -> ...
             | `Error error -> ...
             | `Aborted -> ...
class ['a] meta_engine : 'a #engine -> ['a final_state] engine

maps the final state s to `Done s

val meta_engine : 'a #engine -> 'a final_state engine

Same as function

class ['t] epsilon_engine : 't engine_state -> Unixqueue.event_system -> ['t] engine

This engine transitions from its initial state `Working 0 in one step ("epsilon time") to the passed constant state.

val epsilon_engine : 't engine_state -> Unixqueue.event_system -> 't engine

Same as function

class [['a, 'b]] seq_engine : 'a #engine -> ('a -> 'b #engine) -> ['b] engine

This engine runs two engines in sequential order.

val seq_engine : 'a #engine ->
('a -> 'b #engine) -> 'b engine

Same as function.

After opening Uq_engines.Operators, this is also available as operator ++, e.g.

 e1 ++ (fun r1 -> e2) 

(when e1 and e2 are engines, and r1 is the result of e1).

class [['a, 'b]] qseq_engine : 'a #engine -> ('a -> 'b #engine) -> ['b] engine
val qseq_engine : 'a #engine ->
('a -> 'b #engine) -> 'b engine

Almost the same as seq_engine, but this version does not propagate working state (i.e. no progress reporting).

qseq_engine should be preferred for recursive chains of engines.

class ['a] stream_seq_engine : 'a -> ('a -> 'a #engine) Stdlib.Stream.t -> Unixqueue.event_system -> ['a] engine

let se = new stream_seq_engine x0 s esys: The constructed engine se fetches functions f : 'a -> 'a #engine from the stream s, and runs the engines obtained by calling these functions e = f x one after the other.

val stream_seq_engine : 'a ->
('a -> 'a #engine) Stdlib.Stream.t ->
Unixqueue.event_system -> 'a engine

Same as function

class [['a, 'b]] sync_engine : 'a #engine -> 'b #engine -> [('a * 'b)] engine

This engine runs two engines in parallel, and waits until both are `Done (synchronization).

val sync_engine : 'a #engine -> 'b #engine -> ('a * 'b) engine

Same as function

class [['a, 'b]] msync_engine : 'a #engine list -> ('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system -> ['b] engine

Multiple synchronization: let me = new msync_engine el f x0 esys - Runs the engines in el in parallel, and waits until all are `Done.

val msync_engine : 'a #engine list ->
('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system -> 'b engine

Same as function

class ['a] delay_engine : float -> (unit -> 'a #engine) -> Unixqueue.event_system -> ['a] engine

let de = delay_engine d f esys: The engine e = f() is created after d seconds, and the result of e becomes the result of de.

val delay_engine : float ->
(unit -> 'a #engine) ->
Unixqueue.event_system -> 'a engine

Same as function

class ['a] timeout_engine : float -> exn -> 'a engine -> ['a] engine

timeout_engine d x e: If the engine e finishes within d seconds, the result remains unchanged.

val timeout_engine : float -> exn -> 'a engine -> 'a engine

Same as function

class watchdog : float -> 'a #engine -> [unit] engine

A watchdog engine checks whether the argument engine makes progress, and if there is no progress for the passed number of seconds, the engine is aborted, and the watchdog state changes to `Error Watchdog_timeout.

val watchdog : float -> 'a #engine -> unit engine

Same as function

class type ['a] serializer_t = object .. end

A serializer queues up engines, and starts the next engine when the previous one finishes.

class ['a] serializer : Unixqueue.event_system -> ['a] serializer_t

Creates a serializer

val serializer : Unixqueue.event_system -> 'a serializer_t

Same as function

class type ['a] prioritizer_t = object .. end

A prioritizer allows to prioritize the execution of engines: At any time, only engines of a certain priority p can be executed.

class ['a] prioritizer : Unixqueue.event_system -> ['a] prioritizer_t

Creates a prioritizer

val prioritizer : Unixqueue.event_system -> 'a prioritizer_t

Same as function

class type ['a] cache_t = object .. end

A cache contains a mutable value that is obtained by running an engine.

class ['a] cache : (Unixqueue.event_system -> 'a engine) -> Unixqueue.event_system -> ['a] cache_t

new cache f esys: A cache that runs f esys to obtain values

val cache : (Unixqueue.event_system -> 'a engine) ->
Unixqueue.event_system -> 'a cache_t

Same as function

class ['t] engine_mixin : 't engine_state -> Unixqueue.event_system -> object .. end

A useful class fragment that implements state and request_notification.

module Operators: sig .. end

Handy operators: ++, >>, and eps_e

Basic I/O engines

class poll_engine : ?extra_match:exn -> bool -> (Unixqueue.operation * float) list -> Unixqueue.event_system -> object .. end

This engine waits until one of the passed operations can be carried out, or until one of the operations times out.

class ['a] input_engine : (Unix.file_descr -> 'a) -> Unix.file_descr -> float -> Unixqueue.event_system -> ['a] engine

Generic input engine for reading from a file descriptor: let e = new input_engine f fd tmo - Waits until the file descriptor becomes readable, and calls then let x = f fd to read from the descriptor.

class ['a] output_engine : (Unix.file_descr -> 'a) -> Unix.file_descr -> float -> Unixqueue.event_system -> ['a] engine

Generic output engine for writing to a file descriptor: let e = new output_engine f fd tmo - Waits until the file descriptor becomes writable, and calls then let x = f fd to write to the descriptor.

class poll_process_engine : ?period:float -> pid:int -> Unixqueue.event_system -> [Unix.process_status] engine

This class is deprecated! Use the classes in Shell_uq instead.

More I/O

The module Uq_io provides a bunch of functions to read and write data via various "devices". All these functions return engines, and are easy to use. Devices can be file descriptors, but also other data structures. In particular, there is also support for buffered I/O and for reading line-by-line from an input device.


When programming with engines, it is normal to use recursion for any kind of loops. For example, to read the lines from a file:

      open Uq_engines.Operators  (* for ">>" and "++" *)

      let fd = 
        Unix.openfile filename [Unix.O_RDONLY] 0 in
      let d = 
        `Buffer_in(Uq_io.create_in_buffer(`Polldescr(`Read_write,fd,esys))) in

      let rec read_lines acc =
        Uq_io.input_line_e d >>
          (function                       (* catch exception End_of_file *)
            | `Done line -> `Done(Some line)
            | `Error End_of_file -> `Done None
            | `Error error -> `Error error
            | `Aborted -> `Aborted
          ) ++
            | Some line ->
                read_lines (line :: acc)
            | None ->
                eps_e (`Done (List.rev acc)) esys
          ) in

      let e = read_lines []

There is generally the question whether this style leads to stack overflows. This depends on the mechanisms that come into play:

  • The engine mechanism passing control from one engine to the next is not tail-recursive, and thus the stack can overflow when the recursion becomes too deep
  • The event queue mechanism, however, does not have this problem. Control falls automatically back to the event queue whenever I/O needs to be done.

In this example, this means that only the engine mechanism is used as long as the data is read from the buffer. When the buffer needs to be refilled, however, control is passed back to the event queue (so the stack is cleaned), and the continuation of the execution is only managed via closures (which only allocate memory on the heap, not on the stack). Usually, this is a good compromise: The engine mechnism is a lot faster, but I/O is an indicator for using the better but slower technique.

Also note another difference: The event queue mechanism allows that other asynchronous code attached to the same event queue may run (control maybe yielded to unrelated execution contexts). The pure engine mechanism does not allow that. This may be handy when exclusive access to variables is needed. (But be careful here - this is very sensitive to minimal changes of the implementation.)

Certain engines enforce using the event queue mechanisms although they are unrelated to I/O. Especially Uq_engines.delay_engine is useful here: A "delay" of 0 seconds is already sufficient to go back to the event queue. If recursions sometimes lead to stack overflows the solution is to include such a zero delay before doing the self call.

More Engines

Pointers to other modules related to engines:


OCamlnet-4.0 moves a number of definitions to the modules

For convenience, the types are still also exported here, but functions and classes are now defined in these modules. See also the module Uq_engines_compat.

class type async_out_channel = object .. end
class type async_in_channel = object .. end
class type async_out_channel_engine = object .. end
type copy_task = [ `Bidirectional of Unix.file_descr * Unix.file_descr
| `Tridirectional of Unix.file_descr * Unix.file_descr * Unix.file_descr
| `Uni_socket of Unix.file_descr * Unix.file_descr
| `Unidirectional of Unix.file_descr * Unix.file_descr ]
class type async_in_channel_engine = object .. end
class type multiplex_controller = object .. end

This definition has now been moved to Uq_multiplex.multiplex_controller

exception Mem_not_supported
class type datagram_multiplex_controller = object .. end
type onshutdown_out_spec = [ `Action of
async_out_channel_engine ->
multiplex_controller -> unit engine_state -> unit
| `Ignore
| `Initiate_shutdown ]
type onshutdown_in_spec = [ `Action of
async_in_channel_engine ->
multiplex_controller -> unit engine_state -> unit
| `Ignore
| `Initiate_shutdown ]
type inetspec = [ `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int ]
type sockspec = [ `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int
| `Sock_unix of Unix.socket_type * string ]
type connect_address = [ `Command of string * (int -> Unixqueue.event_system -> unit)
| `Socket of sockspec * connect_options
| `W32_pipe of Netsys_win32.pipe_mode * string ]
type connect_options = {
   conn_bind : sockspec option; (*

Bind the connecting socket to this address (same family as the connected socket required). None: Use an anonymous port.

type connect_status = [ `Command of Unix.file_descr * int
| `Socket of Unix.file_descr * sockspec
| `W32_pipe of Unix.file_descr ]
class type client_endpoint_connector = object .. end
type listen_address = [ `Socket of sockspec * listen_options
| `W32_pipe of Netsys_win32.pipe_mode * string * listen_options ]
type listen_options = {
   lstn_backlog : int;
   lstn_reuseaddr : bool;
class type server_endpoint_acceptor = object .. end
class type server_endpoint_listener = object .. end

Moved to Uq_server.server_endpoint_listener

type datagram_type = [ `Inet6_udp | `Inet_udp | `Unix_dgram ] 
class type wrapped_datagram_socket = object .. end
class type datagram_socket_provider = object .. end


module Debug: sig .. end
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml