(* $Id: uq_ssl.ml 1916 2013-10-01 14:24:54Z gerd $ *)
module Debug = struct
let enable = ref false
let dump_data = ref false
end
let dlog = Netlog.Debug.mk_dlog "Uq_ssl" Debug.enable
let dlogr = Netlog.Debug.mk_dlogr "Uq_ssl" Debug.enable
let () =
Netlog.Debug.register_module "Uq_ssl" Debug.enable
open Printf
exception Ssl_error of Ssl.ssl_error
type ssl_socket_state = [ `Unset | `Client | `Server | `Unclean | `Clean ]
let string_of_ssl_error e =
match e with
| Ssl.Error_none ->
"Ssl.Error_none"
| Ssl.Error_ssl ->
"Ssl.Error_ssl"
| Ssl.Error_want_read ->
"Ssl.Error_want_read"
| Ssl.Error_want_write ->
"Ssl.Error_want_write"
| Ssl.Error_want_x509_lookup ->
"Ssl.Error_want_x509_lookup"
| Ssl.Error_syscall ->
"Ssl.Error_syscall"
| Ssl.Error_zero_return ->
"Ssl.Error_zero_return"
| Ssl.Error_want_connect ->
"Ssl.Error_want_connect"
| Ssl.Error_want_accept ->
"Ssl.Error_want_accept"
let string_of_verify_error e =
match e with
| Ssl.Error_v_unable_to_get_issuer_cert ->
"Ssl.Error_v_unable_to_get_issuer_cert"
| Ssl.Error_v_unable_to_get_ctl ->
"Ssl.Error_v_unable_to_get_ctl"
| Ssl.Error_v_unable_to_decrypt_cert_signature ->
"Ssl.Error_v_unable_to_decrypt_cert_signature"
| Ssl.Error_v_unable_to_decrypt_CRL_signature ->
"Ssl.Error_v_unable_to_decrypt_CRL_signature"
| Ssl.Error_v_unable_to_decode_issuer_public_key ->
"Ssl.Error_v_unable_to_decode_issuer_public_key"
| Ssl.Error_v_cert_signature_failure ->
"Ssl.Error_v_cert_signature_failure"
| Ssl.Error_v_CRL_signature_failure ->
"Ssl.Error_v_CRL_signature_failure"
| Ssl.Error_v_cert_not_yet_valid ->
"Ssl.Error_v_cert_not_yet_valid"
| Ssl.Error_v_cert_has_expired ->
"Ssl.Error_v_cert_has_expired"
| Ssl.Error_v_CRL_not_yet_valid ->
"Ssl.Error_v_CRL_not_yet_valid"
| Ssl.Error_v_CRL_has_expired ->
"Ssl.Error_v_CRL_has_expired"
| Ssl.Error_v_error_in_cert_not_before_field ->
"Ssl.Error_v_error_in_cert_not_before_field"
| Ssl.Error_v_error_in_cert_not_after_field ->
"Ssl.Error_v_error_in_cert_not_after_field"
| Ssl.Error_v_error_in_CRL_last_update_field ->
"Ssl.Error_v_error_in_CRL_last_update_field"
| Ssl.Error_v_error_in_CRL_next_update_field ->
"Ssl.Error_v_error_in_CRL_next_update_field"
| Ssl.Error_v_out_of_mem ->
"Ssl.Error_v_out_of_mem"
| Ssl.Error_v_depth_zero_self_signed_cert ->
"Ssl.Error_v_depth_zero_self_signed_cert"
| Ssl.Error_v_self_signed_cert_in_chain ->
"Ssl.Error_v_self_signed_cert_in_chain"
| Ssl.Error_v_unable_to_get_issuer_cert_locally ->
"Ssl.Error_v_unable_to_get_issuer_cert_locally"
| Ssl.Error_v_unable_to_verify_leaf_signature ->
"Ssl.Error_v_unable_to_verify_leaf_signature"
| Ssl.Error_v_cert_chain_too_long ->
"Ssl.Error_v_cert_chain_too_long"
| Ssl.Error_v_cert_revoked ->
"Ssl.Error_v_cert_revoked"
| Ssl.Error_v_invalid_CA ->
"Ssl.Error_v_invalid_CA"
| Ssl.Error_v_path_length_exceeded ->
"Ssl.Error_v_path_length_exceeded"
| Ssl.Error_v_invalid_purpose ->
"Ssl.Error_v_invalid_purpose"
| Ssl.Error_v_cert_untrusted ->
"Ssl.Error_v_cert_untrusted"
| Ssl.Error_v_cert_rejected ->
"Ssl.Error_v_cert_rejected"
| Ssl.Error_v_subject_issuer_mismatch ->
"Ssl.Error_v_subject_issuer_mismatch"
| Ssl.Error_v_akid_skid_mismatch ->
"Ssl.Error_v_akid_skid_mismatch"
| Ssl.Error_v_akid_issuer_serial_mismatch ->
"Ssl.Error_v_akid_issuer_serial_mismatch"
| Ssl.Error_v_keyusage_no_certsign ->
"Ssl.Error_v_keyusage_no_certsign"
| Ssl.Error_v_application_verification ->
"Ssl.Error_v_application_verification"
let () =
Netexn.register_printer
(Ssl_error Ssl.Error_none)
(fun x ->
match x with
| Ssl_error e -> "Uq_ssl.Ssl_error(" ^ string_of_ssl_error e ^ ")"
| _ -> assert false
);
Netexn.register_printer
(Ssl.Connection_error Ssl.Error_none)
(fun x ->
match x with
| Ssl.Connection_error e ->
"Ssl.Connection_error(" ^ string_of_ssl_error e ^ ")"
| _ -> assert false
);
Netexn.register_printer
(Ssl.Accept_error Ssl.Error_none)
(fun x ->
match x with
| Ssl.Accept_error e ->
"Ssl.Accept_error(" ^ string_of_ssl_error e ^ ")"
| _ -> assert false
);
Netexn.register_printer
(Ssl.Read_error Ssl.Error_none)
(fun x ->
match x with
| Ssl.Read_error e -> "Ssl.Read_error(" ^ string_of_ssl_error e ^ ")"
| _ -> assert false
);
Netexn.register_printer
(Ssl.Write_error Ssl.Error_none)
(fun x ->
match x with
| Ssl.Write_error e -> "Ssl.Write_error(" ^ string_of_ssl_error e ^ ")"
| _ -> assert false
);
Netexn.register_printer
(Ssl.Verify_error Ssl.Error_v_out_of_mem)
(fun x ->
match x with
| Ssl.Verify_error e ->
"Ssl.Verify_error(" ^ string_of_verify_error e ^ ")"
| _ -> assert false
)
let string_of_socket_state =
function
| `Unset -> "Unset"
| `Client -> "Client"
| `Server -> "Server"
| `Unclean -> "Unclean"
| `Clean -> "Clean"
class type ssl_multiplex_controller =
object
inherit Uq_engines.multiplex_controller
method ssl_socket : Ssl.socket
method ssl_socket_state : ssl_socket_state
method ssl_connecting : bool
method ssl_accepting : bool
method start_ssl_connecting :
when_done:(exn option -> unit) -> unit -> unit
method start_ssl_accepting :
when_done:(exn option -> unit) -> unit -> unit
method inactivate_no_close : unit -> unit
end
let string_of_tag =
function
| `Connecting -> "Connecting"
| `Accepting -> "Accepting"
| `Reading -> "Reading"
| `Writing -> "Writing"
| `Writing_eof -> "Writing_eof"
| `Shutting_down -> "Shutting_down"
let string_of_exn_opt =
function
| None -> "successful"
| Some x -> Netexn.to_string x
class ssl_mplex_ctrl ?(close_inactive_descr=true)
?(preclose = fun () -> ())
?(initial_state = `Unset)
?timeout
fd ssl_sock esys : ssl_multiplex_controller =
let () = Unix.set_nonblock fd in
let fdi = Netsys.int64_of_file_descr fd in
object(self)
val mutable alive = true (* if false => state in { `Clean, `Unclean } *)
val mutable closed = false
val mutable read_eof = false
val mutable wrote_eof = false
val mutable state = (initial_state : ssl_socket_state)
val mutable connecting = false (* true only in state `Unset *)
val mutable accepting = false (* true only in state `Unset *)
val mutable reading = None (* <> None only in states `Client/`Server *)
val mutable writing = None (* <> None only in states `Client/`Server *)
val mutable writing_eof = None (* <> None only in states `Client/`Server *)
val mutable shutting_down = None (* <> None only in states `Client/`Server *)
val mutable disconnecting = None
val mutable have_handler = false
val mutable pending = []
(* list of pending socket operations *)
val mutable expecting_input = false
val mutable expecting_output = false
val mutable timers = Hashtbl.create 7
val group = Unixqueue.new_group esys
method alive = alive
method ssl_socket = ssl_sock
method ssl_socket_state = state
method ssl_connecting = connecting
method ssl_accepting = accepting
method reading = reading <> None
method writing = writing <> None
method shutting_down = shutting_down <> None
method read_eof = read_eof
method wrote_eof = wrote_eof
method supports_half_open_connection = true
method mem_supported = false
method start_ssl_connecting ~when_done () =
if state <> `Unset then
failwith "#start_connecting: no longer possible in this state";
if connecting || accepting then
failwith "#start_connecting: handshake already in progress";
dlogr
(fun () ->
sprintf "FD %Ld: start_ssl_connecting" fdi);
let when_done exn_opt =
dlogr
(fun () ->
sprintf "FD %Ld: done start_ssl_connecting: %s"
fdi (string_of_exn_opt exn_opt)
);
when_done exn_opt in
self # nonblock_operation
(ref false)
`Connecting
(fun () ->
try
Ssl.connect ssl_sock;
state <- `Client;
connecting <- false;
(false, false, fun () -> when_done None)
with
| Ssl.Connection_error Ssl.Error_want_read ->
(true, false, fun () -> ())
| Ssl.Connection_error Ssl.Error_want_write ->
(false, true, fun () -> ())
| Ssl.Connection_error ssl_err ->
state <- `Unclean;
connecting <- false;
(false, false,
fun () ->
self # inactivate_no_close();
when_done (Some (Ssl_error ssl_err))
)
| err ->
state <- `Unclean;
connecting <- false;
(false, false,
fun () ->
self # inactivate_no_close();
when_done (Some err)
)
)
(fun x ->
self # inactivate_no_close();
when_done (Some x));
connecting <- true
method start_ssl_accepting ~when_done () =
if state <> `Unset then
failwith "#start_accepting: no longer possible in this state";
if connecting || accepting then
failwith "#start_accepting: handshake already in progress";
dlogr
(fun () ->
sprintf "FD %Ld: start_ssl_accepting" fdi);
let when_done exn_opt =
dlogr
(fun () ->
sprintf "FD %Ld: done start_ssl_accepting: %s"
fdi (string_of_exn_opt exn_opt)
);
when_done exn_opt in
self # nonblock_operation
(ref false)
`Accepting
(fun () ->
try
Ssl.accept ssl_sock;
state <- `Server;
accepting <- false;
(false, false, fun () -> when_done None)
with
| Ssl.Accept_error Ssl.Error_want_read ->
(true, false, fun () -> ())
| Ssl.Accept_error Ssl.Error_want_write ->
(false, true, fun () -> ())
| Ssl.Accept_error ssl_err ->
state <- `Unclean;
accepting <- false;
(false, false,
fun () ->
self # inactivate_no_close();
when_done (Some (Ssl_error ssl_err))
)
| err ->
state <- `Unclean;
accepting <- false;
(false, false,
fun () ->
self # inactivate_no_close();
when_done (Some err)
)
)
(fun x ->
self # inactivate_no_close();
when_done (Some x));
accepting <- true;
method start_reading ?(peek = fun() -> ()) ~when_done s pos len =
if pos < 0 || len < 0 || pos + len > String.length s then
invalid_arg "#start_reading";
if state <> `Client && state <> `Server then
failwith "#start_reading: bad state";
if reading <> None then
failwith "#start_reading: already reading";
if shutting_down <> None then
failwith "#start_reading: already shutting down";
dlogr
(fun () ->
sprintf "FD %Ld: start_reading" fdi);
let when_done exn_opt =
dlogr
(fun () ->
sprintf "FD %Ld: done start_reading: %s"
fdi (string_of_exn_opt exn_opt)
);
when_done exn_opt in
let cancel_flag = ref false in
self # nonblock_operation
cancel_flag
`Reading
(fun () ->
try
(* peek(); *)
(* [peek] is used by auth-local. It does not work for SSL. *)
let n = Ssl_exts.single_read ssl_sock s pos len in
reading <- None;
assert(n > 0);
if !Debug.dump_data then
dlogr
(fun () ->
sprintf "FD %Ld: read %S"
fdi (String.sub s pos n)
);
(false, false, fun () -> when_done None n)
with
| Ssl.Read_error Ssl.Error_zero_return ->
(* Read EOF *)
read_eof <- true;
(* Note: read_eof should be consistent with Ssl.read *)
(false, false, fun () -> when_done (Some End_of_file) 0)
| Ssl.Read_error Ssl.Error_want_read ->
(true, false, fun () -> ())
| Ssl.Read_error Ssl.Error_want_write ->
(false, true, fun () -> ())
| Ssl.Read_error ssl_err ->
state <- `Unclean;
reading <- None;
(false, false, fun () -> when_done (Some (Ssl_error ssl_err)) 0)
| err ->
state <- `Unclean;
reading <- None;
(false, false, fun () -> when_done (Some err) 0)
)
(fun x -> self # cancel_reading_with x);
reading <- Some (when_done, cancel_flag)
method start_mem_reading ?(peek = fun() -> ()) ~when_done m pos len =
raise Uq_engines.Mem_not_supported
method cancel_reading () =
self # cancel_reading_with Uq_engines.Cancelled
method private cancel_reading_with x =
dlogr
(fun () ->
sprintf "FD %Ld: cancel_reading" fdi);
match reading with
| None ->
()
| Some (f_when_done, cancel_flag) ->
assert(not !cancel_flag);
self # cancel_operation `Reading;
cancel_flag := true;
reading <- None;
f_when_done (Some x) 0
method start_writing ~when_done s pos len =
if pos < 0 || len < 0 || pos + len > String.length s then
invalid_arg "#start_writing";
if state <> `Client && state <> `Server then
failwith "#start_writing: bad state";
if writing <> None || writing_eof <> None then
failwith "#start_writing: already writing";
if shutting_down <> None then
failwith "#start_writing: already shutting down";
if wrote_eof then
failwith "#start_writing: already past EOF";
dlogr
(fun () ->
sprintf "FD %Ld: start_writing" fdi);
let when_done exn_opt =
dlogr
(fun () ->
sprintf "FD %Ld: done start_writing: %s"
fdi (string_of_exn_opt exn_opt)
);
when_done exn_opt in
let cancel_flag = ref false in
self # nonblock_operation
cancel_flag
`Writing
(fun () ->
try
let n = Ssl_exts.single_write ssl_sock s pos len in
writing <- None;
if !Debug.dump_data then
dlogr
(fun () ->
sprintf "FD %Ld: write %S"
fdi (String.sub s pos n)
);
(false, false, fun () -> when_done None n)
with
| Ssl.Write_error Ssl.Error_zero_return ->
(false, true, fun () -> ())
| Ssl.Write_error Ssl.Error_want_read ->
(true, false, fun () -> ())
| Ssl.Write_error Ssl.Error_want_write ->
(false, true, fun () -> ())
| Ssl.Write_error ssl_err ->
state <- `Unclean;
writing <- None;
(false, false, fun () -> when_done (Some (Ssl_error ssl_err)) 0)
| err ->
state <- `Unclean;
writing <- None;
(false, false, fun () -> when_done (Some err) 0)
)
(fun x -> self # cancel_writing_with x);
writing <- Some (when_done, cancel_flag)
method start_mem_writing ~when_done m pos len =
raise Uq_engines.Mem_not_supported
method start_writing_eof ~when_done () =
if state <> `Client && state <> `Server && state <> `Unclean then
failwith "#start_writing_eof: bad state";
(* N.B. We accept here Unclean because there is still a chance that we
can at least close the tunnel
*)
if writing <> None then
failwith "#start_writing_eof: already writing";
if shutting_down <> None then
failwith "#start_writing_eof: already shutting down";
if wrote_eof then
Unixqueue.once esys group 0.0 (fun () -> when_done None)
else (
dlogr
(fun () ->
sprintf "FD %Ld: start_writing_eof" fdi);
let when_done exn_opt =
dlogr
(fun () ->
sprintf "FD %Ld: done start_writing_eof: %s"
fdi (string_of_exn_opt exn_opt)
);
when_done exn_opt in
let n = ref 0 in
let cancel_flag = ref false in
self # nonblock_operation
cancel_flag
`Writing_eof
(fun () ->
try
let (_, sent_shutdown_0) =
Ssl_exts.get_shutdown ssl_sock in
if not sent_shutdown_0 then
Ssl_exts.single_shutdown ssl_sock;
let (rcvd_shutdown, sent_shutdown) =
Ssl_exts.get_shutdown ssl_sock in
if rcvd_shutdown then
read_eof <- true;
if sent_shutdown then
wrote_eof <- true;
if !n=1 && not sent_shutdown then (
(* Unclean crash *)
writing_eof <- None;
state <- `Unclean;
(false, false,
fun () -> when_done(Some(Failure "Unclean SSL shutdown")))
)
else
match sent_shutdown with
| false ->
(* strange *)
(false, true, fun () -> ())
| true ->
writing_eof <- None;
(* The following is unnecessary according to the SSL
specs, but actually required by buggy implementations.
*)
( try Unix.shutdown fd Unix.SHUTDOWN_SEND with _ -> ());
(false, false, fun () -> when_done None)
with
| Ssl_exts.Shutdown_error Ssl.Error_want_read ->
(true, false, fun () -> ())
| Ssl_exts.Shutdown_error Ssl.Error_want_write ->
(false, true, fun () -> ())
| Ssl_exts.Shutdown_error ssl_err ->
state <- `Unclean;
shutting_down <- None;
(false, false, fun () -> when_done (Some (Ssl_error ssl_err)))
| err ->
state <- `Unclean;
shutting_down <- None;
(false, false, fun () -> when_done (Some err))
)
(fun x -> self # cancel_writing_with x);
writing_eof <- Some(when_done, cancel_flag)
)
method cancel_writing () =
self # cancel_writing_with Uq_engines.Cancelled
method private cancel_writing_with x =
dlogr
(fun () ->
sprintf "FD %Ld: cancel_writing" fdi);
match writing with
| Some (f_when_done, cancel_flag) ->
assert(not !cancel_flag);
self # cancel_operation `Writing;
cancel_flag := true;
writing <- None;
f_when_done (Some x) 0
| None ->
( match writing_eof with
| Some(f_when_done, cancel_flag) ->
assert(not !cancel_flag);
self # cancel_operation `Writing_eof;
cancel_flag := true;
writing_eof <- None;
f_when_done (Some x)
| None ->
()
)
method start_shutting_down ?(linger = 60.0) ~when_done () =
if state <> `Client && state <> `Server && state <> `Unclean then
failwith "#start_shutting_down: bad state";
if reading <> None || writing <> None then
failwith "#start_shutting_down: still reading or writing";
if shutting_down <> None then
failwith "#start_shutting_down: already shutting down";
dlogr
(fun () ->
sprintf "FD %Ld: start_shutting_down" fdi);
let when_done exn_opt =
dlogr
(fun () ->
sprintf "FD %Ld: done start_shutting_down: %s"
fdi (string_of_exn_opt exn_opt)
);
when_done exn_opt in
let n = ref 0 in
let cancel_flag = ref false in
self # nonblock_operation
cancel_flag
`Shutting_down
(fun () ->
try
Ssl_exts.single_shutdown ssl_sock;
incr n;
let (rcvd_shutdown, sent_shutdown) =
Ssl_exts.get_shutdown ssl_sock in
if rcvd_shutdown then
read_eof <- true;
if sent_shutdown then
wrote_eof <- true;
if !n=2 && not (rcvd_shutdown && sent_shutdown) then (
(* Unclean crash *)
shutting_down <- None;
state <- `Unclean;
if rcvd_shutdown || sent_shutdown then
(false, false, fun () -> when_done None)
else
(false, false,
fun () -> when_done(Some(Failure "Unclean SSL shutdown")))
)
else
match (rcvd_shutdown, sent_shutdown) with
| (false, false) ->
(* strange *)
(false, true, fun () -> ())
| (true, false) ->
(false, true, fun () -> ())
| (false, true) ->
(* The following is unnecessary according to the SSL
specs, but actually required by buggy implementations.
*)
( try Unix.shutdown fd Unix.SHUTDOWN_SEND with _ -> ());
(true, false, fun () -> ())
| (true, true) ->
shutting_down <- None;
state <- `Clean;
( try Unix.shutdown fd Unix.SHUTDOWN_ALL with _ -> ());
(false, false, fun () -> when_done None)
with
| Ssl_exts.Shutdown_error Ssl.Error_want_read ->
(true, false, fun () -> ())
| Ssl_exts.Shutdown_error Ssl.Error_want_write ->
(false, true, fun () -> ())
| Ssl_exts.Shutdown_error ssl_err ->
state <- `Unclean;
shutting_down <- None;
(false, false, fun () -> when_done (Some (Ssl_error ssl_err)))
| err ->
state <- `Unclean;
shutting_down <- None;
(false, false, fun () -> when_done (Some err))
)
(fun x -> self # cancel_shutting_down_with x);
shutting_down <- Some(when_done, cancel_flag)
method cancel_shutting_down () =
self # cancel_shutting_down_with Uq_engines.Cancelled
method private cancel_shutting_down_with x =
dlogr
(fun () ->
sprintf "FD %Ld: cancel_shutting_down" fdi);
match shutting_down with
| None ->
()
| Some (f_when_done, cancel_flag) ->
assert(not !cancel_flag);
self # cancel_operation `Shutting_down;
cancel_flag := true;
shutting_down <- None;
f_when_done (Some x)
method private start_timer tag f_tmo =
(* Call f_tmo when operation for tag times out *)
match timeout with
| None ->
()
| Some (tmo, x) ->
let tmo_g = Unixqueue.new_group esys in
Hashtbl.replace timers tag (tmo_g, f_tmo);
Unixqueue.once esys tmo_g tmo
(fun () ->
Hashtbl.remove timers tag;
f_tmo x
)
method private stop_timer tag =
try
let tmo_g, _ = Hashtbl.find timers tag in
Unixqueue.clear esys tmo_g;
Hashtbl.remove timers tag
with Not_found -> ()
method private restart_all_timers () =
let ht = Hashtbl.copy timers in
Hashtbl.clear timers;
Hashtbl.iter
(fun tag (tmo_g, f_tmo) ->
Unixqueue.clear esys tmo_g;
self # start_timer tag f_tmo
)
ht
method private stop_all_timers() =
Hashtbl.iter
(fun tag (tmo_g, f_tmo) ->
Unixqueue.clear esys tmo_g;
)
timers;
Hashtbl.clear timers;
method private nonblock_operation cancel_flag tag f f_tmo =
(* We use here min_float instead of 0.0 because the latter is handled
in an optimized way in Unixqueue - and this gets here in the way.
The optimization implies that it is normally not checked whether
there are other socket events. However, we exactly want this here -
so other events can be processed while we are doing our sequence
of operations.
*)
Unixqueue.once
esys
group
(* 0.0 *) min_float
(fun () ->
if not !cancel_flag then (
dlogr
(fun () ->
sprintf "FD %Ld: operation: %s" fdi (string_of_tag tag));
let (want_rd, want_wr, action) = f() in
dlogr
(fun () ->
sprintf "FD %Ld: returning from %s - want_rd=%b want_wr=%b %s"
fdi (string_of_tag tag) want_rd want_wr
(if want_rd || want_wr then "- queuing op and retrying later"
else ""));
if want_rd || want_wr then (
self # start_timer tag f_tmo;
pending <- (tag, want_rd, want_wr, f) :: pending;
)
else
self # restart_all_timers();
( try
action();
self # setup_queue();
with
| error ->
self # setup_queue(); raise error
)
)
)
method private cancel_operation tag =
self # stop_timer tag;
pending <-
List.filter (fun (t, _, _, _) -> t <> tag) pending;
self # setup_queue()
method private retry_nonblock_operations can_read can_write =
dlogr
(fun () ->
sprintf "FD %Ld: retry_nonblock_operations" fdi);
let cur_pending = pending in
pending <- []; (* maybe new operations are added! *)
let actions = ref [] in
let pending' =
List.flatten
(List.map
(fun (tag, want_rd, want_wr, f) ->
if (want_rd && can_read) || (want_wr && can_write) then (
dlogr
(fun () ->
sprintf "FD %Ld: retried operation: %s"
fdi (string_of_tag tag));
let (want_rd', want_wr', action) = f() in (* must not fail! *)
dlogr
(fun () ->
sprintf "FD %Ld: returning from %s - \
want_rd=%b want_wr=%b %s"
fdi (string_of_tag tag) want_rd' want_wr'
(if want_rd' || want_wr' then
"- queuing op and retrying later"
else ""));
actions := action :: !actions;
if want_rd' || want_wr' then
[ tag, want_rd', want_wr', f ] (* try again later *)
else (
self # stop_timer tag;
[]
)
)
else
[ tag, want_rd, want_wr, f ] (* just keep *)
)
cur_pending
) in
pending <- pending @ pending';
self # restart_all_timers();
(* Be careful: We can only return the first error *)
let first_error = ref None in
List.iter
(fun f ->
try f()
with
| e ->
( match !first_error with
| None -> first_error := Some e
| Some _ ->
Netlog.logf `Crit
"Uq_ssl hidden exception: %s"
(Netexn.to_string e)
)
)
(List.rev !actions);
self # setup_queue();
( match !first_error with
| None -> ()
| Some e -> raise e
)
method private setup_queue() =
if alive then (
let expecting_input' =
List.exists (fun (_, want_rd, _, _) -> want_rd) pending in
let expecting_output' =
List.exists (fun (_, _, want_wr, _) -> want_wr) pending in
if expecting_input' || expecting_output' then (
if not have_handler then (
Unixqueue.add_handler esys group (fun _ _ -> self # handle_event);
have_handler <- true;
);
disconnecting <- None;
)
else
if have_handler && disconnecting = None then (
(* It makes only sense to disconnect if all callbacks are cancelled *)
if not(accepting || connecting || reading <> None ||
writing <> None || shutting_down <> None) then (
let wid = Unixqueue.new_wait_id esys in
let disconnector = Unixqueue.Wait wid in
Unixqueue.add_event esys (Unixqueue.Timeout(group,disconnector));
disconnecting <- Some disconnector
)
);
( match expecting_input, expecting_input' with
| (false, true) ->
Unixqueue.add_resource esys group (Unixqueue.Wait_in fd, (-1.0))
| (true, false) ->
Unixqueue.remove_resource esys group (Unixqueue.Wait_in fd)
| _ ->
()
);
( match expecting_output, expecting_output' with
| (false, true) ->
Unixqueue.add_resource esys group (Unixqueue.Wait_out fd, (-1.0))
| (true, false) ->
Unixqueue.remove_resource esys group (Unixqueue.Wait_out fd)
| _ ->
()
);
expecting_input <- expecting_input';
expecting_output <- expecting_output';
)
method private handle_event ev =
match ev with
| Unixqueue.Input_arrived(g, _) when g = group ->
self # retry_nonblock_operations true false
| Unixqueue.Output_readiness(g, _) when g = group ->
self # retry_nonblock_operations false true
| Unixqueue.Timeout (g, op) when g = group ->
( match disconnecting with
| Some op' when op = op' ->
disconnecting <- None;
have_handler <- false;
raise Equeue.Terminate
| _ -> raise Equeue.Reject
(* Can also be a timeout event from a "once" handler *)
)
| _ ->
raise Equeue.Reject
method inactivate() =
self # inactivate_no_close();
if not closed then (
closed <- true;
if close_inactive_descr then (
preclose();
Unix.close fd
)
)
method inactivate_no_close() =
pending <- [];
disconnecting <- None;
have_handler <- false;
if alive then Unixqueue.clear esys group;
self # stop_all_timers();
alive <- false
method event_system = esys
end
;;
let create_ssl_multiplex_controller
?close_inactive_descr ?preclose ?initial_state ?timeout ?ssl_socket
fd ctx esys =
let () = Unix.set_nonblock fd in
let s =
match ssl_socket with
| Some s -> s
| None ->
let s = Ssl.embed_socket fd ctx in
let m = Ssl_exts.get_mode s in
let () = Ssl_exts.set_mode s
{ m with
Ssl_exts.enable_partial_write = true;
accept_moving_write_buffer = true } in
s in
new ssl_mplex_ctrl ?close_inactive_descr ?preclose ?initial_state ?timeout
fd s esys
;;
class ssl_connect_engine (mplex : ssl_multiplex_controller) =
object(self)
inherit [ unit ] Uq_engines.engine_mixin (`Working 0) mplex#event_system
initializer
mplex # start_ssl_connecting
~when_done:(fun exn_opt ->
match exn_opt with
| None ->
self # set_state (`Done())
| Some err ->
self # set_state (`Error err)
)
()
method event_system = mplex # event_system
method abort() =
match self#state with
| `Working _ ->
mplex # inactivate_no_close();
self # set_state `Aborted
| _ ->
()
end
let ssl_connect_engine = new ssl_connect_engine
class ssl_accept_engine (mplex : ssl_multiplex_controller) =
object(self)
inherit [ unit ] Uq_engines.engine_mixin (`Working 0) mplex#event_system
initializer
mplex # start_ssl_accepting
~when_done:(fun exn_opt ->
match exn_opt with
| None ->
self # set_state (`Done())
| Some err ->
self # set_state (`Error err)
)
()
method event_system = mplex # event_system
method abort() =
match self#state with
| `Working _ ->
mplex # inactivate_no_close();
self # set_state `Aborted
| _ ->
()
end
let ssl_accept_engine = new ssl_accept_engine