 * $Id: 1412 2010-02-15 16:20:27Z gerd $

open Printf

exception Closed_channel
exception Broken_communication
exception Addressing_method_not_supported
exception Watchdog_timeout
exception Cancelled

class type async_out_channel = object
  method output : string -> int -> int -> int
  method close_out : unit -> unit
  method pos_out : int
  method flush : unit -> unit
  method can_output : bool
  method request_notification : (unit -> bool) -> unit

class type async_in_channel = object
  method input : string -> int -> int -> int
  method close_in : unit -> unit
  method pos_in : int
  method can_input : bool
  method request_notification : (unit -> bool) -> unit

type 't engine_state =
  [ `Working of int
  | `Done of 't
  | `Error of exn
  | `Aborted

class type [ 't ] engine = object
  method state : 't engine_state
  method abort : unit -> unit
  method request_notification : (unit -> bool) -> unit
  method event_system : Unixqueue.event_system

class type async_out_channel_engine = object
  inherit [ unit ] engine
  inherit async_out_channel

class type async_in_channel_engine = object
  inherit [ unit ] engine
  inherit async_in_channel

module Debug = struct
  let enable = ref false

let dlog = Netlog.Debug.mk_dlog "Uq_engines" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Uq_engines" Debug.enable

let () =
  Netlog.Debug.register_module "Uq_engines" Debug.enable

let is_active state =
  match state with
      `Working _ -> true
    | _          -> false

class [ 't ] engine_mixin (init_state : 't engine_state) =
  val mutable notify_list = []
  val mutable notify_list_new = []
  val mutable state = init_state

  method state = state

  method request_notification f =
    notify_list_new <- f :: notify_list_new
  method private set_state s =
    if s <> state then (
      state <- s;
      self # notify();

  method private notify() =
    notify_list <- notify_list @ notify_list_new;
    notify_list_new <- [];
    notify_list <- List.filter (fun f -> f()) notify_list
end ;;

let when_state ?(is_done = fun _ -> ())
               ?(is_error = fun _ -> ())
	       ?(is_aborted = fun _ -> ())
	       (eng : 'a #engine) =
  (* Execute is_done when the state of eng goes to `Done,
   * execute is_error when the state goes to `Error, and
   * execute is_aborted when the state goes to `Aborted.
   * The argument of the callback function is the argument
   * of the state value.
  eng # request_notification
    (fun () ->
       match eng#state with
	   `Done v    -> is_done v; false
	 | `Error x   -> is_error x; false
	 | `Aborted   -> is_aborted(); false
	 | `Working _ -> true

class ['a,'b] map_engine ~map_done ?map_error ?map_aborted 
              (eng : 'a #engine) =
  inherit ['b] engine_mixin (`Working 0)

    state <- self # map_state eng#state;
    eng # request_notification self#forward_notification;

  method private forward_notification() =
    (* This method is called when [eng] changes its state. We compute our
     * mapped state, and notify our own listeners.
    let eng_state = eng#state in
    let state' = self # map_state eng_state in
    self # set_state state';
    let cont =
      match state' with
	  (`Working _) -> true
	| (`Done _)
	| (`Error _)
	| `Aborted ->     false in

  method private map_state eng_state =
    match eng_state with
	(`Working _ as wrk_state) -> 
      | `Done x -> 
	    map_done x
      | (`Error x as err_state) ->
	  ( match map_error with
		Some f -> f x
	      | None   -> err_state
      | `Aborted ->
	  ( match map_aborted with
		Some f -> f ()
		| None   -> `Aborted

  method event_system = eng#event_system

  method abort = eng#abort

class ['a,'b] seq_engine (eng_a : 'a #engine) (make_b : 'a -> 'b #engine) =

  val mutable eng_a_state = eng_a # state

  val mutable eng_b = None
  val mutable eng_b_state = `Working 0

  inherit ['b] engine_mixin (`Working 0)

    eng_a # request_notification self#update_a

  method private update_a() =
    (* eng_a is running, eng_b not yet existing *)
    let s = eng_a # state in
    match s with
	`Working n ->
	  if s <> eng_a_state then self # count();
	  eng_a_state <- s;
      | `Done arg ->
	  (* Create eng_b *)
	  let e = make_b arg in
	  eng_b <- Some e;
	  let s' = e # state in
	  eng_b_state <- s';
	  self # count();
	  e # request_notification self#update_b;
      | `Error arg ->
	  self # set_state (`Error arg);
      | `Aborted ->
	  self # set_state `Aborted;

  method private update_b() =
    (* eng_a is `Done, eng_b is running *)
    let e = match eng_b with Some e -> e | None -> assert false in
    let s = e # state in
    match s with
	`Working n ->
	  if s <> eng_b_state then self # count();
	  eng_b_state <- s;
      | `Done arg ->
	  self # set_state s;
      | `Error arg ->
	  self # set_state s;
      | `Aborted ->
	  self # set_state s;

  method private count() =
    match state with
	`Working n ->
	  self # set_state (`Working (n+1))
      | _ ->
	  assert false

  method event_system = 
    eng_a # event_system

  method abort() =
    eng_a # abort();
    ( match eng_b with
	  Some e -> e # abort()
	| None -> ()

let abort_if_working eng =
  match eng#state with
      `Working _ ->
	eng # abort()
    | _ ->

class ['a,'b] sync_engine (eng_a : 'a #engine) (eng_b : 'b #engine) =

  val mutable eng_a_state = eng_a # state

  val mutable eng_b_state = eng_b # state

  inherit ['a * 'b] engine_mixin (`Working 0)

    eng_a # request_notification self#update_a;
    eng_b # request_notification self#update_b

  method private update_a() =
    let s = eng_a # state in
    match s with
	`Working n ->
	  if s <> eng_a_state then self # transition();
	  eng_a_state <- s;
      | _ ->
	  eng_a_state <- s;
	  self # transition();
	  abort_if_working eng_b;

  method private update_b() =
    let s = eng_b # state in
    match s with
	`Working n ->
	  if s <> eng_b_state then self # transition();
	  eng_b_state <- s;
      | _ ->
	  eng_b_state <- s;
	  self # transition();
	  abort_if_working eng_a;

  method private transition() =
    (* Compute new state from eng_a_state and eng_b_state: *)
    let state' =
      match state with
	  `Working n ->
	    ( match (eng_a_state, eng_b_state) with
		  (`Working _, `Working _) ->
		    `Working (n+1)
		| (`Working _, `Done _) ->
		    `Working (n+1)
		| (`Done _, `Working _) ->
		    `Working (n+1)
		| (`Done a, `Done b) ->
		    `Done (a,b)
		| (`Error x, _) ->
		    `Error x
		| (_, `Error x) ->
		    `Error x
		| (`Aborted, _) ->
		| (_, `Aborted) ->
	| _ ->
	    (* The state will never change again! *)
    self # set_state state'

  method event_system = 
    eng_a # event_system

  method abort() =
    eng_a # abort();
    eng_b # abort();

class poll_engine ?(extra_match = fun _ -> false) oplist ues =

  inherit [Unixqueue.event] engine_mixin (`Working 0)

  val mutable group = Unixqueue.new_group ues

    self # restart()

  method group = group

  method restart() =
    group <- Unixqueue.new_group ues;
    state <- (`Working 0 : Unixqueue.event engine_state);
    (* Define the event handler: *)
    Unixqueue.add_handler ues group (fun _ _ -> self # handle_event);
    (* Define the abort (exception) handler: *)
    Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception);
    (* Add the resources: *)
    List.iter (Unixqueue.add_resource ues group) oplist;

  method private handle_event ev =
    match ev with
	Unixqueue.Input_arrived(g,fd) when g = group ->
	  self # accept_event ev
      | Unixqueue.Output_readiness(g,fd) when g = group ->
	  self # accept_event ev
      | Unixqueue.Out_of_band(g,fd) when g = group ->
	  self # accept_event ev
      | Unixqueue.Timeout(g,op) when g = group ->
	  self # accept_event ev
      | Unixqueue.Extra x ->
	  if extra_match x then
	    self # accept_event ev
	    raise Equeue.Reject
      | _ ->
	  raise Equeue.Reject

  method private accept_event ev =
    Unixqueue.clear ues group;
    self # set_state (`Done ev);

  method private handle_exception x =
    self # set_state (`Error x)

  method abort() =
    match state with
	`Working _ ->
	  Unixqueue.clear ues group;
	  self # set_state `Aborted;
      | _ ->

  method event_system = ues

end ;;

class poll_process_engine ?(period = 0.1) ~pid ues =

  inherit [Unix.process_status] engine_mixin (`Working 0)

  val group = Unixqueue.new_group ues
  val wait_id = Unixqueue.new_wait_id ues

    (* Define the event handler: *)
    Unixqueue.add_handler ues group (fun _ _ -> self # handle_event);
    (* Define the abort (exception) handler: *)
    Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception);
    (* Add the resources: *)
    Unixqueue.add_resource ues group (Unixqueue.Wait wait_id, period);

  method private handle_event ev =
    match ev with
	Unixqueue.Timeout(g, Unixqueue.Wait wid) 
	                                   when g = group && wid = wait_id ->
	  self # check_process()
      | Unixqueue.Signal ->
	  self # check_process();
	  raise Equeue.Reject    (* Signal must not be accepted! *)
      | _ ->
	  raise Equeue.Reject

  method private check_process () =
      let (w_pid, w_status) = Unix.waitpid [ Unix.WNOHANG ] pid in
      if w_pid > 0 then (
	Unixqueue.clear ues group;
	self # set_state (`Done w_status);
	error ->

  method private handle_exception x =
    self # set_state (`Error x)

  method abort() =
    match state with
	`Working _ ->
	  Unixqueue.clear ues group;
	  self # set_state `Aborted;
      | _ ->

  method event_system = ues

end ;;

class watchdog period eng =
  let ues = eng#event_system in
  let wid = Unixqueue.new_wait_id ues in
object (self)
  inherit [unit] engine_mixin (`Working 0)

  val mutable last_eng_state = eng # state
  val timer_eng = new poll_engine [ Unixqueue.Wait wid, 0.1 *. period ] ues
  val mutable aborted = false
  val mutable inactivity = 0
			     (* Counts to 10 *)

    let rec watch() =
	~is_done:(fun _ ->
		    let eng_state = eng # state in
		    if eng_state = last_eng_state then (
		      inactivity <- inactivity + 1;
		      if inactivity >= 10 then (
			aborted <- true;
			self # set_state (`Error Watchdog_timeout)
		      else (
			timer_eng # restart();
		    else (
		      last_eng_state <- eng_state;
		      inactivity <- 0;
		      timer_eng # restart();


      ~is_done:(fun _ -> if not aborted then self # set_state (`Done()))
      ~is_error:(fun _ -> if not aborted then self # set_state (`Done()))
      ~is_aborted:(fun _ -> if not aborted then self # set_state (`Done()))

  method abort() =
    match state with
	`Working _ ->
	  aborted <- true;
	  timer_eng # abort();
	  self # set_state `Aborted;
      | _ ->

  method event_system = 

end ;;

class ['t] epsilon_engine target_state ues =
  (* Simply create a poll engine, and add a timeout event for its group.
   * The poll engine accepts all events of that group and switches to
   * `Done.
  let eng =
    new poll_engine [] ues in
  let g =
    eng # group in
  let wid =
    Unixqueue.new_wait_id ues in
  let () =
    Unixqueue.add_event ues (Unixqueue.Timeout(g, Unixqueue.Wait wid)) in
  [Unixqueue.event, 't] map_engine 
    ~map_done:(fun _ -> target_state)
    (eng :> Unixqueue.event engine)

(* TODO: Avoid the usage of Extra events here. Extra events are more
 * expensive than other events because all handlers see them.
 * Can be substituted with Timeout events.

exception Receiver_attn of ;;
let receiver_attn g = Unixqueue.Extra(Receiver_attn g);;

exception Sender_attn of ;;
let sender_attn g = Unixqueue.Extra(Sender_attn g);;

let buf_max_size = 4096;;

class receiver ~src ~(dst : #async_out_channel) ?(close_src=true) 
               ?(close_dst=true) ues 
      : [ unit ] engine = 
  (* The receiver has to copy data if (1) the src file descriptor is
   * readable, and (2) the dst channel accepts output. There is also
   * an internal buffer that stored read data that cannot yet be 
   * written into the dst channel.
   * We implement the following logic:
   * - The src file descriptor is polled when there is space in the
   *   internal buffer. Every time new data is added to the buffer,
   *   the event Receiver_attn is generated
   * - When the dst state changes, the event Receiver_attn is generated
   * - The event handler catches Receiver_attn, and checks whether
   *   the output channel is ready. If so, data of the internal
   *   buffer is written to the output channel, and a new Receiver_attn
   *   event is generated. If the output channel is not ready, nothing
   *   will happen.

  inherit [unit] engine_mixin (`Working 0 : unit engine_state)

  val group = Unixqueue.new_group ues

  val buf = String.create buf_max_size
  val mutable buf_size = 0

  val mutable in_eof = false
  val mutable in_polling = false

  val mutable out_eof = false

  val mutable deferred_exn = None

    (* Arrange that Receiver_attn is generated when the dst state changes: *)
    dst # request_notification
      (fun () ->
	 (* Note: With MT, we do not know which thread calls this function.
	  * Fortunately, add_event is thread-safe.
	 if is_active state then (
	   Unixqueue.add_event ues (receiver_attn group);
	     (* Continue notifications *)
	     (* The engine is no longer active: disable any further
	      * notification
    (* Define the event handler: *)
    Unixqueue.add_handler ues group (fun _ _ -> self # handle_event);
    (* Define the abort (exception) handler: *)
    Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception);
    (* Because the internal buffer is empty initially, we can poll
     * src: 
    Unixqueue.add_resource ues group (Unixqueue.Wait_in src, -1.0);
    in_polling <- true   (* Remember add_resource *)

  method abort() =
    match state with
	`Working _ ->
	  if not in_eof && close_src then Unix.close src;
	  in_eof <- true;
	  if not out_eof && close_dst then dst # close_out();
	  out_eof <- true;
	  self # set_state `Aborted;
	  Unixqueue.clear ues group
      | _ ->

  method event_system = ues

  method private count() =
    match state with
	`Working n -> 
	  self # set_state (`Working (n+1))
      | _ ->

  method private handle_event ev =
    match ev with
	Unixqueue.Input_arrived(g,_) when g = group ->
	  self # handle_input();
	  self # check_input_polling();
      | Unixqueue.Extra (Receiver_attn g) when g = group ->
	  self # handle_output();
	  if out_eof then (
	    Unixqueue.clear ues group;    (* Delete the whole group *)
	    raise Equeue.Terminate        (* Deactivate this handler *)
      | _ ->
	  raise Equeue.Reject

  method private handle_exception exn =
    (* Unixqueue already ensures that the whole group will be deleted,
     * so we need not to do it here
    if not in_eof && close_src then (
      try Unix.close src
      with error ->
	Netlog.logf `Err
	  "Uq_engines.receiver#handle_exception: %s" 
	  (Netexn.to_string error) );
    in_eof <- true;
    if not out_eof && close_dst then (
      try dst # close_out()
      with error ->
	Netlog.logf `Err
	  "Uq_engines.receiver#handle_exception: %s" 
	  (Netexn.to_string error) );
    out_eof <- true;
    self # set_state (`Error exn)

  method private handle_input() =
    if not in_eof && buf_size < buf_max_size then
	let n = src buf buf_size (buf_max_size - buf_size) in
	buf_size <- buf_size + n;
	in_eof <- (n = 0);
	if in_eof && close_src then Unix.close src;
	Unixqueue.add_event ues (receiver_attn group);
	self # count();
	| Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
	| Unix.Unix_error(Unix.EINTR,_,_) ->
	    (* These exceptions are expected, and can be ignored *)
	| error ->
	    (* Any other exception stops the engine. But first it is tried to
	     * process the buffer contents:
	    in_eof <- true;
	    deferred_exn <- Some error;
	    if in_eof && close_src then Unix.close src;
	    Unixqueue.add_event ues (receiver_attn group);
	    self # count();

  method private check_input_polling() =
    let need_polling = not in_eof && buf_size < buf_max_size in
    ( if need_polling && not in_polling then
	Unixqueue.add_resource ues group (Unixqueue.Wait_in src, -1.0)
	if not need_polling && in_polling then
	  Unixqueue.remove_resource ues group (Unixqueue.Wait_in src);
    in_polling <- need_polling

  method private handle_output() =
    (* If this method is called when out_eof, we assume that this is
     * an event coming too late. Just ignore.
    if not out_eof then (
      (* First check the state of dst: If [pos_out] raises an exception,
       * we assume that the output channel is broken.
      ( try ignore(dst#pos_out)
	    _ ->
	      (* dst is in an error state, or somebody has closed it *)

      (* It is possible that dst#can_output is false, because we get
       * Reciever_attn events for many conditions, not just that
       * output is again accepted. Ignore this case.
	if dst#can_output then (
	  if buf_size > 0 then (
	    let n = dst # output buf 0 buf_size in
	    if n > 0 then (
	      String.blit buf n buf 0 (buf_size - n);
	      buf_size <- buf_size - n;
	      if (buf_size > 0 && dst#can_output) || in_eof then
		Unixqueue.add_event ues (receiver_attn group);
	      self # check_input_polling();
	      self # count();
	  else if in_eof then (
	    (* Note: we do not close dst. out_eof just means that copying
	     * is done
	    if close_dst then dst # close_out();
	    out_eof <- true;
	    ( match deferred_exn with
		| None -> self # set_state (`Done());
		| Some err -> self # set_state (`Error err);
	  error ->
	    (* In most cases coming from dst#output *)


class sender ~(src : #async_in_channel) ~dst ?(close_src=true) 
               ?(close_dst=true) ues 
      : [ unit ] engine = 
  (* The sender has to copy data if (1) the src channel is
   * readable, and (2) the dst descriptor accepts output. There is also
   * an internal buffer that stored read data that cannot yet be 
   * written into the dst descriptor.
   * We implement the following logic:
   * - The dst file descriptor is polled when there is data in the
   *   internal buffer. Every time new data is added to the buffer,
   *   the event Sender_attn is generated
   * - When the src state changes, the event Sender_attn is generated
   * - The event handler catches Sender_attn, and checks whether
   *   the input channel has data. If so, the data is appended to the internal
   *   buffer, and a new Sender_attn
   *   event is generated.

  inherit [unit] engine_mixin (`Working 0 : unit engine_state)

  val group = Unixqueue.new_group ues

  val buf = String.create buf_max_size
  val mutable buf_size = 0

  val mutable in_eof = false

  val mutable out_eof = false
  val mutable out_polling = false

    (* Arrange that Sender_attn is generated when the src state changes: *)
    src # request_notification
      (fun () ->
	 (* Note: With MT, we do not know which thread calls this function.
	  * Fortunately, add_event is thread-safe.
	 if is_active state then (
	   Unixqueue.add_event ues (sender_attn group);
	     (* Continue notifications *)
	     (* The engine is no longer active: disable any further
	      * notification
    (* Define the event handler: *)
    Unixqueue.add_handler ues group (fun _ _ -> self # handle_event);
    (* Define the abort (exception) handler: *)
    Unixqueue.add_abort_action ues group (fun _ -> self # handle_exception);
    (* Because the internal buffer is empty initially, we cannot poll
     * dst. 
    out_polling <- false;
    (* Immediately check for input: *)
    Unixqueue.add_event ues (sender_attn group);

  method abort() =
    match state with
	`Working _ ->
	  if not in_eof && close_src then src # close_in();
	  in_eof <- true;
	  if not out_eof && close_dst then Unix.close dst;
	  out_eof <- true;
	  self # set_state `Aborted;
	  Unixqueue.clear ues group
      | _ ->

  method event_system = ues

  method private count() =
    match state with
	`Working n -> 
	  self # set_state (`Working (n+1))
      | _ ->

  method private handle_event ev =
    match ev with
	Unixqueue.Extra (Sender_attn g) when g = group ->
	  self # handle_input();
      | Unixqueue.Output_readiness(g,_) when g = group ->
	  self # handle_output();
	  self # check_output_polling();
	  if out_eof then (
	    Unixqueue.clear ues group;    (* Delete the whole group *)
	    raise Equeue.Terminate        (* Deactivate this handler *)
      | _ ->
	  raise Equeue.Reject

  method private handle_exception exn =
    (* Unixqueue already ensures that the whole group will be deleted,
     * so we need not to do it here
    if not in_eof && close_src then (
      try src # close_in();
      with error ->
	Netlog.logf `Err
	  "Uq_engines.sender#handle_exception: %s" 
	  (Netexn.to_string error) );
    in_eof <- true;
    if not out_eof && close_dst then (
      try Unix.close dst
      with error ->
	Netlog.logf `Err
	  "Uq_engines.sender#handle_exception: %s" 
	  (Netexn.to_string error) );
    out_eof <- true;
    self # set_state (`Error exn)

  method private handle_output() =
    if not out_eof then
	let n = Unix.single_write dst buf 0 buf_size in
	String.blit buf n buf 0 (buf_size - n);
	buf_size <- buf_size - n;
	if buf_size = 0 && in_eof then (
	  out_eof <- true;
	  if close_dst then Unix.close dst;
	  self # set_state (`Done());
	else (
	  self # count();
	  if n > 0 && not in_eof && src#can_input then
	    Unixqueue.add_event ues (sender_attn group);
	  (* if not src#can_input, we will be notified when input is 
	   * again possible.
	| Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
	| Unix.Unix_error(Unix.EINTR,_,_) ->
	    (* These exceptions are expected, and can be ignored *)
	| error ->
	    (* Any other exception stops the engine *)

  method private check_output_polling() =
    let need_polling = not out_eof && (buf_size > 0 || in_eof) in
    ( if need_polling && not out_polling then
	Unixqueue.add_resource ues group (Unixqueue.Wait_out dst, -1.0)
	if not need_polling && out_polling then
	  Unixqueue.remove_resource ues group (Unixqueue.Wait_out dst);
    out_polling <- need_polling

  method private handle_input() =
    (* If this method is called when in_eof, we assume that this is
     * an event coming too late. Just ignore.
    if not in_eof then (
      (* First check the state of src: If [pos_in] raises an exception,
       * we assume that the input channel is broken.
      ( try ignore(src#pos_in)
	    _ ->
	      (* src is in an error state, or somebody has closed it *)

      (* It is possible that src#can_input is false, because we get
       * Sender_attn events for many conditions, not just that
       * input data is again available. Ignore this case.
	if src#can_input then (
	  let l = String.length buf in
	  if buf_size < l then (
	      let n = src # input buf buf_size (l-buf_size) in
	      if n > 0 then (
		buf_size <- buf_size + n;
		(* Check for more input data immediately: *)
		if buf_size < l then
		  Unixqueue.add_event ues (sender_attn group);
		self # check_output_polling();
		self # count();
		End_of_file ->
		  (* We do see EOF for the first time! *)
		  if close_src then src # close_in();
		  in_eof <- true;
		  self # check_output_polling();
		  self # count();
	  error ->
	    (* In most cases coming from src#input *)


exception Mem_not_supported

class type multiplex_controller =
  method alive : bool
  method mem_supported : bool
  method event_system : Unixqueue.event_system
  method reading : bool
  method start_reading : 
    ?peek:(unit -> unit) ->
    when_done:(exn option -> int -> unit) -> string -> int -> int -> unit
  method start_mem_reading : 
    ?peek:(unit -> unit) ->
    when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int ->
  method cancel_reading : unit -> unit
  method writing : bool
  method start_writing :
    when_done:(exn option -> int -> unit) -> string -> int -> int -> unit
  method start_mem_writing : 
    when_done:(exn option -> int -> unit) -> Netsys_mem.memory -> int -> int ->
  method supports_half_open_connection : bool
  method start_writing_eof :
    when_done:(exn option -> unit) -> unit -> unit
  method cancel_writing : unit -> unit
  method read_eof : bool
  method wrote_eof : bool
  method shutting_down : bool
  method start_shutting_down :
    ?linger : float ->
    when_done:(exn option -> unit) -> unit -> unit
  method cancel_shutting_down : unit -> unit
  method inactivate : unit -> unit

class type datagram_multiplex_controller =
  inherit multiplex_controller
  method received_from : Unix.sockaddr
  method send_to : Unix.sockaddr -> unit

type onshutdown_out_spec =
    [ `Ignore
    | `Initiate_shutdown
    | `Action of async_out_channel_engine -> multiplex_controller -> 
                   unit engine_state -> unit

type onshutdown_in_spec =
    [ `Ignore
    | `Initiate_shutdown
    | `Action of async_in_channel_engine -> multiplex_controller -> 
                   unit engine_state -> unit

type onclose_spec = [ `Ignore | `Write_eof ]

class output_async_mplex ?(onclose = (`Ignore : onclose_spec) )
                         ?(onshutdown = (`Ignore : onshutdown_out_spec) )
                         (mplex : multiplex_controller)
                         : async_out_channel_engine =
object (self)

  inherit [unit] engine_mixin (`Working 0 : unit engine_state)

  val data_queue = Queue.create()
		     (* The queue of strings to output *)

  val mutable data_top_pos = 0
		     (* How many bytes of the first string of data_queue
		      * have already been copied to buf.

  val mutable data_queue_length = 0
		     (* The sum of all strings in data_queue, not counting
		      * data_top_pos

  val buf = String.create buf_max_size
	      (* The output buffer. The strings from data_queue are
	       * appended to this buffer to reduce the number of
	       * Unix.write syscalls

  val mutable buf_size = 0
	     (* The number of bytes used at the beginning of [buf]. *)

  val mutable pos_out = 0
	     (* The position of the channel *)

  (* Note that the object buffers the strings in data_queue plus the
   * string in buf, and buffer_size is the limit for 
   * data_queue_length + buf_size

  val mutable in_eof = false
  val mutable shutdown_done = false

  method output s p l =
    if p < 0 || l < 0 || p > String.length s || p+l > String.length s then
      invalid_arg "Uq.engines.output_async_mplex#output";

    if in_eof then raise Closed_channel;

    let l' =
      match buffer_size with
	  None ->
	    (* Unrestricted buffers *)
	    if l > 0 then Queue.add (String.sub s p l) data_queue;
	| Some max_size ->
	    let size = data_queue_length + buf_size in
	    let n = min l (max_size - size) in
	    if n > 0 then Queue.add (String.sub s p n) data_queue;

    pos_out <- pos_out + l';
    data_queue_length <- data_queue_length + l';
    assert(data_queue_length >= 0);  (* must never overflow *)

    if not mplex#writing && l' > 0 then 
      self # check_for_output();

    if l' > 0 then self # count();
    (* If l' = 0, there was no space in the buffer. No need for notification *)


  method close_out () =
    if not in_eof then (
      in_eof <- true;
      if not mplex#writing then
	self # check_for_output();

  method pos_out =
    if in_eof then raise Closed_channel;

  method flush () = 
    if in_eof then raise Closed_channel;

  method abort() =
    match state with
	`Working _ ->
	  mplex # cancel_writing();
	  self # shutdown `Aborted;
      | _ ->

  method event_system = mplex # event_system

  method private count() =
    match state with
	`Working n -> 
	  self # set_state (`Working (n+1))
      | _ ->

  method can_output =
    not in_eof &&
    match buffer_size with
	None ->
	  (* Unrestricted buffers *)
      | Some max_size ->
	  let size = data_queue_length + buf_size in
	  size < max_size

  method private handle_exception exn =
    mplex # cancel_writing();
    self # shutdown (`Error exn)

  method private check_for_output() =
    assert(not mplex#writing);
    if not mplex#wrote_eof then (
      (* Refill buf: *)
      while buf_size < buf_max_size && not (Queue.is_empty data_queue) do
	let s0 = data_queue in
	let m = String.length s0 - data_top_pos in
	let space = buf_max_size - buf_size in
	let n = min space m in
	String.blit s0 data_top_pos buf buf_size n;
	buf_size <- buf_size + n;
	data_top_pos <- data_top_pos + n;
	data_queue_length <- data_queue_length - n;
	if data_top_pos >= String.length s0 then (
	  ignore(Queue.take data_queue);
	  data_top_pos <- 0
	assert(data_queue_length >= 0);  (* must never overflow *)
	assert(data_top_pos >= 0);       (* must never overflow *)
      (* Have something to write? *)
      if buf_size > 0 then (
	let cur_buf_size = buf_size in
	mplex # start_writing 
	  ~when_done:(fun exn_opt n ->
			match exn_opt with
			  | None ->
			      assert(buf_size = cur_buf_size);
			      String.blit buf n buf 0 (buf_size - n);
			      buf_size <- buf_size - n;
			      self # check_for_output();
			      if n > 0 then self # count();
			      (* Note: this also implies notification because
                               * [can_output] returns true
			  | Some Cancelled ->
			      (* Called from [abort], so ignore any data *)
			  | Some error ->
			      self # handle_exception error
	  buf 0 cur_buf_size;
	if in_eof then (
	  match onclose with
	    | `Write_eof ->
		mplex # start_writing_eof
		  ~when_done:(fun exn_opt ->
				match exn_opt with
				  | None ->
				      self # shutdown (`Done());
				  | Some Cancelled ->
				  | Some error ->
				      self # handle_exception error
	    | `Ignore ->
		self # shutdown (`Done());

  method private shutdown next_state =
    (* See also input_async_mplex # shutdown *)
    if not shutdown_done then (
      shutdown_done <- true;
      in_eof <- true;
      Queue.clear data_queue;
      data_queue_length <- 0;
      data_top_pos <- 0;
      ( match onshutdown with
	  | `Ignore -> ()
	  | `Initiate_shutdown ->
	      mplex # start_shutting_down ~when_done:(fun _ -> ()) ()
		(* CHECK: What to do if shutdown not possible? E.g. because
                 * there is also a reader?
	  | `Action f ->
	      ( try
		    (self : #async_out_channel_engine :> async_out_channel_engine)
		with error ->
		  (* CHECK: We could map that also to state Error *)
		  Netlog.logf `Err
		    "Uq_engines.output_async_mplex#shutdown: %s" 
		    (Netexn.to_string error)
      self # set_state next_state


class input_async_mplex ?(onshutdown = (`Ignore : onshutdown_in_spec) )
                        (mplex : multiplex_controller)
                        : async_in_channel_engine =
object (self)

  inherit [unit] engine_mixin (`Working 0 : unit engine_state)

  val data_queue = Queue.create()
		     (* The queue of the read strings *)

  val mutable data_top_pos = 0
		     (* How many bytes of the first string of data_queue
		      * have already been copied to the reading user.

  val mutable data_queue_length = 0
		     (* The sum of all strings in data_queue, not counting
		      * data_top_pos

  val buf = String.create buf_max_size
	      (* The input buffer *)

  val mutable pos_in = 0

  val mutable in_eof = false
  val mutable shutdown_done = false

    self # check_for_input()

  method input s p l =
    if p < 0 || l < 0 || p > String.length s || p+l > String.length s then
      invalid_arg "Uq.engines.input_async_mplex#input";

    if in_eof then raise Closed_channel;

    let l' = min l data_queue_length in
    let l_todo = ref l' in
    let s_pos = ref p in

    while !l_todo > 0 do
      let u = try Queue.peek data_queue with Queue.Empty -> assert false in
      let n = min !l_todo (String.length u - data_top_pos) in
      String.blit u data_top_pos s !s_pos n;
      s_pos := !s_pos + n;
      data_top_pos <- data_top_pos + n;
      l_todo := !l_todo - n;
      if data_top_pos = String.length u then (
	let _ = Queue.take data_queue in
	data_top_pos <- 0

    pos_in <- pos_in + l';
    data_queue_length <- data_queue_length - l';
    assert(data_queue_length >= 0);  (* must never overflow *)

    if not mplex#reading then 
      self # check_for_input();

    if l' > 0 then self # count();
    (* If l' = 0, there were no data in the buffer. No need for notification *)

    if l' = 0 && mplex # read_eof then
      raise End_of_file

  method close_in () =
    if not in_eof then (
      mplex # cancel_reading();
      self # shutdown (`Done())

  method pos_in =
    if in_eof then raise Closed_channel;

  method abort() =
    match state with
	`Working _ ->
	  mplex # cancel_reading();
	  self # shutdown `Aborted;
      | _ ->

  method event_system = mplex # event_system

  method private count() =
    match state with
	`Working n -> 
	  self # set_state (`Working (n+1))
      | _ ->

  method can_input =
    not in_eof && (data_queue_length > 0 || mplex#read_eof)

  method private handle_exception exn =
    mplex # cancel_reading();
    self # shutdown (`Error exn)

  method private check_for_input() =
    assert(not mplex#reading);
    if not mplex#read_eof then (
      (* Space to read something? *)
      let space =
	match buffer_size with
	  | None -> String.length buf
	  | Some m -> min (String.length buf) (m - data_queue_length) in
      if space > 0 then (
	mplex # start_reading 
	  ~when_done:(fun exn_opt n ->
			match exn_opt with
			  | None ->
			      if n > 0 then (
				let s = String.sub buf 0 n in
				Queue.add s data_queue;
				data_queue_length <- data_queue_length + n;
				assert(data_queue_length >= 0);
				      (* must never overflow *)
			      self # check_for_input();
			      if n > 0 then self # count()
			  | Some End_of_file ->
			      self # count()
			  | Some Cancelled ->
			      (* Called from [abort], so ignore any data *)
			  | Some error ->
			      self # handle_exception error
	  buf 0 space

  method private shutdown next_state =
    (* See also output_async_mplex # shutdown *)
    if not shutdown_done then (
      shutdown_done <- true;
      in_eof <- true;
      Queue.clear data_queue;
      data_top_pos <- 0;
      data_queue_length <- 0;
      ( match onshutdown with
	  | `Ignore -> ()
	  | `Initiate_shutdown ->
	      mplex # start_shutting_down ~when_done:(fun _ -> ()) ()
	  | `Action f ->
	      ( try
		    (self : #async_in_channel_engine :> async_in_channel_engine)
		with error ->
		  Netlog.logf `Err
		    "Uq_engines.input_async_mplex#shutdown: %s" 
		    (Netexn.to_string error)
      self # set_state next_state


let anyway ~finally f arg =
    let r = f arg in
    | error ->
	raise error

class socket_multiplex_controller
         ?(close_inactive_descr = true)
         ?(preclose = fun () -> ())
         ?(supports_half_open_connection = false)
         fd esys : datagram_multiplex_controller =

  let fd_style = Netsys.get_fd_style fd in

  let get_ph() = Netsys_win32.lookup_pipe fd in
    (* To be used only when fd_style = `Pipe *)

  let supports_half_open_connection =
    match fd_style with
      | `W32_pipe -> false
      | _ -> supports_half_open_connection in

  let mem_supported =
    match fd_style with
      | `Read_write -> true
      | `Recv_send _ -> true
      | `Recv_send_implied -> true
      | _ -> false in

  let () =
    prerr_endline ("fd style: " ^ Netsys.string_of_fd_style fd_style) in

  val mutable alive = true
  val mutable read_eof = false
  val mutable wrote_eof = false
  val mutable reading = `None
  val mutable writing = `None
  val mutable writing_eof = None
  val mutable shutting_down = None
  val mutable disconnecting = None
  val mutable need_linger = false
  val mutable have_handler = false

  val mutable rcvd_from = None
  val mutable send_to = None

  val group = Unixqueue.new_group esys

  method alive = alive
  method mem_supported = mem_supported
  method reading = reading <> `None
  method writing = writing <> `None || writing_eof <> None
  method shutting_down = shutting_down <> None
  method read_eof = read_eof
  method wrote_eof = wrote_eof

  method supports_half_open_connection = supports_half_open_connection

  method received_from =
    match rcvd_from with
      | None -> 
	  failwith "#received_from: Nothing received yet, or unknown address"
      | Some a -> a

  method send_to a =
    send_to <- Some a

    ( match fd_style with
	| `W32_pipe -> ()
	| `W32_event | `W32_pipe_server | `W32_input_thread 
	| `W32_output_thread | `W32_process ->
	    invalid_arg "Uq_engines.socket_multiplex_controller: \
                       invalid type of file descriptor"
	| _ -> 
	    Unix.set_nonblock fd
    dlogr (fun () ->
	       "new socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method start_reading ?(peek = fun ()->()) ~when_done s pos len =
    if pos < 0 || len < 0 || pos > String.length s - len then
      invalid_arg "#start_reading";
    if reading <> `None then
      failwith "#start_reading: already reading";
    if shutting_down <> None then
      failwith "#start_reading: already shutting down";
    if not alive then
      failwith "#start_reading: inactive connection";
    self # check_for_connect();
    Unixqueue.add_resource esys group (Unixqueue.Wait_in fd, -1.0);
    reading <- `String(when_done, peek, s, pos, len);
    disconnecting <- None;
    dlogr (fun () ->
	       "start_reading socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method start_mem_reading ?(peek = fun ()->()) ~when_done m pos len =
    if not mem_supported then raise Mem_not_supported;
    if pos < 0 || len < 0 || pos > Bigarray.Array1.dim m - len then
      invalid_arg "#start_mem_reading";
    if reading <> `None then
      failwith "#start_mem_reading: already reading";
    if shutting_down <> None then
      failwith "#start_mem_reading: already shutting down";
    if not alive then
      failwith "#start_mem_reading: inactive connection";
    self # check_for_connect();
    Unixqueue.add_resource esys group (Unixqueue.Wait_in fd, -1.0);
    reading <- `Mem(when_done, peek, m, pos, len);
    disconnecting <- None;
    dlogr (fun () ->
	       "start_reading socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method cancel_reading () =
    match reading with
      | `None ->
      | `String(f_when_done, _, _, _, _) ->
	  self # really_cancel_reading();
	    (f_when_done (Some Cancelled)) 0
      | `Mem(f_when_done, _, _, _, _) ->
	  self # really_cancel_reading();
	    (f_when_done (Some Cancelled)) 0

  method private really_cancel_reading() =
    if reading <> `None then (
      Unixqueue.remove_resource esys group (Unixqueue.Wait_in fd);
      reading <- `None;
      dlogr (fun () ->
		 "cancel_reading socket_multiplex_controller mplex=%d fd=%Ld"
		 ( self) (Netsys.int64_of_file_descr fd))

  method start_writing ~when_done s pos len =
    if pos < 0 || len < 0 || pos > String.length s - len then
      invalid_arg "#start_writing";
    if writing <> `None || writing_eof <> None then
      failwith "#start_writing: already writing";
    if shutting_down <> None then
      failwith "#start_writing: already shutting down";
    if wrote_eof then
      failwith "#start_writing: already past EOF";
    if not alive then
      failwith "#start_writing: inactive connection";
    self # check_for_connect();
    Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, -1.0);
    writing <- `String(when_done, s, pos, len);
    disconnecting <- None;
    dlogr (fun () ->
	       "start_writing socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method start_mem_writing ~when_done m pos len =
    if not mem_supported then raise Mem_not_supported;
    if pos < 0 || len < 0 || pos > Bigarray.Array1.dim m - len then
      invalid_arg "#start_mem_writing";
    if writing <> `None || writing_eof <> None then
      failwith "#start_mem_writing: already writing";
    if shutting_down <> None then
      failwith "#start_mem_writing: already shutting down";
    if wrote_eof then
      failwith "#start_mem_writing: already past EOF";
    if not alive then
      failwith "#start_mem_writing: inactive connection";
    self # check_for_connect();
    Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, -1.0);
    writing <- `Mem(when_done, m, pos, len);
    disconnecting <- None;
    dlogr (fun () ->
	       "start_writing socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method start_writing_eof ~when_done () =
    if not supports_half_open_connection then
      failwith "#start_writing_eof: operation not supported";
    (* From here on we know fd is not a named pipe *)
    if writing <> `None || writing_eof <> None then
      failwith "#start_writing_eof: already writing";
    if shutting_down <> None then
      failwith "#start_writing_eof: already shutting down";
    if wrote_eof then
      failwith "#start_writing_eof: already past EOF";
    if not alive then
      failwith "#start_writing_eof: inactive connection";
    self # check_for_connect();
    Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, -1.0);
    writing_eof <- Some when_done;
    disconnecting <- None;
    dlogr (fun () ->
	       "start_writing_eof socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method cancel_writing () =
    match writing, writing_eof with
      | `None, None ->
      | (`String(f_when_done, _, _, _) | `Mem(f_when_done, _, _, _)), None ->
	  self # really_cancel_writing();
	    (f_when_done (Some Cancelled)) 0
      | `None, Some f_when_done ->
	  self # really_cancel_writing();
	    f_when_done (Some Cancelled)
      | _ ->
	  assert false

  method private really_cancel_writing() =
    if writing <> `None || writing_eof <> None then (
      Unixqueue.remove_resource esys group (Unixqueue.Wait_out fd);
      writing <- `None;
      writing_eof <- None;
      dlogr (fun () ->
		 "cancel_writing socket_multiplex_controller mplex=%d fd=%Ld"
		 ( self) (Netsys.int64_of_file_descr fd))


  method start_shutting_down ?(linger = 60.0) ~when_done () =
    if reading <> `None || writing <> `None || writing_eof <> None then
      failwith "#start_shutting_down: still reading or writing";
    if shutting_down <> None then
      failwith "#start_shutting_down: already shutting down";
    if not alive then
      failwith "#start_shutting_down: inactive connection";
    self # check_for_connect();
    let linger_timeout = 
      if need_linger then linger else 0.0 in
    let wid = Unixqueue.new_wait_id esys in
    let (op, tmo) =
      if linger_timeout = 0.0 then
	(Unixqueue.Wait wid, 0.0)
	(Unixqueue.Wait_in fd, linger_timeout) in
    Unixqueue.add_resource esys group (op,tmo);
    shutting_down <- Some(when_done, op);
    disconnecting <- None;
    dlogr (fun () ->
	       "start_shutting_down socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd))

  method cancel_shutting_down () =
    match shutting_down with
      | None ->
      | Some (f_when_done, _) ->
	  self # really_cancel_shutting_down ();
	    f_when_done (Some Cancelled)

  method private really_cancel_shutting_down () =
    match shutting_down with
      | None -> 
      | Some (_, op) ->
	  Unixqueue.remove_resource esys group op;
	  shutting_down <- None;
	  dlogr (fun () ->
		     "cancel_shutting_down \
                        socket_multiplex_controller mplex=%d fd=%Ld"
		     ( self) (Netsys.int64_of_file_descr fd))

  method private check_for_connect() =
    if not have_handler then (
      Unixqueue.add_handler esys group (fun _ _ -> self # handle_event);
      have_handler <- true
    disconnecting <- None

  method private check_for_disconnect() =
    if reading = `None && writing = `None && writing_eof = None && 
         shutting_down = None && disconnecting = None then
	     let wid = Unixqueue.new_wait_id esys in
	     let disconnector = Unixqueue.Wait wid in
	     Unixqueue.add_event esys (Unixqueue.Timeout(group,disconnector));
	     disconnecting <- Some disconnector

  method private notify_rd f_when_done exn_opt n =
    self # really_cancel_reading();
    dlogr (fun () ->
	       "input_done \
                socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd));
      (f_when_done exn_opt) n

  method private notify_wr f_when_done exn_opt n =
    self # really_cancel_writing();
    dlogr (fun () ->
	       "output_done \
                socket_multiplex_controller mplex=%d fd=%Ld"
	       ( self) (Netsys.int64_of_file_descr fd));
      (f_when_done exn_opt) n

  method private handle_event ev =
    match ev with
      | Unixqueue.Input_arrived(g, _) when g = group ->
	  dlogr (fun () ->
		     "input_event \
                        socket_multiplex_controller mplex=%d fd=%Ld"
		     ( self) (Netsys.int64_of_file_descr fd));
	  ( match reading with
	      | `None -> ()
	      | `String (f_when_done, peek, _, _, _)
	      | `Mem    (f_when_done, peek, _, _, _) -> (
		    rcvd_from <- None;
		    let n = 
		      match reading with
			| `None -> assert false
			| `String(_,_, s, pos, len) -> (
			    match fd_style with
			      | `Recv_send(_,a) ->
				  let n = Unix.recv fd s pos len [] in
				  rcvd_from <- Some a;
			      | `Recvfrom_sendto ->
				  let (n, a) = Unix.recvfrom fd s pos len [] in
				  rcvd_from <- Some a;
			      | _ ->
				  Netsys.gread fd_style fd s pos len
			| `Mem(_,_, m, pos, len) -> (
			    match fd_style with
			      | `Recv_send(_,a) ->
				  let n = 
				    Netsys_mem.mem_recv fd m pos len [] in
				  rcvd_from <- Some a;
			      | `Recv_send_implied ->
				  Netsys_mem.mem_recv fd m pos len []
			      | `Read_write ->
				  Netsys_mem.mem_read fd m pos len
			      | _ ->
				  assert false
			  )  in
		    if n = 0 then (
		      read_eof <- true;
		      need_linger <- false;
		      self # notify_rd f_when_done (Some End_of_file) 0
		      self # notify_rd f_when_done None n
		    | Unix.Unix_error(Unix.EAGAIN,_,_)
		    | Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
		    | Unix.Unix_error(Unix.EINTR,_,_) ->
		    | error ->
			self # notify_rd f_when_done (Some error) 0
	  ( match shutting_down with
	      | Some (f_when_done, op) when op = Unixqueue.Wait_in fd ->
		  let exn_opt, notify =
		      Netsys.gshutdown fd_style fd Unix.SHUTDOWN_ALL;
		      (None, true)
		      | Unix.Unix_error(Unix.EAGAIN,_,_)
		      | Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
		      | Unix.Unix_error(Unix.EINTR,_,_) ->
			  (None, false)
		      | Unix.Unix_error(Unix.ENOTCONN,_,_) ->
			  (None, true)
		      | Unix.Unix_error(Unix.EPERM,_,_) ->
			  (None, true)
		      | error ->
			  (Some error, true)
		  if notify then (
		    self # really_cancel_shutting_down();
		    read_eof <- true;
		    wrote_eof <- true;
		    dlogr (fun () ->
			       "shutdown_done \
                                  socket_multiplex_controller mplex=%d fd=%Ld"
			       ( self) (Netsys.int64_of_file_descr fd));
		      f_when_done exn_opt
	      | _ -> ()

      | Unixqueue.Output_readiness(g, _) when g = group ->
	  dlogr (fun () ->
		     "output_event \
                        socket_multiplex_controller mplex=%d fd=%Ld"
		     ( self) (Netsys.int64_of_file_descr fd));
	  ( match writing with
	      | `None -> ()
	      | `String (f_when_done, _, _, _)
	      | `Mem    (f_when_done, _, _, _) -> (
		    let n = 
		      match writing with
			| `None -> assert false
			| `String(_, s, pos, len) -> (
			    match fd_style with
			      | `Recvfrom_sendto ->
				  ( match send_to with
				      | None ->
					  failwith "socket_multiplex_controller: Unknown receiver of message to send"
				      | Some a ->
					  Unix.sendto fd s pos len [] a
			      | _ ->
				  Netsys.gwrite fd_style fd s pos len 
			| `Mem(_, m, pos, len) -> (
			    match fd_style with
			      | `Recv_send _ ->
				  Netsys_mem.mem_send fd m pos len []
			      | `Read_write ->
				  Netsys_mem.mem_write fd m pos len
			      | _ ->
				  assert false
			  ) in
		    self # notify_wr f_when_done None n
		    | Unix.Unix_error(Unix.EAGAIN,_,_)
		    | Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
		    | Unix.Unix_error(Unix.EINTR,_,_) ->
		    | error ->
			self # notify_wr f_when_done (Some error) 0
	  ( match writing_eof with
	      | None -> ()
	      | Some f_when_done ->
		  let exn_opt, notify =
		      if not wrote_eof then (
			Netsys.gshutdown fd_style fd Unix.SHUTDOWN_SEND;
			if not read_eof then need_linger <- true;
			wrote_eof <- true
		      (None, true)
		      | Unix.Unix_error(Unix.EAGAIN,_,_)
		      | Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
		      | Unix.Unix_error(Unix.EINTR,_,_) ->
			  (None, false)
		      | Netsys.Shutdown_not_supported ->
			  (None, true)
		  if notify then (
		    self # really_cancel_writing();
		    dlogr (fun () ->
			       "output_eof_done \
                                  socket_multiplex_controller mplex=%d fd=%Ld"
			       ( self) (Netsys.int64_of_file_descr fd));
		      f_when_done exn_opt

      | Unixqueue.Timeout (g, op) when g = group ->
	  (* Note: The following is incompatible with [once] because we
           * always accept timeout events!
	  dlogr (fun () ->
		     "other_event \
                        socket_multiplex_controller mplex=%d fd=%Ld"
		     ( self) (Netsys.int64_of_file_descr fd));
	  ( match shutting_down with
	      | Some (f_when_done, op') when op = op' ->
		  let exn_opt, notify =
		      match fd_style with
			| `W32_pipe ->
			    let ph = get_ph() in
			    Netsys_win32.pipe_shutdown ph;
			    (None, true)
			| _ ->
			    Unix.shutdown fd 
			      (if wrote_eof then Unix.SHUTDOWN_RECEIVE else
			    (None, true)
		      | Unix.Unix_error(Unix.EAGAIN,_,_)
		      | Unix.Unix_error(Unix.EWOULDBLOCK,_,_)
		      | Unix.Unix_error(Unix.EINTR,_,_) ->
			  assert false  (* Not documented in man pages *)
		      | Unix.Unix_error(Unix.ENOTCONN,_,_) ->
			  (None, true)
		      | error ->
			  (Some error, true)
		  if notify then (
		    self # really_cancel_shutting_down();
		    read_eof <- true;
		    wrote_eof <- true;
		    dlogr (fun () ->
			       "shutdown_done \
                                  socket_multiplex_controller mplex=%d fd=%Ld"
			       ( self) (Netsys.int64_of_file_descr fd));
		      f_when_done exn_opt
	      | _ -> ()
	  ( match disconnecting with
	      | Some op' when op = op' ->
		  disconnecting <- None;
		  have_handler <- false;
		  raise Equeue.Terminate

	      | _ -> ()

      | _ ->
	  raise Equeue.Reject

  method inactivate() =
    dlogr (fun () ->
	       "inactivate \
                  socket_multiplex_controller mplex=%d fd=%Ld alive=%b \
	       ( self) (Netsys.int64_of_file_descr fd) alive
    if alive then (
      alive <- false;
      self # really_cancel_reading();
      self # really_cancel_writing();
      self # really_cancel_shutting_down();
      disconnecting <- None;
      have_handler <- false;
      Unixqueue.clear esys group;
      if close_inactive_descr then (
	Netsys.gclose fd_style fd
 	(* It is important that Unix.close (or substitute) is the very
           last action. From here on, a thread running in parallel can
           allocate this descriptor again, so it is essential that there
           are no references anymore to it when the old descriptor is closed.

  method event_system = esys


let create_multiplex_controller_for_connected_socket 
       ?close_inactive_descr ?preclose ?supports_half_open_connection fd esys =
  let mplex = 
    new socket_multiplex_controller
      ?close_inactive_descr ?preclose ?supports_half_open_connection fd esys in
  (mplex :> multiplex_controller)

let create_multiplex_controller_for_datagram_socket 
       ?close_inactive_descr ?preclose fd esys =
  let mplex = 
    new socket_multiplex_controller
      ?close_inactive_descr ?preclose ~supports_half_open_connection:false 
      fd esys in
  (mplex :> datagram_multiplex_controller)

class output_async_descr ~dst ?buffer_size ?(close_dst=true) esys =
  (* Map to output_async_mplex. Be careful not to depend on socket
   * functionaliy (esp. shutdown).
  let mplex = create_multiplex_controller_for_connected_socket dst esys in
  let shutdown ach mplex _ =
    if close_dst then Unix.close dst
    ~onshutdown:(`Action shutdown)

class input_async_descr ~src ?buffer_size ?(close_src=true) esys =
  (* Map to input_async_mplex. Be careful not to depend on socket
   * functionaliy (esp. shutdown).
  let mplex = create_multiplex_controller_for_connected_socket src esys in
  let shutdown ach mplex _ =
    if close_src then Unix.close src
    ~onshutdown:(`Action shutdown)

type copy_task =
    [ `Unidirectional of (Unix.file_descr * Unix.file_descr)
    | `Uni_socket of (Unix.file_descr * Unix.file_descr)
    | `Bidirectional of (Unix.file_descr * Unix.file_descr)
    | `Tridirectional of (Unix.file_descr * Unix.file_descr * Unix.file_descr)

class copier (copy_task : copy_task) ues : [unit] engine =
  val mutable engines = []
  val mutable last_eng_states = []
  val mutable last_count = 0

    ( match copy_task with
	  `Unidirectional(fd1, fd2) ->
	    self # init_unidirectional fd1 fd2

	| `Uni_socket(fd1, fd2) ->
	    self # init_uni_socket fd1 fd2

	| `Bidirectional(fd1, fd2) ->
	    self # init_tridirectional true fd1 fd2 fd2
	| `Tridirectional(fd1, fd2, fd3) ->
	    self # init_tridirectional false fd1 fd2 fd3
	| _ ->
	    assert false
    last_eng_states <- (fun eng -> eng # state) engines;

  method private init_unidirectional fd1 fd2 =
    (* This is quite simple. fd2_ch is an output channel
     * writing data to fd2. fd1_rcv is a receiver transferring
     * data from fd1 to fd2_ch. If fd1_rcv is at EOF, it will
     * close fd1, and close fd2_ch. fd2_ch closes fd2 after
     * it has written all buffered data.
    let fd2_ch = new output_async_descr 
		   ues in
    let fd1_rcv = new receiver
		    ~dst:(fd2_ch :> async_out_channel)
		    ues in
    engines <- [ fd1_rcv;
		 (fd2_ch :> unit engine);

  method private init_uni_socket fd1 fd2 =
    (* Here, we have to modify the EOF behaviour. First,
     * fd1_rcv must not close src. Of course, it must
     * close the output channel fd2_ch, otherwise the
     * channel would not know that it is at the end of 
     * the data stream. However, fd2_ch must not close
     * dst; instead we catch the EOF situation, and
     * shutdown the socket.
    let fd2_ch = new output_async_descr 
		   ues in
    let fd1_rcv = new receiver
		    ~dst:(fd2_ch :> async_out_channel)
		    ues in
    when_state ~is_done:(fun () -> 
			   Unix.shutdown fd2 Unix.SHUTDOWN_SEND)
    engines <- [ fd1_rcv;
		 (fd2_ch :> unit engine);

  method private init_tridirectional bi_case fd1 fd2 fd3 =
    (* Basically, we have two `Uni_socket copiers where one copier
     * transfers data into the reverse direction as the other
     * copier. Additionally, we have to close the descriptors
     * when work is done, either successfully or with error.
    (* bi_case: fd2 = fd3 is assumed, and fd2 must be a socket

    (* Copy fd1 to fd2: *)
    let fd2_ch = new output_async_descr 
		   ues in
    let fd1_rcv = new receiver
		    ~dst:(fd2_ch :> async_out_channel)
		    ues in
    (* Copy fd3 to fd1: *)
    let fd1_ch = new output_async_descr 
		   ues in
    let fd3_rcv = new receiver
		    ~dst:(fd1_ch :> async_out_channel)
		    ues in
    (* Check state: *)
    let fd1_eof = ref false in  (* whether output to fd1 @ eof *)
    let fd1_closed = ref false in
    let fd2_eof = ref false in  (* whether output to fd2 @ eof *)
    let fd2_closed = ref false in
    let fd3_closed = ref false in
    let full_close _ =
      if not !fd1_closed then
	Unix.close fd1;
      fd1_eof := true;
      fd1_closed := true;

      if not !fd2_closed then (
	Unix.close fd2;
	if bi_case then fd3_closed := true;
      fd2_eof := true;
      fd2_closed := true;

      if not !fd3_closed then 
	Unix.close fd3;
      fd3_closed := true;
    let half_close_fd1() =
      if !fd2_eof then (
      ) else (
        if not !fd1_eof then
	  Unix.shutdown fd1 Unix.SHUTDOWN_SEND;
	fd1_eof := true
      ) in
    let half_close_fd2() =
      if !fd1_eof then (
      ) else (
        if not !fd2_eof then (
	  if bi_case then
	    Unix.shutdown fd2 Unix.SHUTDOWN_SEND
	  else (
	    Unix.close fd2;
	    fd2_closed := true;
	fd2_eof := true
      ) in

    when_state ~is_done:half_close_fd2
               (fd2_ch :> 'a engine);
    when_state ~is_done:half_close_fd1

    engines <- [ fd1_rcv;
		 (fd2_ch :> unit engine);
		 (fd1_ch :> unit engine);

  method state =
    (* We inspect the states of all engines. If there is an engine
     * in error state, this state will be returned (Broken_communication
     * has lower priority than other errors). Otherwise: If there is
     * an aborted engine, we return that the copier is aborted.
     * Otherwise: If there is at least one working engine, we return
     * working state. The last case is that all engines are done, and
     * we return done.
     * Note that the progress meter for `Working is emulated, and
     * the more often [state] is invoked, the more frequent the progress
     * meter is increased. But it is only increased if at least one
     * engine has made some progress.
     * CHECK: This seems to be a generalization of the sync_engine above.
     * Maybe we want it as basic engine construct?

    let eng_states = (fun eng -> eng # state) engines in

    let our_state = ref(`Done()) in

      (fun st ->
	 match st with
	     `Done _ -> ()
	   | `Working _ ->
	       ( match !our_state with
		     `Done _ -> our_state := `Working 0
		   | _       -> ()
	   | `Aborted ->
	       ( match !our_state with
		     `Done _ 
		   | `Working _ -> our_state := `Aborted
		   | _          -> ()
	   | `Error err ->
	       ( match !our_state with
		     `Done _
		   | `Working _ 
		   | `Aborted
		   | `Error Broken_communication ->
		       our_state := st
		   | `Error _ ->

    ( match !our_state with
	  `Working _ ->
	    if eng_states <> last_eng_states then 
	      last_count <- last_count + 1;
	    our_state := `Working last_count
	| _ ->
    last_eng_states <- eng_states;


  method abort () =
    (* Simply abort all engines *)
      (fun eng -> eng # abort())
      (* CHECK: Hopefully, no engine goes to an error state because the
       * other engine aborts...

  method request_notification f =
    (* Simply forward the request to all engines *)

    let enabled = ref true in
    (* After the first notification has disabled further notifications, 
     * it must be ensured that no more notifications will happen.
     * [enabled] is [true] as long as notifications are enabled.

    let f'() = 
      !enabled && 
      ( let enabled' = f() in
	enabled := !enabled && enabled';

      (fun eng -> eng # request_notification f')

  method event_system =  ues


type inetspec =
  [ `Sock_inet of (Unix.socket_type * Unix.inet_addr * int)
  | `Sock_inet_byname of (Unix.socket_type * string * int)

type sockspec =
  [ `Sock_unix of (Unix.socket_type * string)
  | inetspec

type connect_address =
    [ `Socket of sockspec * connect_options
    | `Command of string * (int -> Unixqueue.event_system -> unit)
    | `W32_pipe of Netsys_win32.pipe_mode * string

and connect_options =
    { conn_bind : sockspec option;


let default_connect_options = { conn_bind = None } ;;

type connect_status =
    [ `Socket of Unix.file_descr * sockspec
    | `Command of Unix.file_descr * int
    | `W32_pipe of Unix.file_descr

let client_endpoint =
      `Socket(fd,_) -> fd
    | `Command(fd,_) -> fd
    | `W32_pipe fd -> fd

let client_socket = client_endpoint

class type client_endpoint_connector = object
  method connect : connect_address -> 
                   Unixqueue.event_system ->
		     connect_status engine
end ;;

class type client_socket_connector = client_endpoint_connector

let addr_of_name name =
  try Unix.inet_addr_of_string name
      Failure _ ->
	let entry = Unix.gethostbyname name in (* may fail *)

let getsockspec stype s =
  match Unix.getsockname s with
      Unix.ADDR_UNIX path ->
	`Sock_unix(stype, path)
    | Unix.ADDR_INET(addr, port) ->
	`Sock_inet(stype, addr, port)

let getpeerspec stype s =
  match Netsys.getpeername s with
      Unix.ADDR_UNIX path ->
	`Sock_unix(stype, path)
    | Unix.ADDR_INET(addr, port) ->
	`Sock_inet(stype, addr, port)

let getinetpeerspec stype s =
  match Netsys.getpeername s with
      Unix.ADDR_UNIX path ->
    | Unix.ADDR_INET(addr, port) ->
	Some(`Sock_inet(stype, addr, port))

let getconnerror s = (* Call this after getpeerspec raised ENOTCONN *)
    let b = String.make 1 ' ' in
    let _ = Unix.recv s b 0 1 [] in
    assert false
    | error -> error

type xdom =
    [ `Socket of socket_domain
    | `Pipe

(* - new impl, not yet ready *)
class direct_socket_connector() : client_socket_connector =
object (self)
  method connect connaddr ues =

    let const_eng v =
      new epsilon_engine(`Done v) ues in

    let sock_prim_data_eng sockspec =
      match sockspec with
	| `Sock_unix(stype, path) ->
	    let addr = Unix.ADDR_UNIX path in
	    const_eng(`Socket Unix.PF_UNIX, stype, addr)
	| `Sock_inet(stype, ip, port) ->
	    let dom = Netsys.domain_of_inet_addr ip in
	    let addr = Unix.ADDR_INET(ip,port) in
	    const_eng(`Socket dom, stype, addr)
	| `Sock_inet_by_name(stype, name, port) ->
	    let ip_opt = XXX in
	    ( match ip_opt with
		| Some ip ->
		    let dom = Netsys.domain_of_inet_addr ip in
		    let addr = Unix.ADDR_INET(ip, port) in
		    const_eng(`Socket dom, stype, addr)
		| None ->
		    (* Now need a real host lookup *)
		    let r = Uq_resolver.current_resolver() in
		    let eng = r # host_by_name name ues in
		    new map_engine
		      ~map_done:(fun he ->
				   let dom = he.Unix.h_addrtype in
				   let ip = he.Unix.h_addr_list.(0) in
				   let addr = Unix.ADDR_INET(ip, port) in
				   `Done(`Socket dom, stype, addr)

	| `Pipe XXX ->

    let sock_data_eng sockspec opts =
      (* Creates an engine that finds out the relevant data to create
         the socket
      match opts.conn_bind with
	| None ->
	    new map_engine
	      ~map_done:(fun (dom, stype, addr) ->
			   `Done(dom, stype, addr, None))
	      (sock_prim_data_eng sockspec)
	| Some bind_sockspec ->
	    (* Run two resolver engines in parallel *)
	    let d1_eng = sock_prim_data_eng sockspec in
	    let d2_eng = sock_prim_data_eng bind_sockspec in
	    new map_engine
	      ~map_done:(fun ((dom1, stype1, addr1), (dom2, _, addr2)) ->
			   if dom1 <> dom2 then
			     `Error(Failure("direct_socket_connector: Socket domain mismatch"))
			     `Done(dom1, stype1, addr1, Some addr2)
	      (new sync_engine d1_eng d2_eng)

    let sock_data_check sockspec opts =
      (* Check whether params are valid *) 
      match (sockspec, opts.conn_bind) with
	| `Sock_unix(stype,_), None -> ()
	| `Sock_unix(stype,_), Some(`Sock_unix(stype, _)) -> ()
	| `Sock_inet(stype,_,_), None -> ()
	| `Sock_inet_byname(stype,_,_), None -> ()
	| `Sock_inet(stype,_,_), Some(`Sock_inet(stype,_,_)) -> ()
	| `Sock_inet(stype,_,_), Some(`Sock_inet_byname(stype,_,_)) -> ()
	| `Sock_inet_byname(stype,_,_), Some(`Sock_inet(stype,_,_)) -> ()

	| `Sock_inet_byname(stype,_,_), Some(`Sock_inet_byname(stype,_,_)) -> ()
	| `Pipe(_,_), None -> ()
	| `Pipe(_,_), Some(`Pipe(_,_)) -> ()
	| _ ->
	    invalid_arg "direct_socket_connector: socket type mismatch"

    let create_eng sockspec (dom, stype, addr, bind_addr_opt) ->
      (* Create and connect the socket s, and return the connect engine
      match dom with
	| `Socket sockdom ->
	    let connect_tried = ref false in
	    let s = Unix.socket sockdom 0 in
	    ( try
		Netsys.set_close_on_exec s;
		Unix.set_nonblock s;
		( match bind_addr_opt with
		    | None -> ()
		    | Some bind_addr ->
			Unix.bind s bind_addr
		connect_tried := true;
		Unix.connect s addr;
		Netsys.connect_check s;
		let fake_conn_eng =
		  const_eng(`Socket(s, getsockspec stype s)) in
		  ~is_aborted:(fun _ -> Unix.close s)
		| Unix.Unix_error((Unix.EINPROGRESS|Unix.EWOULDBLOCK),_,_) 
		    when !connect_tried -> 
		    (* Note: Win32 returns EWOULDBLOCK instead of EINPROGRESS *)
		    (* Wait until the socket is writeable. Win32 reports connect
                       errors by signaling that out-of-band data can be received
                       (funny, right?), so we wait for that condition, too.
		    let poll_eng = 
		      new poll_engine [ Unixqueue.Wait_out s, (-1.0);
					Unixqueue.Wait_oob s, (-1.0)
				      ] ues in
		    let conn_eng =
		      new map_engine
			~map_done:(fun _ ->
				       Netsys.connect_check s;
				       `Done(getsockspec stype s)
				       | error -> 
					   Unix.close s; `Error error
			~map_error:(fun e ->
				      Unix.close s; `Error e)
			~map_aborted:(fun _ ->
					Unix.close s; `Aborted)
			(poll_eng :> Unixqueue.event engine) in
		| e ->
		    Unix.close s; raise e

	| `Pipe ->
	    ( match sockspec with
		| `Pipe(mode,name) ->
		    let ph = Netsys_win32.pipe_connect name mode in
		    let s = Netsys_win32.pipe_descr ph in
		    (s, None)
		      (* CHECK: do we need
		         Netsys_win32.pipe_shutdown ph
		| _ ->
		    assert false

    match connaddr with
      | `Socket(sockspec,opts) ->
	  (* Check on wrong arguments: *)
	  sock_data_check sockspec opts;
	  (* Create and use the engines: *)
	  let data_eng = sock_data_eng sockspec opts in
	  new seq_engine
	    (fun sock_data ->
	       create_eng sockspec sock_data)
      | _ ->
	  raise Addressing_method_not_supported

(* Old impl with sync name lookup *)
  class direct_connector() : client_endpoint_connector =
  object (self)
    method connect connaddr ues =

      let setup_socket s stype dest_addr opts =
	  Netsys.set_close_on_exec s;
	  Unix.set_nonblock s;
	  ( match opts.conn_bind with
	      | Some bind_spec ->
		  ( match bind_spec with
		      | `Sock_unix(stype', path) ->
			  if stype <> stype' then 
			    invalid_arg "Socket type mismatch";
			  Unix.bind s (Unix.ADDR_UNIX path)
		      | `Sock_inet(stype', addr, port) ->
			  if stype <> stype' then 
			    invalid_arg "Socket type mismatch";
			  Unix.bind s (Unix.ADDR_INET(addr,port))
		      | `Sock_inet_byname(stype', name, port) ->
			  if stype <> stype' then 
			    invalid_arg "Socket type mismatch";
			  let addr = addr_of_name name in
			  Unix.bind s (Unix.ADDR_INET(addr,port))
	      | None -> ()
	  Unix.connect s dest_addr;
	  (s, stype, true)
	    Unix.Unix_error((Unix.EINPROGRESS|Unix.EWOULDBLOCK),_,_) -> 
		(* Note: Win32 returns EWOULDBLOCK instead of EINPROGRESS *)
	  | error ->
	      (* Remarks:
               * We can get here EAGAIN. Unfortunately, this is a kind of
               * "catch-all" error for Unix.connect, e.g. you can get it when
               * you are run out of local ports, or if the backlog limit is
               * exceeded. It is totally unclear what to do in this case,
               * so we do not handle it here. The user is supposed to connect
               * later again.
	      Unix.close s; raise error

      match connaddr with
	  `Socket(sockspec,opts) ->
	    let (s, stype, is_connected) = 
	      match sockspec with
		| `Sock_unix(stype, path) ->
		    let s = Unix.socket Unix.PF_UNIX stype 0 in
		    setup_socket s stype (Unix.ADDR_UNIX path) opts;
		| `Sock_inet(stype, addr, port) ->
		    let dom = Netsys.domain_of_inet_addr addr in
		    let s = Unix.socket dom stype 0 in
		    setup_socket s stype (Unix.ADDR_INET(addr,port)) opts;
		| `Sock_inet_byname(stype, name, port) ->
		    let addr = addr_of_name name in
		    let dom = Netsys.domain_of_inet_addr addr in
		    let s = Unix.socket dom stype 0 in
		    setup_socket s stype (Unix.ADDR_INET(addr,port)) opts;
	    let conn_eng =
	      if is_connected then (
		let status =
		    Netsys.connect_check s;
		    `Done(`Socket(s, getsockspec stype s))
		    | error -> 
			`Error error in
		new epsilon_engine status ues
	      else (
		(* Now wait until the socket is writeable. Win32 reports connect
                   errors by signaling that out-of-band data can be received
                   (funny, right?), so we wait for that condition, too.
		let e = new poll_engine [ Unixqueue.Wait_out s, (-1.0);
					  Unixqueue.Wait_oob s, (-1.0)
					] ues in
		new map_engine
		  ~map_done:(fun _ ->
				 Netsys.connect_check s;
				 `Done(`Socket(s, getsockspec stype s))
				 | error -> 
				     `Error error
		  (e :> Unixqueue.event engine)
	      ) in
	    (* It is possible that somebody aborts conn_eng. In this case,
             * the socket must be closed. Same when we enter an error state.
	      ~is_aborted:(fun () -> Unix.close s)
	      ~is_error:(fun _ -> Unix.close s)
	    (* conn_eng is what the user sees: *)

	| `W32_pipe(mode,name) ->
	    let ph = Netsys_win32.pipe_connect name mode in
	    let s = Netsys_win32.pipe_descr ph in
	    let status = `Done(`W32_pipe s) in
	    let conn_eng = new epsilon_engine status ues in
	    (* It is possible that somebody aborts conn_eng. In this case,
             * the descr must be closed. The error state cannot be reached.
	    let close() =
	      Netsys_win32.pipe_shutdown ph;
	      Unix.close s
	      ~is_aborted:(fun () -> close())
	    (* conn_eng is what the user sees: *)

	| _ ->
	    raise Addressing_method_not_supported
  end ;;

(* TODO: Close u, v on abort/error *)
(* TODO: port to Win32 *)
class command_connector () : client_endpoint_connector =
  method connect connaddr ues =
    match connaddr with
	`Command (cmdstr,cmdcb) ->
          let (u,v) = Unix.socketpair Unix.PF_UNIX Unix.SOCK_STREAM 0 in

	  Unix.set_nonblock u;
	  Unix.set_nonblock v;
          Netsys.set_close_on_exec u;
          Netsys.set_close_on_exec v;
          let (s_in_sub, s_in) = Unix.pipe() in
          let (s_out, s_out_sub) = Unix.pipe() in
          let _e1 = new copier (`Tridirectional(v, s_in, s_out)) ues in
	  Netsys.set_close_on_exec s_in;
          Netsys.set_close_on_exec s_out;

	  let e2 = new epsilon_engine (`Done ()) ues in
	  new map_engine
	    ~map_done:(fun _ ->
			 let pid =
			     [| "/bin/sh"; "-c"; cmdstr |]
			     s_in_sub s_out_sub Unix.stderr in
			 (* CHECK: Are the other descriptors closed? *)
			 Unix.close s_in_sub;
			 Unix.close s_out_sub;
			 cmdcb pid ues;
			 `Done (`Command(u, pid)))

      | _ ->
	  raise Addressing_method_not_supported

	    (* CHECK: Maybe we want a combinator for engines that:
	     * - Is only `Done when all engines are done
	     * - Goes to `Error when one engine is `Error
	     * - Goes to `Aborted when one engine is `Aborted
	     * See copier. It implements a similar idea.

let connector ?proxy connaddr ues =
  let eff_proxy =
    match proxy with
	Some p -> ( p :> client_socket_connector )
      | None   -> 
	  ( match connaddr with
	      | `Socket _ 
	      | `W32_pipe _ ->
		  new direct_connector()
	      | `Command _ ->
		  new command_connector()
  eff_proxy # connect connaddr ues

type listen_address =
    [ `Socket of sockspec * listen_options
    | `W32_pipe of Netsys_win32.pipe_mode * string * listen_options

and listen_options =
    { lstn_backlog : int;
      lstn_reuseaddr : bool;

let default_listen_options =
  { lstn_backlog = 20;
    lstn_reuseaddr = false;

class type server_endpoint_acceptor = object
  method server_address : connect_address
  method multiple_connections : bool
  method accept : unit -> (Unix.file_descr * inetspec option) engine
  method shut_down : unit -> unit

class type server_socket_acceptor = server_endpoint_acceptor

class type server_endpoint_listener = object
  method listen : listen_address ->
                  Unixqueue.event_system ->
		    server_endpoint_acceptor engine

class type server_socket_listener = server_endpoint_listener

class direct_acceptor ?(close_on_shutdown=true) ?(preclose=fun()->())
                      fd ues : server_socket_acceptor =
  let fd_style = Netsys.get_fd_style fd in
  let pipe_objects = lazy (
    let psrv = Netsys_win32.lookup_pipe_server fd in
    let cn_ev = Netsys_win32.pipe_connect_event psrv in
    let cn_ev_descr = Netsys_win32.event_descr cn_ev in
    (psrv, cn_ev, cn_ev_descr)
  ) in
  let () =
    match fd_style with
      | `Read_write | `W32_pipe | `W32_event ->
	  failwith "Uq_engines.direct_acceptor: endpoint not supported"
      | `W32_pipe_server ->
	  ignore(Lazy.force pipe_objects)
      | _ -> () in
  val mutable acc_engine = None
			     (* The engine currently accepting connections *)

  method server_address = 
    match fd_style with
      | `W32_pipe_server ->
	  let (psrv, _, _) = Lazy.force pipe_objects in
	  let name = Netsys_win32.pipe_server_name psrv in
	  let mode = Netsys_win32.pipe_server_mode psrv in
	  let mode' = Netsys_win32.rev_mode mode in
      | `Recv_send _ | `Recvfrom_sendto | `Recv_send_implied ->
	  `Socket(getsockspec Unix.SOCK_STREAM fd,
      | _ ->
	  assert false

  method multiple_connections = true

  method accept () =
    (* Poll until the socket becomes readable, then accept it *)
    if acc_engine <> None then
      failwith "Uq_engines.direct_acceptor: Already waiting for connection";

    let socket_accept_eng() =
      let eng = new poll_engine [ Unixqueue.Wait_in fd, (-1.0) ] ues in
      new map_engine
	~map_done:(fun _ ->
		       let (fd',_) = Unix.accept fd in
		       Unix.set_nonblock fd';
		       (* There seem to be buggy kernels out there where
                        * [accept] does not always return a connected socket.
                        * We get ENOTCONN for [getpeername] then.
                        * Who does that? The super hackers at SW-Soft with
                        * their Virtuozzo shit.
		       let ps =
			 try getinetpeerspec Unix.SOCK_STREAM fd'
			   | Unix.Unix_error(Unix.ENOTCONN,_,_) as e ->
			       Unix.close fd';
			       raise e in
		       acc_engine <- None;
		       `Done(fd', ps)
		       | Unix.Unix_error( (Unix.EAGAIN | Unix.EINTR | 
					       Unix.ENOTCONN), _, _) ->
			   eng # restart();
			   `Working 0
		       | error ->
			   `Error error

    let w32_pipe_accept_eng() =
      let (psrv, cn_ev, cn_ev_descr) = Lazy.force pipe_objects in
      let eng = 
	new poll_engine [ Unixqueue.Wait_in cn_ev_descr, (-1.0) ] ues in
      new map_engine
	~map_done:(fun _ ->
		       let pipe = Netsys_win32.pipe_accept psrv in
		       let pipe_fd = Netsys_win32.pipe_descr pipe in
		       acc_engine <- None;
		       `Done(pipe_fd, None)
		       | Unix.Unix_error( (Unix.EAGAIN | Unix.EINTR | 
					       Unix.ENOTCONN), _, _) ->
			   eng # restart();
			   `Working 0
		       | error ->
			   `Error error

    let acc_eng = 
      match fd_style with
	| `Recv_send _ | `Recvfrom_sendto | `Recv_send_implied ->
	| `W32_pipe_server ->
	| _ ->
	    assert false in
      ~is_error:(fun x -> acc_engine <- None)
      ~is_aborted:(fun () -> acc_engine <- None)

    acc_engine <- Some acc_eng;

  method shut_down() =
    if close_on_shutdown then (
      ( match fd_style with
	  | `Recv_send _ | `Recvfrom_sendto | `Recv_send_implied ->
	      Unix.close fd
	  | `W32_pipe_server ->
	      let (psrv, _, cn_ev_descr) = Lazy.force pipe_objects in
	      Unix.close cn_ev_descr;
	      Netsys_win32.pipe_shutdown_server psrv;
	      Unix.close fd
	  | _ ->
	      assert false
    (* else: if not close_on_shutdown, there is no portable way of
       achieving that further connection attempts are refused by the
       kernel. listen(fd,0) works on some systems, but not on all.
    match acc_engine with
	None -> 
      | Some acc -> 
	  acc # abort()


class direct_socket_acceptor fd esys = direct_acceptor fd esys

class direct_listener () : server_socket_listener =
  method listen lstnaddr ues =
    let accept fd =
      let acc = new direct_acceptor fd ues in
      let eng = new epsilon_engine (`Done acc) ues in
	~is_aborted:(fun () -> acc # shut_down())
	~is_error:(fun _ -> acc # shut_down())

    match lstnaddr with
	`Socket (sockspec, opts) ->
	  ( match sockspec with
		`Sock_unix(stype, path) ->
		  let s = Unix.socket Unix.PF_UNIX stype 0 in
		  Unix.set_nonblock s;
		  if opts.lstn_reuseaddr then 
		    Unix.setsockopt s Unix.SO_REUSEADDR true;
		  Unix.bind s (Unix.ADDR_UNIX path);
		  Unix.listen s opts.lstn_backlog;
		  accept s
	      | `Sock_inet(stype, addr, port) ->
		  let dom = Netsys.domain_of_inet_addr addr in
		  let s = Unix.socket dom stype 0 in
		  Unix.set_nonblock s;
		  if opts.lstn_reuseaddr then 
		    Unix.setsockopt s Unix.SO_REUSEADDR true;
		  Unix.bind s (Unix.ADDR_INET(addr,port));
		  Unix.listen s opts.lstn_backlog;
		  accept s
	      | `Sock_inet_byname(stype, name, port) ->
		  let addr = addr_of_name name in
		  let dom = Netsys.domain_of_inet_addr addr in
		  let s = Unix.socket dom stype 0 in
		  Unix.set_nonblock s;
		  if opts.lstn_reuseaddr then 
		    Unix.setsockopt s Unix.SO_REUSEADDR true;
		  Unix.bind s (Unix.ADDR_INET(addr,port));
		  Unix.listen s opts.lstn_backlog;
		  accept s
      | `W32_pipe (mode, name, opts) ->
	  let backlog = opts.lstn_backlog in
	  let psrv = Netsys_win32.create_local_pipe_server name mode max_int in
	  Netsys_win32.pipe_listen psrv backlog;
	  let fd = Netsys_win32.pipe_server_descr psrv in
	  accept fd
      | _ ->
	  raise Addressing_method_not_supported

let listener ?proxy lstnaddr ues =
  let eff_proxy =
    match proxy with
	Some p -> ( p :> server_socket_listener )
      | None   -> 
	  ( match lstnaddr with
	      | `Socket _ | `W32_pipe _ ->
		  new direct_listener()
  eff_proxy # listen lstnaddr ues

type datagram_type =
    [ `Unix_dgram
    | `Inet_udp
    | `Inet6_udp

class type wrapped_datagram_socket =
  method descriptor : Unix.file_descr
  method sendto : 
    string -> int -> int -> Unix.msg_flag list -> sockspec -> int
  method recvfrom : 
    string -> int -> int -> Unix.msg_flag list -> (int * sockspec)
  method shut_down : unit -> unit
  method datagram_type : datagram_type
  method socket_domain : Unix.socket_domain
  method socket_type : Unix.socket_type
  method socket_protocol : int

class type datagram_socket_provider =
  method create_datagram_socket : datagram_type ->
                                  Unixqueue.event_system ->
                                    wrapped_datagram_socket engine
end ;;

class direct_datagram_socket dgtype (sdom,stype,sproto) 
      : wrapped_datagram_socket =
  let sock = Unix.socket sdom stype sproto in
  let _ = 
    Unix.set_nonblock sock;
    Netsys.set_close_on_exec sock in
  method descriptor = sock
  method sendto s p n flags spec = 
    let sockaddr =
      match spec with
	  `Sock_unix(stype', path) ->
	    if stype <> stype' then invalid_arg "Socket type mismatch";
	    Unix.ADDR_UNIX path
	| `Sock_inet(stype', addr, port) ->
	    if stype <> stype' then invalid_arg "Socket type mismatch";
	| `Sock_inet_byname(stype', name, port) ->
	    if stype <> stype' then invalid_arg "Socket type mismatch";
	    let addr = addr_of_name name in
    Unix.sendto sock s p n flags sockaddr

  method recvfrom s p n flags =
    let (n, sockaddr) = Unix.recvfrom sock s p n flags in
    let sockspec = 
      match sockaddr with
	  Unix.ADDR_UNIX path ->
	    `Sock_unix(stype, path)
	| Unix.ADDR_INET(addr,port) ->
	    `Sock_inet(stype, addr, port)

  method shut_down() =
    Unix.close sock

  method datagram_type = dgtype
  method socket_domain = sdom
  method socket_type = stype
  method socket_protocol = sproto
end ;;

let datagram_provider ?proxy dgtype ues = 
  match proxy with
      Some p ->
	( p :> datagram_socket_provider ) # create_datagram_socket dgtype ues
    | None   -> 
	let (sdom,stype,sproto) =
	  match dgtype with
	      `Unix_dgram -> (Unix.PF_UNIX, Unix.SOCK_DGRAM, 0)
	    | `Inet_udp   -> (Unix.PF_INET, Unix.SOCK_DGRAM, 0)
	    | `Inet6_udp  -> (Unix.PF_INET6, Unix.SOCK_DGRAM, 0)
	let wsock =
	  new direct_datagram_socket dgtype (sdom,stype,sproto) in
	let eng = new epsilon_engine (`Done wsock) ues in

	  ~is_aborted:(fun () -> wsock # shut_down())
	  ~is_error:(fun _ -> wsock # shut_down())


