Plasma GitLab Archive
Projects Blog Knowledge

Netmcore_tut


Netmulticore Tutorial

Contents

This manual gives an overview of Netmulticore, which allows it to manage subprocesses for speeding up computations on multicore CPU's. Netmulticore tries to overcome a limitation of OCaml's runtime, namely that only one thread at a time can get the CPU. Because of this, multi-threaded programs cannot make use of the additional power of multicore CPU's.

Readers are encouraged to first have a look at Netmcore_basics, which is more fundamental and doesn't use unsafe language features.

The approach of Netmulticore is to spawn subprocesses acting in the role of worker threads. Processes are separated from each other, and hence there is normally no direct way of getting into interaction. The help of the operating system is required here - a classic example of IPC (interprocess communication) are pipes, which create a data stream from one process to the other. Of course, we want here even closer interaction than this. Another, rarely used IPC mechanism is shared memory. This means that a block of RAM is allocated and mapped into all processes. When one process mutates a RAM cell in a shared block, the other processes immediately see this mutation, so that there is no system call overhead for transporting data. Actually, there is a bit of overhead in modern computers when this technique is used, but it is only occuring on the hardware level, and is very fast.

Netmulticore does not only allocate a shared block of RAM, but also manages it. Ideally, using the shared block would be as simple as using normal, process-local memory. Unfortunately, this is not possible, but the design of Netmulticore allows it to come quite close to this ideal. With Netmulticore a process can write normal Ocaml values like strings, tuples or records to the shared block, and the other processes can directly access this data as if it were in normal RAM. Unfortunately, one cannot use direct assignment to do such writing to the shared block, but has to follow special programming rules which ensure that the shared block remains in a consistent state. Large parts of this tutorial explain how to accomplish this.

The list of features supported by Netmulticore:

  • Creation of worker processes
  • Management of shared resources like file descriptors and shared memory blocks
  • Management of shared RAM pools that are mapped to the same address in all worker processes
  • Management of shared heaps, i.e. containers for Ocaml values
  • Garbage collection for shared heaps
  • Predefined data structures that live in shared heaps
  • Synchronization primitives (locks, condition variables)
  • Message passing between worker processes
  • Integration with Netplex
Before you start looking at Netmulticore, I should also give a warning. Netmulticore requires unsafe programming elements which can, if used the wrong way, lead to corrupt results or even crash the program. This is, to some degree, comparable to interfacing the Ocaml runtime on the C level. Unfortunately, when it comes to parallel programming, such unsafeness is unavoidable if you want to get real speedups of your program. Note that multi-threading suffers from the safe problem, and is in no way different.

Design

The process hierarchy

When processes start other processes, the Unix system design defines a relation between processes: The started workers are children of the process requesting the starts. An immediate effect of this is that only the requesting process can wait for the termination of the children. There are more implications than this, but it should be clear that we have to take the relations between processes into account.

When a Netmulticore program is started, the implicitly created first process has a special role. It is called the master process, and it is normally not used for doing the real work. It is merely acting as a supervisor managing the workers. The tasks of the master process include the management of the resource table, and of course watching the lifetime of the workers. The master normally starts quickly the first worker process, which in turn starts as many workers as needed for accomplishing the computation.

Now, what does it mean to start processes? Unix only defines this via the fork system call. When doing a fork the requesting process is duplicated (i.e. RAM contents and many resources are copied, with only a few exceptions), and the duplicate is registered as the new child process. Netmulticore requires that the relationships between processes remain manageable, and because of this it is always the master process which requests the start of new children. Of course, the programmer can also call Netmcore_process.start from other workers, but the Netmulticore machinery just sends this request to the master, where it is really executed.

All in all this means: When a new worker is created, it is initialized as copy of the master process, independently of from where the user code requests the creation.

Resources

Netmulticore manages not only processes but also other kinds of resources. As of now the supported types are:

  • Temporary files
  • Shared memory blocks
  • Shared memory blocks with the additional requirement that all workers map them at the same address
  • Named semaphores
  • Fork points, i.e. registered functions acting as worker body
  • Join points, i.e. the option of waiting for the completion of worker bodies
All resources have a resource ID (Netmcore.res_id) which is effectively an integer. With only the resource ID every worker can request to get the details of the resource (e.g. which name the temporary file has). The master process also keeps records which worker needs which resource. If a resource is not needed anymore by any worker, it is automatically deleted.

Since Ocamlnet-3.6, the deletion procedure has been substantially improved. Now, a list of the alive resources is not only kept in memory, but also written to a file "netplex.pmanage". This allows it to delete the resources after the program has crashed or terminated somehow abnormally. This is automatically done when the program is restarted, or by the user running the netplex-admin utility with option -unlink. The background for this is that the mentioned resources have kernel persistency, and continue to exist when the program is finished.

Memory pools

For setting up shared heaps as explained below, we especially need shared memory blocks that are mapped at the same address by all worker processes. This kind of block is needed for memory pools as implemented by Netmcore_mempool. There is, unfortunately, only one reliable method of doing so: The master process allocates the block and maps it into its own address space, and the workers get access to the block by inheriting the mapping. This just means that the fork operation leaves the mapping in place, and because all workers are created in the same way, all workers end up seeing the shared block at the same address. The details of this procedure are hidden in Netmulticore, but the user should know the limitations of this method.

A process cannot inherit a posteriori - when the process is already created, and a new shared block is set up, there is no safe way for the process to get access to the block at the right address. Because of this, there are two important programming rules:

  • A newly created shared block (with the "same address" requirement) is only visible in worker processes that are created after the block
  • This even means that the requesting process does not get access to the shared block it just created if the requesting process is a worker
The safest and easiest way to deal with this is to create the shared block in the master before starting any worker.

Another limitation of the inheritance method: A shared block cannot be enlarged later. For the programmer this means that the initially created block must be large enough for the whole lifetime of the program.

The management of the shared blocks is now done by two modules: Netmcore_mempool is responsible in the first place, and can hand out pieces of the shared block to users. Netmcore_heap is such a user, and manages the pieces it gets as a heap of Ocaml values. Heaps can be enlarged if necessary, i.e. more pieces can be obtained from the big shared block that is managed by Netmcore_mempool. Of course, it is also possible to have several heaps getting their memory pieces from a single Netmcore_mempool.

Shared heaps

What is now a heap, or better shared heap? It is just a memory area, and it is possible to put Ocaml values into it. The values are connected by pointers that reach from one value to the next. This is not very much different than what is done by the Ocaml runtime and the normal Ocaml heap, only that the values are now exposed to a multitude of processes. We'll later see how we can create such a heap, and how it can be filled with Ocaml values. At this point, let me only mention that it requires the discipline of the programmer to fill and access a shared heap in the right way. If something gets wrong, the punishment is an illegal memory access, normally leading to a segmentation fault.

If a shared heap fills up, Netmcore_heap starts a special garbage collection run over it to reclaim memory that is no longer referenced. This garbage collector works very much like the "major GC" that is built into the Ocaml runtime.

As it is a bit complicated to manage shared heaps directly, there are a number of pre-defined data structures over shared heaps that can be directly used. Among these structures are buffers, queues, arrays, and hash tables.

Camlboxes

For fast notification between processes, Netmulticore uses Camlboxes. This data structure was invented before and exists independently of Netmulticore, but is specially supported.

Camlboxes are like mail boxes where an open number of senders can send messages to a single receiver. The messages are normal Ocaml values (like strings, records, or variants), so there is no marshalling issue. Camlboxes also use shared memory as transport medium, but it is here not required that the shared block needs to be mapped at the same address. Because of this, it is also possible to connect processes with Camlboxes that are not related to each other.

Camlboxes are optimized for speed and maximum parallelism. The data model is the following: The receiver creates the box which consists of a fixed number of slots, and each slot has a fixed maximum size. Each sender can map the box into its own address space, and fill any free slot. Because there is no strict ordering of the messages, the senders can avoid to run into lock contention issues (the senders simply use different slots and avoid to step on each other's feet). The receiver can look at the messages, copy the messages outside the box, and delete messages. The messages are not strings or marshalled data, but really Ocaml values, relocated to the receiver's address.

Synchronization

Netmulticore provides these synchronization primitives:

  • Mutexes
  • Semaphores
  • Condition variables
These primitives can be used in conjuction with shared heaps, and allow the programmer to define additional synchronization requirements that are not satisfied by the data structure in the heap.

For example, consider a shared hash table (Netmcore_hashtbl). This data structure already includes everything to protect the internal representation from concurrent accesses, but not more. For example, a Netmcore_hashtbl.add adds a value to the table, and it is safe to call this operation from several processes at the same time. This is not enough, however, to protect whole read/modify/update cycles where the whole cycle needs to be run uninterrupted to avoid data corruption. The user can define these additional synchronization requests with the mentioned primitives.

As this is a quite frequent programming case, many shared data structures already contain a special area called the header where the user can put these primitives.

How to start processes

For starting processes, there are actually two API's: the slightly more generic one in Netmcore, and the strictly typed one in Netmcore_process. They are quite similar, and I'm only explaining the latter.

Before a process can be started, it needs to be defined:

let process_fork, process_join =
  Netmcore_process.def_process process_body

The definition via Netmcore_process.def_process takes a function process_body, which is simply a function of type 'a -> 'b, and returns a fork point process_fork and a join point process_join. This definition must happen in the master process. Remember that it is also always the master process that is forked when a new worker process is created. This is consistent with this programming rule - process_body will be called in the context of a fresh copy of the master, so it needs also to be defined in the context of the master.

Practically, the definition is usually done when a top-level module is initialized, and process_body is a normal top-level function.

The fork point process_fork has type 'a fork_point where 'a is the type of the argument of process_body. Imagine a fork point as a way to fork the master process, where a value of type 'a is passed down, and process_body is finally called with this value in the new child. At the other end, the joint point process_join (of type 'b join_point) is a way to wait for the completion of the process_body and for getting the result of type 'b.

Of course, after defining a process, one can start it as many times as needed. Here is how to do:

let pid = Netmcore_process.start process_fork arg

This finally does:

  • The master process is notified that a new process is needed which will run a process as defined by process_fork
  • Once the process is running, the argument arg is marshalled and sent to the new process
  • Execution returns now to the caller of start with a process ID pid
  • The new process runs process_body arg' where arg' is the restored value arg
The process ID's are Netmulticore's own ID's, and are guaranteed to be unique (other than the process ID's the operating system uses which can wrap around).

When process_body finishes, the function result is passed back to the master where it is stored until the join is done. The worker process is terminated. One can get the result value by doing:

let r_opt = Netmcore_process.join process_join pid

This waits until the result value is available, and returns it as Some r (when r is of type 'b). If no result is available because of an exception or other termination of the process, None is returned. Note that the result r is also marshalled.

If there is no interest in getting results at all, one can also do

Netmcore_process.release_join_point process_join

which prevents that results are stored in the master until they are picked up.

Before giving a complete example, let's look at how to initialize Netmulticore. When the master is set up (processes are defined etc.) one can start the first worker. At this point, the whole management machinery needs to be started, too. This is normally done by a function call like

Netmcore.startup
   ~socket_directory:"some_dir" 
   ~first_process:(fun () -> Netmcore_process.start process_fork arg)
   ()

The socket_directory is needed by the machinery to store runtime files like Unix domain sockets (inherited from Netplex). In doubt, set it to something like "/tmp/dir", but note that each running instance of the program needs its own socket directory. (There is a chance that socket_directory becomes optional in future releases.)

Now a complete example: The master starts process X which in turn starts process Y. The processes do not much, but let's see:

let c = ref 0

let body_Y k =
  k + 1 + !c

let fork_Y, join_Y =
  Netmcore_process.def_process body_Y

let body_X k =
  c := 1;
  let pid_Y = Netmcore_process.start fork_Y k in
  let j =
     match Netmcore_process.join join_Y pid_Y with
      | Some j -> j
      | None -> failwith "Error in process Y" in
  Printf.printf "Result: %d\n%!" j

let fork_X, join_X =
  Netmcore_process.def_process body_X

Netmcore.startup
   ~socket_directory:"some_dir" 
   ~first_process:(fun () -> Netmcore_process.start fork_X 1)
   ()

The result is of course 2 (and not 3), because the assignment to c does not have any effect. Remember that process X is forked from the master process where c still has the value 0.

A final word on marshalling before going on. Arguments and results of processes are transmitted as strings that have been created with the functions from the Marshal module of the standard library. There are a number of constraints one should be aware of:

  • Functions, objects, and lazy values are not supported, and will cause exceptions. For a few other types this is also the case (e.g. in_channel).
  • Unfortunately, there are also types of values that do not trigger exceptions, but do nonsense. For example, Unix.file_descr is such a case. This unfortunately also holds for many Netmulticore types such as heaps. For these types the marshalling seems to work, but the restored values are actually unusable because they have lost their connection with the underlying resources of the operating system.

How to use Camlboxes for passing messages

Camlboxes must always be created by the (single) receiver of the messages. The address of the box is then made available to the senders which can then start the transmission of messages.

The number of message slots is fixed, and cannot be changed later. Also, the maximum size of the messages must be specified in advance, in bytes. Of course, this means one cannot use Camlboxes for arbitrarily large messages, but this is not what they are designed for. Camlboxes are good for small notifications that need to be quickly sent without risking any lock contention. (If big payload data needs to be included, a good workaround is to include that data by reference to a shared heap only.)

Let's just give an example: Process X creates a box, starts process Y, and Y sends a message to the box of X.

let body_Y box_res =
  let (box_sender : string Netcamlbox.camlbox_sender) = 
     Netmcore_camlbox.lookup_camlbox_sender box_res in
  Netcamlbox.camlbox_send box_sender "Hello world"

let fork_Y, join_Y =
  Netmcore_process.def_process body_Y

let () =
  Netmcore_process.release_join_point join_Y

let body_X () =
  let ((box : string Netcamlbox.camlbox),box_res) = 
     Netmcore_camlbox.create_camlbox "example" 1 100 in
  let pid_Y = Netmcore_process.start fork_Y box_res in
  ( match Netcamlbox.camlbox_wait box with
     | [ slot ] ->
         let m = Netcamlbox.camlbox_get_copy box slot in
         Netcamlbox.camlbox_delete box slot;
         printf "Message: %s\n%!" m
     | _ ->
         (* in _this_ example not possible *)
         assert false
  )

let fork_X, join_X =
  Netmcore_process.def_process body_X

Netmcore.startup
   ~socket_directory:"some_dir" 
   ~first_process:(fun () -> Netmcore_process.start fork_X ())
   ()

Let's go through it:

  • The box is created at the beginning of body_X, by a call of Netmcore_camlbox.create_camlbox. Generally, Netmcore_camlbox contains Netmulticore-specific extensions of the Camlbox abstraction which is primarily defined in Netcamlbox. The function Netmcore_camlbox.create_camlbox does not only create the box, but also registers it as resource in the master process. The string "example" is used for naming the shared memory block backing the box. This box has a capacity of one message which may get 100 bytes long. There are two return values, box and box_res. The value box is the way to access the box for receiving messages. The value box_res is the resource ID.
  • We pass box_res when we start body_Y. Resource ID's can be marshalled (whereas camlboxes cannot).
  • The helper function Netmcore_camlbox.lookup_camlbox_sender is called at the beginning of body_Y to look up the sender interface for the box identified by box_res. The sender view of the box is box_sender.
  • Now the string "Hello world" is sent by invoking Netcamlbox.camlbox_send.
  • In the meantime, process X continued running in parallel, and by invoking Netcamlbox.camlbox_wait the execution is suspended until at least one message is in the box. The function returns the slots containing messages.
  • We look now into the single slot where we expect a message. With Netcamlbox.camlbox_get_copy we can get a copy of the message in the slot. Note that there is also Netcamlbox.camlbox_get which does not make a copy but returns a reference to the message as it is in the box. This is unsafe because this reference becomes invalid when the message is deleted.
  • Finally we delete the message in the slot with Netcamlbox.camlbox_delete. The slot is now again free, and can hold the next message.
You may wonder why I added the type annotations for box and box_sender. If you look at the signatures of the Camlbox modules, you'll see that there is no enforcement that the types of box and box_sender fit to each other. The type parameter string is lost at the moment the resource ID is used for identifying the box.

When sending a message, it is always copied into the box. For doing this a special "value copier" is used. This copier traverses deeply through the value representation and copies each piece into the box. This is a bit like marshalling, only that this does not result in a string but a copy of the orginal value (and the copy is placed into a reserved region in the shared memory block of the Camlbox). This copier has also restrictions which types of values can be handled, very much like marshalling.

How to create and use memory pools

Now back to the core of Netmulticore. For the remaining data structures the shared memory must be managed as a memory pool. The module Netmcore_mempool accomplishes this.

As explained above, there are certain restrictions on which processes have access to memory pools. For most applications it is best when they simply create the pool in the master process directly after program startup and before launching any worker process. This avoids all restrictions, but the size of the pool needs to be set at this early point of execution (remember that pools also cannot be enlarged).

It is a good question how big a pool should be. Generally, the pool should be a small factor larger than the minimum amount of RAM needed for the pool. I cannot give really good recommendations, but factors between 1.5 and 3.0 seem to be good choices. If the pool size is chosen too tight, the garbage collector will run often and slow down the program.

Another strategy for the pool size is to make it always large, e.g. 25% of the available system memory. The idea here is that when parts of the pool remain unused throughout the lifetime of the program, they actually will also not consume RAM. Operating systems typically distinguish between RAM that is reserved by a memory allocation and RAM that is actually filled with data. Only the latter type consumes real RAM, whereas the first type is only taken into account for bookkeeping. This means the pool memory is reserved for the case it is needed at a certain point, but RAM is not wasted if not.

Now, the pool is simply created with

let pool = Netmcore_mempool.create_mempool size_in_bytes

The value pool is again a resource ID, and it is no problem to marshal this ID. When creating shared heaps, and for a number of other operations the pool ID is required, so it is a good idea to passed it down to all worker processes.

Pools are backed by shared memory blocks, and these blocks have kernel persistence (usually they even appear as files, but this depends on the operating system). This means they exist until explicitly deleted (like files). To do so, just call

Netmcore.release pool

after the Netmcore.startup function returns to the caller, and before the program is finally terminated.

As pool memory is inherited by worker processes (as explained in the section about "Design"), one has to enable the inheritance for each started process, e.g.

let pid = 
  Netmcore_process.start 
    ~inherit_resources:`All
    process_fork arg

Otherwise the pool is not accessible by the started worker process. (I'm thinking about making this the default, but haven't come to a conclusion yet.)

If system memory is very tight, you will sometimes see bus errors when using pools (signal SIGBUS is sent to one of the processes, typically the signal number is 7). This happens when allocated shared memory is actually not available when it is used for the first time. (I'm still looking for ways how to get nicer reactions.)

Shared ref-type variables

Let's now look at a data structure that lives in a shared heap. Note that there is also a direct interface to shared heaps, but for the sake of explaining the concept, it is easier to first look at a concrete instance of a heap.

The module Netmcore_ref provides a mutable reference for a single value residing in the shared heap. The reference is comparable to the ref type provided by the standard library, and it is possible to dereference it, and to assign new values to it. The difference is, however, that the reference and the referenced value reside completely in shared memory, and are accessible by all workers.

The reference is created by something like

let s = Netmcore_ref.sref pool initial_value

where pool is the resource ID of a pool (see above), and initial_value is the value to assign to the reference initially. This is comparable to

let r = ref initial_value

for normal references. There is an important difference, however. The referenced value must completely reside in shared memory, and in order to achieve this, the sref function copies the initial_value to it. (The same copy mechanism is used that also puts messages into Camlboxes.)

After running sref, a new shared heap has been created which is initialized as a reference. You can get a direct handle for the heap structure by calling Netmcore_ref.heap.

Also note that you have to call sref from a worker process. It is not possible to do this from the master process. This also applies to almost any other heap-related function.

It is possible to assign a new value to s:

Netmcore_ref.assign s new_value

This also copies the new_value to the heap, and changes the reference.

You may wonder what happens when you assign new values over and over again. There is no mechanism for deleting the old values immediately. Instead, the values accumulate over time in the heap, and when a certain threshold is reached, a garbage collection run is started. This run checks for values (or value parts) that became unreachable, and reclaims the memory used for them.

This garbage collector (GC) is not the normal garbage collector that cleans the process-local heap managed by the Ocaml runtime. It is a special GC designed for shared heaps. Unfortunately, this special GC is not as automatic as the GC built into the Ocaml runtime, as we'll see.

Back to our shared references. There are three functions for getting the value pointed to by a reference:

let v1 = Netmcore_ref.deref_ro s

let v2 = Netmcore_ref.deref_c s

Netmcore_ref.deref_p s (fun v3 -> ...)

The first function, deref_ro, is the fastest but unsafest. It just returns the current value of the reference as-is, and this value is of course stored in the shared heap. The problem is, however, that further assignments to the variable can invalidate this value when a GC run occurs. If this happens, v1 becomes invalid because the memory holding the representation for v1 is overwritten with something else, and any accesses to v1 or its parts may crash the program! For this reason, the deref_ro function must only be used if it can be excluded by other means that the variable is not being assigned while v1 is being accessed (e.g. by a lock). The suffix "_ro" is meant to remind of the fact that it is safe to use in read-only contexts. (N.B. The GC could theoretically also check for values that are only alive because of references from process-local memory. Unfortunately, this would complicate everything dramatically - the local GC and the shared GC would need some synchronization, and slow or misbehaving local GC's could delay shared GC runs almost indefinitely.)

The second function, deref_c, does not have this problem because it always returns a copy of the referenced value (suffix "_c" = "copy"). It is, of course, a lot slower because of this, and the copy semantics may not always be the right design for an application.

The third function, deref_p, is a clever alternative. As you can see, deref_p does not return the value, but it runs an argument function with the current value. The trick is now that v3 is specially protected while this function is running. This protection does not prevent assignments, but it prevents that the GC deletes the value v3, if a GC run is done. This type of protection is also called pinning, and the suffix "_p" is meant to remind of this.

When using deref_p, one still should be careful and think about whether accesses to v3 (or parts of v3) can occur after the pinning protection has ended. This must be excluded by all means!

Let's look at another thing the programmer must not do. Imagine some inner part of the value is mutable, e.g.

type t = { mutable data : string }

let s = Netmcore_ref.sref pool { data = "initial string" }

The question is now: Is it allowed to assign to the data component, as in

(Netmcore_ref.deref_ro s).data <- "new string"   (* WARNING - PROBLEMATIC CODE *)

The meaning of this assignment is that the data component is overwritten with a pointer to a string residing in process-local memory. Imagine now what happens if a different worker process accesses the data component. The address of this pointer is now meaningless, because the string does not exist in context of the other process at this address. If we access nevertheless, we risk getting random data or even segmentation faults.

I'll explain below how to fix this assignment. It is possible, if done in the right way.

Finally, here is a complete example using shared references:

let body_Y (pool, c_descr, k) =
  let c = Netmcore_sref.sref_of_descr pool c_descr in
  let c_value = Netmcore_sref.deref_ro c in
  k + 1 + c_value

let fork_Y, join_Y =
  Netmcore_process.def_process body_Y

let body_X (pool,k) =
  let c = Netmcore_sref.sref pool 1 in
  let c_descr = Netmcore_sref.descr_of_sref c in
  let pid_Y = 
    Netmcore_process.start ~inherit_resources:`All fork_Y (pool, c_descr, k) in
  let j =
     match Netmcore_process.join join_Y pid_Y with
      | Some j -> j
      | None -> failwith "Error in process Y" in
  Printf.printf "Result: %d\n%!" j

let fork_X, join_X =
  Netmcore_process.def_process body_X

let () = 
  let pool = Netmcore_mempool.create_mempool (1024 * 1024) in

  Netmcore.startup
     ~socket_directory:"some_dir" 
     ~first_process:
        (fun () -> 
           Netmcore_process.start ~inherit_resources:`All fork_X (pool,1))
     ();

  Netmcore.release pool

Compare this with the example I gave when explaining how to start processes. The variable c is now a shared reference, and because of this process Y can access the contents of c. The result is now 3.

In this example already a feature is used that is first explained in the next section: descriptors. For some reason (we'll see why) it is not possible to marshal shared heaps, and thus you cannot call body_Y with c as argument. The workaround is to create a descriptor for c, use the descriptor for marshalling, and restore the orginal heap in the called function.

Descriptors

Shared heaps are very special objects - in particular, the heaps reside in shared memory at a certain address, and this address also appears in the internal data structures the heaps use to manage their memory space.

Imagine what happens when you do not respect this special nature of shared heaps, and create a copy nevertheless (e.g. by marshalling the heap, or by putting the heap into another heap). At the first glance, you will see that you can actually create the copy (it's a valid Ocaml value), but when you try to use it, the program will crash. What has happened?

The problem is that the internal data structure of the copied heap still contains addresses that are only valid for the orignal heap, but are meaningless for the copy. When these addresses are followed, invalid memory access occur, and the program crashes.

So, a very important programming rule: Never copy shared heaps! Unfortunately, this really requires discipline, as there are many mechanisms in Netmulticore where copies are automatically created (just page back and look how often I told you that a copy is created here and there), for example when starting worker processes the arguments are also copied over to the newly created process.

How to work around? First let's think about what we really want to have when we e.g. start a worker process and pass a shared heap to it. Of course, we do not want to create a copy, but rather we want to make the same shared heap accessible to the worker. We want call by reference!

Descriptors are the solution. A descriptor of a shared heap is just a marshallable reference to the heap. For each shared data structure there is a special descriptor type, and it is possible to get the descriptor for a heap, and to look up the heap by descriptor. For example, shared references define this as (in Netmcore_ref):

type 't sref_descr

val descr_of_sref : 't sref -> 't sref_descr
val sref_of_descr : Netmcore.res_id -> 't sref_descr -> 't sref

Note that the sref_of_descr function takes the resource ID of the pool as first argument, and that this function is quite slow (because it has to ask the master process where to find the shared memory object).

For the other shared data structures, the are comparable functions supporting descriptors.

How to mutate shared values

Remember this code?

(Netmcore_ref.deref_ro s).data <- "new string"   (* WARNING - PROBLEMATIC CODE *)

We now look how to fix it. The basic problem is that the new string is not residing in shared memory, and if we could achieve that this is the case, the assignment would be acceptable.

The correct way to do it is this:

Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    (Netmcore_ref.deref_ro s).data <- Netmcore_heap.add mut "new string"
  )

The essence is that we now call Netmcore_heap.add before assigning the string. This function again copies the argument value, and puts the copy onto the heap. For managing the copy, this function needs an special object mut, which is actually a mutator. The only way to get a mutator is to call Netmcore_heap.modify, which - among other actions - locks the heap, and prevents that any competing write access occurs. This is required, because otherwise mutations could overlap in bad ways when they are done in parallel, and the internal representation of the heap would be corrupted.

Note that you need to call add for assigning all values that are not yet residing in the same heap. This means: Call it if the new value is in process-local memory, but also call it when the new value is already part of a different heap (because pointers from one heap to the other are not allowed - heaps must be completely self-contained).

You may wonder why we used deref_ro to get the current value of s, and not one of the other access functions. Quite easy answer: the other functions deref_c and deref_p would deadlock the program! The modify function already acquires the heap lock for the duration of the mutation, and using any access function that also plays with this lock will cause deadlocks.

Let's look at this example:

type u = { mutable data1 : string;
           mutable data2 : string;
         }

let s = Netmcore_ref.sref
           pool
           { data1 = "initial string 1"; data2 = "initial string 2" }

We want now to swap the values in the two components data1 and data2. The solution is of course:

Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    let u = Netmcore_ref.deref_ro s in
    let p = u.data1 in
    u.data1 <- u.data2;
    u.data2 <- p
  )

Note that we call modify although we do not need the mutator for doing our work. The reason is that modify also write-locks the heap, and this protects the integrity of our data. We do not need to call add because the two strings are already residing in the same shared heap, and we are just swapping them.

A little variation of this let us run into a possible problem, though:

Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    let u = Netmcore_ref.deref_ro s in
    let p = u.data1 in
    u.data1 <- Netmcore_heap.add mut ("Previously in data2: " ^ u.data2);
    u.data2 <- Netmcore_heap.add mut ("Previously in data1: " ^ p);
    printf "p=%s\n" p;                    (* THIS LINE IS PROBLEMATIC *)
  )

This piece of code will crash now and then (not often, so maybe difficult to run into). What is going wrong?

There is one phenomenon we haven't paid attention to yet: When we call add it is possible that the available space in the heap is not sufficient anymore to do the required memory allocations, and that a GC run is started. When this happens in the second add, the field data1 is already overwritten, and because of this the string p is now unreachable from the top-level value of the heap. The space occupied by p is reclaimed, and may even be overwritten. For the two assignments this is no problem, because p is no longer needed. When we access p after the second add, however, the value may have become invalid. Accessing it may crash the program!

How to fix? It is possible to pin additional values during mutation so that they cannot be collected by the GC:

Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
    let u = Netmcore_ref.deref_ro s in
    let p = u.data1 in
    u.data1 <- Netmcore_heap.add mut ("Previously in data2: " ^ u.data2);
    u.data2 <- Netmcore_heap.add mut ("Previously in data1: " ^ p);
    Netmcore_heap.pin mut p;
    printf "p=%s\n" p;
  )

This piece of code is now correct. The effect of pin lasts until the end of the modify function.

Shared data structures

Besides references, there are a number of further shared data structures:

You may wonder why these data structures exist. Isn't it possible to e.g. put a normal Queue into a shared reference in order to get a shared version? The problem is that existing types like Queue do not adhere to the programming rules we've outlined above. Queue is a mutable structure, and when elements are added to or removed from the queue, new value allocations are always done in the normal process-local heap but not in the right shared heap. This is nothing one can fix by wrapping code around these data structures.

If you look at the implementation of e.g. Netmcore_queue (which is really recommended), you'll see that it looks very much like the implementation of Queue, only that Netmcore_heap.modify is used for managing mutation.

The above listed modules also include all the necessary means to protect the data structures against the possibly disastrous effects of parallel mutation. Also, for read accesses there are always several access functions corresponding to what we've seen for references (deref_ro, deref_c, and deref_p).

Let's have a closer look at Netmcore_array to see how this is done. The signature is:

type ('e,'h) sarray
type ('e,'h) sarray_descr
val create : Netmcore.res_id -> 'e array -> 'h -> ('e,'h) sarray
val make : Netmcore.res_id -> int -> 'e -> 'h -> ('e,'h) sarray
val init : Netmcore.res_id -> int -> (int -> 'e) -> 'h -> ('e,'h) sarray
val grow : ('e,_) sarray -> int -> 'e -> unit
val set : ('e,_) sarray -> int -> 'e -> unit
val get_ro : ('e,_) sarray -> int -> 'e
val get_p : ('e,_) sarray -> int -> ('e -> 'a) -> 'a
val get_c : ('e,_) sarray -> int -> 'e
val length : (_,_) sarray -> int
val header : (_,'h) sarray -> 'h
val deref : ('e,_) sarray -> 'e array
val heap : (_,_) sarray -> Obj.t Netmcore_heap.heap
val descr_of_sarray : ('e,'h) sarray -> ('e,'h) sarray_descr
val sarray_of_descr : Netmcore.res_id -> ('e,'h) sarray_descr -> ('e,'h) sarray

As you can see, the type is not only 'e sarray (when 'e is the type of the elements), but there is a second type variable 'h, so that the type becomes ('e,'h) sarray. This is the type of the header, which is simply an extra place for storing data that exists once per shared data structure. The header can have any type, but is often a record with a few fields. If you do not need the header, just set it to () (i.e. 'h = unit).

For managing shared data structures one often needs a few extra fields that can be put into the header. For example, a requirement could be that the length of a shared queue is limited, and one needs synchronization variables to ensure that the users of the queue stick to the limit (i.e. the addition of new elements to the queue is suspended when it is full, and it is restarted when there is again space). As such extra requirements are very common, all shared data structures have such a header (well, for Netmcore_ref it was omitted for obvious reasons).

When you create an instance of the structure, you always have to pass the resource ID of the pool (here for create, make, init). Another argument is the initial value of the header (which is also copied into the shared heap). The header, after being copied to the heap, can be accessed with the header function.

The function set looks very much like the one in Array. Note that for all mutation the shared heap is write-locked, so there can actually only be one running set operation at a time. Because set copies the argument data to the shared heap, this time is not neglectable.

For accessing elements, there are the known three variants: get_ro, get_c, and get_p. The length function works as in Array.

As it would be difficult for the user to implement growable arrays on top of the basic version, it was chosen to add a grow function exactly doing that. This function avoids to copy the values again into the shared heap that are already there.

The type ('e,'h) sarray_descr is used for descriptors to shared arrays, and the functions descr_of_sarray and sarray_of_descr allow it to manage descriptors.

Synchronization

There are three kinds of synchronization devices:

Unlike the above explained data structures, these synchronization means are simply special values, and not shared heaps of their own (which would be quite costly). One consequence of their special nature is that it is not possible to copy these values around - they must exist at a fixed memory address, and cannot be copied or moved. This just means that after copying or moving the values become non-functional.

Of course, these special values must be put into shared heaps in order to be accessible by several processes. Let's just walk through an example, to see how this is done right.

Imagine you have a field x of some type, and a lock m that is going to protect concurrent accesses to x:

type t =
  { mutable x : some_type;
    mutable m : Netmcore_mutex.mutex
  }

This record is put into a shared reference:

let s = Netmcore_ref.sref { x = ...; m = ... }

Remember that sref initializes the reference with a copy of the argument value. This would mean we copy m, which is invalid as we've learned. How to solve?

Mutexes (like the other synchronization devices) are designed for this use pattern, so there is already a built-in solution. There is a special dummy value one can use during initialization:

let s = Netmcore_ref.sref { x = ...; m = Netmcore_mutex.dummy() }

Dummies are placeholders that need to be re-initialized later when the mutex is already copied to the shared heap. This is done by:

Netmcore_heap.modify
  (Netmcore_ref.heap s)
  (fun mut ->
     let r = Netmcore_ref.deref_ro mut in
     r.m <- Netmcore_mutex.create mut `Normal
  )

That's it! Note that we've set the type of the mutex to `Normal which creates a fast mutex without deadlock protection.

The mutex can now be used in statements like

Netmcore_ref.deref_p (fun r -> Netmcore_mutex.lock r)

Be careful not to use deref_c because this would create a copy of the mutex, and render it useless!

Semaphores are much like mutexes, only that the synchronization functions are not lock and unlock but wait and post.

Condition variables are a bit harder to use, unfortunately. When implementing them I ran into the issue that the fast algorithm needs to allocate special storage places, one for each process that can be suspended. (There is a slow algorithm not requiring additional storage, but this would have been a very bad deal.) In system-level implementations of condition variables the additional storage can usually be hidden from the user. This is not possible in a pure user-space implementation like this one. For this reason, the user of condition variables has to allocate these places called wait_entry. One wait_entry is needed for each process that can ever wait for a condition variable. There is also wait_set which is just a collection of wait_entry values. Let's look at an example:

type t =
  { mutable x : some_type;
    mutable m : Netmcore_mutex.mutex;
    mutable c : Netmcore_condition.condition;
    mutable w : Netmcore_condition.wait_set
  }

let s = 
  Netmcore_ref.sref
     { x = ...; 
       m = Netmcore_mutex.dummy();
       c = Netmcore_condition.dummy_condition();
       w = Netmcore_condition.dummy_wait_set()
     }

let () =
  Netmcore_heap.modify
    (Netmcore_ref.heap s)
    (fun mut ->
       let r = Netmcore_ref.deref_ro mut in
       r.m <- Netmcore_mutex.create mut `Normal;
       r.c <- Netmcore_condition.create_condition mut;
       r.w <- Netmcore_condition.create_wait_set mut
    )

The field w is now an empty wait_set. Note that we only need one w for all condition variables that exist in the same shared heap.

The point is now that we need to get a wait_entry for each process using s. Get it with:

let we =
  Netmcore_heap.modify
    (Netmcore_ref.heap s)
    (fun mut -> 
      Netmcore_condition.alloc_wait_entry mut (Netmcore_ref.deref_ro s).w
    )

This just needs to happen once for each process. The value we can be used for all wait calls related to all condition variable in the same shared heap.

A wait call looks then like:

Netmcore_ref.deref_p (fun r -> Netmcore_condition.wait we r.c r.m)

For signal and broadcast the value we is not needed. As you see the only additional complexity has to do with the initialization of the process - we need to create we once for each process.

Where to find examples

It is very much recommended to study complete examples before trying to develop with Netmulticore. There are a few examples in the examples/multicore directory of the distributed tarball.

The latest version can always be found in the svn repository:

Remarks on the implementation

The current implementation of shared heaps only uses a single lock for protecting every heap, even for the initial actions of read accesses. This e.g. means that a deref_c locks the reference for a short time until it has pinned the current value. The (perhaps time consuming) copy operation is then done without lock.

This might not be optimal for all use cases. An improved locking scheme would use reader/writer locks. However, this kind of lock is complicated to implement on top of just semaphores, so it was omitted for now. Also, reader/writer locks are more expensive in general, so it is not clear whether it is better at all.

The memory management of heaps is still in a quite experimental state. Heaps are extended in units of 64 KB which may be way too big or way too small for the application. Also, it is tried to achieve that at least half of the heap memory is free (i.e. the "overhead factor" is 50%).

If parts of a shared heap become completely free they are in deed given back to the memory pool.

Some points where Netmulticore is different from multi-threading

When porting multi-threaded programs to Netmulticore, you may wonder where the differences are.

Netmulticore can only deal with values that are completely stored in shared heaps. This requires that the value is initially copied to the heap, and the described programming rules must be adhered to when modifiying and also reading the values. Of course, there are also programming rules in a multi-threaded environment, so this is not completely different.

The way the shared heaps are managed is less automatic than in the multi-threaded case. Especially, the garbage collector of shared heaps does not recognize values in process-local memory as roots. This is really unfortunate, because the user has to work around this limitation (pinning), and this is error-prone. There is, however, probably nothing we can do about it. Theoretically it is possible to create a protocol between the shared GC and the process-local GC, but it looks very complicated to implement it.

Another problem is that there is nothing that would prevent erroneous pointers from shared heaps to process-local heaps. In multi-threaded environments this distinction does not exist, so there is no such problem. By changing the Ocaml compiler it could be possible (without having checked it in detail) to emit different code when a process-local value is assigned to a shared variable, and to automatically allocate the value in the shared heap.

The Netmulticore approach also has advantages. Especially, it is way more scalable than a multi-threaded environment with a single heap only. There is no global lock that could become the bottleneck of the implementation. Each shared heap has its own lock, so there is always the possibility to increase the "lock capacity" by using more shared heaps together. In some sense, this is not a "multicore" solution, but rather a "manycore" approach that will also work for hundreds of cores.

Operating system issues

Administering POSIX shared memory

Many of the data structures described here are actually backed by shared memory blocks. For managing these blocks, the POSIX API is used (i.e. shm_open). Note that there is also an older API on many systems called System V API (i.e. shmget). This API is not used.

Shared memory has kernel persistence, i.e. the blocks remain allocated even after the process terminates that created them. The blocks need to be explicitly deleted. This can be done by the right API calls (e.g. call Netmcore.release for objects with a Netmulticore resource ID), but from time to time a program does not terminate normally, and this deletion is not performed. The question is how to get administratively rid of the blocks.

The nice aspect of the POSIX API is that shared memory looks very much like files, and in deed, in many implementations I've seen the blocks appear somewhere in the file system. Typical locations for these files are /dev/shm and /tmp. The files have names like mempool_f7e8bdaa, or generally <prefix>_<8hexdigits>. The prefix depends on the data structure the block is used for, and the hex digits make the name unique. By deleting these files, the blocks are removed.

Since Ocamlnet-3.6, one can also delete shared memory administratively with the netplex-admin utility. See Deleting persistent kernel objects for details. This method works for all OS. Also, an unlink of old memory is automatically done when the program is restarted.

Another issue is that OS typically define an upper limit for the amount of shared memory. This is e.g. 50% of the system RAM for Linux. There are usually ways to configure this limit.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml