This document tries to explain briefly what you can do with the
Equeue library. It is more superficial than
gives some recipes how to do things.
We talk here about a form of concurrent programming, sometimes called lightweight threading or cooperative threading. As for all concurrency mechanisms, the ultimate goal is to do several things in parallel. This type, however, focuses especially on I/O, because the points where the execution threads can be switched are usually the points where a thread needs to stop for waiting on an I/O resource.
There is no preemption: When normal OCaml code is executed, there is no possibility to switch the thread. First when the control is returned to the Equeue library, a different thread can be selected for execution.
There is also no parallelism: All execution happens in the context of the process or kernel thread running the code. It is only possible to use one CPU core.
Note that LWT is another popular (and younger) implementation of the same idea for OCaml. It is possible to use LWT together with Ocamlnet, but there are restrictions. We'll explain this later in this article.
You will often see the type
in signatures of functions. An event system bundles all resources that are needed to run cooperative threads, like watched file descriptors, handlers for file events, and timers. It is the common anchor point for all activities that will happen concurrently:
event_system, it is possible to run them concurrently.
event_systemobjects in your program. However, once you attach cooperative threads to different event systems, they cannot run together anymore.
event_system object is usually passed on from one function call
to the next. There is no global event system. (NB. If you develop for
Netplex, there is a pseudo-global event system for every Netplex container.
But this just means that you can define your own global event system if
you need it for your application.)
How to create an event system:
let esys = Unixqueue.standard_event_system()
An older name of the same is
There is also a second implementation which uses accelerated poll interfaces if provided by the operating system (e.g. epoll on Linux):
let esys = Unixqueue.performance_event_system()
This, however, is only an advantage if you have hundreds of file descriptors to observe.
How to attach actions to event systems:
The abstraction of event systems defines an API allowing one to interact
with it. This is available in the
Unixqueue module. Normally, however,
you don't use this module directly, because it is very low-level.
So, let's look directly at high-level interfaces. For example, the
Nethttp_client uses event systems internally, and one can also control
this aspect of it. When creating an
just set the event system to the one you want to use. This attaches
the whole HTTP protocol interpreter implemented by this object to the
let pipeline = new Nethttp_client.pipeline let () = pipeline # set_event_system esys
Note that you can attach other pipelines and even unrelated, other I/O actions to the same event system. This just means, as mentioned above, that these actions are done concurrently.
The HTTP pipeline is initially empty, i.e. it does nothing. Before something can happen, you need to program it, i.e. add tasks to do. For example:
pipeline # add_with_callback (new Nethttp_client.get "http://caml.inria.fr/") (fun get -> ...)
add_with_callback method adds the HTTP task to run to the internal
queue. Also, there is a callback function which gets invoked when the
task is done.
If you enter the shown OCaml code into a toploop, you will notice that no I/O occurs so far. Adding something to the internal task queue does not yet trigger that it is executed. This is meant as a feature of all Equeue-based concurrency: You have the chance to set the machinery up before it starts running.
This example showed how to deal with HTTP clients. What about other network protocols? The scheme is always the same: The event system object needs to be passed down to the protocol interpreter, either directly at creation time, or directly after that.
How to run event systems
The remaining question is now how to start the execution after everything is set up. This is normally done with
This single statement starts whatever action was previously configured, and returns first when the action is completely finished. In our example this means it covers the whole HTTP GET protocol.
It is allowed to modify the scene while something is already happening. For example, you could download a second HTTP file when the first is done:
pipeline # add_with_callback (new Nethttp_client.get "http://caml.inria.fr/") (fun get1 -> pipeline # add_with_callback (new Nethttp_client.get "http://www.camlcity.org/") (fun get2 -> ...) )
These "in-place" modifications of what to do are not only allowed at points like the shown where a part of the action is already complete, but at any point in time. For example, you can define a timer that starts the other action, no matter at which point of execution the running action currently is:
pipeline # add_with_callback (new Nethttp_client.get "http://caml.inria.fr/") (fun get1 -> ...); let g = Unixqueue.new_group esys Unixqueue.once esys g time (fun () -> pipeline # add_with_callback (new Nethttp_client.get "http://www.camlcity.org/") (fun get2 -> ...) )
time seconds the second download is started. (NB. What is the
g? A Unixqueue group can be used for cancelling all actions
associated to the group. In this case for cancelling the timer.)
The HTTP client provides an API style where the completion of an action is indicated to the user via a callback. This style is easy to use for beginners, but it has a drawback: There is no uniform way how to compose more elementary actions to more complex actions. Such composition is possible as shown in the example, but it is always an ad-hoc solution.
Recursion is your friend
Let's have a look at such an ad-hoc composition: Assumed we have a list of URLs we want to download them with high concurrency.
Idea 1: We just add all URLs to the same pipeline, as in
let count = ref 0 List.iter (fun url -> pipeline # add_with_callback (new Nethttp_client.get url) (fun get -> decr count; if !count = 0 then ... (* do some followup action here *) ); incr count ) list
and then run the
esys. This works, but the "intelligence" of the
HTTP pipeline object is only limited. If there are several files to
download from the same server, the pipeline is able to manage to use
only a limited number of connections to do this, and to serialize the
requests over these connections. There is, however, no built-in
mechanism that would limit the number of servers to contact at
once. If you had one million different servers in this list, the
pipeline would try to download from all servers concurrently. Of
course, this will fail (lacking system resources).
Idea 2: We only add a limited number of URLs at a time.
let count = ref 0 let count_max = 10 let list = ref list let rec maybe_next() = if !count < count_max then ( match !list with |  -> () | url :: list' -> pipeline # add_with_callback (new Nethttp_client.get url) (fun get -> decr count; maybe_next(); if !count = 0 then ... (* followup action *) ); incr count; list := list'; maybe_next() )
We use here recursion to encode the repetitive algorithm. This is the mechanism of choice, because we need to continue the loop from the callback function (an imperative construct could not do so).
Note that recursive calls from callback functions do not fill up the stack, so you could do this endlessly without risking a stack overflow.
Trap: do not mix synchronous and event-based APIs
Many protocol interpreters provide both styles of APIs: Conventional synchronous APIs, and event-based APIs. The question is whether one can mix them.
This is not possible, because the synchronous API is normally derived from the event-based API by setting up a one-time action in the event system and then running the event system. If you mixed the APIs, it would occur that a running event system is again tried to be run. This is forbidden, though, and will cause that an exception is thrown.
So: Use the same instance of the protocol interpreter either in a synchronous way, or in an event-based way, but do not do both.
As we have seen, callbacks are a common way to notify the caller about state changes. However, callbacks are too primitive to allow systematic composition of actions. The abstraction of engines has been developed to fill this gap. As a first approximation, imagine an engine as a wrapped callback interface: a machinery which is executing something concurrently until the result is available, with the possibility of notifying users of the result.
Continuing the HTTP example, there is also an engine-based version of adding a request to the pipeline:
let e = pipeline # add_e (new Nethttp_client.get url)
This is the same as
add_with_callback only that the delivery mechanism
It is possible to attach a callback to an engine:
Uq_engines.when_state ~is_done:(fun () -> ...) ~is_error:(fun ex -> ...) e
The first function is called when the HTTP request could be processed
successfully, and the second one when a fatal error occurs (with
being the exception). Using
Uq_engines.when_state, every engine-style
interface can be turned into a callback-style interface. Of course, this
is not the primary idea of this abstraction, but this possibility means
that we can go to the lower level of callbacks whenever needed.
The three most important composition operators are available in
Uq_engines.Operators. It is suggested that you
open this module,
and use the operators directly:
e1 ++ (fun r1 -> e2)
e1is executed first, and when it has computed the result
r1, the engine
e2is started. The result of
++is again an engine, so it is possible to concatenate several expressions:
One can also set the parentheses differently if the previous results are needed later:
e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ...
e1 ++ (fun r1 -> e2 ++ (fun r2 -> e3 ++ ... ))
++operator is also available as normal function:
one can map the final state of
e1 >> (fun st1 -> st2)
e1to a different state. The state of the engine is either the computed value, the resulting exception, or the tag that the engine was aborted:
As you can also have access to exceptions, this construction can be used to catch exceptions, and to transform them into normal values:
e1 >> (function | `Done v -> ... | `Error ex -> ... | `Aborted -> ... )
e1 >> (function | `Done s -> `Done(Some s) | `Error Not_found -> `Done None | `Error ex -> `Error ex | `Aborted -> `Aborted )
vinto an engine, use
or more generally
eps_e (`Done v) esys
to encapsulate any state. The
eps_e st esys
eps_emakes an engine out of a value by pretending that the value is computed in a single step (the epsilon step).
Using this, the above example of downloading two files, one after the other, looks like:
let e = pipeline # add_e (new Nethttp_client.get "http://caml.inria.fr/") ++ (fun get1 -> pipeline # add_e (new Nethttp_client.get "http://www.camlcity.org/") ++ (fun get2 -> ...; eps_e (`Done()) pipeline#event_system ) )
Note that the final result is here just
(), and it is transformed with
eps_e into an engine-compatible shape.
Getting results out of an engine-based algorithm
As engines use event systems internally, the constructed complex
e in the previous example) is not immediately started,
but first when the event system runs (unless the event system is
already running). So you still finally need
to fire up the prepared engines, and to wait for the result.
You may wonder how to access the result. In the previous example, the
result was just
(), so there is no interest in knowing it. But you
could also just return what you have got, as in
let e = pipeline # add_e (new Nethttp_client.get "http://caml.inria.fr/") ++ (fun get1 -> pipeline # add_e (new Nethttp_client.get "http://www.camlcity.org/") ++ (fun get2 -> eps_e (`Done(get1, get2)) pipeline#event_system ) )
and the question is how to get the pair
(get1,get2) with the
downloaded files after
e is finished. This is in deed very simple -
Unixqueue.run returns, you can check for result values:
let st = e # state
st can again have the values
`Done xif the engine has a final value
x(here our pair)
`Error eif the engine execution ended in an exception
`Abortedif the engine was articially stopped
`Working nif the engine is not yet done, and
nis an integer indicating the number of computation steps
`Working state is only visible if you query the state directly with
state but not in the
Forking and joining concurrent threads built with engines
By concatenating elementary engines with
++ you basically create an
execution thread. As we are talking here about concurrent programming,
the question is how to fork a new thread off of an existing one, and
how to join again with the created thread once it is finished.
Forking is very simple: Just have several expressions, e.g.
let e1 = <expression using ++> let e2 = <expression using ++>
e2 run concurrently.
For joining, use the function
let e_joined = Uq_engines.sync_engine e1 e2
e_joined is first finished when both
finished. The result value of
e_joined is the pair of the results of
e_joined ++ (fun (r1,r2) -> ...)
There is also a version of
sync_engine which can join any number of
Uq_engines.msync_engine. We use it in the engine-based version
of the download example:
let count_max = 10 let download_e list = let list = ref list in let rec download_thread_e() = match !list with |  -> eps_e (`Done ()) esys | url :: list' -> list := list'; pipeline # add_e (new Nethttp_client.get url) ++ (fun get -> download_thread_e() ) in let rec fork_e k = if k < count_max then let e = download_thread_e() in e :: fork_e (k+1) else  in let e_list = fork_e 0 in Uq_engines.msync_engine e_list (fun _ () -> ()) () esys
download_thread_e downloads documents sequentially from the
HTTP servers. The URLs are fetched from the variable
list. In order to
count_max of these threads are started by the
fork_e recursion. The result is
e_list, a list of all concurrently
running engines. Finally,
msync_engine is used to wait until all of these
threads are finished.
msync_engine works like a fold operator, and
aggregates the result values via the argument function. Here, the threads
() as results, so aggregation is pointless.
This module contains the basic definition of engines,
plus a number of combinators for engines:
Uq_engines.seq_engine, also backing the
Uq_engines.fmap_engine. The latter is backing the
Uq_engines.signal_engine. The engine stops and waits until a
signalfunction is called.
Uq_engines.meta_engine. Errors are lifted into the normal value space.
Uq_engines.stream_seq_engine. Folding over a stream of values (
Streammodule), and evaluation of the fold function as engine.
Uq_engines.delay_enginesuspends the execution n seconds
Uq_engines.timeout_enginegives an engine a maximum time for computations, and if the time is exceeded the engine is aborted.
Uq_engines.serializerforces that an engine function is serialized - the next engine can first start when the previous is finished.
Uq_engines.prioritizeris an advanced version where the waiting engines can be prioritized.
Uq_engines.cacheobtains a lazily computed value by running an engine.
Also, the module defines basic I/O engines:
Uq_engines.poll_enginewaits until a file descriptor is ready for an I/O operation
Uq_engines.output_enginecan be used to define I/O engines by wrapping
Uq_client.connect_ecreates a new socket and connects it to a server
Uq_server.listenercreates a server socket and allows it to process the accepted connections with engines.
This module contains functions for buffered I/O:
Uq_io.input_ereads data from a device
Uq_io.output_ewrites data to a device
Uq_io.really_output_e. There are also variants for writing strings and buffers
A "device" is a file descriptor, a multiplex controller (see
Uq_engines.multiplex_controller), or an asynchronous channel.
The definition of devices is extensible.
Uq_transfer.copiercopies data between file descriptors without looking at the data
We have, top-to-bottom:
Netsys_pollset) are event loops, i.e. it is waited for file descriptor conditions.
Aborting engines: Unlike other types of threads, cooperative threads
can be aborted at any time. Use the
e # abort()
e will then enter the
`Aborted state as soon as possible.
Often this will happen immediately, but there are also engines where this
takes some time.
Exceptions: There are several ways to signal exceptions. First, the orderly way:
eps_e (`Error x) esys
This creates an engine that represents an exception as final state. Because
exceptions cannot always handled that cleanly, the basic combinators like
++ always catch exceptions, and represent these exceptions in their final
state. For example:
e1 ++ (fun r1 -> raise x)
++ operator catches the exception, and the state transitions to
`Error x (just as the
eps_e example would do).
Note that this behavior relates to engines only. If you program event systems directly, there will be no automatic exception handling. For example
Unixqueue.once esys g 1.0 (fun () -> raise x)
does not catch
x in any way. The effect is that the exception falls
through to the caller, which is always
How to jump out of event processing: The just mentioned way of raising an exception can be used to leave event processing. Just define
and throw it like
Unixqueue.once esys g 0.0 (fun () -> raise Esys_exit)
and catch it like
try Unixqueue.run esys with | Esys_exit -> ...
This works always, even if the call of
Unixqueue.once is inside an
Users who are familiar with LWT will certainly recognize many operations - although they often have another name. (You may wonder why both implementations exist - well, a longer story, but essentially Ocamlnet has older roots, and at a certain state of the development it only knew event systems but not yet engines. The LWT developers saw this, and found this insufficient, and developed LWT. Unfortunately, they did not base their new library on Ocamlnet, but chose to reimplement the event loop core. In the meantime, the Ocamlnet development fixed the deficiencies in their library. Now we have two good libraries for the same range of problems.)
You should, however, be aware that there are some differences between the libraries:
Lwt_main.run. In contrast, Equeue prefers to first start at
Lwt.return). In Equeue we don't do -
eps_ecreates an engine which will terminate as soon as possible and which results in a constant value. Effectively, this means that
eps_eis seen as point where threads can be switched, whereas this is not the case for
Lwt.return. This gives more opportunities for switching, but there are also more subtle consequences, like who is the caller of suspended functions. In Equeue it is always the core of the event loop, whereas this is not uniform in LWT.
event_systemobjects as needed. This is mainly a restriction for programs using kernel threads: In Equeue, every thread can have its own
event_system, but doing the same in LWT is impossible (incompatibility with kernel threads on this level).
The remaining question is how to use both facilities in the same program
(provided it is not multi-threaded, which rules LWT out). There is now
Uq_lwt helping here.
The idea is to replace the event loop built into LWT by the Ocamlnet
event loop. This is done for a given
class lwt_engine esys = object inherit Lwt_engine.abstract inherit Uq_lwt.lwt_backend esys end Lwt_engine.set (new lwt_engine esys)
Additionally, it is required that you now always call
starting event-based programs, and not
Unixqueue.run esys. The latter
would not work for technical reasons. Of course, this means that the
main program will be LWT.
How to call engines from LWT threads: Assumed you are in some
LWT code, and want to call an engine-based function
f : 'a -> 'b engine. This can be achieved by first simply calling
the function, and then using an LWT primitive to LWT-wait until the
engine is done:
let call_thread f x = let e = f x in let waiter, condition = Lwt.wait() in Uq_engines.when_state ~is_done:(fun r -> Lwt.wakeup condition r) ~is_error:(fun x -> Lwt.wakeup_exn condition x) e; waiter
How to call LWT threads from engines: The reverse for a function
f : 'a -> 'b Lwt.thread:
let call_e f x = let thr = f x in let waiter, signal = Uq_engines.signal_engine() in Lwt.on_success thr (fun r -> signal (`Done r)); Lwt.on_failure thr (fun x -> signal (`Error x)); waiter