This document tries to explain briefly what you can do with the
Equeue library. It is more superficial than Equeue_intro
, and
gives some recipes how to do things.
Contents:
We talk here about a form of concurrent programming, sometimes called lightweight threading or cooperative threading. As for all concurrency mechanisms, the ultimate goal is to do several things in parallel. This type, however, focuses especially on I/O, because the points where the execution threads can be switched are usually the points where a thread needs to stop for waiting on an I/O resource.
There is no preemption: When normal OCaml code is executed, there is no possibility to switch the thread. First when the control is returned to the Equeue library, a different thread can be selected for execution.
There is also no parallelism: All execution happens in the context of the process or kernel thread running the code. It is only possible to use one CPU core.
Note that LWT is another popular (and younger) implementation of the same idea for OCaml. It is possible to use LWT together with Ocamlnet, but there are restrictions. We'll explain this later in this article.
You will often see the type
Unixqueue.event_system
in signatures of functions. An event system bundles all resources that are needed to run cooperative threads, like watched file descriptors, handlers for file events, and timers. It is the common anchor point for all activities that will happen concurrently:
event_system
,
it is possible to run them concurrently.event_system
objects in your program.
However, once you attach cooperative threads to different event
systems, they cannot run together anymore.event_system
object is usually passed on from one function call
to the next. There is no global event system. (NB. If you develop for
Netplex, there is a pseudo-global event system for every Netplex container.
But this just means that you can define your own global event system if
you need it for your application.)
How to create an event system:
let esys = Unixqueue.standard_event_system()
An older name of the same is Unixqueue.create_unix_event_system
.
There is also a second implementation which uses accelerated poll interfaces if provided by the operating system (e.g. epoll on Linux):
let esys = Unixqueue.performance_event_system()
This, however, is only an advantage if you have hundreds of file descriptors to observe.
How to attach actions to event systems:
The abstraction of event systems defines an API allowing one to interact
with it. This is available in the Unixqueue
module. Normally, however,
you don't use this module directly, because it is very low-level.
So, let's look directly at high-level interfaces. For example, the
Http_client
uses event systems internally, and one can also control
this aspect of it. When creating an Http_client.pipeline
object,
just set the event system to the one you want to use. This attaches
the whole HTTP protocol interpreter implemented by this object to the
event system:
let pipeline = new Http_client.pipeline
let () = pipeline # set_event_system esys
Note that you can attach other pipelines and even unrelated, other I/O actions to the same event system. This just means, as mentioned above, that these actions are done concurrently.
The HTTP pipeline is initially empty, i.e. it does nothing. Before something can happen, you need to program it, i.e. add tasks to do. For example:
pipeline # add_with_callback
(new Http_client.get "http://caml.inria.fr/")
(fun get -> ...)
The add_with_callback
method adds the HTTP task to run to the internal
queue. Also, there is a callback function which gets invoked when the
task is done.
If you enter the shown OCaml code into a toploop, you will notice that no I/O occurs so far. Adding something to the internal task queue does not yet trigger that it is executed. This is meant as a feature of all Equeue-based concurrency: You have the chance to set the machinery up before it starts running.
This example showed how to deal with HTTP clients. What about other network protocols? The scheme is always the same: The event system object needs to be passed down to the protocol interpreter, either directly at creation time, or directly after that.
How to run event systems
The remaining question is now how to start the execution after everything is set up. This is normally done with
Unixqueue.run esys
This single statement starts whatever action was previously configured, and returns first when the action is completely finished. In our example this means it covers the whole HTTP GET protocol.
It is allowed to modify the scene while something is already happening. For example, you could download a second HTTP file when the first is done:
pipeline # add_with_callback
(new Http_client.get "http://caml.inria.fr/")
(fun get1 ->
pipeline # add_with_callback
(new Http_client.get "http://www.camlcity.org/")
(fun get2 -> ...)
)
These "in-place" modifications of what to do are not only allowed at points like the shown where a part of the action is already complete, but at any point in time. For example, you can define a timer that starts the other action, no matter at which point of execution the running action currently is:
pipeline # add_with_callback
(new Http_client.get "http://caml.inria.fr/")
(fun get1 -> ...);
let g = Unixqueue.new_group esys
Unixqueue.once esys g time
(fun () ->
pipeline # add_with_callback
(new Http_client.get "http://www.camlcity.org/")
(fun get2 -> ...)
)
After time
seconds the second download is started. (NB. What is the
purpose of g
? A Unixqueue group can be used for cancelling all actions
associated to the group. In this case for cancelling the timer.)
The HTTP client provides an API style where the completion of an action is indicated to the user via a callback. This style is easy to use for beginners, but it has a drawback: There is no uniform way how to compose more elementary actions to more complex actions. Such composition is possible as shown in the example, but it is always an ad-hoc solution.
Recursion is your friend
Let's have a look at such an ad-hoc composition: Assumed we have a list of URLs we want to download them with high concurrency.
Idea 1: We just add all URLs to the same pipeline, as in
let count = ref 0
List.iter
(fun url ->
pipeline # add_with_callback
(new Http_client.get url)
(fun get ->
decr count;
if !count = 0 then ... (* do some followup action here *)
);
incr count
)
list
and then run the esys
. This works, but the "intelligence" of the
HTTP pipeline object is only limited. If there are several files to
download from the same server, the pipeline is able to manage to use
only a limited number of connections to do this, and to serialize the
requests over these connections. There is, however, no built-in
mechanism that would limit the number of servers to contact at
once. If you had one million different servers in this list, the
pipeline would try to download from all servers concurrently. Of
course, this will fail (lacking system resources).
Idea 2: We only add a limited number of URLs at a time.
let count = ref 0
let count_max = 10
let list = ref list
let rec maybe_next() =
if !count < count_max then (
match !list with
| [] -> ()
| url :: list' ->
pipeline # add_with_callback
(new Http_client.get url)
(fun get ->
decr count;
maybe_next();
if !count = 0 then ... (* followup action *)
);
incr count;
list := list';
maybe_next()
)
We use here recursion to encode the repetitive algorithm. This is the mechanism of choice, because we need to continue the loop from the callback function (an imperative construct could not do so).
Note that recursive calls from callback functions do not fill up the stack, so you could do this endlessly without risking a stack overflow.
Trap: do not mix synchronous and event-based APIs
Many protocol interpreters provide both styles of APIs: Conventional synchronous APIs, and event-based APIs. The question is whether one can mix them.
This is not possible, because the synchronous API is normally derived from the event-based API by setting up a one-time action in the event system and then running the event system. If you mixed the APIs, it would occur that a running event system is again tried to be run. This is forbidden, though, and will cause that an exception is thrown.
So: Use the same instance of the protocol interpreter either in a synchronous way, or in an event-based way, but do not do both.
As we have seen, callbacks are a common way to notify the caller about state changes. However, callbacks are too primitive to allow systematic composition of actions. The abstraction of engines has been developed to fill this gap. As a first approximation, imagine an engine as a wrapped callback interface: a machinery which is executing something concurrently until the result is available, with the possibility of notifying users of the result.
Continuing the HTTP example, there is also an engine-based version of adding a request to the pipeline:
let e = pipeline # add_e (new Http_client.get url)
This is the same as add_with_callback
only that the delivery mechanism
is different.
It is possible to attach a callback to an engine:
Uq_engines.when_state
~is_done:(fun () -> ...)
~is_error:(fun ex -> ...)
e
The first function is called when the HTTP request could be processed
successfully, and the second one when a fatal error occurs (with ex
being the exception). Using Uq_engines.when_state
, every engine-style
interface can be turned into a callback-style interface. Of course, this
is not the primary idea of this abstraction, but this possibility means
that we can go to the lower level of callbacks whenever needed.
The three most important composition operators are available in
Uq_engines.Operators
. It is suggested that you open
this module,
and use the operators directly:
e1 ++ (fun r1 -> e2)
the engine e1
is executed first, and when it has computed the result
r1
, the engine e2
is started. The result of ++
is again an engine,
so it is possible to concatenate several expressions:
e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ...
One can also set the parentheses differently if the previous results
are needed later:
e1 ++ (fun r1 -> e2 ++ (fun r2 -> e3 ++ ... ))
The ++
operator is also available as normal function:
Uq_engines.seq_engine
e1 >> (fun st1 -> st2)
one can map the final state of e1
to a different state. The state
of the engine is either the computed value, the resulting exception,
or the tag that the engine was aborted:
e1 >>
(function
| `Done v -> ...
| `Error ex -> ...
| `Aborted -> ...
)
As you can also have access to exceptions, this construction can be
used to catch exceptions, and to transform them into normal values:
e1 >>
(function
| `Done s -> `Done(Some s)
| `Error Not_found -> `Done None
| `Error ex -> `Error ex
| `Aborted -> `Aborted
)
v
into an engine, use
eps_e (`Done v) esys
or more generally
eps_e st esys
to encapsulate any state. The eps_e
makes an engine out of a value
by pretending that the value is computed in a single step (the
epsilon step).
let e =
pipeline # add_e
(new Http_client.get "http://caml.inria.fr/")
++ (fun get1 ->
pipeline # add_e
(new Http_client.get "http://www.camlcity.org/")
++ (fun get2 ->
...;
eps_e (`Done()) pipeline#event_system
)
)
Note that the final result is here just ()
, and it is transformed with
eps_e
into an engine-compatible shape.
Getting results out of an engine-based algorithm
As engines use event systems internally, the constructed complex
engine (like e
in the previous example) is not immediately started,
but first when the event system runs (unless the event system is
already running). So you still finally need
Unixqueue.run esys
to fire up the prepared engines, and to wait for the result.
You may wonder how to access the result. In the previous example, the
result was just ()
, so there is no interest in knowing it. But you
could also just return what you have got, as in
let e =
pipeline # add_e
(new Http_client.get "http://caml.inria.fr/")
++ (fun get1 ->
pipeline # add_e
(new Http_client.get "http://www.camlcity.org/")
++ (fun get2 ->
eps_e (`Done(get1, get2)) pipeline#event_system
)
)
and the question is how to get the pair (get1,get2)
with the
downloaded files after e
is finished. This is in deed very simple -
after Unixqueue.run
returns, you can check for result values:
let st = e # state
Here, st
can again have the values
`Done x
if the engine has a final value x
(here our pair)`Error e
if the engine execution ended in an exception e
`Aborted
if the engine was articially stopped
`Working n
if the engine is not yet done, and n
is an integer
indicating the number of computation steps`Working
state is only visible if you query the state directly with
state
but not in the >>
operator.
Forking and joining concurrent threads built with engines
By concatenating elementary engines with ++
you basically create an
execution thread. As we are talking here about concurrent programming,
the question is how to fork a new thread off of an existing one, and
how to join again with the created thread once it is finished.
Forking is very simple: Just have several expressions, e.g.
let e1 = <expression using ++>
let e2 = <expression using ++>
Now e1
and e2
run concurrently.
For joining, use the function Uq_engines.sync_engine
:
let e_joined =
Uq_engines.sync_engine e1 e2
The engine e_joined
is first finished when both e1
and e2
are
finished. The result value of e_joined
is the pair of the results of
e1
and e2
, e.g.
e_joined ++ (fun (r1,r2) -> ...)
There is also a version of sync_engine
which can join any number of
engines: Uq_engines.msync_engine
. We use it in the engine-based version
of the download example:
let count_max = 10
let download_e list =
let list = ref list in
let rec download_thread_e() =
match !list with
| [] -> eps_e (`Done ()) esys
| url :: list' ->
list := list';
pipeline # add_e (new Http_client.get url)
++ (fun get ->
download_thread_e()
) in
let rec fork_e k =
if k < count_max then
let e = download_thread_e() in
e :: fork_e (k+1)
else
[] in
let e_list = fork_e 0 in
Uq_engines.msync_engine
e_list
(fun _ () -> ())
()
esys
The function download_thread_e
downloads documents sequentially from the
HTTP servers. The URLs are fetched from the variable list
. In order to
get concurrency, count_max
of these threads are started by the
fork_e
recursion. The result is e_list
, a list of all concurrently
running engines. Finally, msync_engine
is used to wait until all of these
threads are finished. msync_engine
works like a fold operator, and
aggregates the result values via the argument function. Here, the threads
only return ()
as results, so aggregation is pointless.
In Uq_engines
:
This module contains the basic definition of engines, Uq_engines.engine
,
plus a number of combinators for engines:
Uq_engines.seq_engine
, also backing the ++
operatorUq_engines.map_engine
and Uq_engines.fmap_engine
. The
latter is backing the >>
operatorUq_engines.signal_engine
. The engine stops and
waits until a signal
function is called.Uq_engines.meta_engine
. Errors are lifted into the
normal value space.Uq_engines.stream_seq_engine
. Folding over a stream of
values (Stream
module), and evaluation of the fold function as engine.Uq_engines.sync_engine
and Uq_engines.msync_engine
Uq_engines.delay_engine
suspends the execution n secondsUq_engines.timeout_engine
gives an engine a maximum time
for computations, and if the time is exceeded the engine is aborted.Uq_engine.watchdog
monitors whether an engine makes
progress, and if it is idle for too long, it is aborted.Uq_engines.serializer
forces that an engine function
is serialized - the next engine can first start when the previous is
finished. Uq_engines.prioritizer
is an advanced version where the
waiting engines can be prioritized.Uq_engines.cache
obtains a lazily computed
value by running an engine.
Uq_engines.poll_engine
waits until a
file descriptor is ready for an I/O operationUq_engines.input_engine
and Uq_engines.output_engine
can be used to define I/O engines by wrapping read
and write
Uq_engines.copier
copies data between file descriptors
without looking at the dataFor sockets, we find:
Uq_engines.connector
creates a new socket and
connects it to a serverUq_engines.listener
creates a server socket
and allows it to process the accepted connections with engines.
In Uq_io
:
This module contains functions for buffered I/O:
Uq_io.input_e
reads data from a deviceUq_io.really_input_e
Uq_io.input_line_e
and Uq_io.input_lines_e
Uq_io.output_e
writes data to a deviceUq_io.really_output_e
. There are also
variants for writing strings and buffersUq_io.write_eof_e
Uq_io.copy_e
Uq_engines.multiplex_controller
), or an asynchronous channel.
The definition of devices is extensible.
Other basics:
Uq_resolver
Uq_socks5
Uq_mt
Rpc_client
Rpc_server
Http_client
Nethttpd_engine
Ftp_client
Telnet_client
We have, top-to-bottom:
Equeue
).Netsys_pollset
) are event loops, i.e. it is waited for
file descriptor conditions.select
, poll
, epoll
etc.)
Aborting engines: Unlike other types of threads, cooperative threads
can be aborted at any time. Use the abort
method:
e # abort()
The engine e
will then enter the `Aborted
state as soon as possible.
Often this will happen immediately, but there are also engines where this
takes some time.
Exceptions: There are several ways to signal exceptions. First, the orderly way:
eps_e (`Error x) esys
This creates an engine that represents an exception as final state. Because
exceptions cannot always handled that cleanly, the basic combinators like
++
always catch exceptions, and represent these exceptions in their final
state. For example:
e1 ++ (fun r1 -> raise x)
The ++
operator catches the exception, and the state transitions to
`Error x
(just as the eps_e
example would do).
Note that this behavior relates to engines only. If you program event systems directly, there will be no automatic exception handling. For example
Unixqueue.once esys g 1.0 (fun () -> raise x)
does not catch x
in any way. The effect is that the exception falls
through to the caller, which is always Unixqueue.run
.
How to jump out of event processing: The just mentioned way of raising an exception can be used to leave event processing. Just define
exception Esys_exit
and throw it like
Unixqueue.once esys g 0.0 (fun () -> raise Esys_exit)
and catch it like
try
Unixqueue.run esys
with
| Esys_exit -> ...
This works always, even if the call of Unixqueue.once
is inside an
engine expression.
Users who are familiar with LWT will certainly recognize many operations - although they often have another name. (You may wonder why both implementations exist - well, a longer story, but essentially Ocamlnet has older roots, and at a certain state of the development it only knew event systems but not yet engines. The LWT developers saw this, and found this insufficient, and developed LWT. Unfortunately, they did not base their new library on Ocamlnet, but chose to reimplement the event loop core. In the meantime, the Ocamlnet development fixed the deficiencies in their library. Now we have two good libraries for the same range of problems.)
Compare:
eps_e
vs. Lwt.return
and Lwt.fail
Uq_engines.signal_engine
vs. Lwt.wait
Uq_engines.seq_engines
vs. Lwt.bind
Uq_engines.sync_engine
vs. Lwt.join
, Lwt.choose
, and
Lwt.pick
Unixqueue.run
vs. Lwt_main.run
Lwt_main.run
. In contrast,
Equeue prefers to first start at Unixqueue.run
time.Lwt.return
).
In Equeue we don't do - eps_e
creates an engine which will terminate
as soon as possible and which results in a constant value. Effectively,
this means that eps_e
is seen as point where threads can be switched,
whereas this is not the case for Lwt.return
. This gives more
opportunities for switching, but there are also more subtle consequences,
like who is the caller of suspended functions. In Equeue it is always
the core of the event loop, whereas this is not uniform in LWT.event_system
objects as needed. This
is mainly a restriction for programs using kernel threads: In Equeue,
every thread can have its own event_system
, but doing the same in
LWT is impossible (incompatibility with kernel threads on this level).Uq_lwt
helping here.
The idea is to replace the event loop built into LWT by the Ocamlnet
event loop. This is done for a given esys
by
class lwt_engine esys =
object
inherit Lwt_engine.abstract
inherit Uq_lwt.lwt_backend esys
end
Lwt_engine.set (new lwt_engine esys)
Additionally, it is required that you now always call Lwt_main.run
for
starting event-based programs, and not Unixqueue.run esys
. The latter
would not work for technical reasons. Of course, this means that the
main program will be LWT.
How to call engines from LWT threads: Assumed you are in some
LWT code, and want to call an engine-based function
f : 'a -> 'b engine
. This can be achieved by first simply calling
the function, and then using an LWT primitive to LWT-wait until the
engine is done:
let call_thread f x =
let e = f x in
let waiter, condition = Lwt.wait() in
Uq_engines.when_state
~is_done:(fun r -> Lwt.wakeup condition r)
~is_error:(fun x -> Lwt.wakeup_exn condition x)
e;
waiter
How to call LWT threads from engines: The reverse for a function
f : 'a -> 'b Lwt.thread
:
let call_e f x =
let thr = f x in
let waiter, signal = Uq_engines.signal_engine() in
Lwt.on_success thr (fun r -> signal (`Done r));
Lwt.on_failure thr (fun x -> signal (`Error x));
waiter