(* Copyright 2010 Gerd Stolpmann This file is part of Plasma, a distributed filesystem and a map/reduce computation framework. Unless you have a written license agreement with the copyright holder (Gerd Stolpmann), the following terms apply: Plasma is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. Plasma is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with Foobar. If not, see <http://www.gnu.org/licenses/>. *) (* $Id: dn_store.ml 276 2010-10-25 18:51:27Z gerd $ *) class type dn_store_config = object method dn_directory : string method dn_blocksize : int end type dn_store = { directory : string; identity : string; blocksize : int; size : int64; fd : Unix.file_descr; mutable is_open : bool; } let init_store config idstr size = let dir = config#dn_directory in let blocksize = config#dn_blocksize in let blocksize64 = Int64.of_int blocksize in if Sys.file_exists(dir ^ "/config") then failwith ("init_store: config file already exists"); if Sys.file_exists(dir ^ "/data") then failwith ("init_store: data file already exists"); if blocksize <= 0 then failwith ("init_store: bad blocksize"); if size <= 0L then failwith ("init_store: bad size"); let total_size = Int64.mul blocksize64 size in if Int64.div total_size blocksize64 <> size then failwith "init_store: total size outside the range of 64 bit numbers"; let f = open_out (dir ^ "/config") in ( try output_string f (idstr ^ "\n"); output_string f (string_of_int blocksize ^ "\n"); close_out f with | error -> close_out_noerr f; raise error ); let fd = Unix.openfile (dir ^ "/data") [Unix.O_RDWR; Unix.O_CREAT] 0o666 in ( try Plasma_util.fallocate fd 0L total_size; Unix.close fd with | error -> Unix.close fd; raise error ) let open_store config = let dir = config#dn_directory in let blocksize = config#dn_blocksize in let blocksize64 = Int64.of_int blocksize in let (idstr, act_blocksize) = ( let f = open_in (dir ^ "/config") in try let idstr = input_line f in let bsizestr = input_line f in let bsize = int_of_string bsizestr in close_in f; (idstr, bsize) with | error -> close_in f; raise error ) in if act_blocksize <> blocksize then failwith "open_store: unexpected blocksize"; let fd = Unix.openfile (dir ^ "/data") [Unix.O_RDWR] 0 in let st = Unix.LargeFile.fstat fd in let size = Int64.div st.Unix.LargeFile.st_size blocksize64 in { directory = dir; identity = idstr; blocksize = blocksize; size = size; fd = fd; is_open = true } let close_store st = if not st.is_open then failwith "close_store: store not open"; Unix.close st.fd; st.is_open <- false let get_identity st = st.identity let get_blocksize st = st.blocksize let get_size st = st.size let will_read_block st block = if not st.is_open then failwith "will_read_block: store not open"; if block < 0L || block >= st.size then failwith "will_read_block: bad block number"; let blocksize64 = Int64.of_int st.blocksize in if Netsys_posix.have_fadvise() then Netsys_posix.fadvise st.fd (Int64.mul block blocksize64) blocksize64 Netsys_posix.FADV_WILLNEED let read_block st block blockpos mem mpos len = if not st.is_open then failwith "read_block: store not open"; if block < 0L || block >= st.size then failwith "read_block: bad block number"; if len < 0 then failwith "read_block: negative length"; if blockpos < 0 || blockpos > st.blocksize - len then failwith "read_block: bad block region"; let mpos_len = Bigarray.Array1.dim mem in if mpos < 0 || mpos > mpos_len - len then failwith "read_block: bad memory region"; let blocksize64 = Int64.of_int st.blocksize in ignore(Unix.LargeFile.lseek st.fd (Int64.add (Int64.mul block blocksize64) (Int64.of_int blockpos)) Unix.SEEK_SET); let n = Netsys_mem.mem_read st.fd mem mpos len in if n <> len then failwith "read_block: reached EOF" let write_block st block mem mpos = if not st.is_open then failwith "write_block: store not open"; if block < 0L || block >= st.size then failwith "write_block: bad block number"; let mpos_len = Bigarray.Array1.dim mem in if mpos < 0 || mpos > mpos_len - st.blocksize then failwith "write_block: bad memory region"; let blocksize64 = Int64.of_int st.blocksize in ignore(Unix.LargeFile.lseek st.fd (Int64.mul block blocksize64) Unix.SEEK_SET); let n = Netsys_mem.mem_write st.fd mem mpos st.blocksize in if n <> st.blocksize then failwith "write_block: reached EOF" external fdatasync : Unix.file_descr -> unit = "netsys_fdatasync" (* Was not exported in ocamlnet-3.0test2 *) let sync st = if not st.is_open then failwith "sync: store not open"; fdatasync st.fd