Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: netstream.ml 1412 2010-02-15 16:20:27Z gerd $
 * ----------------------------------------------------------------------
 *
 *)


open Netchannels;;


class type in_obj_stream =
object
  inherit Netchannels.in_obj_channel
  method block_size : int
  method window : Netbuffer.t
  method want : int -> unit
  method want_another_block : unit -> unit
  method window_length : int
  method window_at_eof : bool
  method skip : int -> unit
end


class virtual input_methods init_s_netbuf =
object(self)
  val mutable s_pos = 0
  val mutable s_at_eof = false
  val s_netbuf = init_s_netbuf
  val mutable s_closed = false


  method virtual want : int -> unit

  method virtual want_another_block : unit -> unit

  method virtual window_length : int

  method virtual input : string -> int -> int -> int


  (* The following input methods base all on [input] *)

  method really_input buf pos len =
    if s_closed then raise Netchannels.Closed_channel;
    let rec read p =
      let l = self # input buf (pos+p) (len-p) in
      let p' = p + l in
      if p' = len then
	()
      else (
	if l=0 then raise Sys_blocked_io;
	read p'
      )
    in
    self # want len;  (* may raise Buffer_underrun *)
    read 0


  method input_char () =
    let s = String.create 1 in
    self # really_input s 0 1;
    s.[0]


  method input_byte () =
    let s = String.create 1 in
    self # really_input s 0 1;
    int_of_char s.[0]


  method input_line () =
    (* CHECK: Are the different end of line conventions important here? *)
    let rec find_eol() =
      try
	Netbuffer.index_from s_netbuf 0 '\n'    (* or Not_found *)
      with
	  Not_found ->
	    if not s_at_eof then begin
	      self # want_another_block();  (* may raise Buffer_underrun *)
	      find_eol()
	    end
	    else self#window_length
    in
    if s_closed then raise Netchannels.Closed_channel;
    let n = find_eol() in
    if n >= self#window_length then begin
      if n = 0 then raise End_of_file;
      let s = String.create n in
      self#really_input s 0 n;
      s
    end
    else begin
      let s = String.create n in
      self#really_input s 0 n;
      ignore(self#input_char());    (* '\n' *)
      s
    end
    

  method pos_in =
    if s_closed then raise Netchannels.Closed_channel;
    s_pos


end


class input_stream ?len ?(block_size = 4096) in_ch : in_obj_stream =
object (self)
  val s_channel = (in_ch : in_obj_channel)
  val s_maxlength = len
  val s_blocksize = block_size
  val mutable s_underrun = false

  inherit input_methods (Netbuffer.create block_size)

  (* Note: This implementation must even work if [in_ch] is a pipe,
   * and raises Buffer_underrun from time to time. This may happen
   * at inconvenient situations. In this case the flag s_underrun stores 
   * whether an underrun happened, and should be reported later.
   *)

  initializer 
    try
      self # want_minimum()   (* may raise Buffer_underrun *)
    with
	Buffer_underrun ->
	  s_underrun <- true

  method private debug msg =
    prerr_endline (msg ^ ": s_pos=" ^ string_of_int s_pos ^ 
		   " s_at_eof=" ^ string_of_bool s_at_eof ^ 
		   " buflen=" ^ string_of_int (Netbuffer.length s_netbuf) ^
		   " s_closed=" ^ string_of_bool s_closed);

  method block_size = s_blocksize
    (* The block size is a static property, so never raise Closed_channel *)

  method window = 
    if s_closed then raise Netchannels.Closed_channel;
    s_netbuf

  method window_length = 
    if s_closed then raise Netchannels.Closed_channel;
    Netbuffer.length s_netbuf

  method window_at_eof = 
    if s_closed then raise Netchannels.Closed_channel;
    s_at_eof

  method want_another_block() =
    if s_closed then raise Netchannels.Closed_channel;
    if not s_at_eof then begin
      (* How much are we allowed to read? *)
      let m =
	match s_maxlength with
	    None   -> s_blocksize
	  | Some l -> min (l - s_pos - Netbuffer.length s_netbuf) s_blocksize
      in
      assert(m >= 0);
      (* Try to read m bytes: *)
      let rec read_block k =
	if k < m then
	  let n = 
	    Netbuffer.add_inplace ~len:(m-k) 
	      s_netbuf 
	      (s_channel # input)    
	        (* may raise End_of_file, Buffer_underrun *)
	  in
	  ( if n > 0 then
	      read_block (k+n)
	    else
	      raise Sys_blocked_io
	  )
	else
	  ()
      in
      try
	if m=0 then
	  (* Artificial EOF because len is reached *)
	  s_at_eof <- true
	else
	  read_block 0
      with
	  End_of_file ->
	    s_at_eof <- true
    end;
    (* self # debug "after stream#want_another_block"; *)
    (* Unix.sleep 1; *)


  method want n =
    if s_closed then raise Netchannels.Closed_channel;
    while not s_at_eof && Netbuffer.length s_netbuf < n do
      self # want_another_block()
    done


  method private want_minimum() =
    self # want s_blocksize


  method skip len =
    if s_closed then raise Netchannels.Closed_channel;
    let rec read len =
      if len > 0 then begin
	let k = min (Netbuffer.length s_netbuf) len in
	Netbuffer.delete s_netbuf 0 k;
	s_pos <- s_pos + k;
	self # want_minimum();    (* may raise Buffer_underrun *)
	if k > 0 then read (len - k)
      end
    in
    read len


  method input buf pos len =
    if s_closed then raise Netchannels.Closed_channel;
    if s_underrun then (
      self # want_minimum();  (* may raise Buffer_underrun *)
      s_underrun <- false;
    );
    (* Assertion: Either window length >= minimum, or eof *)
    let len' = min len (Netbuffer.length s_netbuf) in
    Netbuffer.blit s_netbuf 0 buf pos len';
    Netbuffer.delete s_netbuf 0 len';
    s_pos <- s_pos + len';
    ( try 
	self # want_minimum();  (* may raise Buffer_underrun *)
      with
	  Buffer_underrun ->
	    s_underrun <- true
    );
    if len'=0 && len>0 then raise End_of_file;
    len'


  method close_in () =
    if not s_closed then (
      s_channel # close_in();
      s_closed <- true;
    )


end


(*
let find_prefix s1 pos len s2 =
  (* Checks where a non-empty prefix of [s2] occurs at the end of the substring
   * of [s1] beginning at [pos] with length [len]. The function returns 
   * the position [p] of the prefix in [s1]. 
   * The function raises Not_found if it does not find a prefix.
   * POSTCONDITION:
   * - s1[p..p+n-1] = s2[0..n-1] for some biggest n, n <= String.length s2
   *   "The string s1 contains the prefix of s2 at position p, and the
   *   prefix has the maximum length n."
   * - n < String.length s2 ==> p+n = String.length s1
   *   "If the prefix is a proper prefix, it occurs at the end of s1"
   *)
  assert(String.length s2 > 0);
  let l1 = min (String.length s1) (pos+len) in
  let l2 = String.length s2 in
  let s2c0 = s2.[0] in
  let rec check_rec p k =
    k >= l2 || p+k >= l1 || (s1.[p+k] = s2.[k] && check_rec p (k+1)) in
  let rec search_rec p =
    if p >= l1 then raise Not_found;
    let p' = String.index_from s1 p s2c0 in  (* or Not_found *)
    if p' >= l1 then raise Not_found;
    if check_rec p' 0 then
      p'
    else
      search_rec (p'+1)
  in
  search_rec pos
;;
*)


class sub_stream ?len ?delimiter in_stream : in_obj_stream =
object(self)
  val s = (in_stream : in_obj_stream)
  val mutable s_winlen = 0
  val mutable s_del = None    (* initialized below *)
  val s_len = len
  val mutable s_underrun = false

  inherit input_methods (in_stream # window)

  initializer
    (match delimiter with
	 Some "" -> invalid_arg "new Netstream.sub_stream";
       | Some d  -> s_del <- Some(d, Netaux.KMP.make_pattern d)
       | None    -> s_del <- None
    );
    (match s_len with 
	 Some l -> if l<0 then invalid_arg "new Netstream.sub_stream";
       | None   -> ()
    );
    try
      self # want_minimum()
    with
	Buffer_underrun ->
	  s_underrun <- true

  method block_size = s # block_size

  method window = 
    if s_closed then raise Netchannels.Closed_channel;
    s_netbuf

  method window_length = 
    if s_closed then raise Netchannels.Closed_channel;
    s_winlen

  method window_at_eof = 
    if s_closed then raise Netchannels.Closed_channel;
    s_at_eof


  method private compute_winlen() =
    (* sets [s_winlen], [s_at_eof], and returns whether the current window
     * is "ambigous" (it is not clear if the stream does end or does not
     * end)
     *)
    let ambigous = ref false in
    let w = s#window in
    let wlen = s#window_length in
    let weof = s#window_at_eof in
    begin match s_del with
	None -> 
	  s_winlen <- wlen;
	  s_at_eof <- weof;
      | Some(d,pat) ->
	  let p = Netaux.KMP.find_pattern
		    pat ~len:wlen (Netbuffer.unsafe_buffer w) in
	  if p >= wlen then begin
	    (* Delimiter d does not occur in the buffer *)
	    s_winlen <- wlen;
	    s_at_eof <- weof;
	  end
	  else 
	    if (p + String.length d) > wlen then begin
	      (* Case: prefix is a proper prefix *)
	      ambigous := not weof;
	      s_winlen <- wlen;
	      s_at_eof <- weof;
	    end
	    else begin
	      (* Case: [d] occurs in the window *)
	      s_winlen <- p;
	      s_at_eof <- true;
	    end
    end;
    begin match s_len with
	None ->
	  ()
      | Some l ->
	  if l - s_pos < s_winlen then begin
	    ambigous := false;
	    s_winlen <- l - s_pos;
	    s_at_eof <- true;
	  end
    end;
    !ambigous


  method want_another_block() =
    if s_closed then raise Netchannels.Closed_channel;
    s # want_another_block();   (* may raise Buffer_underrun *)
    while self # compute_winlen() do
      s # want_another_block(); (* may raise Buffer_underrun *)
    done

  method want n =
    if s_closed then raise Netchannels.Closed_channel;
    while not s_at_eof && s_winlen < n do
      self # want_another_block()   (* may raise Buffer_underrun *)
    done

  method private want_minimum() =
    if self # compute_winlen() then
      self # want_another_block();   (* may raise Buffer_underrun *)
    self # want s#block_size


  method skip len =
    if s_closed then raise Netchannels.Closed_channel;
    let rec read len =
      if len > 0 then begin
	let k = min s_winlen len in
	s # skip k;              (* may raise Buffer_underrun *)
	s_pos <- s_pos + k;
	self # want_minimum();   (* may raise Buffer_underrun *)
	if k > 0 then read (len - k)
      end
    in
    read len


  method input buf pos len =
    if s_closed then raise Netchannels.Closed_channel;
    if s_underrun then (
      self # want_minimum();  (* may raise Buffer_underrun *)
      s_underrun <- false;
    );
    (* Assertion: Either window length >= minimum, or eof *)
    let len' = min len s_winlen in
    Netbuffer.blit s_netbuf 0 buf pos len';
    s # skip len';              (* never raises Buffer_underrun *)
    s_pos <- s_pos + len';
    ( try
	self # want_minimum();
      with
	  Buffer_underrun ->
	    s_underrun <- true
    );
    if len'=0 && len>0 then raise End_of_file;
    len'


  method close_in () =
    if not s_closed then (
      s # close_in();
      s_closed <- true;
    )

end


let print_in_obj_stream fmt s =
  Format.fprintf fmt
    "<NETSTREAM pos_in:%d window_length:%d eof=%b>"
    s#pos_in
    s#window_length
    s#window_at_eof
;;

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml