Plasma GitLab Archive
Projects Blog Knowledge

Equeue_howto


The Equeue, Unixqueue, and Engines HOWTO

This document tries to explain briefly what you can do with the Equeue library. It is more superficial than Equeue_intro, and gives some recipes how to do things.

Contents:

What is this about?

We talk here about a form of concurrent programming, sometimes called lightweight threading or cooperative threading. As for all concurrency mechanisms, the ultimate goal is to do several things in parallel. This type, however, focuses especially on I/O, because the points where the execution threads can be switched are usually the points where a thread needs to stop for waiting on an I/O resource.

There is no preemption: When normal OCaml code is executed, there is no possibility to switch the thread. First when the control is returned to the Equeue library, a different thread can be selected for execution.

There is also no parallelism: All execution happens in the context of the process or kernel thread running the code. It is only possible to use one CPU core.

Note that LWT is another popular (and younger) implementation of the same idea for OCaml. It is possible to use LWT together with Ocamlnet, but there are restrictions. We'll explain this later in this article.

What are event systems?

You will often see the type

 Unixqueue.event_system 

in signatures of functions. An event system bundles all resources that are needed to run cooperative threads, like watched file descriptors, handlers for file events, and timers. It is the common anchor point for all activities that will happen concurrently:

  • If you define several cooperative threads for the same event_system, it is possible to run them concurrently.
  • You can have any number of event_system objects in your program. However, once you attach cooperative threads to different event systems, they cannot run together anymore.
  • Having several event systems makes nevertheless sense in a number of scenarios. For example, you could write a library function that will do a number of I/O actions concurrently, but when all I/O is finished, the function returns normally (and stops any concurrent execution). In this case you would use a local event system that exists for the lifetime of this function only.
  • A more extreme model is to use only one event system for the whole program. This, however, means that the whole program must follow a programming style that is compatible with events.
The event_system object is usually passed on from one function call to the next. There is no global event system. (NB. If you develop for Netplex, there is a pseudo-global event system for every Netplex container. But this just means that you can define your own global event system if you need it for your application.)

How to create an event system:

let esys = Unixqueue.standard_event_system()

An older name of the same is Unixqueue.create_unix_event_system.

There is also a second implementation which uses accelerated poll interfaces if provided by the operating system (e.g. epoll on Linux):

let esys = Unixqueue.performance_event_system()

This, however, is only an advantage if you have hundreds of file descriptors to observe.

How to attach actions to event systems:

The abstraction of event systems defines an API allowing one to interact with it. This is available in the Unixqueue module. Normally, however, you don't use this module directly, because it is very low-level.

So, let's look directly at high-level interfaces. For example, the Nethttp_client uses event systems internally, and one can also control this aspect of it. When creating an Nethttp_client.pipeline object, just set the event system to the one you want to use. This attaches the whole HTTP protocol interpreter implemented by this object to the event system:

let pipeline = new Nethttp_client.pipeline
let () = pipeline # set_event_system esys

Note that you can attach other pipelines and even unrelated, other I/O actions to the same event system. This just means, as mentioned above, that these actions are done concurrently.

The HTTP pipeline is initially empty, i.e. it does nothing. Before something can happen, you need to program it, i.e. add tasks to do. For example:

pipeline # add_with_callback
  (new Nethttp_client.get "http://caml.inria.fr/")
  (fun get -> ...)

The add_with_callback method adds the HTTP task to run to the internal queue. Also, there is a callback function which gets invoked when the task is done.

If you enter the shown OCaml code into a toploop, you will notice that no I/O occurs so far. Adding something to the internal task queue does not yet trigger that it is executed. This is meant as a feature of all Equeue-based concurrency: You have the chance to set the machinery up before it starts running.

This example showed how to deal with HTTP clients. What about other network protocols? The scheme is always the same: The event system object needs to be passed down to the protocol interpreter, either directly at creation time, or directly after that.

How to run event systems

The remaining question is now how to start the execution after everything is set up. This is normally done with

Unixqueue.run esys

This single statement starts whatever action was previously configured, and returns first when the action is completely finished. In our example this means it covers the whole HTTP GET protocol.

It is allowed to modify the scene while something is already happening. For example, you could download a second HTTP file when the first is done:

pipeline # add_with_callback
  (new Nethttp_client.get "http://caml.inria.fr/")
  (fun get1 -> 
    pipeline # add_with_callback
      (new Nethttp_client.get "http://www.camlcity.org/")
      (fun get2 -> ...)
  )

These "in-place" modifications of what to do are not only allowed at points like the shown where a part of the action is already complete, but at any point in time. For example, you can define a timer that starts the other action, no matter at which point of execution the running action currently is:

pipeline # add_with_callback
  (new Nethttp_client.get "http://caml.inria.fr/")
  (fun get1 -> ...);

let g = Unixqueue.new_group esys
Unixqueue.once esys g time
  (fun () ->
    pipeline # add_with_callback
      (new Nethttp_client.get "http://www.camlcity.org/")
      (fun get2 -> ...)
  )

After time seconds the second download is started. (NB. What is the purpose of g? A Unixqueue group can be used for cancelling all actions associated to the group. In this case for cancelling the timer.)

The HTTP client provides an API style where the completion of an action is indicated to the user via a callback. This style is easy to use for beginners, but it has a drawback: There is no uniform way how to compose more elementary actions to more complex actions. Such composition is possible as shown in the example, but it is always an ad-hoc solution.

Recursion is your friend

Let's have a look at such an ad-hoc composition: Assumed we have a list of URLs we want to download them with high concurrency.

Idea 1: We just add all URLs to the same pipeline, as in

let count = ref 0
List.iter
  (fun url ->
    pipeline # add_with_callback
      (new Nethttp_client.get url)
      (fun get ->
        decr count;
        if !count = 0 then ... (* do some followup action here *)
      );
    incr count
  )
  list

and then run the esys. This works, but the "intelligence" of the HTTP pipeline object is only limited. If there are several files to download from the same server, the pipeline is able to manage to use only a limited number of connections to do this, and to serialize the requests over these connections. There is, however, no built-in mechanism that would limit the number of servers to contact at once. If you had one million different servers in this list, the pipeline would try to download from all servers concurrently. Of course, this will fail (lacking system resources).

Idea 2: We only add a limited number of URLs at a time.

let count = ref 0
let count_max = 10
let list = ref list

let rec maybe_next() =
  if !count < count_max then (
    match !list with
    | [] -> ()
    | url :: list' ->
        pipeline # add_with_callback
          (new Nethttp_client.get url)
          (fun get ->
             decr count;
             maybe_next();
             if !count = 0 then ...  (* followup action *)
          );
        incr count;
        list := list';
        maybe_next()
  )

We use here recursion to encode the repetitive algorithm. This is the mechanism of choice, because we need to continue the loop from the callback function (an imperative construct could not do so).

Note that recursive calls from callback functions do not fill up the stack, so you could do this endlessly without risking a stack overflow.

Trap: do not mix synchronous and event-based APIs

Many protocol interpreters provide both styles of APIs: Conventional synchronous APIs, and event-based APIs. The question is whether one can mix them.

This is not possible, because the synchronous API is normally derived from the event-based API by setting up a one-time action in the event system and then running the event system. If you mixed the APIs, it would occur that a running event system is again tried to be run. This is forbidden, though, and will cause that an exception is thrown.

So: Use the same instance of the protocol interpreter either in a synchronous way, or in an event-based way, but do not do both.

What are engines?

As we have seen, callbacks are a common way to notify the caller about state changes. However, callbacks are too primitive to allow systematic composition of actions. The abstraction of engines has been developed to fill this gap. As a first approximation, imagine an engine as a wrapped callback interface: a machinery which is executing something concurrently until the result is available, with the possibility of notifying users of the result.

Continuing the HTTP example, there is also an engine-based version of adding a request to the pipeline:

let e = pipeline # add_e (new Nethttp_client.get url)

This is the same as add_with_callback only that the delivery mechanism is different.

It is possible to attach a callback to an engine:

Uq_engines.when_state
  ~is_done:(fun () -> ...)
  ~is_error:(fun ex -> ...)
  e

The first function is called when the HTTP request could be processed successfully, and the second one when a fatal error occurs (with ex being the exception). Using Uq_engines.when_state, every engine-style interface can be turned into a callback-style interface. Of course, this is not the primary idea of this abstraction, but this possibility means that we can go to the lower level of callbacks whenever needed.

The three most important composition operators are available in Uq_engines.Operators. It is suggested that you open this module, and use the operators directly:

  • Sequence: With
     e1 ++ (fun r1 -> e2) 
    the engine e1 is executed first, and when it has computed the result r1, the engine e2 is started. The result of ++ is again an engine, so it is possible to concatenate several expressions:
     e1 ++ (fun r1 -> e2) ++ (fun r2 -> e3) ++ ... 
    One can also set the parentheses differently if the previous results are needed later:
     e1 ++ (fun r1 -> e2 ++ (fun r2 -> e3 ++ ... )) 
    The ++ operator is also available as normal function: Uq_engines.seq_engine
  • Mapping: With
     e1 >> (fun st1 -> st2) 
    one can map the final state of e1 to a different state. The state of the engine is either the computed value, the resulting exception, or the tag that the engine was aborted:
         e1 >>
           (function
             | `Done v -> ...
             | `Error ex -> ...
             | `Aborted -> ...
           )
       
    As you can also have access to exceptions, this construction can be used to catch exceptions, and to transform them into normal values:
         e1 >>
           (function
             | `Done s          -> `Done(Some s)
             | `Error Not_found -> `Done None
             | `Error ex        -> `Error ex
             | `Aborted         -> `Aborted
           )
       
  • Values as engines: If you just want to encapsulate an already existing value v into an engine, use
     eps_e (`Done v) esys 
    or more generally
     eps_e st esys 
    to encapsulate any state. The eps_e makes an engine out of a value by pretending that the value is computed in a single step (the epsilon step).
Using this, the above example of downloading two files, one after the other, looks like:

let e =
  pipeline # add_e
    (new Nethttp_client.get "http://caml.inria.fr/")
  ++ (fun get1 -> 
        pipeline # add_e
          (new Nethttp_client.get "http://www.camlcity.org/")
        ++ (fun get2 ->
              ...;
              eps_e (`Done()) pipeline#event_system
           )
     )

Note that the final result is here just (), and it is transformed with eps_e into an engine-compatible shape.

Getting results out of an engine-based algorithm

As engines use event systems internally, the constructed complex engine (like e in the previous example) is not immediately started, but first when the event system runs (unless the event system is already running). So you still finally need

Unixqueue.run esys

to fire up the prepared engines, and to wait for the result.

You may wonder how to access the result. In the previous example, the result was just (), so there is no interest in knowing it. But you could also just return what you have got, as in

let e =
  pipeline # add_e
    (new Nethttp_client.get "http://caml.inria.fr/")
  ++ (fun get1 -> 
        pipeline # add_e
          (new Nethttp_client.get "http://www.camlcity.org/")
        ++ (fun get2 ->
              eps_e (`Done(get1, get2)) pipeline#event_system
           )
     )

and the question is how to get the pair (get1,get2) with the downloaded files after e is finished. This is in deed very simple - after Unixqueue.run returns, you can check for result values:

let st = e # state

Here, st can again have the values

  • `Done x if the engine has a final value x (here our pair)
  • `Error e if the engine execution ended in an exception e
  • `Aborted if the engine was articially stopped
but also

  • `Working n if the engine is not yet done, and n is an integer indicating the number of computation steps
The `Working state is only visible if you query the state directly with state but not in the >> operator.

Forking and joining concurrent threads built with engines

By concatenating elementary engines with ++ you basically create an execution thread. As we are talking here about concurrent programming, the question is how to fork a new thread off of an existing one, and how to join again with the created thread once it is finished.

Forking is very simple: Just have several expressions, e.g.

let e1 = <expression using ++>
let e2 = <expression using ++>

Now e1 and e2 run concurrently.

For joining, use the function Uq_engines.sync_engine:

let e_joined =
  Uq_engines.sync_engine e1 e2

The engine e_joined is first finished when both e1 and e2 are finished. The result value of e_joined is the pair of the results of e1 and e2, e.g.

e_joined ++ (fun (r1,r2) -> ...)

There is also a version of sync_engine which can join any number of engines: Uq_engines.msync_engine. We use it in the engine-based version of the download example:

let count_max = 10

let download_e list =
  let list = ref list in

  let rec download_thread_e() =
    match !list with
    | [] -> eps_e (`Done ()) esys
    | url :: list' ->
        list := list';
        pipeline # add_e (new Nethttp_client.get url)
        ++ (fun get ->
              download_thread_e()
           ) in

  let rec fork_e k =
    if k < count_max then
      let e = download_thread_e() in
      e :: fork_e (k+1)
    else
      [] in

  let e_list = fork_e 0 in

  Uq_engines.msync_engine
    e_list
    (fun _ () -> ())
    ()
    esys

The function download_thread_e downloads documents sequentially from the HTTP servers. The URLs are fetched from the variable list. In order to get concurrency, count_max of these threads are started by the fork_e recursion. The result is e_list, a list of all concurrently running engines. Finally, msync_engine is used to wait until all of these threads are finished. msync_engine works like a fold operator, and aggregates the result values via the argument function. Here, the threads only return () as results, so aggregation is pointless.

Overview of library functions for engines

In Uq_engines:

This module contains the basic definition of engines, Uq_engines.engine, plus a number of combinators for engines:

Also, the module defines basic I/O engines:

In Uq_client:

In Uq_server:

  • Define a server: Uq_server.listener creates a server socket and allows it to process the accepted connections with engines.
In Uq_io:

This module contains functions for buffered I/O:

A "device" is a file descriptor, a multiplex controller (see Uq_engines.multiplex_controller), or an asynchronous channel. The definition of devices is extensible.

In Uq_transfer:

  • Copying data: Uq_transfer.copier copies data between file descriptors without looking at the data
Other basics:

Protocol interpreters:

Definition hierarchy

We have, top-to-bottom:

  • Engines are the top-level abstraction. Essentially, they "only" provide a notification mechanism for operations over event systems, but exactly this make them the easily composable units that are most useful for constructing algorithms.
  • Event systems are simply a uniform interface for event loops, and events can be queued up in user space (Equeue).
  • Pollsets (Netsys_pollset) are event loops, i.e. it is waited for file descriptor conditions.
  • Kernel interface (select, poll, epoll etc.)

Various tricks

Aborting engines: Unlike other types of threads, cooperative threads can be aborted at any time. Use the abort method:

 e # abort() 

The engine e will then enter the `Aborted state as soon as possible. Often this will happen immediately, but there are also engines where this takes some time.

Exceptions: There are several ways to signal exceptions. First, the orderly way:

 eps_e (`Error x) esys 

This creates an engine that represents an exception as final state. Because exceptions cannot always handled that cleanly, the basic combinators like ++ always catch exceptions, and represent these exceptions in their final state. For example:

e1 ++ (fun r1 -> raise x)

The ++ operator catches the exception, and the state transitions to `Error x (just as the eps_e example would do).

Note that this behavior relates to engines only. If you program event systems directly, there will be no automatic exception handling. For example

Unixqueue.once esys g 1.0 (fun () -> raise x)

does not catch x in any way. The effect is that the exception falls through to the caller, which is always Unixqueue.run.

How to jump out of event processing: The just mentioned way of raising an exception can be used to leave event processing. Just define

 exception Esys_exit 

and throw it like

Unixqueue.once esys g 0.0 (fun () -> raise Esys_exit)

and catch it like

try
  Unixqueue.run esys
with
  | Esys_exit -> ...

This works always, even if the call of Unixqueue.once is inside an engine expression.

Engines and LWT

Users who are familiar with LWT will certainly recognize many operations - although they often have another name. (You may wonder why both implementations exist - well, a longer story, but essentially Ocamlnet has older roots, and at a certain state of the development it only knew event systems but not yet engines. The LWT developers saw this, and found this insufficient, and developed LWT. Unfortunately, they did not base their new library on Ocamlnet, but chose to reimplement the event loop core. In the meantime, the Ocamlnet development fixed the deficiencies in their library. Now we have two good libraries for the same range of problems.)

Compare:

You should, however, be aware that there are some differences between the libraries:

  • Delayed vs immediate execution: The LWT threads are immediately started, and run until they sleep. First at this point, you need to call Lwt_main.run. In contrast, Equeue prefers to first start at Unixqueue.run time.
  • LWT allows it to create Lwt threads which are already terminated (Lwt.return). In Equeue we don't do - eps_e creates an engine which will terminate as soon as possible and which results in a constant value. Effectively, this means that eps_e is seen as point where threads can be switched, whereas this is not the case for Lwt.return. This gives more opportunities for switching, but there are also more subtle consequences, like who is the caller of suspended functions. In Equeue it is always the core of the event loop, whereas this is not uniform in LWT.
  • In Equeue all engines can be aborted, whereas in LWT a special abstraction needs to be used.
  • In LWT there can be only one active event loop at a time, whereas in Equeue there can be as many event_system objects as needed. This is mainly a restriction for programs using kernel threads: In Equeue, every thread can have its own event_system, but doing the same in LWT is impossible (incompatibility with kernel threads on this level).
The remaining question is how to use both facilities in the same program (provided it is not multi-threaded, which rules LWT out). There is now the library Uq_lwt helping here.

The idea is to replace the event loop built into LWT by the Ocamlnet event loop. This is done for a given esys by

class lwt_engine esys =
object
  inherit Lwt_engine.abstract
  inherit Uq_lwt.lwt_backend esys
end

Lwt_engine.set (new lwt_engine esys)

Additionally, it is required that you now always call Lwt_main.run for starting event-based programs, and not Unixqueue.run esys. The latter would not work for technical reasons. Of course, this means that the main program will be LWT.

How to call engines from LWT threads: Assumed you are in some LWT code, and want to call an engine-based function f : 'a -> 'b engine. This can be achieved by first simply calling the function, and then using an LWT primitive to LWT-wait until the engine is done:

let call_thread f x =
  let e = f x in
  let waiter, condition = Lwt.wait() in
  Uq_engines.when_state
    ~is_done:(fun r -> Lwt.wakeup condition r)
    ~is_error:(fun x -> Lwt.wakeup_exn condition x)
    e;
  waiter

How to call LWT threads from engines: The reverse for a function f : 'a -> 'b Lwt.thread:

let call_e f x =
  let thr = f x in
  let waiter, signal = Uq_engines.signal_engine() in
  Lwt.on_success thr (fun r -> signal (`Done r));
  Lwt.on_failure thr (fun x -> signal (`Error x));
  waiter

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml