(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: pfs_db.ml 239 2010-06-23 16:49:03Z gerd $ *) class type db_config = object method db_host : string option method db_hostaddr : string option method db_port : string option method db_dbname : string option method db_user : string option method db_password : string option method db_ro_connections_max : int end let extract_db_config cf = let databases = cf#resolve_section cf#root_addr "database" in match databases with | [] -> failwith "Missing 'database' config" | _ :: _ :: _ -> failwith "More than one 'database' config" | [database] -> let param f name = try Some(f (cf#resolve_parameter database name)) with Not_found -> None in let host = (param cf#string_param "host") in let hostaddr = (param cf#string_param "hostaddr") in let port = (param cf#string_param "port") in let dbname = (param cf#string_param "dbname") in let user = (param cf#string_param "user") in let password = (param cf#string_param "password") in let ro_connections_max = (try cf#int_param (cf#resolve_parameter database "ro_connections_max") with Not_found -> 20) in ( object method db_host = host method db_hostaddr = hostaddr method db_port = port method db_dbname = dbname method db_user = user method db_password = password method db_ro_connections_max = ro_connections_max end ) class type ['mode] async_connection = object inherit Postgresql.connection method continue_with : (bool -> unit) -> unit method next : bool -> unit method idle : bool end type read_write = [`R|`W] type read_only = [`R] type rw_async_connection = read_write async_connection type ro_async_connection = read_only async_connection class ['mode] connect (db_conf : db_config) = object(self) inherit Postgresql.connection ?host:db_conf#db_host ?hostaddr:db_conf#db_hostaddr ?port:db_conf#db_port ?dbname:db_conf#db_dbname ?user:db_conf#db_user ?password:db_conf#db_password () as super val mutable processing = false val queue = Queue.create() method send_query ?params ?binary_params query = if processing then failwith "Db.send_query: previous SQL command is still being processed"; super # send_query ?params ?binary_params query; processing <- true method get_result = if not processing then failwith "Db.get_result: No running SQL command"; let r = super # get_result in if r = None then processing <- false; r method continue_with f = if not processing && Queue.is_empty queue then f true else Queue.push f queue method next ok = if not(Queue.is_empty queue) then ( let f = Queue.take queue in f ok ) method idle = not processing end class async_exec (conn : _ async_connection) esys ?expect ?params ?binary_params ?(copy_in_lines=[]) query = let fdi = conn # socket in let fd = Netsys_posix.file_descr_of_int fdi in ( object(self) inherit [Postgresql.result] Uq_engines.engine_mixin (`Working 0) esys val mutable started = false val mutable final_state = None val mutable propagated = false initializer ( conn # continue_with (fun ok -> if ok then ( (* conn # set_nonblocking true; - only affects sending queries *) Netlog.logf `Debug "Sending SQL: %s" query; try conn # send_query ?params ?binary_params query; started <- true; let pe = new Uq_engines.poll_engine [Unixqueue.Wait_in fd, (-1.0)] esys in self # pe_restart pe; Netlog.logf `Debug "Polling SQL: %s" query; with | error -> Netlog.logf `Err "Exception sending SQL %s: %s" query (Netexn.to_string error) ) else self # abort() ) ) method event_system = esys method private pe_restart pe = ( match pe # state with | `Working _ -> () | _ -> pe # restart() ); Uq_engines.when_state ~is_done:(self#pe_is_done pe) ~is_error:self#pe_is_error pe method private pe_is_done pe _ = (* fd is readable *) try Netlog.logf `Debug "SQL: consume_input"; conn # consume_input; if conn#is_busy then ( Netlog.logf `Debug "SQL: polling for more"; self # pe_restart pe ) else ( match conn#get_result with | None -> Netlog.logf `Err "Missing SQL result"; final_state <- Some(`Error (Failure "Db.async_exec: out of sync")); self # propagate_final_state() | Some r -> let s = match expect with | None -> `Done r | Some l -> if List.mem r#status l then `Done r else `Error(Postgresql.Error (Postgresql.Unexpected_status (r#status,r#error,l))) in if r#status = Postgresql.Copy_in then ( (* now send copy data *) Netlog.logf `Debug "SQL: Sending copy data"; List.iter (fun line -> ignore(conn#put_copy_data (line ^ "\n"))) copy_in_lines; ignore(conn#put_copy_end()); (* Wait for final result *) Netlog.logf `Debug "SQL: Waiting for copy status"; self # pe_restart pe ) else ( Netlog.logf `Debug "Done with SQL: %s" query; final_state <- Some s; ( match conn#get_result with | None -> Netlog.logf `Debug "Last SQL result"; self # propagate_final_state() | Some _ -> Netlog.logf `Err "More than one SQL result!"; final_state <- Some (`Error (Failure "Db.async_exec: more than one SQL result")); self # propagate_final_state() ) ) ) with | error -> Netlog.logf `Err "Db.async_exec: pg exception %s" (Netexn.to_string error); pe # abort(); final_state <- Some(`Error error); self # propagate_final_state() method private pe_is_error error = Netlog.logf `Err "Db.async_exec: pe exception %s" (Netexn.to_string error); final_state <- Some(`Error error); self # propagate_final_state() method abort() = if final_state = None then ( if started then ( conn # request_cancel; final_state <- Some `Aborted; ) else ( final_state <- Some `Aborted; self # propagate_final_state() ) ) method private propagate_final_state() = if not propagated then ( match final_state with | Some st -> propagated <- true; (** We don't want to get exceptions here. So run the [set_state] method from the event queue. [set_state] invokes callbacks waiting for state change notifications. *) let g = Unixqueue.new_group esys in Unixqueue.once esys g 0.0 (fun () -> self # set_state st); let ok = match st with | `Done _ -> true | _ -> false in conn # next ok | None -> () ) else Netlog.logf `Warning "Not propagating further final state" end ) let () = Netexn.register_printer (Postgresql.Error Postgresql.Binary) (fun e -> match e with | Postgresql.Error e' -> "Postgresql.Error(" ^ Postgresql.string_of_error e' ^ ")" | _ -> assert false )