Plasma GitLab Archive
Projects Blog Knowledge

Netmcore_basics

Netmulticore Basics

Contents

  • Netmcore_basics.processes
  • Netmcore_basics.startstop
  • Netmcore_basics.primitive
  • Netmcore_basics.ipc
  • Netmcore_basics.msgpassing
  • Netmcore_basics.goon

The intention of this chapter is to give an overview over the basic mechanisms of Netmulticore. The focus is here on the safe mechanisms, i.e. those that cannot crash the program if used the wrong way. Generally, however, Netmulticore also provides unsafe programming elements, mostly to maximize performance. The unsafe elements are well explained in Netmcore_tut.

Processes

Netmulticore uses subprocesses as workers. This has the advantage that the workers are really independent from each other. Especially, every worker does its own memory management (e.g. a worker won't stop another worker when it does a garbage collection run). The downside is, of course, that it is harder to exchange data between the workers (especially compared with multithreading).

Remember that multithreading in OCaml is restricted so that at most only one CPU core can be utilized. Implementing the workers with full processes is so far the only way to run algorithms on more than one core.

There is a fixed process hierarchy. When Netmulticore starts up, the current process becomes the master process, and the workers will be children of this master:

master
  |
  +-- worker1
  |
  +-- worker2
  |
  +-- worker3

It is possible to create new workers at any time. A worker can create another worker, and Netmulticore creates then a new child as fork of the master (really, this works, and is implemented by forwarding the creation request from the worker to the master). Netmulticore never forks a worker from a worker directly - this would create deep process hierarchies, and these tend to become unmanageable.

In the same way, it is also possible to wait for the termination of a worker (join). Any worker can join any other worker.

All "payload work" must be really done by the workers, and not by the master. The master remains mostly idle for the time of the Netmulticore job, and only provides auxiliary services for things that must be done in the master (e.g. organizing a fork). Ideally, the master process is kept as lean as possible, i.e. nothing is stored there, because the new workers are copies of the master image, and copying is cheapest when the image is as small as possible.

Starting up, shutting down

At startup time, Netmulticore starts one special worker, the first process. This is the only worker that is not started by another worker, but directly by the process playing the role of the master.

When all workers have terminated, the Netmulticore job is done. At this point, the master process is alone again. It is now possible to query for the result of other worker processes, especially of the first process.

Let's look at a simple example (the "hello world" of Netmulticore):

let computation (x,y) =
  x +. y

let computation_fork, computation_join =
  Netmcore_process.def_process computation

let first_process() =
  Netmcore_process.start computation_fork (3.0,4.0)

let extract_result _ pid =
  Netmcore_process.join_nowait computation_join pid

let () =
  let sum_opt =
    Netmcore.run
      ~socket_directory:"/tmp/netmcore"
      ~first_process
      ~extract_result
      () in
  match sum_opt with
    | None -> printf "Error\n"
    | Some sum -> printf "Sum: %f\n" sum

The function to be run in a worker process is here computation. With Netmcore_process.def_process we define this function as designated process. Note that this needs to happen before the first worker is started, and because of this, it is normally done in global context (i.e. not from a function body, but directly at module level). The master needs to know all functions that can be used as worker processes in advance (remember that all forks are managed by the master). def_process returns a pair of "fork point" and "join point". These are abstract descriptors needed for forking and joining, respectively.

With Netmcore_process.start a new worker is started. The arguments are the "fork point" (which implicitly names the worker function), and the argument passed to the worker. The process identifier is returned. In this example, we have only one worker, and that takes over the role of the first process. (N.B. The pid is not directly the identifier used by the operating system, but an internally managed identifier.)

As mentioned, the Netmulticore job is done when all workers have finished their tasks. Normally, you call Netmcore_process.join to get the result of a worker. However, join also blocks the execution (i.e. it waits until the worker is done). In the master process, blocking is generally not allowed, and hence we have to use here the variant of join that does not wait, Netmcore_process.join_nowait. As we already know that the workers are finished, and we only want to get the result value, this is no problem here.

For joining, we pass the so-called "join point" to this function - basically, this tells join_nowait which result to retrieve.

The callback extract_result is invoked when the workers are already done, but before Netmulticore officially finishes its work. This is the last moment when workers can be joined. The result of extract_result is the result of Netmcore.run.

History: The function Netmcore.run is new in OCamlnet-3.6.3. In older versions there was only Netmcore.startup without providing any way to pass results back to the caller. Note that Netmcore.join_nowait was also added in this release. If you find code in the Internet, it often contains workarounds for these limitations present in older Ocamlnet versions.

A few more remarks:

  • The socket_directory is used for Unix domain sockets, and for temporary files Netmulticore may need. Typical locations are /tmp or /var/run. The path name of this directory must not be too iong (there is a limit of around 100 chars in total). The name can also be generated. You can delete the directory after use.
  • Worker parameters (here the float pair (3.0,4.0) and the sum) are passed by serializing the values with the Marshal module. So far, functional values, objects, lazy values, and exceptions cannot be marshalled.
  • Essentially, Netmulticore uses the functionality provided by Netplex to start and stop processes, and also for the communication between worker and master. Because of this, the Netplex infrastructure is fully available. For instance, you can use the functions of Netplex_cenv, e.g. for logging, or manipulating the Netplex container. The netplex-admin utility (see Netplex_admin.admin) can be used to query the process state from the command line. It is not only possible to use Netplex from Netmulticore, but also the other way round: A network server implemented with Netplex can start Netmulticore workers (just call Netmcore_process.start when you need one) - the only requirement and initialization for this is that Netmcore.add_plugins must have been called at Netplex startup time. We'll look at the possible interactions with Netplex closer below.

Primitive parallelization

In the most simple scenario, a few workers are started at the same time, and compute the result in parallel. When the workers are done, it is expected that every worker has computed some part result, and it is only required to retrieve it, and to combine it with the other part results. Note that Netmcore.run can only start one worker, so we need to start the real workers on our own from the single first process:

let n = <number of workers>

let computation (i,arg) =
  <compute something from arg and return it>

let computation_fork, computation_join =
  Netmcore_process.def_process computation

let manage arg =
  let pids =
    List.map
      (fun i -> Netmcore_process.start computation_fork (i,arg))
      (Array.to_list (Array.initialize n (fun i -> i))) in
  let results =
    List.map
      (fun pid ->
         match Netmcore_process.join computation_join pid with
           | None -> failwith "No result after error"
           | Some r -> r
      )
      pids in
  <reduce the results to a single one>

let manage_fork, manage_join =
  Netmcore_process.def_process manage

let first_process arg =
  Netmcore_process.start manage_fork arg

let extract_result _ pid =
  Netmcore_process.join_nowait manage_join pid

<call Netmcore.run as above>

Here, manage takes over the role of the first process that starts the real workers, and waits until the workers are done. Note that we use Netmcore_process.join here, and no longer join_nowait, because it is essential to wait for the termination of the workers.

Some remarks:

  • Starting and terminating processes are relatively expensive operations. This scheme is only well-suited if the parallelized computation takes really long (e.g. several seconds at least)
  • The workers do not communicate directly with each other in this scheme. The workers just get an argument to process, and deliver some part result. In between, there is no opportunity for data exchange.
  • The workers are created at the time they receive their arguments. There is no option for doing some per-worker initialization before the invocation. Of course, one can do some global initialization in the master, because the worker processes are created as copies of the master process, and inherit any global data prepared there.
  • Every worker gets an individual copy of the arguments. Copying is not a free operation, although quite cheap. For certain algorithms, copying per worker is already relatively expensive, and decreases the performance noticeably (e.g. this can be observed for sorting algorithms).

Using Netplex IPC mechanisms

As noted, the Netplex library is the basis on which Netmulticore provides more advanced features. Let's have a quick glance at the mechanisms Netplex defines:

Generally, the Netplex mechanisms are implemented on top of RPC messaging with Unix domain sockets. The master process serves as the controlling instance of the primitives, i.e. the worker sends a message to the master with a request like "lock the mutex", and the master implements the logic, eventually notifying the worker that the lock has been acquired. This type of IPC primitives is relatively slow, but also robust ("uncrashable"), and does not need special prerequisites like shared memory. (Note that there are also very fast IPC primitives in Netmulticore that use shared memory for communication, and which are described in Netmcore_tut. These are, however, a lot more complicated to use than the simple ones defined here, and not well suited as starting point for exploring parallelization options in OCaml.)

Initialization

The Netplex mechanisms need to be initialized at two times:

  • They need to be added as plugins to the Netplex controller. Such a plugin attaches some new behavior to the Netplex routines running in the master process. This type of initialization is done once per Netplex controller, and as Netmulticore creates a new controller per Netmcore.run call, this is required for every such call. The plugin is defined in the respective module (i.e. Netplex_mutex.plugin, Netplex_sharedvar.plugin, and Netplex_semaphore.plugin). You just need to add it to the controller with code like
      Netmcore.run
        ...
        ~init_ctrl:(fun ctrl -> ctrl # add_plugin Netplex_mutex.plugin)
        ...
        
    Adding a plugin several times is a no-op.
  • Of course, you can manage several objects per mechanism (i.e. several mutexes/variables/semaphores). Each object needs to be created. Note that you cannot do this in the scope of the master process! A good point in time to do this is at the beginning of the first worker process, before any further worker is launched. (Alternatively, it is also possible to create the objects when they are used first. This is a bit more complicated, and not covered by this tutorial.) The objects are normally identified by strings. For example, a semaphore could be created as
        ignore(Netplex_semaphore.create "my_sem" 5L)
        
    The initial value would be 5. The semaphore is now available for all workers in the same Netmcore.run session under this name. The return value of Netplex_semaphore.create says whether the semaphore was created (true). Otherwise, the semaphore existed already.

There is normally no need to delete the objects when you are done with them. The objects are bound to the lifetime of the Netplex controller, and this ends anyway when Netmcore.run returns.

Object initialization for semaphores

As mentioned, the function to call is Netplex_semaphore.create:

let success =
  Netplex_semaphore.create name initial_value

Remember that semaphores are counters with non-negative values, and hence initial_value is the initial counter value.

Object initialization for mutexes

There is no need to create mutexes - these are implicitly created (in unlocked state) when they are used for the first time, i.e. when doing

let mutex_handle =
  Netplex_mutex.access name

(Now call Netplex_mutex.lock or Netplex_mutex.unlock to work with the mutex.)

Object initialization for shared variables

For Netplex_sharedvar variables, the creation looks like

let success =
  Netplex_sharedvar.create_var ~enc:true "my_var"

We pass here enc:true which is required when we want to use the Netplex_sharedvar.Make_var_type functor for getting easy and safe access. This works like this: Define

module Var_foo =
  Netplex_sharedvar.Make_var_type(struct type t = foo end)

in global context to get the well-typed accessor functions

let value = Var_foo.get "my_var"

and

Var_foo.set "my_var" new_value

As noted, this works only when setting enc:true at creation time.

Operation

Generally, the access to the Netplex synchronization objects is restricted to the lifetime of the Netplex controller (i.e. the duration of Netmcore.run), and the objects can only be accessed from worker processes (or better, from any Netplex container, as workers are implemented by containers). It is not possible to interact with the objects from the master process (although there are a few exceptions from this rule, e.g. you can read (but not write) the value of a shared variable also from the master, and the last opportunity is even in the extract_result callback of Netmcore.run).

Every operation is isolated from concurrent operations of the same type. For example, when two workers set the same shared variable with Var_foo.set "my_var", there is no risk that the two calls interact in a bad way and cause a crash. Netplex implicitly serializes such calls, and one of the two calls is executed before the other.

For this reason, it is normally not necessary to proctect a single shared variable with a mutex. You need mutexes first when you need to synchronize several variables, or a variable and a semaphore.

Overview of the operations (see linked pages for details):

Message passing

Message passing means that a worker installs a message box, and waits for the arrival of messages from other workers. Messages can be arbitrary OCaml values provided these can be marshalled. The message boxes implemented by Netplex_mbox have only space for one message at a time, so the message senders will have to wait until the box is free.

There are other implementations of message boxes in OCamlnet: Netcamlbox provides very fast boxes that store the messages in shared memory. The caveat is that the size of the messages is limited. Another option is Netmcore_queue which is a shared value queue which can be easily extended to support full message box functionality. Both alternatives do not run on every operating system, though (but Linux and OS X are supported).

Preparations: First, the required plugin needs to be installed in the Netplex controller. Again, use code like

  Netmcore.run
    ...
    ~init_ctrl:(fun ctrl -> ctrl # add_plugin Netplex_mbox.plugin)
    ...

Second, create the mailbox module. This is very similar to Netplex_sharedvar, e.g.:

module Mbox_foo =
  Netplex_mbox.Make_mbox_type(struct type t = foo end)

Remember that this needs to happen in global context (i.e. don't do it in a local module).

Now create the mailbox (in a worker):

let mbox = Mbox_foo.create "mybox"

If the box already exists, it is just opened, so you can use create to get a handle for the message box in all workers accessing it. Sending a message msg of type foo is as easy as

Mbox_foo.send mbox msg

and receiving one is possible with

let msg = Mbox_foo.receive mbox

which also waits for the arrival of the message. Remember that all three functions, create, send, and receive can only be called from worker context.

Example: Task queue

In this example, we want to parallelize a list of tasks which can be independently run on any worker. The idea is that every worker provides a message box where a special process, the supervisor, sends the task descriptions to. If all tasks are done, the supervisor sends a termination request instead:

type worker_msg =
  | Task of task
  | Term_request

The supervisor has no idea by itself which worker is busy and which one would be free for another task. Because of this, the supervisor installs another message box, and the worker sends a message when it is idle and requests another task:

type supervisor_msg =
  | Task_request of int

The integer argument is the index of the requesting worker.

This arrangement will result in a "ping pong game": When a worker is free it sends Task_request to the supervisor, which in turn will send the next Task to the requesting worker, or Term_request if the list is already empty. The interesting property is that no process actively monitors another process - instead, all processes just wait for messages and react on these.

The definitions of the mailbox modules:

module Mbox_worker =
  Netplex_mbox.Make_mbox_type(struct type t = worker_msg end)

module Mbox_supervisor =
  Netplex_mbox.Make_mbox_type(struct type t = supervisor_msg end)

The implementation of the workers

The workers wait for the arrival of messages from the supervisor in a loop, and react on incoming tasks. The loop is left when the termination request arrives.

let worker_main w =
  (* where w is the index of the worker, 0..n-1 *)
  let wrk_mbox_name = sprintf "Worker_%d" w in
  let wrk_mbox = Mbox_worker.create wrk_mbox_name in
  let op_mbox = Mbox_supervisor.create "Supervisor" in
  let cont = ref true in
  while !cont do
    (* request a new task *)
    Mbox_supervisor.send op_mbox (Task_request w);
    (* wait for task *)
    match Mbox_worker.receive wrk_mbox with
      | Task t ->
           (* do here the task *)
           ...
      | Term_request ->
           cont := false
  done;
  ()

let worker_main_fork, worker_main_join =
  Netmcore_process.def_process worker_main

The implementation of the supervisor

The supervisor starts the worker processes, and also joins them at the end.

There is a queue of messages to send to the workers, q. When a worker requests another task, the next prepared message is sent. At the end of q there are as many Term_request messages as needed to ensure that all workers will terminate.

Note that this version does not collect any results from the workers. There could be extra Task_result messages for this purpose (emitted by the workers and interpreted by the supervisor).

let supervisor_main arg =
  let ((num_workers : int), (tasks : task list)) = arg in
  let op_mbox = Mbox_supervisor.create "Supervisor" in
  let q = Queue.create() in
  List.iter (fun t -> Queue.add q (Task t)) tasks;
  let workers =
    Array.init
      num_workers
      (fun i -> Netmcore_process.start worker_main_fork i) in
  Array.iteri
    (fun i _ -> Queue.add q Term_request)
    workers;
  let wrk_mbox_names = 
    Array.mapi
      (fun i _ -> sprintf "Worker_%d" i)
      workers in
  let wrk_mboxes =
    Array.map
      (fun name -> Mbox_worker.create name)
      wrk_mbox_names in
  while not(Queue.is_empty q) do
    (* wait for request *)
    match Mbox_supervisor.receive op_mbox with
      | Task_request r ->
          let msg = Queue.take q in
          let wrk_mbox = wrk_mboxes.(r) in
          Mbox_worker.send wrk_mbox msg;
  done;
  Array.iter
    (fun pid ->
       Netmcore_process.join worker_main_join pid
    )
    workers

let supervisor_main_fork, supervisor_main_join =
  Netmcore_process.def_process supervisor_main

A main program

The main program just starts the supervisor, and waits for its termination:

let main tasks =
  let sum_opt =
    Netmcore.run
      ~socket_directory:"/tmp/netmcore"
      ~first_process:(fun () ->
                        Netmcore_process.start supervisor_main_fork tasks
                     )
      ~extract_result:(fun _ pid ->
                        Netmcore_process.join_nowait supervisor_main_join pid
                      )
      () in
  match sum_opt with
    | None -> printf "Error\n"
    | Some () -> printf "Tasks completed\n"

Where to go on from here

If you want to write an Internet server, and you need Netmulticore for managing some workload processes, you should next try to understand Netplex in more detail. Netplex is a generic process manager with special support for server processes. As pointed out before, Netmulticore is just an extension of Netplex, so both libraries can be easily used together. Read more about Netplex in: Netplex_intro, Netplex_advanced, Netplex_admin.

If your focus is on the acceleration of your multicore program, the next page to read is Netmcore_tut. This page explains the parts of Netmulticore that use shared memory. In particular, the worker processes are enabled to access shared heaps containing OCaml values. The heaps are read/write, which is so far unique in the OCaml world. This allows you to represent shared data, e.g. as queues, hashtables, or arrays. The downside of these mechanisms is that unsafe and low-level OCaml features are used, comparable to writing a wrapper for a C function.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml