Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: pfs_db.ml 239 2010-06-23 16:49:03Z gerd $ *)

class type db_config =
object
  method db_host : string option
  method db_hostaddr : string option
  method db_port : string option
  method db_dbname : string option
  method db_user : string option
  method db_password : string option
  method db_ro_connections_max : int
end


let extract_db_config cf =
  let databases =
    cf#resolve_section cf#root_addr "database" in
  match databases with
    | [] ->
        failwith "Missing 'database' config"
    | _ :: _ :: _ ->
        failwith "More than one 'database' config"
    | [database] ->
	let param f name =
          try Some(f (cf#resolve_parameter database name))
          with Not_found -> None in
	let host =
	  (param cf#string_param "host") in
	let hostaddr =
	  (param cf#string_param "hostaddr") in
	let port =
	  (param cf#string_param "port") in
	let dbname =
	  (param cf#string_param "dbname") in
	let user =
	  (param cf#string_param "user") in
	let password =
	  (param cf#string_param "password") in
	let ro_connections_max =
	  (try cf#int_param
	         (cf#resolve_parameter database "ro_connections_max") 
	   with Not_found -> 20) in
	( object
	    method db_host = host
	    method db_hostaddr = hostaddr
	    method db_port = port
	    method db_dbname = dbname
	    method db_user = user
	    method db_password = password
	    method db_ro_connections_max = ro_connections_max
	  end
	)


class type ['mode] async_connection =
object
  inherit Postgresql.connection
  method continue_with : (bool -> unit) -> unit
  method next : bool -> unit
  method idle : bool
end


type read_write = [`R|`W]
type read_only = [`R]
type rw_async_connection = read_write async_connection
type ro_async_connection = read_only async_connection


class ['mode] connect (db_conf : db_config) =
object(self)
  inherit Postgresql.connection
    ?host:db_conf#db_host
    ?hostaddr:db_conf#db_hostaddr
    ?port:db_conf#db_port
    ?dbname:db_conf#db_dbname
    ?user:db_conf#db_user
    ?password:db_conf#db_password
    ()
    as super

  val mutable processing = false
  val queue = Queue.create()

  method send_query ?params ?binary_params query =
    if processing then
      failwith "Db.send_query: previous SQL command is still being processed";
    super # send_query ?params ?binary_params query;
    processing <- true

  method get_result =
    if not processing then
      failwith "Db.get_result: No running SQL command";
    let r = super # get_result in
    if r = None then
      processing <- false;
    r

  method continue_with f =
    if not processing && Queue.is_empty queue then
      f true
    else
      Queue.push f queue

  method next ok =
    if not(Queue.is_empty queue) then (
      let f = Queue.take queue in
      f ok
    )

  method idle =
    not processing

end


class async_exec (conn : _ async_connection) esys 
                 ?expect ?params ?binary_params 
		 ?(copy_in_lines=[])
		 query =
  let fdi =
    conn # socket in
  let fd =
    Netsys_posix.file_descr_of_int fdi in
  ( object(self)
      inherit [Postgresql.result] Uq_engines.engine_mixin (`Working 0) esys

      val mutable started = false
      val mutable final_state = None
      val mutable propagated = false

      initializer (
	conn # continue_with
	  (fun ok -> 
	     if ok then (
	       (* conn # set_nonblocking true; - only affects sending queries *)
	       Netlog.logf `Debug "Sending SQL: %s" query;
	       try
		 conn # send_query ?params ?binary_params query;
		 started <- true;
		 let pe =
		   new Uq_engines.poll_engine
		     [Unixqueue.Wait_in fd, (-1.0)] esys in
		 self # pe_restart pe;
		 Netlog.logf `Debug "Polling SQL: %s" query;
	       with
		 | error ->
		     Netlog.logf `Err "Exception sending SQL %s: %s"
		       query (Netexn.to_string error)
	     )
	     else
	       self # abort()
	  )
      )

      method event_system = esys

      method private pe_restart pe =
	( match pe # state with
	    | `Working _ -> ()
	    | _ -> pe # restart()
	);
	Uq_engines.when_state
	  ~is_done:(self#pe_is_done pe)
	  ~is_error:self#pe_is_error
	  pe

      method private pe_is_done pe _ =
	(* fd is readable *)
	try
	  Netlog.logf `Debug "SQL: consume_input";
	  conn # consume_input;
	  if conn#is_busy then (
	    Netlog.logf `Debug "SQL: polling for more";
	    self # pe_restart pe
	  )
	  else (
	    match conn#get_result with
	      | None ->
		  Netlog.logf `Err "Missing SQL result";
		  final_state <- 
		    Some(`Error (Failure "Db.async_exec: out of sync"));
		  self # propagate_final_state()
	      | Some r -> 
		  let s =
		    match expect with
		      | None -> `Done r
		      | Some l ->
			  if List.mem r#status l then
			    `Done r 
			  else
			    `Error(Postgresql.Error
				     (Postgresql.Unexpected_status
					(r#status,r#error,l))) in
		  if r#status = Postgresql.Copy_in then (
		    (* now send copy data *)
		    Netlog.logf `Debug "SQL: Sending copy data";
		    List.iter
		      (fun line -> ignore(conn#put_copy_data (line ^ "\n")))
		      copy_in_lines;
		    ignore(conn#put_copy_end());
		    (* Wait for final result *)
		    Netlog.logf `Debug "SQL: Waiting for copy status";
		    self # pe_restart pe
		  )
		  else (
		    Netlog.logf `Debug "Done with SQL: %s" query;
		    final_state <- Some s;
		    ( match conn#get_result with
			| None ->
			    Netlog.logf `Debug "Last SQL result";
			    self # propagate_final_state()
			| Some _ ->
			    Netlog.logf `Err "More than one SQL result!";
			    final_state <- 
			      Some
			      (`Error
				 (Failure
				    "Db.async_exec: more than one SQL result"));
			    self # propagate_final_state()
		    )
		  )
	  )
	with
	  | error ->
	      Netlog.logf `Err "Db.async_exec: pg exception %s"
		(Netexn.to_string error);
	      pe # abort();
	      final_state <- Some(`Error error);
	      self # propagate_final_state()

      method private pe_is_error error =
	Netlog.logf `Err "Db.async_exec: pe exception %s"
	  (Netexn.to_string error);
	final_state <- Some(`Error error);
	self # propagate_final_state()

      method abort() =
	if final_state = None then (
	  if started then (
	    conn # request_cancel;
	    final_state <- Some `Aborted;
	  ) else (
	    final_state <- Some `Aborted;
	    self # propagate_final_state()
	  )
	)

      method private propagate_final_state() =
	if not propagated then (
	  match final_state with
	    | Some st ->
		propagated <- true;
		(** We don't want to get exceptions here. So run the
                    [set_state] method from the event queue. [set_state]
                    invokes callbacks waiting for state change notifications.
		 *)
		let g = Unixqueue.new_group esys in
		Unixqueue.once esys g 0.0 (fun () -> self # set_state st);
		let ok =
		  match st with
		    | `Done _ -> true
		    | _ -> false in
		conn # next ok
	    | None -> 
		()
	)
	else
	  Netlog.logf `Warning
	    "Not propagating further final state"
    end
  )


let () =
  Netexn.register_printer
    (Postgresql.Error Postgresql.Binary)
    (fun e -> 
       match e with
	 | Postgresql.Error e' ->
	     "Postgresql.Error(" ^ Postgresql.string_of_error e' ^ ")"
	 | _ ->
	     assert false
    )

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml