Plasma GitLab Archive
Projects Blog Knowledge

(* HTTP Pipelines and multithreading
 *
 *
 * This is the recommended solution for multi-threaded apps:
 * A designated HTTP thread handles all HTTP requests, and the other threads
 * of the program send their HTTP requests to the HTTP thread. The HTTP
 * thread can process several requests in parallel.
 *)

(* Compile with:
 * ocamlfind ocamlopt -o t -package netclient,threads -linkpkg -thread http_mt.ml
 *)


open Nethttp_client

exception HTTP_Job of http_call * (http_call -> unit)
  (* This is not an exception in the usual sense, but simply a tagged
   * pair (call, f_done). This pair is pushed onto the event queue to
   * send another HTTP request [call] to the HTTP thread. When the
   * request is processed, the function [f_done] is called. Note that
   * [f_done] is called in the context of the HTTP thread, and it must
   * arrange some synchronisation with the calling thread to return
   * the result.
   *)


let http_esys = ref None

let get_http_esys() =
  match !http_esys with
    | None -> failwith "No event system"
    | Some e -> e

let http_keep_alive_group = ref None 

let get_http_keep_alive_group() =
  match !http_keep_alive_group with
    | None -> failwith "No keep alive group"
    | Some g -> g


let http_init() =
  let esys = Unixqueue.create_unix_event_system() in
  let keep_alive_group = Unixqueue.new_group esys in
  http_esys := Some esys;
  http_keep_alive_group := Some keep_alive_group
;;


let http_thread() =
  (* Create the HTTP pipeline for a known event system: *)
  let esys = get_http_esys() in
  let pipeline = new pipeline in
  pipeline # set_event_system esys;

  (* In order to keep the event system active when there are no HTTP requests
   * to process, we add an artificial timer that never times out (-1.0).
   * The timer is bound to a Unixqueue group, and by clearing this group
   * the timer can be deleted.
   *)
  let keep_alive_group = get_http_keep_alive_group() in
  let w = Unixqueue.new_wait_id esys in
  Unixqueue.add_resource esys keep_alive_group (Unixqueue.Wait w,(-1.0));

  (* We arrange now that whenever a HTTP_Job arrives on the event queue,
   * a new HTTP call is started.
   *)
  Unixqueue.add_handler
    esys
    keep_alive_group
    (fun _ _ event ->
       match event with
	 | Unixqueue.Extra (HTTP_Job (call, f_done)) ->
	     pipeline # add_with_callback call f_done
	 | _ ->
	     raise Equeue.Reject  (* The event is not for us *)
    );

  (* Now start the event queue. It returns when all jobs are done and
   * the keep_alive_group is cleared.
   *)
  Unixqueue.run esys;
  ()
;;


let shutdown_http_thread() =
  let esys = get_http_esys() in
  let keep_alive_group = get_http_keep_alive_group() in
  Unixqueue.clear esys keep_alive_group;
  http_keep_alive_group := None;
  http_esys := None
;;


let caller_thread() =
  (* This is a thread that calls for an HTTP request *)
  let esys = get_http_esys() in
  let mutex = Mutex.create() in
  let cond = Condition.create () in
  let call = new get "http://localhost/" in
  let result = ref "" in
  let f_done call =
    (* This function is called from the scope of the HTTP thread!
     * Signal the calling thread that the call is done:
     *)
    Mutex.lock mutex;
    result := ( match call # status with
		  | `Successful ->
		      let body = call # response_body # value in
		      body
		  | _ ->
		      "some problem"
	      );
    Condition.signal cond;
    Mutex.unlock mutex
  in
  Unixqueue.add_event esys (Unixqueue.Extra (HTTP_Job(call, f_done)));
  (* Wait until we get a signal: *)
  Mutex.lock mutex;
  Condition.wait cond mutex;
  print_endline !result;
  flush stdout;
  Mutex.unlock mutex
;;


let _ =
  (* Unixqueue.set_debug_mode true; *)

  (* Initialize first: *)
  http_init();

  (* Start the HTTP thread: *)
  let http_thr = Thread.create http_thread () in
  
  (* Start a lot of caller threads: *)
  let callers = ref [] in
  for n = 1 to 100 do
    let thr = Thread.create caller_thread () in
    callers := thr :: !callers
  done;

  (* Wait until the callers return: *)
  List.iter Thread.join !callers;

  (* Shut down the HTTP thread, and wait until it is done *)
  shutdown_http_thread();
  Thread.join http_thr
;;

  

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml