(* HTTP Pipelines and multithreading
*
*
* This is the recommended solution for multi-threaded apps:
* A designated HTTP thread handles all HTTP requests, and the other threads
* of the program send their HTTP requests to the HTTP thread. The HTTP
* thread can process several requests in parallel.
*)
(* Compile with:
* ocamlfind ocamlopt -o t -package netclient,threads -linkpkg -thread http_mt.ml
*)
open Http_client
exception HTTP_Job of http_call * (http_call -> unit)
(* This is not an exception in the usual sense, but simply a tagged
* pair (call, f_done). This pair is pushed onto the event queue to
* send another HTTP request [call] to the HTTP thread. When the
* request is processed, the function [f_done] is called. Note that
* [f_done] is called in the context of the HTTP thread, and it must
* arrange some synchronisation with the calling thread to return
* the result.
*)
let http_esys = ref None
let get_http_esys() =
match !http_esys with
| None -> failwith "No event system"
| Some e -> e
let http_keep_alive_group = ref None
let get_http_keep_alive_group() =
match !http_keep_alive_group with
| None -> failwith "No keep alive group"
| Some g -> g
let http_init() =
let esys = Unixqueue.create_unix_event_system() in
let keep_alive_group = Unixqueue.new_group esys in
http_esys := Some esys;
http_keep_alive_group := Some keep_alive_group
;;
let http_thread() =
(* Create the HTTP pipeline for a known event system: *)
let esys = get_http_esys() in
let pipeline = new pipeline in
pipeline # set_event_system esys;
(* In order to keep the event system active when there are no HTTP requests
* to process, we add an artificial timer that never times out (-1.0).
* The timer is bound to a Unixqueue group, and by clearing this group
* the timer can be deleted.
*)
let keep_alive_group = get_http_keep_alive_group() in
let w = Unixqueue.new_wait_id esys in
Unixqueue.add_resource esys keep_alive_group (Unixqueue.Wait w,(-1.0));
(* We arrange now that whenever a HTTP_Job arrives on the event queue,
* a new HTTP call is started.
*)
Unixqueue.add_handler
esys
keep_alive_group
(fun _ _ event ->
match event with
| Unixqueue.Extra (HTTP_Job (call, f_done)) ->
pipeline # add_with_callback call f_done
| _ ->
raise Equeue.Reject (* The event is not for us *)
);
(* Now start the event queue. It returns when all jobs are done and
* the keep_alive_group is cleared.
*)
Unixqueue.run esys;
()
;;
let shutdown_http_thread() =
let esys = get_http_esys() in
let keep_alive_group = get_http_keep_alive_group() in
Unixqueue.clear esys keep_alive_group;
http_keep_alive_group := None;
http_esys := None
;;
let caller_thread() =
(* This is a thread that calls for an HTTP request *)
let esys = get_http_esys() in
let mutex = Mutex.create() in
let cond = Condition.create () in
let call = new get "http://localhost/" in
let result = ref "" in
let f_done call =
(* This function is called from the scope of the HTTP thread!
* Signal the calling thread that the call is done:
*)
Mutex.lock mutex;
result := ( match call # status with
| `Successful ->
let body = call # response_body # value in
body
| _ ->
"some problem"
);
Condition.signal cond;
Mutex.unlock mutex
in
Unixqueue.add_event esys (Unixqueue.Extra (HTTP_Job(call, f_done)));
(* Wait until we get a signal: *)
Mutex.lock mutex;
Condition.wait cond mutex;
print_endline !result;
flush stdout;
Mutex.unlock mutex
;;
let _ =
(* Unixqueue.set_debug_mode true; *)
(* Initialize first: *)
http_init();
(* Start the HTTP thread: *)
let http_thr = Thread.create http_thread () in
(* Start a lot of caller threads: *)
let callers = ref [] in
for n = 1 to 100 do
let thr = Thread.create caller_thread () in
callers := thr :: !callers
done;
(* Wait until the callers return: *)
List.iter Thread.join !callers;
(* Shut down the HTTP thread, and wait until it is done *)
shutdown_http_thread();
Thread.join http_thr
;;