Plasma GitLab Archive
Projects Blog Knowledge

Module Uq_engines

module Uq_engines: sig .. end
An engine performs a certain task in an autonomous way. Engines are attached to a Unixqueue.event_system, and do their task by generating events for resources of the operating system, and by handling such events. Engines are in one of four states: They may be still working, they may be done, they may be aborted, or they may be in an error state. The three latter states a called final states, because they indicate that the engine has stopped operation.

It is possible to ask an engine to notify another object when it changes its state. For simplicity, notification is done by invoking a callback function, and not by issuing notification events.

Effectively, engines provide a calculus for cooperative microthreading. This calculus includes combinators for sequential execution and synchronization. Moreover, it is easy to connect it with callback-style microthreading - one can arrange callbacks when an engine is done, and one can catch callbacks and turn them into engines.



Exceptions


exception Closed_channel
Raised when a method of a closed channel object is called (only channel methods count).

This exception should be regarded as equivalent to Netchannels.Closed_channel, but need not be the same exception.

exception Broken_communication
Some engines indicate this error when they cannot continue because the other endpoint of communication signals an error.

This exception is not raised, but used as argument of the `Error state.

exception Watchdog_timeout
Used by the watchdog engine to indicate a timeout.

This exception is not raised, but used as argument of the `Error state.

exception Timeout
Used by input_engine and output_engine to indicate timeouts
exception Addressing_method_not_supported
Raised by client_socket_connector and server_socket_acceptor to indicate that the passed address is not supported by the class.
exception Cancelled
The callback function of a multiplex_controller is invoked with this exception if the operation is cancelled.

Engine definition


type 't engine_state = [ `Aborted | `Done of 't | `Error of exn | `Working of int ] 
The type of states with result values of type 't:
  • `Working n: The engine is working. The number n counts the number of events that have been processed.
  • `Done arg: The engine has completed its task without errors. The argument arg is the result value of the engine
  • `Error exn: The engine has aborted because of an error. The argument exn describes the error as an exception.
  • `Aborted: The engine has aborted because the abort method was called

type 't final_state = [ `Aborted | `Done of 't | `Error of exn ] 
Same as engine_state without `Working. These are only the final states.
val string_of_state : 'a engine_state -> string
For debug purposes: Returns a string describing the state
class type ['t] engine = object .. end
This class type defines the interface an engine must support.
class ['t] delegate_engine : 't #engine -> ['t] engine
Turns an engine value into a class

Engines and callbacks


val when_state : ?is_done:('a -> unit) ->
?is_error:(exn -> unit) ->
?is_aborted:(unit -> unit) ->
?is_progressing:(int -> unit) -> 'a #engine -> unit
Watches the state of the argument engine, and arranges that one of the functions is called when the corresponding state change is done. Once a final state is reached, the engine is no longer watched. Note that when_state only observes future state changes.

If one of the functions raises an exception, this exception is propagated to the caller of Unixqueue.run.


is_done : The state transitions to `Done. The argument of is_done is the argument of the `Done state.
is_error : The state transitions to `Error. The argument of is_error is the argument of the `Error state.
is_aborted : The state transitions to `Aborted.
is_progressing : This function is called when the `Working state changes. The int argument is the new `Working arg.
class ['a] signal_engine : Unixqueue.event_system -> object .. end
let se = new signal_engine esys: The engine se remains in `Working 0 until the method se # signal x is called.
val signal_engine : Unixqueue.event_system ->
'a engine * ('a final_state -> unit)
let (se, signal) = signal_engine esys: Same as function

Combinators



The following combinators serve as the control structures to connect primitive engines with each other.
class [['a, 'b]] map_engine : map_done:('a -> 'b engine_state) -> ?map_error:exn -> 'b engine_state -> ?map_aborted:unit -> 'b engine_state -> ?propagate_working:bool -> 'a #engine -> ['b] engine
The map_engine observes the argument engine, and when the state changes to `Done, `Error, or `Aborted, the corresponding mapping function is called, and the resulting state becomes the state of the mapped engine.
val map_engine : map_done:('a -> 'b engine_state) ->
?map_error:(exn -> 'b engine_state) ->
?map_aborted:(unit -> 'b engine_state) ->
?propagate_working:bool -> 'a #engine -> 'b engine
Same as function
class [['a, 'b]] fmap_engine : 'a #engine -> ('a final_state -> 'b final_state) -> ['b] engine
Similar to map_engine but different calling conventions: The mapping function is called when the argument engine reaches a final state, and this state can be mapped to another final state.
val fmap_engine : 'a #engine ->
('a final_state -> 'b final_state) ->
'b engine
Same as function

After opening Uq_engines.Operators, this is also available as operator >>, e.g.

 
         e >>
           (function
             | `Done r -> ...
             | `Error error -> ...
             | `Aborted -> ...
           )
       

class ['a] meta_engine : 'a #engine -> ['a final_state] engine
maps the final state s to `Done s
val meta_engine : 'a #engine -> 'a final_state engine
Same as function
class ['t] epsilon_engine : 't engine_state -> Unixqueue.event_system -> ['t] engine
This engine transitions from its initial state `Working 0 in one step ("epsilon time") to the passed constant state.
val epsilon_engine : 't engine_state -> Unixqueue.event_system -> 't engine
Same as function
class [['a, 'b]] seq_engine : 'a #engine -> ('a -> 'b #engine) -> ['b] engine
This engine runs two engines in sequential order.
val seq_engine : 'a #engine ->
('a -> 'b #engine) -> 'b engine
Same as function.

After opening Uq_engines.Operators, this is also available as operator ++, e.g.

 e1 ++ (fun r1 -> e2) 
(when e1 and e2 are engines, and r1 is the result of e1).
class ['a] stream_seq_engine : 'a -> ('a -> 'a #engine) Stream.t -> Unixqueue.event_system -> ['a] engine
let se = new stream_seq_engine x0 s esys: The constructed engine se fetches functions f : 'a -> 'a #engine from the stream s, and runs the engines obtained by calling these functions e = f x one after the other.
val stream_seq_engine : 'a ->
('a -> 'a #engine) Stream.t ->
Unixqueue.event_system -> 'a engine
Same as function
class [['a, 'b]] sync_engine : 'a #engine -> 'b #engine -> [('a * 'b)] engine
This engine runs two engines in parallel, and waits until both are `Done (synchronization).
val sync_engine : 'a #engine -> 'b #engine -> ('a * 'b) engine
Same as function
class [['a, 'b]] msync_engine : 'a #engine list -> ('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system -> ['b] engine
Multiple synchronization: let me = new msync_engine el f x0 esys - Runs the engines in el in parallel, and waits until all are `Done.
val msync_engine : 'a #engine list ->
('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system -> 'b engine
Same as function
class ['a] delay_engine : float -> (unit -> 'a #engine) -> Unixqueue.event_system -> ['a] engine
let de = delay_engine d f esys: The engine e = f() is created after d seconds, and the result of e becomes the result of de.
val delay_engine : float ->
(unit -> 'a #engine) ->
Unixqueue.event_system -> 'a engine
Same as function
class ['a] timeout_engine : float -> exn -> 'a engine -> ['a] engine
timeout_engine d x e: If the engine e finishes within d seconds, the result remains unchanged.
val timeout_engine : float -> exn -> 'a engine -> 'a engine
Same as function
class watchdog : float -> 'a #engine -> [unit] engine
A watchdog engine checks whether the argument engine makes progress, and if there is no progress for the passed number of seconds, the engine is aborted, and the watchdog state changes to `Error Watchdog_timeout.
val watchdog : float -> 'a #engine -> unit engine
Same as function
class type ['a] serializer_t = object .. end
A serializer queues up engines, and starts the next engine when the previous one finishes.
class ['a] serializer : Unixqueue.event_system -> ['a] serializer_t
Creates a serializer
val serializer : Unixqueue.event_system -> 'a serializer_t
Same as function
class type ['a] prioritizer_t = object .. end
A prioritizer allows to prioritize the execution of engines: At any time, only engines of a certain priority p can be executed.
class ['a] prioritizer : Unixqueue.event_system -> ['a] prioritizer_t
Creates a prioritizer
val prioritizer : Unixqueue.event_system -> 'a prioritizer_t
Same as function
class type ['a] cache_t = object .. end
A cache contains a mutable value that is obtained by running an engine.
class ['a] cache : (Unixqueue.event_system -> 'a engine) -> Unixqueue.event_system -> ['a] cache_t
new cache f esys: A cache that runs f esys to obtain values
val cache : (Unixqueue.event_system -> 'a engine) ->
Unixqueue.event_system -> 'a cache_t
Same as function
class ['t] engine_mixin : 't engine_state -> Unixqueue.event_system -> object .. end
A useful class fragment that implements state and request_notification.
module Operators: sig .. end
Handy operators: ++, >>, and eps_e

Basic I/O engines


class poll_engine : ?extra_match:exn -> bool -> (Unixqueue.operation * float) list -> Unixqueue.event_system -> object .. end
This engine waits until one of the passed operations can be carried out, or until one of the operations times out.
class ['a] input_engine : (Unix.file_descr -> 'a) -> Unix.file_descr -> float -> Unixqueue.event_system -> ['a] engine
Generic input engine for reading from a file descriptor: let e = new input_engine f fd tmo - Waits until the file descriptor becomes readable, and calls then let x = f fd to read from the descriptor.
class ['a] output_engine : (Unix.file_descr -> 'a) -> Unix.file_descr -> float -> Unixqueue.event_system -> ['a] engine
Generic output engine for writing to a file descriptor: let e = new output_engine f fd tmo - Waits until the file descriptor becomes writable, and calls then let x = f fd to write to the descriptor.
class poll_process_engine : ?period:float -> pid:int -> Unixqueue.event_system -> [Unix.process_status] engine
This class is deprecated! Use the classes in Shell_uq instead.

More I/O

The module Uq_io provides a bunch of functions to read and write data via various "devices". All these functions return engines, and are easy to use. Devices can be file descriptors, but also other data structures. In particular, there is also support for buffered I/O and for reading line-by-line from an input device.

Transfer engines



Transfer engines copy data between file descriptors. This kind of engine is likely to be declared as deprecated in the future. If possible, one should use multiplex controllers (see below), and for copying streams the generic copier Uq_io.copy_e is a better choice.

The pure types async_in_channel and async_out_channel have been proven to be useful for bridging with Netchannels.

class type async_out_channel = object .. end
An asynchrounous output channel provides methods to output data to a stream descriptor.
class type async_in_channel = object .. end
An asynchrounous input channel provides methods to input data from a stream descriptor.
class pseudo_async_out_channel : #Netchannels.raw_out_channel -> async_out_channel
Takes a Netchannels.raw_out_channel as an asynchronous channel.
class pseudo_async_in_channel : #Netchannels.raw_in_channel -> async_in_channel
Takes a Netchannels.raw_in_channel as an asynchronous channel.
class receiver : src:Unix.file_descr -> dst:#async_out_channel -> ?close_src:bool -> ?close_dst:bool -> Unixqueue.event_system -> [unit] engine
This engine copies all data from the src file descriptor to the dst output channel.
class sender : src:#async_in_channel -> dst:Unix.file_descr -> ?close_src:bool -> ?close_dst:bool -> Unixqueue.event_system -> [unit] engine
This engine copies all data from the src input channel to the dst file descriptor.
class type async_out_channel_engine = object .. end
Combination of engine + async_out_channel
class type async_in_channel_engine = object .. end
Combination of engine + async_in_channel
class output_async_descr : dst:Unix.file_descr -> ?buffer_size:int -> ?close_dst:bool -> Unixqueue.event_system -> async_out_channel_engine
This engine implements an async_out_channel for the output descriptor dst.
class input_async_descr : src:Unix.file_descr -> ?buffer_size:int -> ?close_src:bool -> Unixqueue.event_system -> async_in_channel_engine
The corresponding class for asynchronous input channels.
type copy_task = [ `Bidirectional of Unix.file_descr * Unix.file_descr
| `Tridirectional of Unix.file_descr * Unix.file_descr * Unix.file_descr
| `Uni_socket of Unix.file_descr * Unix.file_descr
| `Unidirectional of Unix.file_descr * Unix.file_descr ]
Specifies the task the copier class has to do:

  • `Unidirectional(src,dst): Data from src are copied to dst. EOF of src causes that both descriptors are closed.
  • `Uni_socket(src,dst): Data from src are copied to dst. EOF of src causes that dst is shut down for sending; all descriptors remain open. It is required that dst is a socket.
  • `Bidirectional(bi1,bi2): Data from bi1 are copied to bi2, and data from bi2 are copied to bi1. EOF of one descriptor causes that the other descriptor is shut down for sending. When both descriptors are at EOF, both are closed. It is required that bi1 and bi2 are sockets.
  • `Tridirectional(bi,dst,src): Data from bi are copied to dst, and data from src are copied to bi (i.e. a bidirectional descriptor is split up into two unidirectional descriptors). EOF of bi causes that dst is closed. EOF of src causes that bi is shut down for sending. EOF in both directions causes that all descriptors are closed. It is required that bi is a socket.

class copier : copy_task -> Unixqueue.event_system -> [unit] engine
This engine copies data between file descriptors as specified by the copy_task argument.

Socket engines



Note that Win32 named pipes are also supported by the following API's, although they are not sockets. These pipes have a feature set comparable to Unix domain sockets.
type inetspec = [ `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int ]
type sockspec = [ `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int
| `Sock_unix of Unix.socket_type * string ]
Extended names for socket addresses. Currently, these naming schemes are supported:
  • `Sock_unix(stype,path): Names the Unix domain socket at path. The socket type stype is an auxiliary piece of information, but not a distinguishing part of the name. path = "" refers to anonymous sockets. Otherwise, the path must be an absolute path name.
  • `Sock_inet(stype,addr,port): Names the Internet socket of type stype bound to the IP address addr and the port. If stype = Unix.SOCK_STREAM, a TCP socket is meant, and if stype = Unix.SOCK_DGRAM, a UDP socket is meant. It is allowed that addr = Unix.inet_addr_any. If port = 0, the name is to be considered as incomplete.
  • `Sock_inet_byname(stype,name,port): Names the Internet socket of type stype bound to the IP address corresponding to the name, and bound to the port. It is unspecified which naming service is used to resolve name to an IP address, and how it is used. If the name cannot be resolved, no socket is meant; this is usually an error. stype is interpreted as for `Sock_inet. If port = 0, the name is to be considered as incomplete.
It is currently not possible to name IP sockets that are bound to several IP addresses but not all IP addresses of the host.
val sockspec_of_sockaddr : Unix.socket_type -> Unix.sockaddr -> sockspec
Converts a normal socket address to the extended form
val sockspec_of_socksymbol : Unix.socket_type -> Netsockaddr.socksymbol -> sockspec
Converts a Netsockaddr.socksymbol to this form

Client sockets


type connect_address = [ `Command of string * (int -> Unixqueue.event_system -> unit)
| `Socket of sockspec * connect_options
| `W32_pipe of Netsys_win32.pipe_mode * string ]
Specifies the service to connect to:

  • `Socket(addr,opts): Connect to the passed socket address
  • `Command(cmd,handler): The cmd is started with the shell, and stdin and stdout are used to transfer data to the process and from the process, respectively. Only SOCK_STREAM type is supported. Note that the passed file descriptors are normal pipes, not sockets (so the descriptors can be individually closed).

    There is not any kind of error detection, so the command should be failsafe. stderr of the command is connected with stderr of the caller process.

    No provisions are taken to wait for the process; this is the task of the caller. After the process has been started, the handler is invoked with the process ID and the event system to give the caller a chance to arrange that the process will be waited for.

  • `W32_pipe(mode,name): A Win32 named pipe

type connect_options = {
   conn_bind :sockspec option; (*Bind the connecting socket to this address (same family as the connected socket required). None: Use an anonymous port.*)
}
val default_connect_options : connect_options
Returns the default options
type connect_status = [ `Command of Unix.file_descr * int
| `Socket of Unix.file_descr * sockspec
| `W32_pipe of Unix.file_descr ]
This type corresponds with Uq_engines.connect_address: An engine connecting with an address `X will return a status of `X.

  • `Socket(fd,addr): fd is the client socket connected with the service. addr is the socket address of the client that must be used by the server to reach the client.
  • `Command(fd, pid): fd is the Unix domain socket connected with the running command. pid is the process ID.
  • `W32_pipe fd: fd is the proxy descriptor of the connected Win32 named pipe endpoint. See Netsys_win32 how to get the w32_pipe object to access the pipe. The proxy descriptor cannot be used for I/O.

val client_endpoint : connect_status -> Unix.file_descr
Returns the client endpoint contained in the connect_status
val client_socket : connect_status -> Unix.file_descr
For backward compatibility. Deprecated name for client_endpoint
class type client_endpoint_connector = object .. end
This class type provides engines to connect to a service.
class type client_socket_connector = client_endpoint_connector
For backward compatibility.
val connector : ?proxy:#client_socket_connector ->
connect_address ->
Unixqueue.event_system -> connect_status engine
This engine connects to a socket as specified by the connect_address, optionally using the proxy, and changes to the state `Done(status) when the connection is established.

If the proxy does not support the connect_address, the class will raise Addressing_method_not_supported.

The descriptor fd (part of the connect_status) is in non-blocking mode, and the close-on-exec flag is set. It is the task of the caller to close this descriptor.

The engine attaches automatically to the event system, and detaches when it is possible to do so. This depends on the type of the connection method. For direct socket connections, the engine can often detach immediately when the conection is established. For proxy connections it is required that the engine copies data to and from the file descriptor. In this case, the engine detaches when the file descriptor is closed.

It is possible that name service queries block execution.

If name resolution fails, the engine will enter `Error(Uq_resolver.Host_not_found name). This is new since Ocamlnet-3.3 - before this version, the exception was simply Not_found.


Example of using connector: This engine e connects to the "echo" service as provided by inetd, sends a line of data to it, and awaits the response.

	let e =
	  Uq_engines.connector
	    (`Socket(`Sock_inet_byname(Unix.SOCK_STREAM, "localhost", 7),
		     Uq_engines.default_connect_options))
	    esys
	  ++ (fun cs ->
		match cs with
		  | `Socket(fd,_) ->
		      let mplex =
			Uq_engines.create_multiplex_controller_for_connected_socket
			  ~supports_half_open_connection:true
			  fd esys in
		      let d_unbuf = `Multiplex mplex in
		      let d = `Buffer_in(Uq_io.create_in_buffer d_unbuf) in
		      Uq_io.output_string_e d_unbuf "This is line1\n"
		      ++ (fun () ->
			    Uq_io.input_line_e d 
			    ++ (fun s ->
				  print_endline s;
				  eps_e (`Done()) esys
			       )
			 )
		  | _ -> assert false
	     )
    


Server sockets


type listen_address = [ `Socket of sockspec * listen_options
| `W32_pipe of Netsys_win32.pipe_mode * string * listen_options ]
Specifies the resource to listen on:

  • `Socket(addr,opts): It is listened on a socket with address addr
  • `W32_pipe(mode,name,opts): It is listened on a pipe server with name which accepts pipe connections in mode.

type listen_options = {
   lstn_backlog :int; (*The length of the queue of not yet accepted connections.*)
   lstn_reuseaddr :bool; (*Whether to allow that the address can be immediately reused after the previous listener has its socket shut down. (Only for Internet sockets.)*)
}
val default_listen_options : listen_options
Returns the default options
class type server_endpoint_acceptor = object .. end
This class type is for service providers that listen for connections.
class type server_socket_acceptor = server_endpoint_acceptor
For backward compatibility.
class direct_acceptor : ?close_on_shutdown:bool -> ?preclose:unit -> unit -> Unix.file_descr -> Unixqueue.event_system -> server_endpoint_acceptor
An implementation of server_endpoint_acceptor for sockets and Win32 named pipes.
class direct_socket_acceptor : Unix.file_descr -> Unixqueue.event_system -> server_endpoint_acceptor
For backward compatibility.
class type server_endpoint_listener = object .. end
This class type represents factories for service providers
class type server_socket_listener = server_endpoint_listener
For backward compatibility.
val listener : ?proxy:#server_socket_listener ->
listen_address ->
Unixqueue.event_system -> server_socket_acceptor engine
This engine creates a server socket listening on the listen_address. If passed, the proxy is used to create the server socket.

On success, the engine goes to state `Done acc, where acc is the acceptor object (see above). The acceptor object can be used to accept incoming connections.


Datagrams


type datagram_type = [ `Inet6_udp | `Inet_udp | `Unix_dgram ] 
- `Unix_dgram: Datagrams over Unix domain sockets
  • `Inet_udp: Internet v4 UDP protocol
  • `Inet6_udp: Internet v6 UDP protocol

class type wrapped_datagram_socket = object .. end
A wrapped_datagram_socket allows datagrams to be sent via proxies.
class type datagram_socket_provider = object .. end
This is a factory for wrapped_datagram_socket objects.
val datagram_provider : ?proxy:#datagram_socket_provider ->
datagram_type ->
Unixqueue.event_system ->
wrapped_datagram_socket engine
This engine creates a datagram socket as demanded by the datagram_type, optionally using proxy for sending and receiving datagrams.

The socket is unconnected.

The socket is in non-blocking mode, and the close-on-exec flag is set.


Multiplex Controllers


class type multiplex_controller = object .. end
A multiplex_controller is a quite low-level device to abstract bidirectional socket connections.
exception Mem_not_supported
May be raised by multiplex controller methods start_mem_reading and start_mem_writing if these methods are not supported for the kind of file descriptor
val create_multiplex_controller_for_connected_socket : ?close_inactive_descr:bool ->
?preclose:(unit -> unit) ->
?supports_half_open_connection:bool ->
?timeout:float * exn ->
Unix.file_descr ->
Unixqueue.unix_event_system -> multiplex_controller
Creates a multiplex controller for a bidirectional socket (e.g. a TCP socket). It is essential that the socket is in connected state. This function also supports Win32 named pipes.

Note that the file descriptor is not closed when the attached engines are terminated. One can call inactivate manually to do that.

close_inactive_descr: Whether inactivate closes the descriptor. True by default.

preclose: This function is called just before the descriptor is closed.

supports_half_open_connection: This implementation does not know how to find out whether the socket supports half-open connections. You can simply set this boolean because of this. Defaults to false. You can set it to true for TCP connections and for Unix-domain connections with stream semantics.

timeout: If set to (t, x), a general timeout of t is set. When an operation has been started, and there is no I/O activity within t seconds, neither by the started operation nor by another operation, the connection times out. In this case, the operation returns the exception x.

class type datagram_multiplex_controller = object .. end
Additional methods for unconnected datagram handling
val create_multiplex_controller_for_datagram_socket : ?close_inactive_descr:bool ->
?preclose:(unit -> unit) ->
?timeout:float * exn ->
Unix.file_descr ->
Unixqueue.unix_event_system -> datagram_multiplex_controller
Creates a multiplex controller for datagram sockets (e.g. UDP socket).

Note that the file descriptor is not closed when the attached engines are terminated. One can call inactivate manually to do that.

close_inactive_descr: Whether inactivate closes the descriptor. True by default.

preclose: This function is called just before the descriptor is closed.

timeout: If set to (t, x), a general timeout of t is set. When an operation has been started, and there is no I/O activity within t seconds, neither by the started operation nor by another operation, the connection times out. In this case, the operation returns the exception x.

type onshutdown_out_spec = [ `Action of
async_out_channel_engine ->
multiplex_controller -> unit engine_state -> unit
| `Ignore
| `Initiate_shutdown ]
See class output_async_mplex for explanations
type onshutdown_in_spec = [ `Action of
async_in_channel_engine ->
multiplex_controller -> unit engine_state -> unit
| `Ignore
| `Initiate_shutdown ]
See class input_async_mplex for explanations
class output_async_mplex : ?onclose:[ `Ignore | `Write_eof ] -> ?onshutdown:onshutdown_out_spec -> ?buffer_size:int -> multiplex_controller -> async_out_channel_engine
Creates an asynchronous output channel writing to the multiplex controller (see also output_async_descr for the corresponding class writing to a single descriptor).
class input_async_mplex : ?onshutdown:onshutdown_in_spec -> ?buffer_size:int -> multiplex_controller -> async_in_channel_engine
Creates an asynchronous input channel reading from the multiplex controller.

Recursion



When programming with engines, it is normal to use recursion for any kind of loops. For example, to read the lines from a file:

      open Uq_engines.Operators  (* for ">>" and "++" *)

      let fd = 
        Unix.openfile filename [Unix.O_RDONLY] 0 in
      let d = 
        `Buffer_in(Uq_io.create_in_buffer(`Polldescr(`Read_write,fd,esys))) in

      let rec read_lines acc =
        Uq_io.input_line_e d >>
          (function                       (* catch exception End_of_file *)
            | `Done line -> `Done(Some line)
            | `Error End_of_file -> `Done None
            | `Error error -> `Error error
            | `Aborted -> `Aborted
          ) ++
          (function
            | Some line ->
                read_lines (line :: acc)
            | None ->
                eps_e (`Done (List.rev acc)) esys
          ) in

      let e = read_lines []
    

There is generally the question whether this style leads to stack overflows. This depends on the mechanisms that come into play:

  • The engine mechanism passing control from one engine to the next is not tail-recursive, and thus the stack can overflow when the recursion becomes too deep
  • The event queue mechanism, however, does not have this problem. Control falls automatically back to the event queue whenever I/O needs to be done.
In this example, this means that only the engine mechanism is used as long as the data is read from the buffer. When the buffer needs to be refilled, however, control is passed back to the event queue (so the stack is cleaned), and the continuation of the execution is only managed via closures (which only allocate memory on the heap, not on the stack). Usually, this is a good compromise: The engine mechnism is a lot faster, but I/O is an indicator for using the better but slower technique.

Also note another difference: The event queue mechanism allows that other asynchronous code attached to the same event queue may run (control maybe yielded to unrelated execution contexts). The pure engine mechanism does not allow that. This may be handy when exclusive access to variables is needed. (But be careful here - this is very sensitive to minimal changes of the implementation.)

Certain engines enforce using the event queue mechanisms although they are unrelated to I/O. Especially Uq_engines.delay_engine is useful here: A "delay" of 0 seconds is already sufficient to go back to the event queue. If recursions sometimes lead to stack overflows the solution is to include such a zero delay before doing the self call.

More Engines



Pointers to other modules related to engines:

  • RPC clients: The function Rpc_proxy.ManagedClient.rpc_engine allows to call an RPC via an engine. When the call is done, the engine transitions to `Done r, and r is the result of the remote call.
  • Subprograms: The class Shell_uq.call_engine allows to start an external program, and to monitor it via an engine.


Debugging


module Debug: sig .. end
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml