Contents
The approach of Netmulticore is to spawn subprocesses acting in the role of worker threads. Processes are separated from each other, and hence there is normally no direct way of getting into interaction. The help of the operating system is required here - a classic example of IPC (interprocess communication) are pipes, which create a data stream from one process to the other. Of course, we want here even closer interaction than this. Another, rarely used IPC mechanism is shared memory. This means that a block of RAM is allocated and mapped into all processes. When one process mutates a RAM cell in a shared block, the other processes immediately see this mutation, so that there is no system call overhead for transporting data. Actually, there is a bit of overhead in modern computers when this technique is used, but it is only occuring on the hardware level, and is very fast.
Netmulticore does not only allocate a shared block of RAM, but also manages it. Ideally, using the shared block would be as simple as using normal, process-local memory. Unfortunately, this is not possible, but the design of Netmulticore allows it to come quite close to this ideal. With Netmulticore a process can write normal Ocaml values like strings, tuples or records to the shared block, and the other processes can directly access this data as if it were in normal RAM. Unfortunately, one cannot use direct assignment to do such writing to the shared block, but has to follow special programming rules which ensure that the shared block remains in a consistent state. Large parts of this tutorial explain how to accomplish this.
The list of features supported by Netmulticore:
Design
The process hierarchy
When processes start other processes, the Unix system design defines a relation between processes: The started workers are children of the process requesting the starts. An immediate effect of this is that only the requesting process can wait for the termination of the children. There are more implications than this, but it should be clear that we have to take the relations between processes into account.
When a Netmulticore program is started, the implicitly created first process has a special role. It is called the master process, and it is normally not used for doing the real work. It is merely acting as a supervisor managing the workers. The tasks of the master process include the management of the resource table, and of course watching the lifetime of the workers. The master normally starts quickly the first worker process, which in turn starts as many workers as needed for accomplishing the computation.
Now, what does it mean to start processes? Unix only defines this
via the fork
system call. When doing a fork
the requesting process
is duplicated (i.e. RAM contents and many resources are copied, with
only a few exceptions), and the duplicate is registered as the new
child process. Netmulticore requires that the relationships between
processes remain manageable, and because of this it is always the
master process which requests the start of new children. Of course,
the programmer can also call Netmcore_process.start
from other
workers, but the Netmulticore machinery just sends this request to the
master, where it is really executed.
All in all this means: When a new worker is created, it is initialized as copy of the master process, independently of from where the user code requests the creation.
Resources
Netmulticore manages not only processes but also other kinds of resources. As of now the supported types are:
Netmcore.res_id
) which is
effectively an integer. With only the resource ID every worker can
request to get the details of the resource (e.g. which name the
temporary file has). The master process also keeps records which
worker needs which resource. If a resource is not needed anymore by
any worker, it is automatically deleted.
Since Ocamlnet-3.6, the deletion procedure has been substantially
improved. Now, a list of the alive resources is not only kept in
memory, but also written to a file "netplex.pmanage". This allows it
to delete the resources after the program has crashed or terminated
somehow abnormally. This is automatically done when the program is
restarted, or by the user running the netplex-admin
utility with
option -unlink
. The background for this is that the mentioned
resources have kernel persistency, and continue to exist when the
program is finished.
Memory pools
For setting up shared heaps as explained below, we especially need
shared memory blocks that are mapped at the same address by all worker
processes. This kind of block is needed for memory pools as
implemented by Netmcore_mempool
. There is, unfortunately, only one
reliable method of doing so: The master process allocates the block
and maps it into its own address space, and the workers get access to
the block by inheriting the mapping. This just means that the
fork
operation leaves the mapping in place, and because all workers
are created in the same way, all workers end up seeing the shared
block at the same address. The details of this procedure are hidden in
Netmulticore, but the user should know the limitations of this method.
A process cannot inherit a posteriori - when the process is already created, and a new shared block is set up, there is no safe way for the process to get access to the block at the right address. Because of this, there are two important programming rules:
Another limitation of the inheritance method: A shared block cannot be enlarged later. For the programmer this means that the initially created block must be large enough for the whole lifetime of the program.
The management of the shared blocks is now done by two modules:
Netmcore_mempool
is responsible in the first place, and can hand
out pieces of the shared block to users. Netmcore_heap
is such a
user, and manages the pieces it gets as a heap of Ocaml values.
Heaps can be enlarged if necessary, i.e. more pieces can be obtained
from the big shared block that is managed by Netmcore_mempool
.
Of course, it is also possible to have several heaps getting their
memory pieces from a single Netmcore_mempool
.
What is now a heap, or better shared heap? It is just a memory area, and it is possible to put Ocaml values into it. The values are connected by pointers that reach from one value to the next. This is not very much different than what is done by the Ocaml runtime and the normal Ocaml heap, only that the values are now exposed to a multitude of processes. We'll later see how we can create such a heap, and how it can be filled with Ocaml values. At this point, let me only mention that it requires the discipline of the programmer to fill and access a shared heap in the right way. If something gets wrong, the punishment is an illegal memory access, normally leading to a segmentation fault.
If a shared heap fills up, Netmcore_heap
starts a special garbage
collection run over it to reclaim memory that is no longer referenced.
This garbage collector works very much like the "major GC" that is
built into the Ocaml runtime.
As it is a bit complicated to manage shared heaps directly, there are a number of pre-defined data structures over shared heaps that can be directly used. Among these structures are buffers, queues, arrays, and hash tables.
Camlboxes
For fast notification between processes, Netmulticore uses Camlboxes. This data structure was invented before and exists independently of Netmulticore, but is specially supported.
Camlboxes are like mail boxes where an open number of senders can send messages to a single receiver. The messages are normal Ocaml values (like strings, records, or variants), so there is no marshalling issue. Camlboxes also use shared memory as transport medium, but it is here not required that the shared block needs to be mapped at the same address. Because of this, it is also possible to connect processes with Camlboxes that are not related to each other.
Camlboxes are optimized for speed and maximum parallelism. The data model is the following: The receiver creates the box which consists of a fixed number of slots, and each slot has a fixed maximum size. Each sender can map the box into its own address space, and fill any free slot. Because there is no strict ordering of the messages, the senders can avoid to run into lock contention issues (the senders simply use different slots and avoid to step on each other's feet). The receiver can look at the messages, copy the messages outside the box, and delete messages. The messages are not strings or marshalled data, but really Ocaml values, relocated to the receiver's address.
Synchronization
Netmulticore provides these synchronization primitives:
For example, consider a shared hash table (Netmcore_hashtbl
). This
data structure already includes everything to protect the internal
representation from concurrent accesses, but not more. For example,
a Netmcore_hashtbl.add
adds a value to the table, and it is safe
to call this operation from several processes at the same time.
This is not enough, however, to protect whole read/modify/update
cycles where the whole cycle needs to be run uninterrupted to avoid
data corruption. The user can define these additional synchronization
requests with the mentioned primitives.
As this is a quite frequent programming case, many shared data structures already contain a special area called the header where the user can put these primitives.
How to start processes
For starting processes, there are actually two API's: the slightly
more generic one in Netmcore
, and the strictly typed one in
Netmcore_process
. They are quite similar, and I'm only explaining
the latter.
Before a process can be started, it needs to be defined:
let process_fork, process_join =
Netmcore_process.def_process process_body
The definition via Netmcore_process.def_process
takes a function
process_body
, which is simply a function of type 'a -> 'b
, and
returns a fork point process_fork
and a join point process_join
.
This definition must happen in the master process. Remember that it is
also always the master process that is forked when a new worker
process is created. This is consistent with this programming rule -
process_body
will be called in the context of a fresh copy of the
master, so it needs also to be defined in the context of the master.
Practically, the definition is usually done when a top-level module
is initialized, and process_body
is a normal top-level function.
The fork point process_fork
has type 'a fork_point
where 'a
is
the type of the argument of process_body
. Imagine a fork point as a
way to fork the master process, where a value of type 'a
is passed
down, and process_body
is finally called with this value in the new
child. At the other end, the joint point process_join
(of type 'b join_point
) is a way to wait for the completion of the
process_body
and for getting the result of type 'b
.
Of course, after defining a process, one can start it as many times as needed. Here is how to do:
let pid = Netmcore_process.start process_fork arg
This finally does:
process_fork
arg
is marshalled
and sent to the new processstart
with a process ID pid
process_body arg'
where arg'
is the
restored value arg
When process_body
finishes, the function result is passed back to
the master where it is stored until the join is done. The worker
process is terminated. One can get the result value by doing:
let r_opt = Netmcore_process.join process_join pid
This waits until the result value is available, and returns it as
Some r
(when r
is of type 'b
). If no result is available because
of an exception or other termination of the process, None
is
returned. Note that the result r
is also marshalled.
If there is no interest in getting results at all, one can also do
Netmcore_process.release_join_point process_join
which prevents that results are stored in the master until they are picked up.
Before giving a complete example, let's look at how to initialize Netmulticore. When the master is set up (processes are defined etc.) one can start the first worker. At this point, the whole management machinery needs to be started, too. This is normally done by a function call like
Netmcore.startup
~socket_directory:"some_dir"
~first_process:(fun () -> Netmcore_process.start process_fork arg)
()
The socket_directory
is needed by the machinery to store runtime
files like Unix domain sockets (inherited from Netplex). In doubt, set
it to something like "/tmp/dir", but note that each running instance
of the program needs its own socket directory. (There is a chance that
socket_directory
becomes optional in future releases.)
Now a complete example: The master starts process X which in turn starts process Y. The processes do not much, but let's see:
let c = ref 0
let body_Y k =
k + 1 + !c
let fork_Y, join_Y =
Netmcore_process.def_process body_Y
let body_X k =
c := 1;
let pid_Y = Netmcore_process.start fork_Y k in
let j =
match Netmcore_process.join join_Y pid_Y with
| Some j -> j
| None -> failwith "Error in process Y" in
Printf.printf "Result: %d\n%!" j
let fork_X, join_X =
Netmcore_process.def_process body_X
Netmcore.startup
~socket_directory:"some_dir"
~first_process:(fun () -> Netmcore_process.start fork_X 1)
()
The result is of course 2 (and not 3), because the assignment to c
does not have any effect. Remember that process X
is forked from
the master process where c
still has the value 0.
A final word on marshalling before going on. Arguments and results of
processes are transmitted as strings that have been created with the
functions from the Marshal
module of the standard library. There are
a number of constraints one should be aware of:
in_channel
).Unix.file_descr
is such
a case. This unfortunately also holds for many Netmulticore types
such as heaps. For these types the marshalling seems to work,
but the restored values are actually unusable because they have
lost their connection with the underlying resources of the operating
system.Camlboxes must always be created by the (single) receiver of the messages. The address of the box is then made available to the senders which can then start the transmission of messages.
The number of message slots is fixed, and cannot be changed later. Also, the maximum size of the messages must be specified in advance, in bytes. Of course, this means one cannot use Camlboxes for arbitrarily large messages, but this is not what they are designed for. Camlboxes are good for small notifications that need to be quickly sent without risking any lock contention. (If big payload data needs to be included, a good workaround is to include that data by reference to a shared heap only.)
Let's just give an example: Process X creates a box, starts process Y, and Y sends a message to the box of X.
let body_Y box_res =
let (box_sender : string Netcamlbox.camlbox_sender) =
Netmcore_camlbox.lookup_camlbox_sender box_res in
Netcamlbox.camlbox_send box_sender "Hello world"
let fork_Y, join_Y =
Netmcore_process.def_process body_Y
let () =
Netmcore_process.release_join_point join_Y
let body_X () =
let ((box : string Netcamlbox.camlbox),box_res) =
Netmcore_camlbox.create_camlbox "example" 1 100 in
let pid_Y = Netmcore_process.start fork_Y box_res in
( match Netcamlbox.camlbox_wait box with
| [ slot ] ->
let m = Netcamlbox.camlbox_get_copy box slot in
Netcamlbox.camlbox_delete box slot;
printf "Message: %s\n%!" m
| _ ->
(* in _this_ example not possible *)
assert false
)
let fork_X, join_X =
Netmcore_process.def_process body_X
Netmcore.startup
~socket_directory:"some_dir"
~first_process:(fun () -> Netmcore_process.start fork_X ())
()
Let's go through it:
body_X
, by a call of
Netmcore_camlbox.create_camlbox
. Generally, Netmcore_camlbox
contains Netmulticore-specific extensions of the Camlbox abstraction
which is primarily defined in Netcamlbox
.
The function Netmcore_camlbox.create_camlbox
does not only create the box,
but also registers it as resource in the master process. The
string "example" is used for naming the shared memory block backing
the box. This box has a capacity of one message which may get
100 bytes long. There are two return values, box
and box_res
.
The value box
is the way to access the box for receiving
messages. The value box_res
is the resource ID.box_res
when we start body_Y
. Resource ID's can be
marshalled (whereas camlboxes cannot).Netmcore_camlbox.lookup_camlbox_sender
is
called at the beginning of body_Y
to look up the sender interface
for the box identified by box_res
. The sender view of the box
is box_sender
.Netcamlbox.camlbox_send
.Netcamlbox.camlbox_wait
the execution is suspended
until at least one message is in the box. The function returns the
slots containing messages.Netcamlbox.camlbox_get_copy
we can get a copy of the message
in the slot. Note that there is also Netcamlbox.camlbox_get
which
does not make a copy but returns a reference to the message as it
is in the box. This is unsafe because this reference becomes invalid
when the message is deleted.Netcamlbox.camlbox_delete
.
The slot is now again free, and can hold the next message.box
and
box_sender
. If you look at the signatures of the Camlbox modules,
you'll see that there is no enforcement that the types of box
and
box_sender
fit to each other. The type parameter string
is lost
at the moment the resource ID is used for identifying the box.
When sending a message, it is always copied into the box. For doing this a special "value copier" is used. This copier traverses deeply through the value representation and copies each piece into the box. This is a bit like marshalling, only that this does not result in a string but a copy of the orginal value (and the copy is placed into a reserved region in the shared memory block of the Camlbox). This copier has also restrictions which types of values can be handled, very much like marshalling.
How to create and use memory pools
Now back to the core of Netmulticore. For the remaining data
structures the shared memory must be managed as a memory pool. The
module Netmcore_mempool
accomplishes this.
As explained above, there are certain restrictions on which processes have access to memory pools. For most applications it is best when they simply create the pool in the master process directly after program startup and before launching any worker process. This avoids all restrictions, but the size of the pool needs to be set at this early point of execution (remember that pools also cannot be enlarged).
It is a good question how big a pool should be. Generally, the pool should be a small factor larger than the minimum amount of RAM needed for the pool. I cannot give really good recommendations, but factors between 1.5 and 3.0 seem to be good choices. If the pool size is chosen too tight, the garbage collector will run often and slow down the program.
Another strategy for the pool size is to make it always large, e.g. 25% of the available system memory. The idea here is that when parts of the pool remain unused throughout the lifetime of the program, they actually will also not consume RAM. Operating systems typically distinguish between RAM that is reserved by a memory allocation and RAM that is actually filled with data. Only the latter type consumes real RAM, whereas the first type is only taken into account for bookkeeping. This means the pool memory is reserved for the case it is needed at a certain point, but RAM is not wasted if not.
Now, the pool is simply created with
let pool = Netmcore_mempool.create_mempool size_in_bytes
The value pool
is again a resource ID, and it is no problem to
marshal this ID. When creating shared heaps, and for a number of
other operations the pool
ID is required, so it is a good idea
to passed it down to all worker processes.
Pools are backed by shared memory blocks, and these blocks have kernel persistence (usually they even appear as files, but this depends on the operating system). This means they exist until explicitly deleted (like files). To do so, just call
Netmcore.release pool
after the Netmcore.startup
function returns to the caller, and
before the program is finally terminated.
As pool memory is inherited by worker processes (as explained in the section about "Design"), one has to enable the inheritance for each started process, e.g.
let pid =
Netmcore_process.start
~inherit_resources:`All
process_fork arg
Otherwise the pool is not accessible by the started worker process. (I'm thinking about making this the default, but haven't come to a conclusion yet.)
If system memory is very tight, you will sometimes see bus errors when using pools (signal SIGBUS is sent to one of the processes, typically the signal number is 7). This happens when allocated shared memory is actually not available when it is used for the first time. (I'm still looking for ways how to get nicer reactions.)
Shared
ref
-type variables
Let's now look at a data structure that lives in a shared heap. Note that there is also a direct interface to shared heaps, but for the sake of explaining the concept, it is easier to first look at a concrete instance of a heap.
The module Netmcore_ref
provides a mutable reference for a single
value residing in the shared heap. The reference is comparable to the
ref
type provided by the standard library, and it is possible to
dereference it, and to assign new values to it. The difference is,
however, that the reference and the referenced value reside completely
in shared memory, and are accessible by all workers.
The reference is created by something like
let s = Netmcore_ref.sref pool initial_value
where pool
is the resource ID of a pool (see above), and
initial_value
is the value to assign to the reference initially.
This is comparable to
let r = ref initial_value
for normal references. There is an important difference, however.
The referenced value must completely reside in shared memory, and
in order to achieve this, the sref
function copies the
initial_value
to it. (The same copy mechanism is used that also
puts messages into Camlboxes.)
After running sref
, a new shared heap has been created which is
initialized as a reference. You can get a direct handle for the
heap structure by calling Netmcore_ref.heap
.
Also note that you have to call sref
from a worker process. It is
not possible to do this from the master process. This also applies
to almost any other heap-related function.
It is possible to assign a new value to s
:
Netmcore_ref.assign s new_value
This also copies the new_value
to the heap, and changes the
reference.
You may wonder what happens when you assign new values over and over again. There is no mechanism for deleting the old values immediately. Instead, the values accumulate over time in the heap, and when a certain threshold is reached, a garbage collection run is started. This run checks for values (or value parts) that became unreachable, and reclaims the memory used for them.
This garbage collector (GC) is not the normal garbage collector that cleans the process-local heap managed by the Ocaml runtime. It is a special GC designed for shared heaps. Unfortunately, this special GC is not as automatic as the GC built into the Ocaml runtime, as we'll see.
Back to our shared references. There are three functions for getting the value pointed to by a reference:
let v1 = Netmcore_ref.deref_ro s
let v2 = Netmcore_ref.deref_c s
Netmcore_ref.deref_p s (fun v3 -> ...)
The first function, deref_ro
, is the fastest but unsafest. It just
returns the current value of the reference as-is, and this value is of
course stored in the shared heap. The problem is, however, that
further assignments to the variable can invalidate this value when a
GC run occurs. If this happens, v1
becomes invalid because the
memory holding the representation for v1
is overwritten with
something else, and any accesses to v1
or its parts may crash the
program! For this reason, the deref_ro
function must only be used if it
can be excluded by other means that the variable is not being assigned
while v1
is being accessed (e.g. by a lock). The suffix "_ro" is
meant to remind of the fact that it is safe to use in read-only
contexts. (N.B. The GC could theoretically also check for values that
are only alive because of references from process-local
memory. Unfortunately, this would complicate everything dramatically -
the local GC and the shared GC would need some synchronization, and
slow or misbehaving local GC's could delay shared GC runs almost
indefinitely.)
The second function, deref_c
, does not have this problem because it
always returns a copy of the referenced value (suffix "_c" = "copy").
It is, of course, a lot slower because of this, and the copy semantics
may not always be the right design for an application.
The third function, deref_p
, is a clever alternative. As you can
see, deref_p
does not return the value, but it runs an
argument function with the current value. The trick is now that
v3
is specially protected while this function is running. This
protection does not prevent assignments, but it prevents that the
GC deletes the value v3
, if a GC run is done. This type of protection
is also called pinning, and the suffix "_p" is meant to remind
of this.
When using deref_p
, one still should be careful and think about
whether accesses to v3
(or parts of v3
) can occur after the
pinning protection has ended. This must be excluded by all means!
Let's look at another thing the programmer must not do. Imagine some inner part of the value is mutable, e.g.
type t = { mutable data : string }
let s = Netmcore_ref.sref pool { data = "initial string" }
The question is now: Is it allowed to assign to the data
component, as in
(Netmcore_ref.deref_ro s).data <- "new string" (* WARNING - PROBLEMATIC CODE *)
The meaning of this assignment is that the data
component is
overwritten with a pointer to a string residing in process-local
memory. Imagine now what happens if a different worker process
accesses the data
component. The address of this pointer is now
meaningless, because the string does not exist in context of the other
process at this address. If we access nevertheless, we risk getting
random data or even segmentation faults.
I'll explain below how to fix this assignment. It is possible, if done in the right way.
Finally, here is a complete example using shared references:
let body_Y (pool, c_descr, k) =
let c = Netmcore_sref.sref_of_descr pool c_descr in
let c_value = Netmcore_sref.deref_ro c in
k + 1 + c_value
let fork_Y, join_Y =
Netmcore_process.def_process body_Y
let body_X (pool,k) =
let c = Netmcore_sref.sref pool 1 in
let c_descr = Netmcore_sref.descr_of_sref c in
let pid_Y =
Netmcore_process.start ~inherit_resources:`All fork_Y (pool, c_descr, k) in
let j =
match Netmcore_process.join join_Y pid_Y with
| Some j -> j
| None -> failwith "Error in process Y" in
Printf.printf "Result: %d\n%!" j
let fork_X, join_X =
Netmcore_process.def_process body_X
let () =
let pool = Netmcore_mempool.create_mempool (1024 * 1024) in
Netmcore.startup
~socket_directory:"some_dir"
~first_process:
(fun () ->
Netmcore_process.start ~inherit_resources:`All fork_X (pool,1))
();
Netmcore.release pool
Compare this with the example I gave when explaining how to start
processes. The variable c
is now a shared reference, and because of
this process Y can access the contents of c
. The result is now 3.
In this example already a feature is used that is first explained in
the next section: descriptors. For some reason (we'll see why) it is not
possible to marshal shared heaps, and thus you cannot call body_Y
with c
as argument. The workaround is to create a descriptor for
c
, use the descriptor for marshalling, and restore the orginal
heap in the called function.
Descriptors
Shared heaps are very special objects - in particular, the heaps reside in shared memory at a certain address, and this address also appears in the internal data structures the heaps use to manage their memory space.
Imagine what happens when you do not respect this special nature of shared heaps, and create a copy nevertheless (e.g. by marshalling the heap, or by putting the heap into another heap). At the first glance, you will see that you can actually create the copy (it's a valid Ocaml value), but when you try to use it, the program will crash. What has happened?
The problem is that the internal data structure of the copied heap still contains addresses that are only valid for the orignal heap, but are meaningless for the copy. When these addresses are followed, invalid memory access occur, and the program crashes.
So, a very important programming rule: Never copy shared heaps! Unfortunately, this really requires discipline, as there are many mechanisms in Netmulticore where copies are automatically created (just page back and look how often I told you that a copy is created here and there), for example when starting worker processes the arguments are also copied over to the newly created process.
How to work around? First let's think about what we really want to have when we e.g. start a worker process and pass a shared heap to it. Of course, we do not want to create a copy, but rather we want to make the same shared heap accessible to the worker. We want call by reference!
Descriptors are the solution. A descriptor of a shared heap is just
a marshallable reference to the heap. For each shared data structure
there is a special descriptor type, and it is possible to get the
descriptor for a heap, and to look up the heap by descriptor. For
example, shared references define this as (in Netmcore_ref
):
type 't sref_descr
val descr_of_sref : 't sref -> 't sref_descr
val sref_of_descr : Netmcore.res_id -> 't sref_descr -> 't sref
Note that the sref_of_descr
function takes the resource ID of the
pool as first argument, and that this function is quite slow (because
it has to ask the master process where to find the shared memory
object).
For the other shared data structures, the are comparable functions supporting descriptors.
How to mutate shared values
Remember this code?
(Netmcore_ref.deref_ro s).data <- "new string" (* WARNING - PROBLEMATIC CODE *)
We now look how to fix it. The basic problem is that the new string is not residing in shared memory, and if we could achieve that this is the case, the assignment would be acceptable.
The correct way to do it is this:
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
(Netmcore_ref.deref_ro s).data <- Netmcore_heap.add mut "new string"
)
The essence is that we now call Netmcore_heap.add
before assigning
the string. This function again copies the argument value, and puts
the copy onto the heap. For managing the copy, this function needs
an special object mut
, which is actually a mutator. The only
way to get a mutator is to call Netmcore_heap.modify
, which - among
other actions - locks the heap, and prevents that any competing write
access occurs. This is required, because otherwise mutations could
overlap in bad ways when they are done in parallel, and the internal
representation of the heap would be corrupted.
Note that you need to call add
for assigning all values that are not
yet residing in the same heap. This means: Call it if the new value is
in process-local memory, but also call it when the new value is
already part of a different heap (because pointers from one heap to
the other are not allowed - heaps must be completely self-contained).
You may wonder why we used deref_ro
to get the current value of s
,
and not one of the other access functions. Quite easy answer: the
other functions deref_c
and deref_p
would deadlock the program!
The modify
function already acquires the heap lock for the duration
of the mutation, and using any access function that also plays with
this lock will cause deadlocks.
Let's look at this example:
type u = { mutable data1 : string;
mutable data2 : string;
}
let s = Netmcore_ref.sref
pool
{ data1 = "initial string 1"; data2 = "initial string 2" }
We want now to swap the values in the two components data1
and data2
.
The solution is of course:
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
let u = Netmcore_ref.deref_ro s in
let p = u.data1 in
u.data1 <- u.data2;
u.data2 <- p
)
Note that we call modify
although we do not need the mutator for
doing our work. The reason is that modify
also write-locks the heap,
and this protects the integrity of our data. We do not need to call
add
because the two strings are already residing in the same shared
heap, and we are just swapping them.
A little variation of this let us run into a possible problem, though:
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
let u = Netmcore_ref.deref_ro s in
let p = u.data1 in
u.data1 <- Netmcore_heap.add mut ("Previously in data2: " ^ u.data2);
u.data2 <- Netmcore_heap.add mut ("Previously in data1: " ^ p);
printf "p=%s\n" p; (* THIS LINE IS PROBLEMATIC *)
)
This piece of code will crash now and then (not often, so maybe difficult to run into). What is going wrong?
There is one phenomenon we haven't paid attention to yet: When we call
add
it is possible that the available space in the heap is not
sufficient anymore to do the required memory allocations, and that a
GC run is started. When this happens in the second add
, the
field data1
is already overwritten, and because of this the string
p
is now unreachable from the top-level value of the heap. The
space occupied by p
is reclaimed, and may even be overwritten. For
the two assignments this is no problem, because p
is no longer needed.
When we access p
after the second add
, however, the value may
have become invalid. Accessing it may crash the program!
How to fix? It is possible to pin additional values during mutation so that they cannot be collected by the GC:
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
let u = Netmcore_ref.deref_ro s in
let p = u.data1 in
u.data1 <- Netmcore_heap.add mut ("Previously in data2: " ^ u.data2);
u.data2 <- Netmcore_heap.add mut ("Previously in data1: " ^ p);
Netmcore_heap.pin mut p;
printf "p=%s\n" p;
)
This piece of code is now correct. The effect of pin
lasts until
the end of the modify
function.
Shared data structures
Besides references, there are a number of further shared data structures:
Netmcore_array
implements shared arraysNetmcore_matrix
implements shared matrices (2-dimensional arrays)Netmcore_buffer
implements shared string buffersNetmcore_queue
implements shared queuesNetmcore_hashtbl
implements shared hash tablesQueue
into a shared reference in order to get a
shared version? The problem is that existing types like Queue
do not
adhere to the programming rules we've outlined above. Queue
is a
mutable structure, and when elements are added to or removed from the
queue, new value allocations are always done in the normal
process-local heap but not in the right shared heap. This is nothing
one can fix by wrapping code around these data structures.
If you look at the implementation of e.g. Netmcore_queue
(which is
really recommended), you'll see that it looks very much like the
implementation of Queue
, only that Netmcore_heap.modify
is used
for managing mutation.
The above listed modules also include all the necessary means to
protect the data structures against the possibly disastrous effects
of parallel mutation. Also, for read accesses there are always several
access functions corresponding to what we've seen for references
(deref_ro
, deref_c
, and deref_p
).
Let's have a closer look at Netmcore_array
to see how this is done.
The signature is:
type ('e,'h) sarray
type ('e,'h) sarray_descr
val create : Netmcore.res_id -> 'e array -> 'h -> ('e,'h) sarray
val make : Netmcore.res_id -> int -> 'e -> 'h -> ('e,'h) sarray
val init : Netmcore.res_id -> int -> (int -> 'e) -> 'h -> ('e,'h) sarray
val grow : ('e,_) sarray -> int -> 'e -> unit
val set : ('e,_) sarray -> int -> 'e -> unit
val get_ro : ('e,_) sarray -> int -> 'e
val get_p : ('e,_) sarray -> int -> ('e -> 'a) -> 'a
val get_c : ('e,_) sarray -> int -> 'e
val length : (_,_) sarray -> int
val header : (_,'h) sarray -> 'h
val deref : ('e,_) sarray -> 'e array
val heap : (_,_) sarray -> Obj.t Netmcore_heap.heap
val descr_of_sarray : ('e,'h) sarray -> ('e,'h) sarray_descr
val sarray_of_descr : Netmcore.res_id -> ('e,'h) sarray_descr -> ('e,'h) sarray
As you can see, the type is not only 'e sarray
(when 'e
is the
type of the elements), but there is a second type variable 'h
, so
that the type becomes ('e,'h) sarray
. This is the type of the
header, which is simply an extra place for storing data that exists
once per shared data structure. The header can have any type, but is
often a record with a few fields. If you do not need the header, just
set it to ()
(i.e. 'h = unit
).
For managing shared data structures one often needs a few extra fields
that can be put into the header. For example, a requirement could be
that the length of a shared queue is limited, and one needs
synchronization variables to ensure that the users of the queue stick
to the limit (i.e. the addition of new elements to the queue is
suspended when it is full, and it is restarted when there is again
space). As such extra requirements are very common, all shared data
structures have such a header (well, for Netmcore_ref
it was omitted
for obvious reasons).
When you create an instance of the structure, you always have to pass
the resource ID of the pool (here for create
, make
,
init
). Another argument is the initial value of the header (which is
also copied into the shared heap). The header, after being copied to
the heap, can be accessed with the header
function.
The function set
looks very much like the one in Array
. Note that
for all mutation the shared heap is write-locked, so there can actually
only be one running set
operation at a time. Because set
copies
the argument data to the shared heap, this time is not neglectable.
For accessing elements, there are the known three variants: get_ro
,
get_c
, and get_p
. The length
function works as in Array
.
As it would be difficult for the user to implement growable arrays
on top of the basic version, it was chosen to add a grow
function
exactly doing that. This function avoids to copy the values again
into the shared heap that are already there.
The type ('e,'h) sarray_descr
is used for descriptors to shared
arrays, and the functions descr_of_sarray
and sarray_of_descr
allow it to manage descriptors.
Synchronization
There are three kinds of synchronization devices:
Netmcore_sem
: SempahoresNetmcore_mutex
: MutexesNetmcore_condition
: Condition variablesOf course, these special values must be put into shared heaps in order to be accessible by several processes. Let's just walk through an example, to see how this is done right.
Imagine you have a field x
of some type, and a lock m
that is going
to protect concurrent accesses to x
:
type t =
{ mutable x : some_type;
mutable m : Netmcore_mutex.mutex
}
This record is put into a shared reference:
let s = Netmcore_ref.sref { x = ...; m = ... }
Remember that sref
initializes the reference with a copy of the
argument value. This would mean we copy m
, which is invalid as we've
learned. How to solve?
Mutexes (like the other synchronization devices) are designed for this use pattern, so there is already a built-in solution. There is a special dummy value one can use during initialization:
let s = Netmcore_ref.sref { x = ...; m = Netmcore_mutex.dummy() }
Dummies are placeholders that need to be re-initialized later when the mutex is already copied to the shared heap. This is done by:
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
let r = Netmcore_ref.deref_ro mut in
r.m <- Netmcore_mutex.create mut `Normal
)
That's it! Note that we've set the type of the mutex to `Normal
which creates a fast mutex without deadlock protection.
The mutex can now be used in statements like
Netmcore_ref.deref_p (fun r -> Netmcore_mutex.lock r)
Be careful not to use deref_c
because this would create a copy of the
mutex, and render it useless!
Semaphores are much like mutexes, only that the synchronization functions
are not lock
and unlock
but wait
and post
.
Condition variables are a bit harder to use, unfortunately. When
implementing them I ran into the issue that the fast algorithm needs
to allocate special storage places, one for each process that can be
suspended. (There is a slow algorithm not requiring additional
storage, but this would have been a very bad deal.) In system-level
implementations of condition variables the additional storage can
usually be hidden from the user. This is not possible in a pure
user-space implementation like this one. For this reason, the user of
condition variables has to allocate these places called wait_entry
.
One wait_entry
is needed for each process that can ever wait for
a condition variable. There is also wait_set
which is just a collection
of wait_entry
values. Let's look at an example:
type t =
{ mutable x : some_type;
mutable m : Netmcore_mutex.mutex;
mutable c : Netmcore_condition.condition;
mutable w : Netmcore_condition.wait_set
}
let s =
Netmcore_ref.sref
{ x = ...;
m = Netmcore_mutex.dummy();
c = Netmcore_condition.dummy_condition();
w = Netmcore_condition.dummy_wait_set()
}
let () =
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
let r = Netmcore_ref.deref_ro mut in
r.m <- Netmcore_mutex.create mut `Normal;
r.c <- Netmcore_condition.create_condition mut;
r.w <- Netmcore_condition.create_wait_set mut
)
The field w
is now an empty wait_set
. Note that we only need one w
for all condition variables that exist in the same shared heap.
The point is now that we need to get a wait_entry
for each process
using s
. Get it with:
let we =
Netmcore_heap.modify
(Netmcore_ref.heap s)
(fun mut ->
Netmcore_condition.alloc_wait_entry mut (Netmcore_ref.deref_ro s).w
)
This just needs to happen once for each process. The value we
can be
used for all wait
calls related to all condition variable in the
same shared heap.
A wait
call looks then like:
Netmcore_ref.deref_p (fun r -> Netmcore_condition.wait we r.c r.m)
For signal
and broadcast
the value we
is not needed. As you see
the only additional complexity has to do with the initialization of the
process - we need to create we
once for each process.
Where to find examples
It is very much recommended to study complete examples before trying
to develop with Netmulticore. There are a few examples in the
examples/multicore
directory of the distributed tarball.
The latest version can always be found in the svn repository:
The current implementation of shared heaps only uses a single lock for
protecting every heap, even for the initial actions of read
accesses. This e.g. means that a deref_c
locks the reference for a
short time until it has pinned the current value. The (perhaps time
consuming) copy operation is then done without lock.
This might not be optimal for all use cases. An improved locking scheme would use reader/writer locks. However, this kind of lock is complicated to implement on top of just semaphores, so it was omitted for now. Also, reader/writer locks are more expensive in general, so it is not clear whether it is better at all.
The memory management of heaps is still in a quite experimental state. Heaps are extended in units of 64 KB which may be way too big or way too small for the application. Also, it is tried to achieve that at least half of the heap memory is free (i.e. the "overhead factor" is 50%).
If parts of a shared heap become completely free they are in deed given back to the memory pool.
Some points where Netmulticore is different from multi-threading
When porting multi-threaded programs to Netmulticore, you may wonder where the differences are.
Netmulticore can only deal with values that are completely stored in shared heaps. This requires that the value is initially copied to the heap, and the described programming rules must be adhered to when modifiying and also reading the values. Of course, there are also programming rules in a multi-threaded environment, so this is not completely different.
The way the shared heaps are managed is less automatic than in the multi-threaded case. Especially, the garbage collector of shared heaps does not recognize values in process-local memory as roots. This is really unfortunate, because the user has to work around this limitation (pinning), and this is error-prone. There is, however, probably nothing we can do about it. Theoretically it is possible to create a protocol between the shared GC and the process-local GC, but it looks very complicated to implement it.
Another problem is that there is nothing that would prevent erroneous pointers from shared heaps to process-local heaps. In multi-threaded environments this distinction does not exist, so there is no such problem. By changing the Ocaml compiler it could be possible (without having checked it in detail) to emit different code when a process-local value is assigned to a shared variable, and to automatically allocate the value in the shared heap.
The Netmulticore approach also has advantages. Especially, it is way more scalable than a multi-threaded environment with a single heap only. There is no global lock that could become the bottleneck of the implementation. Each shared heap has its own lock, so there is always the possibility to increase the "lock capacity" by using more shared heaps together. In some sense, this is not a "multicore" solution, but rather a "manycore" approach that will also work for hundreds of cores.
Operating system issues
Many of the data structures described here are actually backed by
shared memory blocks. For managing these blocks, the POSIX API
is used (i.e. shm_open
). Note that there is also an older API
on many systems called System V API (i.e. shmget
). This API
is not used.
Shared memory has kernel persistence, i.e. the blocks remain allocated
even after the process terminates that created them. The blocks need
to be explicitly deleted. This can be done by the right API calls
(e.g. call Netmcore.release
for objects with a Netmulticore
resource ID), but from time to time a program does not terminate
normally, and this deletion is not performed. The question is how to
get administratively rid of the blocks.
The nice aspect of the POSIX API is that shared memory looks very much
like files, and in deed, in many implementations I've seen the blocks
appear somewhere in the file system. Typical locations for these files
are /dev/shm
and /tmp
. The files have names like
mempool_f7e8bdaa
, or generally <prefix>_<8hexdigits>
. The prefix
depends on the data structure the block is used for, and the hex
digits make the name unique. By deleting these files, the blocks
are removed.
Since Ocamlnet-3.6, one can also delete shared memory administratively
with the netplex-admin
utility. See Deleting persistent kernel objects for
details. This method works for all OS. Also, an unlink of old memory
is automatically done when the program is restarted.
Another issue is that OS typically define an upper limit for the
amount of shared memory. This is e.g. 50% of the system RAM for
Linux. There are usually ways to configure this limit.