Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: qserver.ml 2195 2015-01-01 12:23:39Z gerd $
 * ----------------------------------------------------------------------
 *
 *)


(* TODO:
 * - Improve log messages
 * - Keep qlist backup
 * - Authentication
 *   Rpc_server: limit message size. Find out peer IP address, port number.
 * - Auth protocol: Password authentication
 * - Put all config options into config section.
 *)

let spooldir = "/tmp/qserver";;
let max_chunksize = Int32.of_string "1048576" ;;   (* 1 MB *)



open Netnumber
open Netxdr
open Rpc
open Rpc_server

module A = Queues_aux ;;
module S1 = Queues_srv.QUEUESPROG.QUEUESVERS1 ;;

type error =
    [ `not_found
      | `picked
      | `exists
      | `not_picked
      | `chunks_too_large
      | `timeout
      | `empty
      | `queue_deleted
      | `full
      | `bad_name
      | `bad_value
      | `inactive
      | `permission_denied
      | `sys_error
    ]

exception Error of error

let log message =
  (* Output [message] to the log medium. This is currently stderr. *)
  prerr_endline message
;;

let check_name name =
  (* Checks that [name] is a proper name. Raises Error if not. *)
  for i = 0 to String.length name - 1 do
    match name.[i] with
	'A'..'Z' -> ()
      | 'a'..'z' -> ()
      | '0'..'9' -> ()
      | '_'      -> ()
      | _        -> raise(Error `bad_name)
  done
;;


let catch_error f reply =
  try (f reply : unit)
  with
      Error code ->
	(* This match...with is only to help the typechecker inferring the
	 * right type
	 *)
	(match code with
           #error as c -> reply c
	)
    | Sys_error e ->
	log ("Sys_error: " ^ e);
	reply `sys_error
    | Unix.Unix_error(code,msg,fn) ->
	log ("Unix_error: " ^ Unix.error_message code ^
 	     (if msg <> "" then ": " ^ msg else "") ^
	     (if fn <> "" then " [" ^ fn ^ "]" else ""));
	reply `sys_error
;;


let protected_call f what =
  try
    f what
  with
      err ->
	log("Exception in deferred function: " ^
	    Printexc.to_string err);
	false    (* remove from [ongrow], [onshrink] *)
;;


let with_out_file file fn =
  try
    let r = fn file in
    close_out file;
    r
  with
      err -> close_out file; raise err
;;


let with_in_file file fn =
  try
    let r = fn file in
    close_in file;
    r
  with
      err -> close_in file; raise err
;;


let directory_list name =
  let l = ref [] in
  let dir = Unix.opendir name in
  try
    while true do
      match Unix.readdir dir with
	  "." | ".." -> ()
	| name ->
	    l := name :: !l
    done; assert false
  with
      End_of_file ->
	Unix.closedir dir;
	List.rev !l
    | err ->
	Unix.closedir dir;
	raise err
;;


let t_queue = Netxdr.validate_xdr_type A.xdrt_queue;;
let t_entry = Netxdr.validate_xdr_type A.xdrt_entry;;


type managed_entry =
    { qentry : A.entry;
      mutable is_picked : bool;
      mutable pick_connection : Rpc_server.connection_id option;
      mutable is_uploading : bool;
      mutable upload_connection : Rpc_server.connection_id option;
      mutable current_downloads : string list;   (* dhandles *)
      mutable current_upload : string option;    (* uhandle *)
    }

type managed_queue =
    { mutable qentries : managed_entry list;
      mutable next_eid : int;
      mutable qinfo : A.queue;
      mutable deleted : bool;
      mutable onshrink : (float * (bool -> bool)) list;
        (* These functions are called in order when the queue shrinks.
	 * If the functions return 'false' they are removed
	 * from the [ongrow] list.
	 * The float is the absolute time when the function should be
	 * called anyway (timeout). The argument passed to the called
	 * function indicates whether the reason of the call is a timeout (true)
	 * or a shrinking queue (false).
	 *)
      mutable ongrow : (float * (bool -> bool)) list;
        (* These functions are called in order when an entry is added to
	 * a queue. If the functions return 'false' they are removed
	 * from the [ongrow] list.
	 * The float is the absolute time when the function should be
	 * called anyway (timeout). The argument passed to the called
	 * function indicates whether the reason of the call is a timeout (true)
	 * or a growing queue (false).
	 *)
    }

type managed_download =
    { mutable downfile : in_channel;
      mutable downsize : int;
      mutable downchunksize : int;
      mutable downserial : int64;
              downentry : managed_entry;
	      downconnection : connection_id;
    }

type managed_upload =
    { mutable upfile : out_channel;
      mutable upserial : int64;
              upentry : managed_entry;
	      upconnection : connection_id;
    }


let queues = (Hashtbl.create 100 : (string, managed_queue) Hashtbl.t) ;;
  (* Cached queues *)

let next_dhandle = ref 0;;
let next_uhandle = ref 0;;

let dhandles = (Hashtbl.create 100 : (string, managed_download) Hashtbl.t) ;;
let uhandles = (Hashtbl.create 100 : (string, managed_upload) Hashtbl.t) ;;

let instance_prefix
  = Digest.string (string_of_float (Unix.gettimeofday())) ;;
  (* A prefix that is prepended before the dhandle and uhandle numbers in
   * the protocol. The instance_prefix distinguishes between instances of
   * qserver.
   *)


let queue_accepts_entries mq =
  let maxl = mq.qinfo.A.qparams.A.qmaxlen in
  mq.qinfo.A.qparams.A.qaccepting &&
  ( maxl < Int32.zero ||
    let l = List.length mq.qentries in
    (Int32.of_int l) < maxl
  )
;;


let queue_delivers_entries mq =
  mq.qinfo.A.qparams.A.qdelivering &&
  ( (* At least one unpicked entry needed: *)
    List.exists
      (fun e -> not e.is_picked && not e.is_uploading)
      mq.qentries
  )
;;


let load_managed_queue qid =
  (* Reads the "qlist" file and the "e*" files containing the meta data
   * of the entries
   *)
  let queuedir = Filename.concat spooldir qid in
  let infofile = Filename.concat queuedir "qinfo" in
  let listfile = Filename.concat queuedir "qlist" in
  if not (Sys.file_exists infofile) then raise (Error `not_found);
  let qinfo =
    with_in_file (open_in_bin infofile)
      (fun f ->
	 let l = in_channel_length f in
	 let buf = String.create l in
	 really_input f buf 0 l;
	 A._to_queue (Netxdr.unpack_xdr_value ~fast:true buf t_queue [])
      )
  in
  let eid_list =
    with_in_file (open_in listfile)
      (fun f ->
	 let l = ref [] in
	 try
	   while true do
	     let line = input_line f in
	     l := (int_of_string line) :: !l;
	   done;
	   assert false
	 with
	     End_of_file -> List.rev !l
      )
  in
  let next_eid =
    (List.fold_left max 0 eid_list) + 1 in
  let mentry_list =
    List.map
      (fun eid ->
	 let qentry_file =
	   Filename.concat queuedir ("e" ^ string_of_int eid) in
	 let v =
	   with_in_file (open_in_bin qentry_file)
	     (fun f ->
		let l = in_channel_length f in
		let buf = String.create l in
		really_input f buf 0 l;
		buf
	     )
	 in
	 let e =
	   A._to_entry (Netxdr.unpack_xdr_value ~fast:true v t_entry []) in
	 { qentry = e;
	   is_picked = false;
	   pick_connection = None;
	   is_uploading = false;
	   upload_connection = None;
	   current_downloads = [];
	   current_upload = None
	 }
      )
      eid_list
  in
  let mqueue =
    { qentries = mentry_list;
      next_eid = next_eid;
      deleted = false;
      qinfo = qinfo;
      onshrink = [];
      ongrow = [];
    }
  in
  mqueue
;;


let store_managed_queue qid mq =
  (* Writes the "qlist" file. Does not write anything else. *)
  let queuedir = Filename.concat spooldir qid in
  let infofile = Filename.concat queuedir "qinfo" in
  let listfile = Filename.concat queuedir "qlist" in
  if not (Sys.file_exists infofile) then raise (Error `not_found);
  (* Update qmodified: *)
  mq.qinfo.A.qmodified <- Int64.of_float (Unix.time());
  with_out_file (open_out_bin infofile)
    (fun f ->
       let v = A._of_queue mq.qinfo in
       Netxdr.pack_xdr_value v t_queue [] (output_string f);
    );
  with_out_file (open_out listfile)
    (fun f ->
       List.iter
	 (fun me ->
	    (* Write now the "qlist" file. Don't include entries currently
	     * being uploaded because these are considered as non-persistent
	     * until the upload finishes.
	     * Picked entries are considered as still persistent, so include
	     * them.
	     *)
	    if not me.is_uploading then begin
	      output_string f me.qentry.A.eid;
	      output_string f "\n";
	    end
	 )
	 mq.qentries;
    )
;;


let lookup_managed_queue qid =
  (* Checks whether the queue is cached. If yes, the cached queue record
   * is returned. Otherwise, the queue is loaded and added to the cache.
   *)
  try
    Hashtbl.find queues qid
  with
      Not_found ->
	let mq = load_managed_queue qid in
	Hashtbl.add queues qid mq;
	(* Note: There are not any picked and uploaded entries, so reset: *)
	mq.qinfo.A.qpicked <- Int32.zero;
	mq.qinfo.A.quploads <- Int32.zero;
	(* Check the length: *)
	let l = Int32.of_int (List.length mq.qentries) in
	if l <> mq.qinfo.A.qlength then begin
	  log "Load error: The qlength field is not correct. (fixed)";
	  mq.qinfo.A.qlength <- l;
	end;
	mq
;;


let proc_ping session () reply =
  reply()
;;

let proc_create_queue session queuename =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: queuename must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name queuename;
       let queuedir = Filename.concat spooldir queuename in
       let infofile = Filename.concat queuedir "qinfo" in
       let listfile = Filename.concat queuedir "qlist" in
       (* Check if the queue already exists: *)
       if Sys.file_exists infofile then raise(Error `exists);
       (* Create the directory [queuedir] if necessary: *)
       if not (Sys.file_exists queuedir) then
	 Unix.mkdir queuedir 0o777;
       (* Create qlist: *)
       with_out_file (open_out listfile)
	 (fun f -> () );
       (* Create qinfo: *)
       let now = Int64.of_float (Unix.time()) in
       let qinfo = { A.qid = queuename;
		     A.qname = queuename;
		     A.qowner = user;
		     A.qcreation = now;
		     A.qmodified = now;
		     A.qlength = Int32.zero;
		     A.qpicked = Int32.zero;
		     A.quploads = Int32.zero;
		     A.qparams = { A.qmaxlen = Int32.minus_one;
				   A.qactive = false;
				   A.qaccepting = true;
				   A.qdelivering = true;
				 }
		   } in
       with_out_file (open_out_bin infofile)
	 (fun f ->
	    let v = A._of_queue qinfo in
	    Netxdr.pack_xdr_value v t_queue [] (output_string f);
	 );
       reply `successful
    )
;;


let proc_delete_queue session queuename =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: queuename must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name queuename;
       let queuedir = Filename.concat spooldir queuename in
       (* Remove the files: *)
       let files = directory_list queuedir in
       List.iter
	 (fun f -> let path = Filename.concat queuedir f in Sys.remove path)
	 files;
       (* Remove the directory. It is ok if this fails (no permission) *)
       ( try Unix.rmdir queuedir with _ -> ());
       (* Is there a managed_queue? *)
       begin try
	 let mq = Hashtbl.find queues queuename in  (* or Not_found *)
	 if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
	 Hashtbl.remove queues queuename;
	 mq.deleted <- true;
	 (* Remove all upload/download handles: *)
	 List.iter
	   (fun me ->
	      List.iter
		(Hashtbl.remove dhandles)
		me.current_downloads;
	      ( match me.current_upload with
		    None -> ()
		  | Some uh -> Hashtbl.remove uhandles uh
	      );
	   )
	   mq.qentries;
	 (* Tell the [ongrow] and [onshrink] handlers that something
	  * errorneous happened
	  *)
	 List.iter
	   (fun (_,f) -> ignore(protected_call f true)) mq.ongrow;
	 List.iter
	   (fun (_,f) -> ignore(protected_call f true)) mq.onshrink;
	 mq.ongrow <- [];
	 mq.onshrink <- [];
       with
	   Not_found -> ()   (* no managed_queue *)
       end;
       reply `successful
    )
;;


let proc_list_queues session () =
  Printf.eprintf "LIST_QUEUES\n%!";
  catch_error
    (fun reply ->
       (* Get all files in the spooldir: *)
       let l = directory_list spooldir in
       (* Keep only files that are actually directories containing "qinfo": *)
       let l' =
	 List.filter
	   (fun name ->
	      Sys.file_exists
		(Filename.concat
		   (Filename.concat spooldir name)
		   "qinfo")
	   )
	   l in
       (* Read in the "qinfo" files, and decode them: *)
       let qlist =
	 List.map
	   (fun name ->
	      let mq = lookup_managed_queue name in
	      mq.qinfo
	   )
	   l'
       in
       (* Pass the result back: *)
       reply (`successful (Array.of_list qlist))
    )
;;


let proc_get_queue session queuename =
  catch_error
    (fun reply ->
       (* Sanity check: queuename must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name queuename;
       let mq = lookup_managed_queue queuename in
       reply (`successful mq.qinfo);
    )
;;


let proc_set_queue session (queuename,params) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: queuename must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name queuename;
       let mq = lookup_managed_queue queuename in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       if params.A.qactive then begin
	 (* Activate the queue/change limits: *)
	 let old_accepting = queue_accepts_entries mq in
	 let old_delivering = queue_delivers_entries mq in
	 mq.qinfo.A.qparams <- params;
	 let new_accepting = queue_accepts_entries mq in
	 let new_delivering = queue_delivers_entries mq in
	 store_managed_queue queuename mq;
	 (* NOTE: If the queue has previously been inactive, the [ongrow] and
	  * [onshrink] lists are empty. So the following statements do
	  * nothing.
	  *)
	 (* Maybe there is now space for growth: *)
	 if not old_delivering && new_delivering then
	   mq.ongrow <-
             List.filter (fun (_,f) -> protected_call f false) mq.ongrow;
	 (* Maybe there is now an entry to pick: *)
	 if not old_accepting && new_accepting then
	   mq.onshrink <-
             List.filter (fun (_,f) -> protected_call f false) mq.onshrink;
       end
       else begin
	 (* Deactivate the queue: *)
	 mq.qinfo.A.qparams <- params;
	 store_managed_queue queuename mq;
	 (* Make that everybody notices the inactive queue: *)
	 mq.ongrow <-
           List.filter (fun (_,f) -> protected_call f true) mq.ongrow;
	 mq.onshrink <-
           List.filter (fun (_,f) -> protected_call f true) mq.onshrink;
	 (* Make these lists empty: *)
	 if mq.ongrow <> [] then
	   log "Warning: After deactivation the ongrow list is not empty";
	 if mq.onshrink <> [] then
	   log "Warning: After deactivation the onshrink list is not empty";
	 mq.ongrow <- [];
	 mq.onshrink <- [];
       end;
       reply `successful
    )
;;


let proc_list_queue_entries session qid =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       let visible =
	 List.filter
	   (fun qe ->
	      not qe.is_picked && not qe.is_uploading
	   )
	   mq.qentries
       in
       let visible' = List.map (fun qe -> qe.qentry) visible in
       reply (`successful (Array.of_list visible'))
    )
;;


let timeout_pick mq =  (* raise the right exception *)
  if mq.deleted then
    raise (Error `queue_deleted)
  else
    if not mq.qinfo.A.qparams.A.qactive then
      raise (Error `inactive)
  else
    if mq.qinfo.A.qparams.A.qdelivering then
      raise (Error `empty)
    else
      raise (Error `timeout)
;;


let try_pick session mq reply0 timed_out =
  let keep = ref true in
  let reply r =
    (* If [reply] is called, the function [try_pick] must return [false]
     * to indicate the it is to be removed from the [ongrow] list
     *)
    keep := false; reply0 r
  in
  catch_error
    (fun reply ->
       if timed_out then timeout_pick mq;
       if not mq.qinfo.A.qparams.A.qactive then raise(Error `inactive);
       try
	 if not mq.qinfo.A.qparams.A.qdelivering then raise Not_found;
	 let first =
	   List.find
	     (fun qe -> not qe.is_picked && not qe.is_uploading)
	     mq.qentries
	 in (* or Not_found *)
	 reply (`successful first.qentry);  (* may raise Connection_lost *)
	 first.is_picked <- true;
	 first.pick_connection <- Some(Rpc_server.get_connection_id session);
	 mq.qinfo.A.qpicked <- Int32.succ mq.qinfo.A.qpicked;
	 mq.qinfo.A.qlength <- Int32.pred mq.qinfo.A.qlength;
       with
	   Not_found -> ()
	       (* Try again *)
    )
    reply;
  !keep




let proc_pick_queue_entry session (qid,timeout) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       if try_pick session mq reply false then begin
	 (* [try_pick] was not successful. Add [try_pick] to the [ongrow]
	  * list
	  *)
	 if timeout = Int32.zero then timeout_pick mq;
	 let max_time =
	   if timeout >= Int32.zero then
	     Unix.time() +. Int32.to_float timeout
           else
	     (-1.0)
	 in
	 mq.ongrow <- mq.ongrow @ [ max_time, try_pick session mq reply ];
       end
    )
;;


let stop_downloads mq entry =
  List.iter
    (fun dhandle ->
       let dl =
	 try Hashtbl.find dhandles dhandle
	 with Not_found -> assert false
       in
       close_in dl.downfile;
       Hashtbl.remove dhandles dhandle
    )
    entry.current_downloads;
  mq.qinfo.A.qpicked <- Int32.pred mq.qinfo.A.qpicked;
  mq.qinfo.A.qlength <- Int32.succ mq.qinfo.A.qlength;
  entry.current_downloads <- []
;;


let proc_return_picked_queue_entry session (qid,eid) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       let entry =
	 try
	   List.find
	     (fun qe -> qe.qentry.A.eid = eid)
	     mq.qentries
	 with Not_found -> raise (Error `not_found)
       in

       if not entry.is_picked then raise (Error `not_picked );
       if entry.pick_connection <> Some(Rpc_server.get_connection_id session)
       then raise(Error `not_found);

       entry.is_picked <- false;
       (* Stop all current downloads: *)
       stop_downloads mq entry;
       reply `successful
    )
;;


let proc_remove_picked_queue_entry session (qid,eid) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       let entry =
	 try
	   List.find
	     (fun qe -> qe.qentry.A.eid = eid)
	     mq.qentries
	 with Not_found -> raise (Error `not_found)
       in

       if not entry.is_picked then raise (Error `not_picked );
       if entry.pick_connection <> Some(Rpc_server.get_connection_id session)
       then raise(Error `not_found);

       (* Stop all current downloads: *)
       stop_downloads mq entry;
       (* Remove this entry from the list, and store the list: *)
       mq.qentries <- List.filter
	                (fun qe -> qe.qentry.A.eid <> eid)
	                mq.qentries;
       mq.qinfo.A.qlength <- Int32.pred mq.qinfo.A.qlength;
       store_managed_queue qid mq;
       (* Remove the "e*" and "d*" files: *)
       let queuedir = Filename.concat spooldir qid in
       let qentry_file = Filename.concat queuedir ("e" ^ eid) in
       let qdata_file = Filename.concat queuedir ("d" ^ eid) in
       Sys.remove qentry_file;
       Sys.remove qdata_file;
       reply `successful;
       (* Finally, consider the [onshrink] activities: *)
       mq.onshrink <- List.filter
	                (fun (_,f) -> protected_call f false) mq.onshrink
    )
;;


let proc_remove_queue_entry session (qid,eid) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       let entry =
	 try
	   List.find
	     (fun qe -> qe.qentry.A.eid = eid)
	     mq.qentries
	 with Not_found -> raise (Error `not_found)
       in

       if entry.is_picked &&
	  entry.pick_connection <> Some(Rpc_server.get_connection_id session)
       then raise(Error `not_found);
       if entry.is_picked then raise (Error `picked );
       if entry.is_uploading then raise (Error `sys_error);  (* strange *)

       (* Remove this entry from the list, and store the list: *)
       mq.qentries <- List.filter
	                (fun qe -> qe.qentry.A.eid <> eid)
	                mq.qentries;
       mq.qinfo.A.qlength <- Int32.pred mq.qinfo.A.qlength;
       store_managed_queue qid mq;
       (* Remove the "e*" and "d*" files: *)
       let queuedir = Filename.concat spooldir qid in
       let qentry_file = Filename.concat queuedir ("e" ^ eid) in
       let qdata_file = Filename.concat queuedir ("d" ^ eid) in
       Sys.remove qentry_file;
       Sys.remove qdata_file;
       reply `successful;
       (* Finally, consider the [onshrink] activities: *)
       mq.onshrink <- List.filter
	                (fun (_,f) -> protected_call f false) mq.onshrink
    )
;;


let proc_download_entry session (qid,eid,chunksize) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       if chunksize > max_chunksize then raise (Error `chunks_too_large);
       if chunksize <= Int32.zero then raise (Error `bad_value);
       let ichunksize = Int32.to_int chunksize in
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       let entry =
	 try
	   List.find
	     (fun qe -> qe.qentry.A.eid = eid)
	     mq.qentries
	 with Not_found -> raise (Error `not_found)
       in

       if not entry.is_picked then raise (Error `not_picked );
       if entry.pick_connection <> Some(Rpc_server.get_connection_id session)
       then raise(Error `not_found);

       (* Create new dhandle: *)
       let n = !next_dhandle in
       incr next_dhandle;
       let dhandle =
	 instance_prefix ^ Netnumber.BE.int4_as_string (int4_of_int n) in
       (* Create new download record *)
       let queuedir = Filename.concat spooldir qid in
       let qdata_file = Filename.concat queuedir ("d" ^ eid) in
       let file = open_in_bin qdata_file in
       begin try
	 let dl =
	   { downfile = file;
	     downsize = in_channel_length file;
	     downchunksize = ichunksize;
	     downserial = Int64.zero;
	     downentry = entry;
	     downconnection = Rpc_server.get_connection_id session;
	   }
	 in
	 Hashtbl.add dhandles dhandle dl;
	 entry.current_downloads <- dhandle :: entry.current_downloads;
       with
	   err -> close_in file; raise err
       end;
       (* Note: [file] remains open. It will be closed once the download
	* is finished, or the download stops for some reason.
	*)
       (* Return the dhandle: *)
       reply (`successful dhandle)
    )
;;


let proc_download_chunk session dhandle =
  catch_error
    (fun reply ->
       let dl =
	 try Hashtbl.find dhandles dhandle
	 with Not_found -> raise (Error `not_found)
       in
       if dl.downconnection <> Rpc_server.get_connection_id session
       then raise(Error `not_found);

       let buf = String.create (dl.downchunksize) in
       let n = input dl.downfile buf 0 dl.downchunksize in
       let chunk =
	 { A.serial = dl.downserial;
	   A.last = pos_in dl.downfile >= dl.downsize;
	   A.data =
	     if n < dl.downchunksize then
	       String.sub buf 0 n
	     else
	       buf;
	 }
       in
       dl.downserial <- Int64.succ dl.downserial;
       reply (`successful chunk)
    )
;;


let timeout_upload mq =  (* raise the right exception *)
  if mq.deleted then
    raise (Error `queue_deleted)
  else
    if not mq.qinfo.A.qparams.A.qactive then
      raise (Error `inactive)
  else
    if mq.qinfo.A.qparams.A.qaccepting then
      raise (Error `full)
    else
      raise (Error `timeout)
;;


let stop_upload mq entry =
  match entry.current_upload with
      None -> ()
    | Some uhandle ->
	let ul =
	  try Hashtbl.find uhandles uhandle
	  with Not_found -> assert false
	in
	close_out ul.upfile;
	let queuedir = Filename.concat spooldir entry.qentry.A.eqid in
	let qdata_file = Filename.concat queuedir ("d" ^ entry.qentry.A.eid) in
	Sys.remove qdata_file;
	Hashtbl.remove uhandles uhandle;
	mq.qinfo.A.quploads <- Int32.pred mq.qinfo.A.quploads;
	entry.current_upload <- None
;;


let start_upload session mq props reply0 timed_out =
  let keep = ref true in
  let reply r =
    (* If [reply] is called, the function [start_upload] must return [false]
     * to indicate the it is to be removed from the [onshrink] list
     *)
    keep := false; reply0 r
  in
  catch_error
    (fun reply ->
       if timed_out then timeout_upload mq;
       if not mq.qinfo.A.qparams.A.qactive then raise (Error `inactive);
       (* Is there space in the queue? *)
       let qid = mq.qinfo.A.qid in
       if queue_accepts_entries mq then begin
	 (* Create new uhandle: *)
	 let n = !next_uhandle in
	 incr next_uhandle;
	 let uhandle =
	   instance_prefix ^ Netnumber.BE.int4_as_string (int4_of_int n) in
	 (* Create new entry record in memory: *)
	 let eid = string_of_int (mq.next_eid) in
	 mq.next_eid <- mq.next_eid + 1;
	 let qe =
	   { A.eid = eid;
	     A.eqid = qid;
	     A.ecreation = Int64.of_float (Unix.time());
	     A.esize = Int64.zero;
	     A.eprops = props;
	   }
	 in
	 let entry =
	   { qentry = qe;
	     is_picked = false;
	     pick_connection = None;
	     is_uploading = true;
	     upload_connection = Some (Rpc_server.get_connection_id session);
	     current_downloads = [];
	     current_upload = Some uhandle;
	   }
	 in
	 (* Create new data file: *)
	 let queuedir = Filename.concat spooldir qid in
	 let qdata_file = Filename.concat queuedir ("d" ^ eid) in
	 let file = open_out_bin qdata_file in
	 (* Note: the entry file will be created when the upload is finished *)
	 begin try
	   (* Create new upload record: *)
	   let ul =
	     { upfile = file;
	       upserial = Int64.zero;
	       upentry = entry;
	       upconnection = Rpc_server.get_connection_id session;
	     }
	   in
	   (* Register [entry]: *)
	   mq.qentries <- mq.qentries @ [ entry ];
	   mq.qinfo.A.quploads <- Int32.succ mq.qinfo.A.quploads;
	   (* Register [ul]: *)
	   Hashtbl.add uhandles uhandle ul;
	 with
	     err -> close_out file; raise err
	 end;
	 (* The upload is accepted: *)
	 try
	   reply (`successful uhandle);
	 with
	     Rpc_server.Connection_lost ->
	       stop_upload mq entry
       end
    )
    reply;
  !keep
;;


let proc_upload_entry session (qid,props,timeout) =
  catch_error
    (fun reply ->
       let user = Rpc_server.get_user session in
       (* Sanity check: qid must only consist of alphanumeric characters,
	* including underscore
	*)
       check_name qid;
       (* If the queue is short enough, begin the upload immediately;
	* otherwise wait
	*)
       let mq = lookup_managed_queue qid in
       if mq.qinfo.A.qowner <> user then raise(Error `permission_denied);
       if start_upload session mq props reply false then begin
	 (* Upload is not possible now. Check if should do it later. *)
	 if timeout = Int32.zero then raise (Error `timeout);
	 let max_time = if timeout >= Int32.zero then
	                   Unix.time() +. Int32.to_float timeout
	                else
			  -1.0
	 in
	 mq.onshrink <- mq.onshrink @
	                [ max_time, start_upload session mq props reply ];
       end
    )
;;


let proc_upload_chunk session (uhandle,chunk) =
  catch_error
    (fun reply ->
       let ul =
	 try Hashtbl.find uhandles uhandle
	 with Not_found -> raise (Error `not_found)
       in
       if ul.upconnection <> Rpc_server.get_connection_id session
       then raise(Error `not_found);

       if chunk.A.serial <> ul.upserial then raise (Error `bad_value);
       output_string ul.upfile chunk.A.data;
       let l = String.length chunk.A.data in
       ul.upentry.qentry.A.esize <-
         Int64.add ul.upentry.qentry.A.esize (Int64.of_int l);
       if chunk.A.last then begin
	 let entry = ul.upentry in
	 (* Close the data file *)
	 close_out ul.upfile;
	 (* Write the entry file *)
	 let qid = entry.qentry.A.eqid in
	 let eid = entry.qentry.A.eid in
	 let queuedir = Filename.concat spooldir qid in
	 let qentry_file = Filename.concat queuedir ("e" ^ eid) in
	 with_out_file (open_out_bin qentry_file)
	   (fun file ->
	      let v = A._of_entry entry.qentry in
	      Netxdr.pack_xdr_value v t_entry [] (output_string file);
	   );
	 (* Deallocate uhandle: *)
	 Hashtbl.remove uhandles uhandle;
	 entry.current_upload <- None;
	 (* Make the new entry visible: *)
	 let mq = lookup_managed_queue qid in
	 entry.is_uploading <- false;
	 mq.qinfo.A.qlength <- Int32.succ mq.qinfo.A.qlength;
	 mq.qinfo.A.quploads <- Int32.pred mq.qinfo.A.quploads;
	 store_managed_queue qid mq;
	 (* Tell waiters that there is a new entry: *)
	 mq.ongrow <- List.filter
	                (fun (_,f) -> protected_call f false) mq.ongrow;
	 reply `successful
       end
       else begin
	 ul.upserial <- Int64.succ ul.upserial;
	 reply `successful
       end
    )
;;


let onclose conn_id =
  (* This function is called when a TCP connection is closed. We have to
   * iterate over the whole cache of queues and to:
   * - return all picked entries (by this connection)
   * - stop all downloads (by this connection)
   * - stop and remove all uploads (by this connection)
   *)
  Hashtbl.iter
    (fun qid mq ->
       let canceled_uploads = ref false in
       List.iter
	 (fun entry ->
	    if entry.is_picked && entry.pick_connection = Some conn_id then
	      begin
		entry.is_picked <- false;
		stop_downloads mq entry;
	      end;

	    if entry.is_uploading && entry.upload_connection = Some conn_id then
	      begin
		entry.is_uploading <- false;
		stop_upload mq entry;
		canceled_uploads := true;
	      end;
	 )
	 mq.qentries;
       (* If there are cancaled uploads, execute [onshrink]: *)
       if !canceled_uploads then
	 mq.onshrink <- List.filter
	                  (fun (_,f) -> protected_call f false) mq.onshrink;
    )
    queues
;;


let check_timeouts ev =
  (* This function is called every minute. We have to
   * iterate over the whole cache of queues and to:
   * - return all timed out picked entries and stop all downloads
   * - stop and remove all timed out uploads
   *)
  ( match ev with
	Unixqueue.Timeout(_,_) -> ()
      | _ -> raise Equeue.Reject
  );
  let now = Unix.time() in
  Hashtbl.iter
    (fun qid mq ->
       (* Are there timed out events in [onshrink]? *)
       mq.onshrink <-
         List.filter
	   (fun (t,f) ->
	      if t >= 0.0 && t <= now then
		protected_call f true
	      else
		true
	   )
	   mq.onshrink;
       (* Are there timed out events in [ongrow]? *)
       mq.ongrow <-
         List.filter
	   (fun (t,f) ->
	      if t >= 0.0 && t <= now then
		protected_call f true
	      else
		true
	   )
	   mq.ongrow;
    )
    queues
;;

let pluggable_auth_module = 
  ref ("<None>", 
       (`Socket(Tcp, Rpc_server.Portmapped, Rpc_server.default_socket_config)),
       (fun _ -> 
	  failwith "No auth module linked, startup not possible"; () )) ;;


let main() =
  Printexc.record_backtrace true;
  let esys = Unixqueue.create_unix_event_system() in

  let (auth_name, srv_mode, f_srv_config) = !pluggable_auth_module in
  prerr_endline ("Starting queues server with auth module: " ^ auth_name);

(*
  Rpc_client.verbose true;
  Rpc_server.verbose true;

  Rpc_client.Debug.enable_ptrace := true;
 *)

  let server = Rpc_server.create2 srv_mode esys in

  S1.bind_async
    ~proc_ping
    ~proc_create_queue
    ~proc_delete_queue
    ~proc_list_queues
    ~proc_get_queue
    ~proc_set_queue
    ~proc_list_queue_entries
    ~proc_pick_queue_entry
    ~proc_return_picked_queue_entry
    ~proc_remove_picked_queue_entry
    ~proc_remove_queue_entry
    ~proc_download_entry
    ~proc_download_chunk
    ~proc_upload_entry
    ~proc_upload_chunk
    server;

  Rpc_server.set_onclose_action server onclose;

  f_srv_config server;

  (* Arrange that [check_timeouts] is called every second: *)
  let w = Unixqueue.new_wait_id esys in
  let g = Unixqueue.new_group esys in
  Unixqueue.add_handler esys g (fun _ _ -> check_timeouts);
  Unixqueue.add_resource esys g (Unixqueue.Wait w, 1.0);

  List.iter
    (fun signal ->
       Netsys_signal.register_handler
	 ~signal
	 ~name:"Qserver"
	 ~callback:(fun _ ->
		      Unixqueue.remove_resource esys g (Unixqueue.Wait w);
		      Rpc_server.stop_server server;
		   )
	 ()
    )
    [ Sys.sighup; Sys.sigint; Sys.sigquit; Sys.sigterm ];

  Sys.set_signal
    Sys.sigpipe
    Sys.Signal_ignore;

  let rec auto_restart f arg =
    try f arg
    with err ->
      prerr_endline ("Server: Uncaught exception: " ^ Printexc.to_string err);
      auto_restart f arg
  in

  (* Fork *)
  match Unix.fork() with
      0 ->
        (* Child *)
	Sys.chdir "/";
	ignore(Unix.setsid());
        auto_restart Unixqueue.run esys;
        exit 99
    | n when n > 0 ->
        (* Parent *)
	()
    | _ ->
        assert false

;;

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml