Plasma GitLab Archive
Projects Blog Knowledge

{1 Netmulticore Tutorial}

{b Contents}

- {!Netmcore_tut.design}
- {!Netmcore_tut.start_procs}
- {!Netmcore_tut.camlboxes}
- {!Netmcore_tut.mempools}
- {!Netmcore_tut.sref}
- {!Netmcore_tut.descriptors}
- {!Netmcore_tut.mutation}
- {!Netmcore_tut.sdata}
- {!Netmcore_tut.sync}
- {!Netmcore_tut.examples}
- {!Netmcore_tut.impl}
- {!Netmcore_tut.diffs}
- {!Netmcore_tut.os}

This manual gives an overview of Netmulticore, which allows it to
manage subprocesses for speeding up computations on multicore
CPU's. Netmulticore tries to overcome a limitation of OCaml's runtime,
namely that only one thread at a time can get the CPU. Because of this,
multi-threaded programs cannot make use of the additional power of
multicore CPU's.

{div remark}
Readers are encouraged to first have a look at {!Netmcore_basics},
which is more fundamental and doesn't use unsafe language features.
{divend remark}


The approach of Netmulticore is to spawn subprocesses acting in the
role of worker threads. Processes are separated from each other, and
hence there is normally no direct way of getting into interaction.
The help of the operating system is required here - a classic example
of IPC (interprocess communication) are pipes, which create a data
stream from one process to the other. Of course, we want here even
closer interaction than this. Another, rarely used IPC mechanism is
shared memory. This means that a block of RAM is allocated and mapped
into all processes. When one process mutates a RAM cell in a shared
block, the other processes immediately see this mutation, so that
there is no system call overhead for transporting data. Actually,
there is a bit of overhead in modern computers when this technique is
used, but it is only occuring on the hardware level, and is very fast.

Netmulticore does not only allocate a shared block of RAM, but also
manages it. Ideally, using the shared block would be as simple as
using normal, process-local memory. Unfortunately, this is not
possible, but the design of Netmulticore allows it to come quite close
to this ideal. With Netmulticore a process can write normal Ocaml
values like strings, tuples or records to the shared block, and the
other processes can directly access this data as if it were in normal
RAM. Unfortunately, one cannot use direct assignment to do such
writing to the shared block, but has to follow special programming
rules which ensure that the shared block remains in a consistent
state. Large parts of this tutorial explain how to accomplish this.

The list of features supported by Netmulticore:

- Creation of worker processes
- Management of shared resources like file descriptors and shared memory
  blocks
- Management of shared RAM pools that are mapped to the same address in
  all worker processes
- Management of shared heaps, i.e. containers for Ocaml values
- Garbage collection for shared heaps
- Predefined data structures that live in shared heaps
- Synchronization primitives (locks, condition variables)
- Message passing between worker processes
- Integration with Netplex

Before you start looking at Netmulticore, I should also give a warning.
Netmulticore requires unsafe programming elements which can, if used
the wrong way, lead to corrupt results or even crash the program.
This is, to some degree, comparable to interfacing the Ocaml runtime
on the C level. Unfortunately, when it comes to parallel programming,
such unsafeness is unavoidable if you want to get real speedups of
your program. Note that multi-threading suffers from the safe problem,
and is in no way different.

{2:design Design}

{3 The process hierarchy}

When processes start other processes, the Unix system design defines a
relation between processes: The started workers are children of the
process requesting the starts. An immediate effect of this is that
only the requesting process can wait for the termination of the
children. There are more implications than this, but it should be
clear that we have to take the relations between processes into
account.

When a Netmulticore program is started, the implicitly created first
process has a special role. It is called the {i master process}, and
it is normally not used for doing the real work. It is merely acting
as a supervisor managing the workers. The tasks of the master process
include the management of the resource table, and of course watching
the lifetime of the workers. The master normally starts quickly the
first worker process, which in turn starts as many workers as needed
for accomplishing the computation.

Now, what does it mean to {i start} processes? Unix only defines this
via the [fork] system call. When doing a [fork] the requesting process
is duplicated (i.e. RAM contents and many resources are copied, with
only a few exceptions), and the duplicate is registered as the new
child process.  Netmulticore requires that the relationships between
processes remain manageable, and because of this {i it is always the
master process which requests the start of new children}. Of course,
the programmer can also call {!Netmcore_process.start} from other
workers, but the Netmulticore machinery just sends this request to the
master, where it is really executed.

All in all this means: When a new worker is created, it is initialized
as copy of the master process, independently of from where the
user code requests the creation.

{3 Resources}

Netmulticore manages not only processes but also other kinds of
resources. As of now the supported types are:

- Temporary files
- Shared memory blocks
- Shared memory blocks with the additional requirement that all workers
  map them at the same address
- Named semaphores
- Fork points, i.e. registered functions acting as worker body
- Join points, i.e. the option of waiting for the completion of worker
  bodies

All resources have a resource ID ({!Netmcore.res_id}) which is
effectively an integer. With only the resource ID every worker can
request to get the details of the resource (e.g. which name the
temporary file has). The master process also keeps records which
worker needs which resource. If a resource is not needed anymore by
any worker, it is automatically deleted.

Since Ocamlnet-3.6, the deletion procedure has been substantially
improved. Now, a list of the alive resources is not only kept in
memory, but also written to a file "netplex.pmanage". This allows it
to delete the resources after the program has crashed or terminated
somehow abnormally. This is automatically done when the program is
restarted, or by the user running the [netplex-admin] utility with
option [-unlink]. The background for this is that the mentioned
resources have kernel persistency, and continue to exist when the
program is finished.

{3 Memory pools}

For setting up shared heaps as explained below, we especially need
shared memory blocks that are mapped at the same address by all worker
processes. This kind of block is needed for memory pools as
implemented by {!Netmcore_mempool}. There is, unfortunately, only one
reliable method of doing so: The master process allocates the block
and maps it into its own address space, and the workers get access to
the block by {i inheriting} the mapping. This just means that the
[fork] operation leaves the mapping in place, and because all workers
are created in the same way, all workers end up seeing the shared
block at the same address. The details of this procedure are hidden in
Netmulticore, but the user should know the limitations of this method.

A process cannot inherit a posteriori - when the process is already
created, and a new shared block is set up, there is no safe way for
the process to get access to the block at the right address. Because
of this, there are two important programming rules:

- A newly created shared block (with the "same address" requirement)
  is only visible in worker processes that are created after the block
- This even means that the requesting process does not get access
  to the shared block it just created if the requesting process is a worker

The safest and easiest way to deal with this is to create the shared
block in the master before starting any worker.

Another limitation of the inheritance method: A shared block cannot be
enlarged later. For the programmer this means that the initially
created block must be large enough for the whole lifetime of the
program.

The management of the shared blocks is now done by two modules:
{!Netmcore_mempool} is responsible in the first place, and can hand
out pieces of the shared block to users. {!Netmcore_heap} is such a
user, and manages the pieces it gets as a heap of Ocaml values.
Heaps can be enlarged if necessary, i.e. more pieces can be obtained
from the big shared block that is managed by {!Netmcore_mempool}.
Of course, it is also possible to have several heaps getting their
memory pieces from a single {!Netmcore_mempool}.

{3 Shared heaps}

What is now a heap, or better {i shared heap}? It is just a memory
area, and it is possible to put Ocaml values into it. The values are
connected by pointers that reach from one value to the next.  This is
not very much different than what is done by the Ocaml runtime and the
normal Ocaml heap, only that the values are now exposed to a multitude
of processes. We'll later see how we can create such a heap, and
how it can be filled with Ocaml values. At this point, let me only
mention that it requires the discipline of the programmer to fill
and access a shared heap in the right way. If something gets wrong,
the punishment is an illegal memory access, normally leading to a
segmentation fault.

If a shared heap fills up, {!Netmcore_heap} starts a special garbage
collection run over it to reclaim memory that is no longer referenced.
This garbage collector works very much like the "major GC" that is
built into the Ocaml runtime.

As it is a bit complicated to manage shared heaps directly, there are
a number of pre-defined data structures over shared heaps that can be
directly used. Among these structures are buffers, queues, arrays,
and hash tables.

{3 Camlboxes}

For fast notification between processes, Netmulticore uses Camlboxes.
This data structure was invented before and exists independently of
Netmulticore, but is specially supported.

Camlboxes are like mail boxes where an open number of senders can
send messages to a single receiver. The messages are normal Ocaml
values (like strings, records, or variants), so there is no
marshalling issue. Camlboxes also use shared memory as transport
medium, but it is here not required that the shared block needs
to be mapped at the same address. Because of this, it is also possible
to connect processes with Camlboxes that are not related to each
other.

Camlboxes are optimized for speed and maximum parallelism. The data
model is the following: The receiver creates the box which consists of
a fixed number of slots, and each slot has a fixed maximum size.  Each
sender can map the box into its own address space, and fill any free
slot. Because there is no strict ordering of the messages, the senders
can avoid to run into lock contention issues (the senders simply use
different slots and avoid to step on each other's feet). The receiver
can look at the messages, copy the messages outside the box, and
delete messages. The messages are not strings or marshalled data, 
but really Ocaml values, relocated to the receiver's address.

{3 Synchronization}

Netmulticore provides these synchronization primitives:

- Mutexes
- Semaphores
- Condition variables

These primitives can be used in conjuction with shared heaps, and allow
the programmer to define additional synchronization requirements that
are not satisfied by the data structure in the heap.

For example, consider a shared hash table ({!Netmcore_hashtbl}). This
data structure already includes everything to protect the internal
representation from concurrent accesses, but not more. For example,
a {!Netmcore_hashtbl.add} adds a value to the table, and it is safe
to call this operation from several processes at the same time.
This is not enough, however, to protect whole read/modify/update
cycles where the whole cycle needs to be run uninterrupted to avoid
data corruption. The user can define these additional synchronization
requests with the mentioned primitives.

As this is a quite frequent programming case, many shared data
structures already contain a special area called the {i header} where
the user can put these primitives.


{2:start_procs How to start processes}

For starting processes, there are actually two API's: the slightly
more generic one in {!Netmcore}, and the strictly typed one in
{!Netmcore_process}. They are quite similar, and I'm only explaining
the latter.

Before a process can be started, it needs to be defined:

{[
let process_fork, process_join =
  Netmcore_process.def_process process_body
]}

The definition via {!Netmcore_process.def_process} takes a function
[process_body], which is simply a function of type ['a -> 'b], and
returns a fork point [process_fork] and a join point [process_join].
This definition must happen in the master process. Remember that it is
also always the master process that is forked when a new worker
process is created. This is consistent with this programming rule -
[process_body] will be called in the context of a fresh copy of the
master, so it needs also to be defined in the context of the master.

Practically, the definition is usually done when a top-level module
is initialized, and [process_body] is a normal top-level function.

The fork point [process_fork] has type ['a fork_point] where ['a] is
the type of the argument of [process_body]. Imagine a fork point as a
way to fork the master process, where a value of type ['a] is passed
down, and [process_body] is finally called with this value in the new
child. At the other end, the joint point [process_join]
(of type ['b join_point]) is a way to wait for the completion of the
[process_body] and for getting the result of type ['b].

Of course, after defining a process, one can start it as many times
as needed. Here is how to do:

{[
let pid = Netmcore_process.start process_fork arg
]}

This finally does:

- The master process is notified that a new process is needed
  which will run a process as defined by [process_fork]
- Once the process is running, the argument [arg] is marshalled
  and sent to the new process
- Execution returns now to the caller of [start] with a process ID [pid]
- The new process runs [process_body arg'] where [arg'] is the
  restored value [arg]

The process ID's are Netmulticore's own ID's, and are guaranteed to be
unique (other than the process ID's the operating system uses which
can wrap around).

When [process_body] finishes, the function result is passed back to
the master where it is stored until the join is done. The worker
process is terminated. One can get the result value by doing:

{[
let r_opt = Netmcore_process.join process_join pid
]}

This waits until the result value is available, and returns it as
[Some r] (when [r] is of type ['b]). If no result is available because
of an exception or other termination of the process, [None] is
returned. Note that the result [r] is also marshalled.

If there is no interest in getting results at all, one can also do

{[
Netmcore_process.release_join_point process_join
]}

which prevents that results are stored in the master until they are
picked up.

Before giving a complete example, let's look at how to initialize
Netmulticore. When the master is set up (processes are defined etc.)
one can start the first worker. At this point, the whole management
machinery needs to be started, too. This is normally done by a
function call like

{[
Netmcore.startup
   ~socket_directory:"some_dir" 
   ~first_process:(fun () -> Netmcore_process.start process_fork arg)
   ()
]}

The [socket_directory] is needed by the machinery to store runtime
files like Unix domain sockets (inherited from Netplex). In doubt, set
it to something like "/tmp/dir", but note that each running instance
of the program needs its own socket directory. (There is a chance that
[socket_directory] becomes optional in future releases.)

Now a complete example: The master starts process X which in turn 
starts process Y. The processes do not much, but let's see:

{[
let c = ref 0

let body_Y k =
  k + 1 + !c

let fork_Y, join_Y =
  Netmcore_process.def_process body_Y

let body_X k =
  c := 1;
  let pid_Y = Netmcore_process.start fork_Y k in
  let j =
     match Netmcore_process.join join_Y pid_Y with
      | Some j -> j
      | None -> failwith "Error in process Y" in
  Printf.printf "Result: %d\n%!" j

let fork_X, join_X =
  Netmcore_process.def_process body_X

Netmcore.startup
   ~socket_directory:"some_dir" 
   ~first_process:(fun () -> Netmcore_process.start fork_X 1)
   ()
]}

The result is of course 2 (and not 3), because the assignment to [c]
does not have any effect. Remember that process [X] is forked from
the master process where [c] still has the value 0.

A final word on marshalling before going on. Arguments and results of
processes are transmitted as strings that have been created with the
functions from the [Marshal] module of the standard library. There are
a number of constraints one should be aware of:

- Functions, objects, and lazy values are not supported, and will
  cause exceptions. For a few other types this is also the case
  (e.g. [in_channel]).
- Unfortunately, there are also types of values that do not trigger
  exceptions, but do nonsense. For example, [Unix.file_descr] is such
  a case. This unfortunately also holds for many Netmulticore types
  such as heaps. For these types the marshalling seems to work,
  but the restored values are actually unusable because they have
  lost their connection with the underlying resources of the operating
  system.


{2:camlboxes How to use Camlboxes for passing messages}

Camlboxes must always be created by the (single) receiver of the
messages. The address of the box is then made available to the senders
which can then start the transmission of messages.

The number of message slots is fixed, and cannot be changed later.
Also, the maximum size of the messages must be specified in advance,
in bytes. Of course, this means one cannot use Camlboxes for
arbitrarily large messages, but this is not what they are designed for.
Camlboxes are good for small notifications that need to be quickly
sent without risking any lock contention. (If big payload data needs
to be included, a good workaround is to include that data by reference
to a shared heap only.)

Let's just give an example: Process X creates a box, starts process Y,
and Y sends a message to the box of X.

{[
let body_Y box_res =
  let (box_sender : string Netcamlbox.camlbox_sender) = 
     Netmcore_camlbox.lookup_camlbox_sender box_res in
  Netcamlbox.camlbox_send box_sender "Hello world"

let fork_Y, join_Y =
  Netmcore_process.def_process body_Y

let () =
  Netmcore_process.release_join_point join_Y

let body_X () =
  let ((box : string Netcamlbox.camlbox),box_res) = 
     Netmcore_camlbox.create_camlbox "example" 1 100 in
  let pid_Y = Netmcore_process.start fork_Y box_res in
  ( match Netcamlbox.camlbox_wait box with
     | [ slot ] ->
         let m = Netcamlbox.camlbox_get_copy box slot in
         Netcamlbox.camlbox_delete box slot;
         printf "Message: %s\n%!" m
     | _ ->
         (* in _this_ example not possible *)
         assert false
  )

let fork_X, join_X =
  Netmcore_process.def_process body_X

Netmcore.startup
   ~socket_directory:"some_dir" 
   ~first_process:(fun () -> Netmcore_process.start fork_X ())
   ()
]}

Let's go through it:

- The box is created at the beginning of [body_X], by a call of
  {!Netmcore_camlbox.create_camlbox}. Generally, {!Netmcore_camlbox}
  contains Netmulticore-specific extensions of the Camlbox abstraction
  which is primarily defined in {!Netcamlbox}.
  The function {!Netmcore_camlbox.create_camlbox} does not only create the box,
  but also registers it as resource in the master process. The
  string "example" is used for naming the shared memory block backing
  the box. This box has a capacity of one message which may get
  100 bytes long. There are two return values, [box] and [box_res].
  The value [box] is the way to access the box for {i receiving}
  messages. The value [box_res] is the resource ID.
- We pass [box_res] when we start [body_Y]. Resource ID's can be
  marshalled (whereas camlboxes cannot).
- The helper function {!Netmcore_camlbox.lookup_camlbox_sender} is
  called at the beginning of [body_Y] to look up the sender interface
  for the box identified by [box_res]. The sender view of the box
  is [box_sender].
- Now the string "Hello world" is sent by invoking 
  {!Netcamlbox.camlbox_send}.
- In the meantime, process X continued running in parallel, and
  by invoking {!Netcamlbox.camlbox_wait} the execution is suspended
  until at least one message is in the box. The function returns the
  slots containing messages.
- We look now into the single slot where we expect a message.
  With {!Netcamlbox.camlbox_get_copy} we can get a copy of the message
  in the slot. Note that there is also {!Netcamlbox.camlbox_get} which
  does not make a copy but returns a reference to the message as it
  is in the box. This is unsafe because this reference becomes invalid
  when the message is deleted.
- Finally we delete the message in the slot with {!Netcamlbox.camlbox_delete}.
  The slot is now again free, and can hold the next message.

You may wonder why I added the type annotations for [box] and
[box_sender].  If you look at the signatures of the Camlbox modules,
you'll see that there is no enforcement that the types of [box] and
[box_sender] fit to each other. The type parameter [string] is lost
at the moment the resource ID is used for identifying the box.

When sending a message, it is always copied into the box. For doing
this a special "value copier" is used. This copier traverses deeply
through the value representation and copies each piece into the box.
This is a bit like marshalling, only that this does not result in
a string but a copy of the orginal value (and the copy is placed into
a reserved region in the shared memory block of the Camlbox). This
copier has also restrictions which types of values can be handled,
very much like marshalling.


{2:mempools How to create and use memory pools}

Now back to the core of Netmulticore. For the remaining data
structures the shared memory must be managed as a memory pool. The
module {!Netmcore_mempool} accomplishes this.

As explained above, there are certain restrictions on which processes
have access to memory pools. For most applications it is best when
they simply create the pool in the master process directly after
program startup and before launching any worker process. This avoids
all restrictions, but the size of the pool needs to be set at this
early point of execution (remember that pools also cannot be
enlarged).

It is a good question how big a pool should be. Generally, the pool
should be a small factor larger than the minimum amount of RAM
needed for the pool. I cannot give really good recommendations, but
factors between 1.5 and 3.0 seem to be good choices. If the pool
size is chosen too tight, the garbage collector will run often and
slow down the program.

Another strategy for the pool size is to make it always large,
e.g. 25% of the available system memory. The idea here is that when
parts of the pool remain unused throughout the lifetime of the
program, they {i actually} will also not consume RAM. Operating
systems typically distinguish between RAM that is reserved by a memory
allocation and RAM that is actually filled with data.  Only the latter
type consumes real RAM, whereas the first type is only taken into
account for bookkeeping. This means the pool memory is reserved for
the case it is needed at a certain point, but RAM is not wasted if
not.

Now, the pool is simply created with

{[
let pool = Netmcore_mempool.create_mempool size_in_bytes
]}

The value [pool] is again a resource ID, and it is no problem to
marshal this ID. When creating shared heaps, and for a number of
other operations the [pool] ID is required, so it is a good idea
to passed it down to all worker processes.

Pools are backed by shared memory blocks, and these blocks have
kernel persistence (usually they even appear as files, but this
depends on the operating system). This means they exist until
explicitly deleted (like files). To do so, just call

{[
Netmcore.release pool
]}

after the {!Netmcore.startup} function returns to the caller, and
before the program is finally terminated.

As pool memory is inherited by worker processes (as explained in the
section about "Design"), one has to enable the inheritance for
each started process, e.g.

{[
let pid = 
  Netmcore_process.start 
    ~inherit_resources:`All
    process_fork arg
]}

Otherwise the pool is not accessible by the started worker process.
(I'm thinking about making this the default, but haven't come to a
conclusion yet.)

If system memory is very tight, you will sometimes see bus errors when
using pools (signal SIGBUS is sent to one of the processes, typically
the signal number is 7). This happens when allocated shared memory is
actually not available when it is used for the first time. (I'm still
looking for ways how to get nicer reactions.)


{2:sref Shared [ref]-type variables}

Let's now look at a data structure that lives in a shared heap.
Note that there is also a direct interface to shared heaps, but
for the sake of explaining the concept, it is easier to first
look at a concrete instance of a heap.

The module {!Netmcore_ref} provides a mutable reference for a single
value residing in the shared heap. The reference is comparable to the
[ref] type provided by the standard library, and it is possible to
dereference it, and to assign new values to it. The difference is,
however, that the reference and the referenced value reside completely
in shared memory, and are accessible by all workers.

The reference is created by something like

{[
let s = Netmcore_ref.sref pool initial_value
]}

where [pool] is the resource ID of a pool (see above), and
[initial_value] is the value to assign to the reference initially.
This is comparable to

{[
let r = ref initial_value
]}

for normal references. There is an important difference, however.
The referenced value must completely reside in shared memory, and
in order to achieve this, the [sref] function {i copies} the 
[initial_value] to it. (The same copy mechanism is used that also
puts messages into Camlboxes.)

After running [sref], a new shared heap has been created which is
initialized as a reference. You can get a direct handle for the
heap structure by calling {!Netmcore_ref.heap}.

Also note that you have to call [sref] from a worker process. It is
not possible to do this from the master process. This also applies
to almost any other heap-related function.

It is possible to assign a new value to [s]:

{[
Netmcore_ref.assign s new_value
]}

This also {i copies} the [new_value] to the heap, and changes the
reference.

You may wonder what happens when you assign new values over and over
again. There is no mechanism for deleting the old values immediately.
Instead, the values accumulate over time in the heap, and when a
certain threshold is reached, a garbage collection run is started.
This run checks for values (or value parts) that became unreachable,
and reclaims the memory used for them.

This garbage collector (GC) is not the normal garbage collector that
cleans the process-local heap managed by the Ocaml runtime. It is a
special GC designed for shared heaps. Unfortunately, this special GC
is not as automatic as the GC built into the Ocaml runtime, as we'll
see.

Back to our shared references. There are three functions for getting
the value pointed to by a reference:

{[
let v1 = Netmcore_ref.deref_ro s

let v2 = Netmcore_ref.deref_c s

Netmcore_ref.deref_p s (fun v3 -> ...)
]}

The first function, [deref_ro], is the fastest but unsafest. It just
returns the current value of the reference as-is, and this value is of
course stored in the shared heap. The problem is, however, that
further assignments to the variable can invalidate this value when a
GC run occurs. If this happens, [v1] becomes invalid because the
memory holding the representation for [v1] is overwritten with
something else, and any accesses to [v1] or its parts may crash the
program! For this reason, the [deref_ro] function must only be used if it
can be excluded by other means that the variable is not being assigned
while [v1] is being accessed (e.g. by a lock). The suffix "_ro" is
meant to remind of the fact that it is safe to use in read-only
contexts. (N.B. The GC could theoretically also check for values that
are only alive because of references from process-local
memory. Unfortunately, this would complicate everything dramatically -
the local GC and the shared GC would need some synchronization, and
slow or misbehaving local GC's could delay shared GC runs almost
indefinitely.)

The second function, [deref_c], does not have this problem because it
always returns a copy of the referenced value (suffix "_c" = "copy").
It is, of course, a lot slower because of this, and the copy semantics
may not always be the right design for an application.

The third function, [deref_p], is a clever alternative. As you can
see, [deref_p] does not {i return} the value, but it runs an
argument function with the current value. The trick is now that
[v3] is specially protected while this function is running. This
protection does not prevent assignments, but it prevents that the
GC deletes the value [v3], if a GC run is done. This type of protection
is also called {i pinning}, and the suffix "_p" is meant to remind
of this.

When using [deref_p], one still should be careful and think about
whether accesses to [v3] (or parts of [v3]) can occur after the
pinning protection has ended. This must be excluded by all means!

Let's look at another thing the programmer must not do. Imagine
some inner part of the value is mutable, e.g.

{[
type t = { mutable data : string }

let s = Netmcore_ref.sref pool { data = "initial string" }
]}

The question is now: Is it allowed to assign to the [data]
component, as in

{[
(Netmcore_ref.deref_ro s).data <- "new string"   (* WARNING - PROBLEMATIC CODE *)
]}

The meaning of this assignment is that the [data] component is
overwritten with a pointer to a string residing in process-local
memory. Imagine now what happens if a different worker process
accesses the [data] component. The address of this pointer is now
meaningless, because the string does not exist in context of the other
process at this address. If we access nevertheless, we risk getting
random data or even segmentation faults.

I'll explain below how to fix this assignment. It is possible, if
done in the right way.

Finally, here is a complete example using shared references:

{[
let body_Y (pool, c_descr, k) =
  let c = Netmcore_sref.sref_of_descr pool c_descr in
  let c_value = Netmcore_sref.deref_ro c in
  k + 1 + c_value

let fork_Y, join_Y =
  Netmcore_process.def_process body_Y

let body_X (pool,k) =
  let c = Netmcore_sref.sref pool 1 in
  let c_descr = Netmcore_sref.descr_of_sref c in
  let pid_Y = 
    Netmcore_process.start ~inherit_resources:`All fork_Y (pool, c_descr, k) in
  let j =
     match Netmcore_process.join join_Y pid_Y with
      | Some j -> j
      | None -> failwith "Error in process Y" in
  Printf.printf "Result: %d\n%!" j

let fork_X, join_X =
  Netmcore_process.def_process body_X

let () = 
  let pool = Netmcore_mempool.create_mempool (1024 * 1024) in

  Netmcore.startup
     ~socket_directory:"some_dir" 
     ~first_process:
        (fun () -> 
           Netmcore_process.start ~inherit_resources:`All fork_X (pool,1))
     ();

  Netmcore.release pool
]}

Compare this with the example I gave when explaining how to start
processes. The variable [c] is now a shared reference, and because of
this process Y can access the contents of [c]. The result is now 3.

In this example already a feature is used that is first explained in
the next section: descriptors. For some reason (we'll see why) it is not
possible to marshal shared heaps, and thus you cannot call [body_Y]
with [c] as argument. The workaround is to create a descriptor for
[c], use the descriptor for marshalling, and restore the orginal
heap in the called function.


{2:descriptors Descriptors}

Shared heaps are very special objects - in particular, the heaps
reside in shared memory at a certain address, and this address also
appears in the internal data structures the heaps use to manage their
memory space.

Imagine what happens when you do not respect this special nature of
shared heaps, and create a copy nevertheless (e.g. by marshalling the
heap, or by putting the heap into another heap). At the first glance,
you will see that you can actually create the copy (it's a valid Ocaml
value), but when you try to use it, the program will crash. What has
happened?

The problem is that the internal data structure of the copied heap
still contains addresses that are only valid for the orignal heap, but
are meaningless for the copy. When these addresses are followed,
invalid memory access occur, and the program crashes.

So, a very important programming rule: Never copy shared heaps!
Unfortunately, this really requires discipline, as there are many
mechanisms in Netmulticore where copies are automatically created
(just page back and look how often I told you that a copy is created
here and there), for example when starting worker processes the
arguments are also copied over to the newly created process.

How to work around? First let's think about what we really want to
have when we e.g. start a worker process and pass a shared heap to it.
Of course, we do not want to create a copy, but rather we want to
make the {i same} shared heap accessible to the worker. We want
call by reference!

Descriptors are the solution. A descriptor of a shared heap is just
a marshallable reference to the heap. For each shared data structure
there is a special descriptor type, and it is possible to get the
descriptor for a heap, and to look up the heap by descriptor. For
example, shared references define this as (in {!Netmcore_ref}):

{[
type 't sref_descr

val descr_of_sref : 't sref -> 't sref_descr
val sref_of_descr : Netmcore.res_id -> 't sref_descr -> 't sref
]}

Note that the [sref_of_descr] function takes the resource ID of the
pool as first argument, and that this function is quite slow (because
it has to ask the master process where to find the shared memory
object).

For the other shared data structures, the are comparable functions
supporting descriptors.


{2:mutation How to mutate shared values}

Remember this code?

{[
(Netmcore_ref.deref_ro s).data <- "new string"   (* WARNING - PROBLEMATIC CODE *)
]}

We now look how to fix it. The basic problem is that the new string is
not residing in shared memory, and if we could achieve that this is
the case, the assignment would be acceptable.

The correct way to do it is this:

{[
Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    (Netmcore_ref.deref_ro s).data <- Netmcore_heap.add mut "new string"
  )
]}

The essence is that we now call {!Netmcore_heap.add} before assigning
the string. This function again copies the argument value, and puts
the copy onto the heap. For managing the copy, this function needs
an special object [mut], which is actually a {i mutator}. The only
way to get a mutator is to call {!Netmcore_heap.modify}, which - among
other actions - locks the heap, and prevents that any competing write
access occurs. This is required, because otherwise mutations could
overlap in bad ways when they are done in parallel, and the internal
representation of the heap would be corrupted.

Note that you need to call [add] for assigning all values that are not
yet residing in the same heap. This means: Call it if the new value is
in process-local memory, but also call it when the new value is
already part of a different heap (because pointers from one heap to
the other are not allowed - heaps must be completely self-contained).

You may wonder why we used [deref_ro] to get the current value of [s],
and not one of the other access functions. Quite easy answer: the
other functions [deref_c] and [deref_p] would deadlock the program!
The [modify] function already acquires the heap lock for the duration
of the mutation, and using any access function that also plays with
this lock will cause deadlocks.

Let's look at this example:

{[
type u = { mutable data1 : string;
           mutable data2 : string;
         }

let s = Netmcore_ref.sref
           pool
           { data1 = "initial string 1"; data2 = "initial string 2" }
]}

We want now to swap the values in the two components [data1] and [data2].
The solution is of course:

{[
Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    let u = Netmcore_ref.deref_ro s in
    let p = u.data1 in
    u.data1 <- u.data2;
    u.data2 <- p
  )
]}

Note that we call [modify] although we do not need the mutator for
doing our work. The reason is that [modify] also write-locks the heap,
and this protects the integrity of our data. We do not need to call
[add] because the two strings are already residing in the same shared
heap, and we are just swapping them.

A little variation of this let us run into a possible problem, though:

{[
Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    let u = Netmcore_ref.deref_ro s in
    let p = u.data1 in
    u.data1 <- Netmcore_heap.add mut ("Previously in data2: " ^ u.data2);
    u.data2 <- Netmcore_heap.add mut ("Previously in data1: " ^ p);
    printf "p=%s\n" p;                    (* THIS LINE IS PROBLEMATIC *)
  )
]}

This piece of code will crash now and then (not often, so maybe difficult
to run into). What is going wrong?

There is one phenomenon we haven't paid attention to yet: When we call
[add] it is possible that the available space in the heap is not
sufficient anymore to do the required memory allocations, and that a
GC run is started. When this happens in the second [add], the
field [data1] is already overwritten, and because of this the string
[p] is now unreachable from the top-level value of the heap. The
space occupied by [p] is reclaimed, and may even be overwritten. For
the two assignments this is no problem, because [p] is no longer needed.
When we access [p] after the second [add], however, the value may
have become invalid. Accessing it may crash the program!

How to fix? It is possible to pin additional values during mutation
so that they cannot be collected by the GC:

{[
Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    let u = Netmcore_ref.deref_ro s in
    let p = u.data1 in
    u.data1 <- Netmcore_heap.add mut ("Previously in data2: " ^ u.data2);
    u.data2 <- Netmcore_heap.add mut ("Previously in data1: " ^ p);
    Netmcore_heap.pin mut p;
    printf "p=%s\n" p;
  )
]}

This piece of code is now correct. The effect of [pin] lasts until
the end of the [modify] function.


{2:sdata Shared data structures}

Besides references, there are a number of further shared data structures:

- {!Netmcore_array} implements shared arrays
- {!Netmcore_matrix} implements shared matrices (2-dimensional arrays)
- {!Netmcore_buffer} implements shared string buffers
- {!Netmcore_queue} implements shared queues
- {!Netmcore_hashtbl} implements shared hash tables

You may wonder why these data structures exist. Isn't it possible to
e.g. put a normal [Queue] into a shared reference in order to get a
shared version? The problem is that existing types like [Queue] do not
adhere to the programming rules we've outlined above. [Queue] is a
mutable structure, and when elements are added to or removed from the
queue, new value allocations are always done in the normal
process-local heap but not in the right shared heap. This is nothing
one can fix by wrapping code around these data structures.

If you look at the implementation of e.g. {!Netmcore_queue} (which is
really recommended), you'll see that it looks very much like the
implementation of [Queue], only that {!Netmcore_heap.modify} is used
for managing mutation.

The above listed modules also include all the necessary means to
protect the data structures against the possibly disastrous effects
of parallel mutation. Also, for read accesses there are always several
access functions corresponding to what we've seen for references
([deref_ro], [deref_c], and [deref_p]).

Let's have a closer look at {!Netmcore_array} to see how this is done.
The signature is:

{[
type ('e,'h) sarray
type ('e,'h) sarray_descr
val create : Netmcore.res_id -> 'e array -> 'h -> ('e,'h) sarray
val make : Netmcore.res_id -> int -> 'e -> 'h -> ('e,'h) sarray
val init : Netmcore.res_id -> int -> (int -> 'e) -> 'h -> ('e,'h) sarray
val grow : ('e,_) sarray -> int -> 'e -> unit
val set : ('e,_) sarray -> int -> 'e -> unit
val get_ro : ('e,_) sarray -> int -> 'e
val get_p : ('e,_) sarray -> int -> ('e -> 'a) -> 'a
val get_c : ('e,_) sarray -> int -> 'e
val length : (_,_) sarray -> int
val header : (_,'h) sarray -> 'h
val deref : ('e,_) sarray -> 'e array
val heap : (_,_) sarray -> Obj.t Netmcore_heap.heap
val descr_of_sarray : ('e,'h) sarray -> ('e,'h) sarray_descr
val sarray_of_descr : Netmcore.res_id -> ('e,'h) sarray_descr -> ('e,'h) sarray
]}

As you can see, the type is not only ['e sarray] (when ['e] is the
type of the elements), but there is a second type variable ['h], so
that the type becomes [('e,'h) sarray]. This is the type of the
{i header}, which is simply an extra place for storing data that exists
once per shared data structure. The header can have any type, but is
often a record with a few fields. If you do not need the header, just
set it to [()] (i.e. ['h = unit]).

For managing shared data structures one often needs a few extra fields
that can be put into the header. For example, a requirement could be
that the length of a shared queue is limited, and one needs
synchronization variables to ensure that the users of the queue stick
to the limit (i.e. the addition of new elements to the queue is
suspended when it is full, and it is restarted when there is again
space). As such extra requirements are very common, all shared data
structures have such a header (well, for {!Netmcore_ref} it was omitted
for obvious reasons).

When you create an instance of the structure, you always have to pass
the resource ID of the pool (here for [create], [make],
[init]). Another argument is the initial value of the header (which is
also copied into the shared heap). The header, after being copied to
the heap, can be accessed with the [header] function.

The function [set] looks very much like the one in [Array]. Note that
for all mutation the shared heap is write-locked, so there can actually
only be one running [set] operation at a time. Because [set] copies
the argument data to the shared heap, this time is not neglectable.

For accessing elements, there are the known three variants: [get_ro],
[get_c], and [get_p]. The [length] function works as in [Array].

As it would be difficult for the user to implement growable arrays
on top of the basic version, it was chosen to add a [grow] function
exactly doing that. This function avoids to copy the values again
into the shared heap that are already there.

The type [('e,'h) sarray_descr] is used for descriptors to shared
arrays, and the functions [descr_of_sarray] and [sarray_of_descr]
allow it to manage descriptors.


{2:sync Synchronization}

There are three kinds of synchronization devices:

- {!Netmcore_sem}: Sempahores
- {!Netmcore_mutex}: Mutexes
- {!Netmcore_condition}: Condition variables

Unlike the above explained data structures, these synchronization
means are simply special values, and not shared heaps of their own
(which would be quite costly). One consequence of their special nature
is that it is not possible to copy these values around - they must
exist at a fixed memory address, and cannot be copied or moved. This
just means that after copying or moving the values become
non-functional.

Of course, these special values must be put into shared heaps in order
to be accessible by several processes. Let's just walk through an
example, to see how this is done right.

Imagine you have a field [x] of some type, and a lock [m] that is going
to protect concurrent accesses to [x]:

{[
type t =
  { mutable x : some_type;
    mutable m : Netmcore_mutex.mutex
  }
]}

This record is put into a shared reference:

{[
let s = Netmcore_ref.sref { x = ...; m = ... }
]}

Remember that [sref] initializes the reference with a {i copy} of the
argument value. This would mean we copy [m], which is invalid as we've
learned. How to solve?

Mutexes (like the other synchronization devices) are designed for this
use pattern, so there is already a built-in solution. There is a special
dummy value one can use during initialization:

{[
let s = Netmcore_ref.sref { x = ...; m = Netmcore_mutex.dummy() }
]}

Dummies are placeholders that need to be re-initialized later when the
mutex is already copied to the shared heap. This is done by:

{[
Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
     let r = Netmcore_ref.deref_ro mut in
     r.m <- Netmcore_mutex.create mut `Normal
  )
]}

That's it! Note that we've set the type of the mutex to [`Normal]
which creates a fast mutex without deadlock protection.

The mutex can now be used in statements like

{[
Netmcore_ref.deref_p (fun r -> Netmcore_mutex.lock r)
]}

Be careful not to use [deref_c] because this would create a copy of the
mutex, and render it useless!

Semaphores are much like mutexes, only that the synchronization functions
are not [lock] and [unlock] but [wait] and [post].

Condition variables are a bit harder to use, unfortunately. When
implementing them I ran into the issue that the fast algorithm needs
to allocate special storage places, one for each process that can be
suspended. (There is a slow algorithm not requiring additional
storage, but this would have been a very bad deal.) In system-level
implementations of condition variables the additional storage can
usually be hidden from the user. This is not possible in a pure
user-space implementation like this one. For this reason, the user of
condition variables has to allocate these places called [wait_entry].
One [wait_entry] is needed for each process that can ever wait for
a condition variable. There is also [wait_set] which is just a collection
of [wait_entry] values. Let's look at an example:

{[
type t =
  { mutable x : some_type;
    mutable m : Netmcore_mutex.mutex;
    mutable c : Netmcore_condition.condition;
    mutable w : Netmcore_condition.wait_set
  }

let s = 
  Netmcore_ref.sref
     { x = ...; 
       m = Netmcore_mutex.dummy();
       c = Netmcore_condition.dummy_condition();
       w = Netmcore_condition.dummy_wait_set()
     }

let () =
  Netmcore_heap.modify
    (Netmcore_ref.heap s)
    (fun mut ->
       let r = Netmcore_ref.deref_ro mut in
       r.m <- Netmcore_mutex.create mut `Normal;
       r.c <- Netmcore_condition.create_condition mut;
       r.w <- Netmcore_condition.create_wait_set mut
    )
]}

The field [w] is now an empty [wait_set]. Note that we only need one [w]
for all condition variables that exist in the same shared heap.

The point is now that we need to get a [wait_entry] for each process
using [s]. Get it with:

{[
let we =
  Netmcore_heap.modify
    (Netmcore_ref.heap s)
    (fun mut -> 
      Netmcore_condition.alloc_wait_entry mut (Netmcore_ref.deref_ro s).w
    )
]}

This just needs to happen once for each process. The value [we] can be
used for all [wait] calls related to all condition variable in the
same shared heap.

A [wait] call looks then like:

{[
Netmcore_ref.deref_p (fun r -> Netmcore_condition.wait we r.c r.m)
]}

For [signal] and [broadcast] the value [we] is not needed. As you see
the only additional complexity has to do with the initialization of the
process - we need to create [we] once for each process.


{2:examples Where to find examples}

It is very much recommended to study complete examples before trying
to develop with Netmulticore. There are a few examples in the
[examples/multicore] directory of the distributed tarball.

The latest version can always be found in the svn repository:

- {{:https://godirepo.camlcity.org/svn/lib-ocamlnet2/trunk/code/examples/multicore/} examples/multicore}


{2:impl Remarks on the implementation}

The current implementation of shared heaps only uses a single lock for
protecting every heap, even for the initial actions of read
accesses. This e.g. means that a [deref_c] locks the reference for a
short time until it has pinned the current value. The (perhaps time
consuming) copy operation is then done without lock.

This might not be optimal for all use cases. An improved locking scheme
would use reader/writer locks. However, this kind of lock is complicated
to implement on top of just semaphores, so it was omitted for now. Also,
reader/writer locks are more expensive in general, so it is not clear
whether it is better at all.

The memory management of heaps is still in a quite experimental state.
Heaps are extended in units of 64 KB which may be way too big or way
too small for the application. Also, it is tried to achieve that at least
half of the heap memory is free (i.e. the "overhead factor" is 50%).

If parts of a shared heap become completely free they are in deed
given back to the memory pool.


{2:diffs Some points where Netmulticore is different from multi-threading}

When porting multi-threaded programs to Netmulticore, you may wonder
where the differences are.

Netmulticore can only deal with values that are completely stored in
shared heaps. This requires that the value is initially copied to the
heap, and the described programming rules must be adhered to when
modifiying and also reading the values. Of course, there are also
programming rules in a multi-threaded environment, so this is not
completely different.

The way the shared heaps are managed is less automatic than in the
multi-threaded case. Especially, the garbage collector of shared heaps
does not recognize values in process-local memory as roots.  This is
really unfortunate, because the user has to work around this
limitation (pinning), and this is error-prone. There is, however,
probably nothing we can do about it. Theoretically it is possible to
create a protocol between the shared GC and the process-local GC, but
it looks very complicated to implement it.

Another problem is that there is nothing that would prevent erroneous
pointers from shared heaps to process-local heaps. In multi-threaded
environments this distinction does not exist, so there is no such
problem. By changing the Ocaml compiler it could be possible (without
having checked it in detail) to emit different code when a
process-local value is assigned to a shared variable, and to
automatically allocate the value in the shared heap.

The Netmulticore approach also has advantages. Especially, it is way
more scalable than a multi-threaded environment with a single heap
only. There is no global lock that could become the bottleneck of the
implementation. Each shared heap has its own lock, so there is always
the possibility to increase the "lock capacity" by using more shared
heaps together. In some sense, this is not a "multicore" solution, but
rather a "manycore" approach that will also work for hundreds of
cores.


{2:os Operating system issues}

{3 Administering POSIX shared memory}

Many of the data structures described here are actually backed by
shared memory blocks. For managing these blocks, the POSIX API
is used (i.e. [shm_open]). Note that there is also an older API
on many systems called System V API (i.e. [shmget]). This API
is not used.

Shared memory has kernel persistence, i.e. the blocks remain allocated
even after the process terminates that created them. The blocks need
to be explicitly deleted. This can be done by the right API calls
(e.g.  call {!Netmcore.release} for objects with a Netmulticore
resource ID), but from time to time a program does not terminate
normally, and this deletion is not performed. The question is how to
get administratively rid of the blocks.

The nice aspect of the POSIX API is that shared memory looks very much
like files, and in deed, in many implementations I've seen the blocks
appear somewhere in the file system. Typical locations for these files
are [/dev/shm] and [/tmp]. The files have names like
[mempool_f7e8bdaa], or generally [<prefix>_<8hexdigits>]. The prefix
depends on the data structure the block is used for, and the hex
digits make the name unique. By deleting these files, the blocks
are removed.

Since Ocamlnet-3.6, one can also delete shared memory administratively
with the [netplex-admin] utility. See {!Netplex_admin.unlink} for
details. This method works for all OS. Also, an unlink of old memory
is automatically done when the program is restarted.

Another issue is that OS typically define an upper limit for the
amount of shared memory. This is e.g. 50% of the system RAM for
Linux. There are usually ways to configure this limit.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml