module Uq_engines:sig
..end
Unixqueue.event_system
, and do their task by
generating events for resources of the operating system, and
by handling such events. Engines are in one of four states: They
may be still working, they may be done, they may be
aborted, or they may be in an error state. The three latter
states a called final states, because they indicate that the
engine has stopped operation.
It is possible to ask an engine to notify another object when it changes its state. For simplicity, notification is done by invoking a callback function, and not by issuing notification events.
Effectively, engines provide a calculus for cooperative microthreading.
This calculus includes combinators for sequential execution and
synchronization. Moreover, it is easy to connect it with callback-style
microthreading - one can arrange callbacks when an engine is done, and
one can catch callbacks and turn them into engines.
exception Closed_channel
This exception should be regarded as equivalent to
Netchannels.Closed_channel
, but need not be the same exception.
exception Broken_communication
This exception is not raised, but used as argument of the `Error
state.
exception Watchdog_timeout
This exception is not raised, but used as argument of the `Error
state.
exception Timeout
input_engine
and output_engine
to indicate timeoutsexception Addressing_method_not_supported
client_socket_connector
and server_socket_acceptor
to
indicate that the passed address is not supported by the class.exception Cancelled
multiplex_controller
is invoked with this
exception if the operation is cancelled.type't
engine_state =[ `Aborted | `Done of 't | `Error of exn | `Working of int ]
't
:`Working n
: The engine is working. The number n
counts the number
of events that have been processed.`Done arg
: The engine has completed its task without errors.
The argument arg
is the result value of the engine`Error exn
: The engine has aborted because of an error. The
argument exn
describes the error as an exception.`Aborted
: The engine has aborted because the abort
method
was calledtype't
final_state =[ `Aborted | `Done of 't | `Error of exn ]
engine_state
without `Working
. These are only the final
states.val string_of_state : 'a engine_state -> string
class type['t]
engine =object
..end
class['t]
delegate_engine :'t #engine ->
['t]
engine
val when_state : ?is_done:('a -> unit) ->
?is_error:(exn -> unit) ->
?is_aborted:(unit -> unit) ->
?is_progressing:(int -> unit) -> 'a #engine -> unit
when_state
only observes future state changes.
If one of the functions raises an exception, this exception is
propagated to the caller of Unixqueue.run
.
is_done
: The state transitions to `Done
. The argument of
is_done
is the argument of the `Done
state.is_error
: The state transitions to `Error
. The argument of
is_error
is the argument of the `Error
state.is_aborted
: The state transitions to `Aborted
.is_progressing
: This function is called when the `Working
state changes. The int argument is the new `Working
arg.class['a]
signal_engine :Unixqueue.event_system ->
object
..end
let se = new signal_engine esys
: The engine se
remains in
`Working 0
until the method se # signal x
is called.
val signal_engine : Unixqueue.event_system ->
'a engine * ('a final_state -> unit)
let (se, signal) = signal_engine esys
: Same as functionclass[['a, 'b]]
map_engine :map_done:('a -> 'b engine_state) -> ?map_error:exn -> 'b engine_state -> ?map_aborted:unit -> 'b engine_state -> ?propagate_working:bool -> 'a #engine ->
['b]
engine
map_engine
observes the argument engine, and when the
state changes to `Done
, `Error
, or `Aborted
, the corresponding
mapping function is called, and the resulting state becomes the state
of the mapped engine.
val map_engine : map_done:('a -> 'b engine_state) ->
?map_error:(exn -> 'b engine_state) ->
?map_aborted:(unit -> 'b engine_state) ->
?propagate_working:bool -> 'a #engine -> 'b engine
class[['a, 'b]]
fmap_engine :'a #engine -> ('a final_state -> 'b final_state) ->
['b]
engine
map_engine
but different calling conventions: The
mapping function is called when the argument engine reaches a
final state, and this state can be mapped to another final state.
val fmap_engine : 'a #engine ->
('a final_state -> 'b final_state) ->
'b engine
After opening Uq_engines.Operators
, this is also available
as operator >>
, e.g.
e >>
(function
| `Done r -> ...
| `Error error -> ...
| `Aborted -> ...
)
class['a]
meta_engine :'a #engine ->
['a final_state]
engine
s
to `Done s
val meta_engine : 'a #engine -> 'a final_state engine
class['t]
epsilon_engine :'t engine_state -> Unixqueue.event_system ->
['t]
engine
`Working 0
in one
step ("epsilon time") to the passed constant state.
val epsilon_engine : 't engine_state -> Unixqueue.event_system -> 't engine
class[['a, 'b]]
seq_engine :'a #engine -> ('a -> 'b #engine) ->
['b]
engine
val seq_engine : 'a #engine ->
('a -> 'b #engine) -> 'b engine
After opening Uq_engines.Operators
, this is also available
as operator ++
, e.g.
e1 ++ (fun r1 -> e2)
(when e1
and e2
are engines, and r1
is the result of e1
).class[['a, 'b]]
qseq_engine :'a #engine -> ('a -> 'b #engine) ->
['b]
engine
val qseq_engine : 'a #engine ->
('a -> 'b #engine) -> 'b engine
seq_engine
, but this version does not
propagate working state (i.e. no progress reporting).
qseq_engine
should be preferred for recursive chains of engines.
class['a]
stream_seq_engine :'a -> ('a -> 'a #engine) Stream.t -> Unixqueue.event_system ->
['a]
engine
let se = new stream_seq_engine x0 s esys
: The constructed engine se
fetches functions f : 'a -> 'a #engine
from the stream s
, and
runs the engines obtained by calling these functions e = f x
one
after the other.
val stream_seq_engine : 'a ->
('a -> 'a #engine) Stream.t ->
Unixqueue.event_system -> 'a engine
class[['a, 'b]]
sync_engine :'a #engine -> 'b #engine ->
[('a * 'b)]
engine
`Done
(synchronization).
val sync_engine : 'a #engine -> 'b #engine -> ('a * 'b) engine
class[['a, 'b]]
msync_engine :'a #engine list -> ('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system ->
['b]
engine
let me = new msync_engine el f x0 esys
- Runs the engines in el
in
parallel, and waits until all are `Done
.
val msync_engine : 'a #engine list ->
('a -> 'b -> 'b) -> 'b -> Unixqueue.event_system -> 'b engine
class['a]
delay_engine :float -> (unit -> 'a #engine) -> Unixqueue.event_system ->
['a]
engine
let de = delay_engine d f esys
: The engine e = f()
is created
after d
seconds, and the result of e
becomes the result of de
.
val delay_engine : float ->
(unit -> 'a #engine) ->
Unixqueue.event_system -> 'a engine
class['a]
timeout_engine :float -> exn -> 'a engine ->
['a]
engine
timeout_engine d x e
: If the engine e
finishes within d
seconds, the result remains unchanged.
val timeout_engine : float -> exn -> 'a engine -> 'a engine
class watchdog :float -> 'a #engine ->
[unit]
engine
`Error Watchdog_timeout
.
val watchdog : float -> 'a #engine -> unit engine
class type['a]
serializer_t =object
..end
class['a]
serializer :Unixqueue.event_system ->
['a]
serializer_t
val serializer : Unixqueue.event_system -> 'a serializer_t
class type['a]
prioritizer_t =object
..end
p
can be executed.
class['a]
prioritizer :Unixqueue.event_system ->
['a]
prioritizer_t
val prioritizer : Unixqueue.event_system -> 'a prioritizer_t
class type['a]
cache_t =object
..end
class['a]
cache :(Unixqueue.event_system -> 'a engine) -> Unixqueue.event_system ->
['a]
cache_t
new cache f esys
: A cache that runs f esys
to obtain values
val cache : (Unixqueue.event_system -> 'a engine) ->
Unixqueue.event_system -> 'a cache_t
class['t]
engine_mixin :'t engine_state -> Unixqueue.event_system ->
object
..end
state
and
request_notification
.
module Operators:sig
..end
++
, >>
, and eps_e
class poll_engine :?extra_match:exn -> bool -> (Unixqueue.operation * float) list -> Unixqueue.event_system ->
object
..end
class['a]
input_engine :(Unix.file_descr -> 'a) -> Unix.file_descr -> float -> Unixqueue.event_system ->
['a]
engine
let e = new input_engine f fd tmo
- Waits until the file descriptor
becomes readable, and calls then let x = f fd
to read from the
descriptor.
class['a]
output_engine :(Unix.file_descr -> 'a) -> Unix.file_descr -> float -> Unixqueue.event_system ->
['a]
engine
let e = new output_engine f fd tmo
- Waits until the file descriptor
becomes writable, and calls then let x = f fd
to write to the
descriptor.
class poll_process_engine :?period:float -> pid:int -> Unixqueue.event_system ->
[Unix.process_status]
engine
Shell_uq
instead.
The module Uq_io
provides a bunch of functions to read and write
data via various "devices". All these functions return engines, and
are easy to use. Devices can be file descriptors, but also other
data structures. In particular, there is also support for buffered I/O
and for reading line-by-line from an input device.
Uq_io.copy_e
is a better choice.
The pure types async_in_channel
and async_out_channel
have been
proven to be useful for bridging with Netchannels
.
class type async_out_channel =object
..end
class type async_in_channel =object
..end
class pseudo_async_out_channel :#Netchannels.raw_out_channel ->
async_out_channel
Netchannels.raw_out_channel
as an asynchronous channel.
class pseudo_async_in_channel :#Netchannels.raw_in_channel ->
async_in_channel
Netchannels.raw_in_channel
as an asynchronous channel.
class receiver :src:Unix.file_descr -> dst:#async_out_channel -> ?close_src:bool -> ?close_dst:bool -> Unixqueue.event_system ->
[unit]
engine
src
file descriptor to the
dst
output channel.
class sender :src:#async_in_channel -> dst:Unix.file_descr -> ?close_src:bool -> ?close_dst:bool -> Unixqueue.event_system ->
[unit]
engine
src
input channel to the
dst
file descriptor.
class type async_out_channel_engine =object
..end
class type async_in_channel_engine =object
..end
class output_async_descr :dst:Unix.file_descr -> ?buffer_size:int -> ?close_dst:bool -> Unixqueue.event_system ->
async_out_channel_engine
async_out_channel
for the output
descriptor dst
.
class input_async_descr :src:Unix.file_descr -> ?buffer_size:int -> ?close_src:bool -> Unixqueue.event_system ->
async_in_channel_engine
typecopy_task =
[ `Bidirectional of Unix.file_descr * Unix.file_descr
| `Tridirectional of Unix.file_descr * Unix.file_descr * Unix.file_descr
| `Uni_socket of Unix.file_descr * Unix.file_descr
| `Unidirectional of Unix.file_descr * Unix.file_descr ]
copier
class has to do:
`Unidirectional(src,dst)
: Data from src
are copied to dst
.
EOF of src
causes that both descriptors are closed.`Uni_socket(src,dst)
: Data from src
are copied to dst
.
EOF of src
causes that dst
is shut down for sending; all descriptors
remain open. It is required that dst
is a socket.`Bidirectional(bi1,bi2)
: Data from bi1
are copied to bi2
,
and data from bi2
are copied to bi1
. EOF of one descriptor
causes that the other descriptor is shut down for sending.
When both descriptors are at EOF, both are closed.
It is required that bi1
and bi2
are sockets.`Tridirectional(bi,dst,src)
: Data from bi
are copied to dst
,
and data from src
are copied to bi
(i.e. a bidirectional
descriptor is split up into two unidirectional descriptors).
EOF of bi
causes that dst
is closed. EOF of src
causes
that bi
is shut down for sending. EOF in both directions
causes that all descriptors are closed. It is required that
bi
is a socket.class copier :copy_task -> Unixqueue.event_system ->
[unit]
engine
copy_task
argument.
typeinetspec =
[ `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int ]
typesockspec =
[ `Sock_inet of Unix.socket_type * Unix.inet_addr * int
| `Sock_inet_byname of Unix.socket_type * string * int
| `Sock_unix of Unix.socket_type * string ]
`Sock_unix(stype,path)
: Names the Unix domain socket at path
.
The socket type stype
is an auxiliary piece of information, but
not a distinguishing part of the name. path = ""
refers to
anonymous sockets. Otherwise, the path
must be an absolute path name.`Sock_inet(stype,addr,port)
: Names the Internet socket of type
stype
bound to the IP address addr
and the port
.
If stype = Unix.SOCK_STREAM
, a TCP socket is meant, and if
stype = Unix.SOCK_DGRAM
, a UDP socket is meant. It is allowed
that addr = Unix.inet_addr_any
. If port = 0
, the name is to
be considered as incomplete.`Sock_inet_byname(stype,name,port)
: Names the Internet socket of
type stype
bound to the IP address corresponding to the
name
, and bound to the port
. It is unspecified which naming
service is used to resolve name
to an IP address, and how it is
used. If the name
cannot be resolved, no socket is meant; this
is usually an error. stype
is interpreted as for `Sock_inet
.
If port = 0
, the name is to be considered as incomplete.val sockspec_of_sockaddr : Unix.socket_type -> Unix.sockaddr -> sockspec
val sockspec_of_socksymbol : Unix.socket_type -> Netsockaddr.socksymbol -> sockspec
Netsockaddr.socksymbol
to this formtypeconnect_address =
[ `Command of string * (int -> Unixqueue.event_system -> unit)
| `Socket of sockspec * connect_options
| `W32_pipe of Netsys_win32.pipe_mode * string ]
`Socket(addr,opts)
: Connect to the passed socket address`Command(cmd,handler)
: The cmd
is started with the shell,
and stdin
and stdout
are used to transfer data to the
process and from the process, respectively. Only SOCK_STREAM
type is supported. Note that the passed file descriptors are
normal pipes, not sockets (so the descriptors can be individually
closed).
There is not any kind of error detection, so the command should
be failsafe. stderr
of the command is connected with stderr
of
the caller process.
No provisions are taken to wait for the process; this is the
task of the caller. After the process has been started, the
handler
is invoked with the process ID and the event system
to give the caller a chance to arrange that the process will be
waited for.
`W32_pipe(mode,name)
: A Win32 named pipetype
connect_options = {
|
conn_bind : |
(* | Bind the connecting socket to this address (same family as the
connected socket required). None : Use an anonymous port. | *) |
val default_connect_options : connect_options
typeconnect_status =
[ `Command of Unix.file_descr * int
| `Socket of Unix.file_descr * sockspec
| `W32_pipe of Unix.file_descr ]
Uq_engines.connect_address
: An engine
connecting with an address `X will return a status of `X.
`Socket(fd,addr)
: fd
is the client socket connected with the
service. addr
is the socket address of the client that must be
used by the server to reach the client.`Command(fd, pid)
: fd
is the Unix domain socket connected with
the running command. pid
is the process ID.`W32_pipe fd
: fd
is the proxy descriptor of the connected
Win32 named pipe endpoint. See Netsys_win32
how to get the
w32_pipe
object to access the pipe. The proxy descriptor cannot
be used for I/O.val client_endpoint : connect_status -> Unix.file_descr
connect_status
val client_socket : connect_status -> Unix.file_descr
client_endpoint
class type client_endpoint_connector =object
..end
class type client_socket_connector = client_endpoint_connector
val connector : ?proxy:#client_socket_connector ->
connect_address ->
Unixqueue.event_system -> connect_status engine
connect_address
,
optionally using the proxy
, and changes to the state
`Done(status)
when the connection is established.
If the proxy
does not support the connect_address
, the class
will raise Addressing_method_not_supported
.
The descriptor fd
(part of the connect_status
) is in non-blocking mode,
and the close-on-exec flag is set.
It is the task of the caller to close this descriptor.
The engine attaches automatically to the event system, and detaches when it is possible to do so. This depends on the type of the connection method. For direct socket connections, the engine can often detach immediately when the conection is established. For proxy connections it is required that the engine copies data to and from the file descriptor. In this case, the engine detaches when the file descriptor is closed.
It is possible that name service queries block execution.
If name resolution fails, the engine will enter
`Error(Uq_resolver.Host_not_found name)
. This is new since
Ocamlnet-3.3 - before this version, the exception was simply
Not_found
.
connector
: This engine e
connects to the
"echo" service as provided by inetd, sends a line of data to it,
and awaits the response.
let e =
Uq_engines.connector
(`Socket(`Sock_inet_byname(Unix.SOCK_STREAM, "localhost", 7),
Uq_engines.default_connect_options))
esys
++ (fun cs ->
match cs with
| `Socket(fd,_) ->
let mplex =
Uq_engines.create_multiplex_controller_for_connected_socket
~supports_half_open_connection:true
fd esys in
let d_unbuf = `Multiplex mplex in
let d = `Buffer_in(Uq_io.create_in_buffer d_unbuf) in
Uq_io.output_string_e d_unbuf "This is line1\n"
++ (fun () ->
Uq_io.input_line_e d
++ (fun s ->
print_endline s;
eps_e (`Done()) esys
)
)
| _ -> assert false
)
typelisten_address =
[ `Socket of sockspec * listen_options
| `W32_pipe of Netsys_win32.pipe_mode * string * listen_options ]
`Socket(addr,opts)
: It is listened on a socket with address addr
`W32_pipe(mode,name,opts)
: It is listened on a pipe server with
name
which accepts pipe connections in mode
.type
listen_options = {
|
lstn_backlog : |
(* | The length of the queue of not yet accepted connections. | *) |
|
lstn_reuseaddr : |
(* | Whether to allow that the address can be immediately reused after the previous listener has its socket shut down. (Only for Internet sockets.) | *) |
val default_listen_options : listen_options
class type server_endpoint_acceptor =object
..end
class type server_socket_acceptor = server_endpoint_acceptor
class direct_acceptor :?close_on_shutdown:bool -> ?preclose:unit -> unit -> Unix.file_descr -> Unixqueue.event_system ->
server_endpoint_acceptor
server_endpoint_acceptor
for sockets and Win32
named pipes.
class direct_socket_acceptor :Unix.file_descr -> Unixqueue.event_system ->
server_endpoint_acceptor
class type server_endpoint_listener =object
..end
class type server_socket_listener = server_endpoint_listener
val listener : ?proxy:#server_socket_listener ->
listen_address ->
Unixqueue.event_system -> server_socket_acceptor engine
listen_address
.
If passed, the proxy
is used to create the server socket.
On success, the engine goes to state `Done acc
, where acc
is
the acceptor object (see above). The acceptor object can be used
to accept incoming connections.
typedatagram_type =
[ `Inet6_udp | `Inet_udp | `Unix_dgram ]
`Unix_dgram
: Datagrams over Unix domain sockets`Inet_udp
: Internet v4 UDP protocol`Inet6_udp
: Internet v6 UDP protocolclass type wrapped_datagram_socket =object
..end
wrapped_datagram_socket
allows datagrams to be sent via proxies.
class type datagram_socket_provider =object
..end
wrapped_datagram_socket
objects.
val datagram_provider : ?proxy:#datagram_socket_provider ->
datagram_type ->
Unixqueue.event_system ->
wrapped_datagram_socket engine
datagram_type
,
optionally using proxy
for sending and receiving datagrams.
The socket is unconnected.
The socket is in non-blocking mode, and the close-on-exec flag is
set.
class type multiplex_controller =object
..end
multiplex_controller
is a quite low-level device to abstract
bidirectional socket connections.
exception Mem_not_supported
start_mem_reading
and
start_mem_writing
if these methods are not supported for the kind
of file descriptorval create_multiplex_controller_for_connected_socket : ?close_inactive_descr:bool ->
?preclose:(unit -> unit) ->
?supports_half_open_connection:bool ->
?timeout:float * exn ->
Unix.file_descr ->
Unixqueue.unix_event_system -> multiplex_controller
Note that the file descriptor is not closed when the attached engines
are terminated. One can call inactivate
manually to do that.
close_inactive_descr
: Whether inactivate
closes the descriptor.
True by default.
preclose
: This function is called just before the descriptor is
closed.
supports_half_open_connection
: This implementation does not know
how to find out whether the socket supports half-open connections.
You can simply set this boolean because of this. Defaults to false
.
You can set it to true
for TCP connections and for Unix-domain
connections with stream semantics.
timeout
: If set to (t, x)
, a general timeout of t
is set.
When an operation has been started, and there is no I/O activity within
t
seconds, neither by the started operation nor by another operation,
the connection times out. In this case, the operation returns the
exception x
.
class type datagram_multiplex_controller =object
..end
val create_multiplex_controller_for_datagram_socket : ?close_inactive_descr:bool ->
?preclose:(unit -> unit) ->
?timeout:float * exn ->
Unix.file_descr ->
Unixqueue.unix_event_system -> datagram_multiplex_controller
Note that the file descriptor is not closed when the attached engines
are terminated. One can call inactivate
manually to do that.
close_inactive_descr
: Whether inactivate
closes the descriptor.
True by default.
preclose
: This function is called just before the descriptor is
closed.
timeout
: If set to (t, x)
, a general timeout of t
is set.
When an operation has been started, and there is no I/O activity within
t
seconds, neither by the started operation nor by another operation,
the connection times out. In this case, the operation returns the
exception x
.
typeonshutdown_out_spec =
[ `Action of
async_out_channel_engine ->
multiplex_controller -> unit engine_state -> unit
| `Ignore
| `Initiate_shutdown ]
output_async_mplex
for explanationstypeonshutdown_in_spec =
[ `Action of
async_in_channel_engine ->
multiplex_controller -> unit engine_state -> unit
| `Ignore
| `Initiate_shutdown ]
input_async_mplex
for explanationsclass output_async_mplex :?onclose:[ `Ignore | `Write_eof ] -> ?onshutdown:onshutdown_out_spec -> ?buffer_size:int -> multiplex_controller ->
async_out_channel_engine
output_async_descr
for the corresponding
class writing to a single descriptor).
class input_async_mplex :?onshutdown:onshutdown_in_spec -> ?buffer_size:int -> multiplex_controller ->
async_in_channel_engine
open Uq_engines.Operators (* for ">>" and "++" *)
let fd =
Unix.openfile filename [Unix.O_RDONLY] 0 in
let d =
`Buffer_in(Uq_io.create_in_buffer(`Polldescr(`Read_write,fd,esys))) in
let rec read_lines acc =
Uq_io.input_line_e d >>
(function (* catch exception End_of_file *)
| `Done line -> `Done(Some line)
| `Error End_of_file -> `Done None
| `Error error -> `Error error
| `Aborted -> `Aborted
) ++
(function
| Some line ->
read_lines (line :: acc)
| None ->
eps_e (`Done (List.rev acc)) esys
) in
let e = read_lines []
There is generally the question whether this style leads to stack overflows. This depends on the mechanisms that come into play:
Also note another difference: The event queue mechanism allows that other asynchronous code attached to the same event queue may run (control maybe yielded to unrelated execution contexts). The pure engine mechanism does not allow that. This may be handy when exclusive access to variables is needed. (But be careful here - this is very sensitive to minimal changes of the implementation.)
Certain engines enforce using the event queue mechanisms although they
are unrelated to I/O. Especially Uq_engines.delay_engine
is
useful here: A "delay" of 0 seconds is already sufficient to
go back to the event queue. If recursions sometimes lead to
stack overflows the solution is to include such a zero delay
before doing the self call.
Rpc_proxy.ManagedClient.rpc_engine
allows
to call an RPC via an engine. When the call is done, the engine transitions
to `Done r
, and r
is the result of the remote call.Shell_uq.call_engine
allows to start an
external program, and to monitor it via an engine.module Debug:sig
..end