Plasma GitLab Archive
Projects Blog Knowledge

(* Example: pipeline between processes

   Here a Netmcore_buffer is used as a pipe buffer. A producer
   pumps data into the pipeline, and the consumer reads it.

   A Netmcore_buffer can grow to arbitrary sizes. In order to 
   prevent this, we put a few synchronization variables into
   the header of the buffer.
 *)

open Printf

type header =
    { mutable length : int;
      mutable eof : bool;
      mutable lock : Netmcore_mutex.mutex;
      mutable have_space : Netmcore_condition.condition;
      mutable have_data : Netmcore_condition.condition;
      mutable wait_set : Netmcore_condition.wait_set
    }

type buffer = header Netmcore_buffer.buffer

type buffer_descr = header Netmcore_buffer.buffer_descr

let pool_size = 1024 * 1024                   (* 1 M *)
  (* Size of the shm block *)

let max_buffer = 65536                        (* 64 K *)
  (* Size of the pipe buffer *)

let buf_increment = 16384
  (* Pipe buffer space is allocated in units of this *)

let send_size = 16 * 1024 * 1024 * 1024       (* 16 G *)
  (* This much data is sent over the pipe in total *)

let variant = `Nocopy
  (* see consumer *)

(* Define the pool. This is allowed to do in the master process.
   Note that the process calling create_mempool generally cannot use
   the pool!
 *)

let pool = Netmcore_mempool.create_mempool pool_size

let producer (bd:buffer_descr) =
  (* We send (almost) endlessly the string 0123456789... *)
  let b = Netmcore_buffer.buffer_of_descr pool bd in
  let p_len = max_buffer - (max_buffer mod 10) + 10 in
  let p = String.make p_len ' ' in
  for k = 0 to p_len-1 do
    p.[k] <- Char.chr(48 + (k mod 10))
  done;

  let h = Netmcore_buffer.header b in
  let w = 
    (* Get a wait entry for the condition variables *)
    Netmcore_heap.modify
      (Netmcore_buffer.heap b)
      (fun mut ->
	 Netmcore_condition.alloc_wait_entry mut h.wait_set
      ) in

  let n = ref 0 in
  let q = ref 0 in

  while !n < send_size do
    let l = (
      let l_ref = ref 0 in
      Netmcore_mutex.lock h.lock;
      while h.length >= max_buffer do
	Netmcore_condition.wait w h.have_space h.lock
      done;
      l_ref := h.length;
      Netmcore_mutex.unlock h.lock;
      !l_ref
    ) in

    let space = max_buffer - l in

    let to_send0 = min (send_size - !n) space in
    let to_send = ref to_send0 in
(* printf "after %d bytes, sending %d bytes\n%!" !n to_send0; *)
    while !to_send > 0 do
      let m = min !to_send (p_len - !q) in
      Netmcore_buffer.add_sub_string b p !q m;
      to_send := !to_send - m;
      q := (!q + m) mod 10;
      n := !n + m;
    done;

    Netmcore_mutex.lock h.lock;
    h.length <- h.length + to_send0;
    h.eof <- !n = send_size;
    Netmcore_mutex.unlock h.lock;
    
    Netmcore_condition.signal h.have_data
  done


let producer_fork, producer_join =
  Netmcore_process.def_process producer


let one_meg = float (1024 * 1024)

let consumer (bd:buffer_descr) =
  (* We compute a checksum, just to do something with the data *)
  let b = Netmcore_buffer.buffer_of_descr pool bd in
  let cksum = ref 0 in
  let t0 = Unix.gettimeofday() in
  let t1 = ref t0 in
  let n1 = ref 0 in

  let h = Netmcore_buffer.header b in
  let w = 
    Netmcore_heap.modify
      (Netmcore_buffer.heap b)
      (fun mut ->
	 Netmcore_condition.alloc_wait_entry mut h.wait_set
      ) in

  let continue = ref true in
  let i = ref 0 in

  while !continue do
    let l = (
      let l_ref = ref 0 in
      Netmcore_mutex.lock h.lock;
      while h.length = 0 do
	Netmcore_condition.wait w h.have_data h.lock
      done;
      l_ref := h.length;
      continue := not h.eof;
      Netmcore_mutex.unlock h.lock;
      !l_ref
    ) in

    (* We implement here two variants:
       (1) Copy data immediately, signal, then evaluate data
           (good if evaluation is expensive)
       (2) Evaluate data on the original string, then signal
           (good if evaluation is cheap)
     *)
    let process s pos len =
      for k = 0 to len-1 do
	let c = Char.code (String.unsafe_get s (pos+k)) in
	cksum := (!cksum lsl 3) + c
      done;
      i := !i + len;
      n1 := !n1 + len in

    let postprocess () =
      let t = Unix.gettimeofday() in
      if t -. !t1 >= 1.0 then (
	printf "received %d bytes, %.1f M/s\n%!"
	  !n1 ((float !n1 /. (t -. !t1)) /. one_meg);
	n1 := 0;
	t1 := t;
      ) in

    let signal len =
      Netmcore_mutex.lock h.lock;
      h.length <- h.length - len;
      Netmcore_mutex.unlock h.lock;
      Netmcore_condition.signal h.have_space in

    match variant with
      | `Copy ->
	  let s = Netmcore_buffer.sub b !i l in
	  Netmcore_buffer.delete_hd b l;
	  signal l;
	  process s 0 l;
	  postprocess()
      | `Nocopy ->
	  let r = ref 0 in
	  while !r < l do
	    Netmcore_buffer.access b !i
	      (fun s pos len ->
		 let len' = min len (l - !r) in
		 process s pos len';
		 r := !r + len';
	      )
	  done;
	  Netmcore_buffer.delete_hd b l;
	  signal l;
	  postprocess()
  done;

  let t = Unix.gettimeofday() in
  printf "Total: received %d bytes, %.1f M/s\n%!"
    !i ((float !i /. (t -. t0)) /. one_meg);
  printf "Checksum: %d\n%!" (!cksum land 0x3fff_ffff)


let consumer_fork, consumer_join =
  Netmcore_process.def_process consumer


let control() =
  (* Create the buffer. Note that synchronization variables must be
     directly created in shared memory. To do so, we first use
     dummy values (h_orig is in normal memory), and after the header
     has been copied to shm, we do the real initialization.
   *)
  let h_orig =
    { length = 0;
      eof = false;
      lock = Netmcore_mutex.dummy();
      have_space = Netmcore_condition.dummy_condition();
      have_data = Netmcore_condition.dummy_condition();
      wait_set = Netmcore_condition.dummy_wait_set()
    } in
  let b = Netmcore_buffer.create pool buf_increment h_orig in
  Netmcore_heap.modify
    (Netmcore_buffer.heap b)
    (fun mut ->
       (* N.B. h is a copy of h_orig residing in shm *)
       let h = Netmcore_buffer.header b in
       h.lock <- Netmcore_mutex.create mut `Normal;
       h.have_space <- Netmcore_condition.create_condition mut;
       h.have_data <- Netmcore_condition.create_condition mut;
       h.wait_set <- Netmcore_condition.create_wait_set mut;
    );

  (* Now start the workers for producing and consuming data *)
  let producer_pid =
    Netmcore_process.start
      ~inherit_resources:`All
      producer_fork (Netmcore_buffer.descr_of_buffer b) in
  let consumer_pid =
    Netmcore_process.start
      ~inherit_resources:`All
      consumer_fork (Netmcore_buffer.descr_of_buffer b) in
  
  (* Now wait until these processes are done *)
  ( match Netmcore_process.join producer_join producer_pid with
      | None ->
	  failwith "Error in the producer"
      | Some _ ->
	  ()
  );
  ( match Netmcore_process.join consumer_join consumer_pid with
      | None ->
	  failwith "Error in the consumer"
      | Some _ ->
	  ()
  );

  (* Cleanup: *)
  let h = Netmcore_buffer.header b in
  Netmcore_mutex.destroy h.lock;
  Netmcore_condition.destroy_condition h.have_space;
  Netmcore_condition.destroy_condition h.have_data;
  Netmcore_condition.destroy_wait_set h.wait_set;
  Netmcore_buffer.destroy b


let control_fork, control_join =
  Netmcore_process.def_process control


let () =
  (* Netmcore_mempool.Debug.enable_alloc := true; *)
  Netmcore.startup
    ~socket_directory:"run_pipeline"
    ~first_process:(fun () -> 
		      Netmcore_process.start 
			~inherit_resources:`All control_fork ())
    ()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml