Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: unixqueue_util.ml 1262 2009-08-31 18:14:21Z gerd $ *)

(** Definitions common to {!Unixqueue and {!Unixqueue2} *)

(** These are internals of Ocamlnet! *)

open Printf

module Debug = struct
  let enable = ref false
  let target = ref `Any
  let set_debug_mode flag = enable := flag
  let set_debug_target t = target := t
end

let dlog0 = Netlog.Debug.mk_dlog "Unixqueue" Debug.enable
let dlogr0 = Netlog.Debug.mk_dlogr "Unixqueue" Debug.enable

let dlog m =
  if Equeue.Debug.test_debug_target !Debug.target then dlog0 m

let dlogr gm =
  if Equeue.Debug.test_debug_target !Debug.target then dlogr0 gm

let () =
  Netlog.Debug.register_module "Unixqueue" Debug.enable



(* [group] and [wait_id] are now objects. The structural equality
 * ( = ) compares object IDs if applied to objects, so that this
 * is exactly what we need. It is no longer necessary to manage
 * the IDs ourselves, because the language already manages object IDs.
 *
 * This has also the advantage that groups can now have additional
 * properties.
 *)

class group_object =
object(self)
  val mutable terminating = false
      (* Whether the group is terminating *)
  method is_terminating = terminating
  method terminate() = 
    dlogr (fun () -> (sprintf "group_terminate <group %d>" (Oo.id self)));
    (* eprintf "group_terminate <group %d>\n%!" (Oo.id self); *)
    terminating <- true
end

type group = group_object

class wait_object =
object
end

type wait_id = wait_object

type operation =
    Wait_in  of Unix.file_descr
  | Wait_out of Unix.file_descr
  | Wait_oob of Unix.file_descr
  | Wait of wait_id

type event =
    Input_arrived of (group * Unix.file_descr)
  | Output_readiness of (group * Unix.file_descr)
  | Out_of_band of (group * Unix.file_descr)
  | Timeout of (group * operation)
  | Signal
  | Extra of exn

type resource_prop =
    group * float * float ref
    (* group, timeout value, time of last event *)


type event_system_t = 
    < new_group : unit -> group;
      new_wait_id : unit -> wait_id;
      exists_resource : operation -> bool;
      add_resource : group -> (operation * float) -> unit;
      add_weak_resource : group -> (operation * float) -> unit;
      add_close_action : group -> (Unix.file_descr * (Unix.file_descr -> unit)) -> unit;
      add_abort_action : group -> (group -> exn -> unit) -> unit;
      remove_resource : group -> operation -> unit;
      add_handler : group -> (event_system_t -> event Equeue.t -> event -> unit) -> unit;
      add_event : event -> unit;
      clear : group -> unit;
      run : unit -> unit;
      is_running : bool;
    >

class type event_system =
object
  method new_group : unit -> group
  method new_wait_id : unit -> wait_id
  method exists_resource : operation -> bool
  method add_resource : group -> (operation * float) -> unit
  method add_weak_resource : group -> (operation * float) -> unit
  method add_close_action : group -> (Unix.file_descr * (Unix.file_descr -> unit)) -> unit
  method add_abort_action : group -> (group -> exn -> unit) -> unit
  method remove_resource : group -> operation -> unit
  method add_handler : group -> (event_system_t -> event Equeue.t -> event -> unit) -> unit
  method add_event : event -> unit
  method clear : group -> unit
  method run : unit -> unit
  method is_running : bool
end



type handler =
    event_system_t -> event Equeue.t -> event -> unit


exception Abort of (group * exn)


let () =
  Netexn.register_printer
    (Abort(new group_object, Not_found))
    (fun e ->
       match e with
	 | Abort(g,e') ->
	     "Unixqueue.Abort(" ^ string_of_int(Oo.id g) ^ 
	       ", " ^ Netexn.to_string e' ^ ")"
	 | _ -> assert false
    )


let string_of_fd fd =
  Int64.to_string (Netsys.int64_of_file_descr fd)
;;


let string_of_op =
  function
      Wait_in fd   -> sprintf "Wait_in(%s)" (string_of_fd fd)
    | Wait_out fd  -> sprintf "Wait_out(%s)" (string_of_fd fd)
    | Wait_oob fd  -> sprintf "Wait_oob(%s)" (string_of_fd fd)
    | Wait id      -> sprintf "Wait(wait_id %d)" (Oo.id id)
;;


let string_of_event ev =
  match ev with
    Input_arrived (g,fd) ->
      sprintf "Input(group %d, fd %s)" (Oo.id g) (string_of_fd fd)
  | Output_readiness (g, fd) ->
      sprintf "Output(group %d, fd %s)" (Oo.id g) (string_of_fd fd)
  | Out_of_band (g, fd) ->
      sprintf "Out_of_band(group %d, fd %s)" (Oo.id g) (string_of_fd fd)
  | Timeout (g, op) ->
      sprintf "Timeout(group %d, %s)" (Oo.id g) (string_of_op op)
  | Signal ->
      "Signal"
  | Extra x ->
      sprintf "Extra(%s)" (Netexn.to_string x)
;;

let once_int is_weak (esys:event_system) g duration f =
  let id = esys#new_wait_id () in
  let op = Wait id in
  let called_back = ref false in

  let handler _ ev e =
    if !called_back then (
      dlogr
	(fun () ->
	   (sprintf
	      "once handler <unexpected terminate group %d>" (Oo.id g)));
      raise Equeue.Terminate
    )
    else
      let e_ref = Timeout(g,op) in
      if e = e_ref then begin
	dlogr
	  (fun () ->
	     (sprintf
		"once handler <regular timeout group %d>" (Oo.id g)));
        esys#remove_resource g op;  (* delete the resource *)
        called_back := true;
        let () = f() in             (* invoke f (callback) *)
        raise Equeue.Terminate      (* delete the handler *)
      end
      else (
	dlogr
	  (fun () ->
	     (sprintf
		"once handler <rejected timeout group %d, got %s but expected %s >"
		(Oo.id g) (string_of_event e) (string_of_event e_ref)));
        raise Equeue.Reject
      )
  in

  if duration >= 0.0 then begin
    if is_weak then
      esys#add_weak_resource g (op, duration)
    else
      esys#add_resource g (op, duration);
    esys#add_handler g handler
  end;
  ()

let once = once_int false
let weak_once = once_int true


(*
let debug_log esys ?label msg =
  if Equeue.test_debug_target !debug_mode then
    prerr_endline("Unixqueue debug log: " ^
                    ( match label with
                          Some l -> l
                        | None -> "anonymous" ) ^
                    " <" ^ msg ^ ">")

let exn_log esys ?(suppressed = false) ?(to_string = Netexn.to_string)
                 ?label e =
  if Equeue.test_debug_target !debug_mode then
    let msg =
      if suppressed then
        "Suppressed exn " ^ to_string e
      else
        "Exn " ^ to_string e in
    debug_log esys ?label msg
 *)


let () =
  Netsys_signal.init()

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml