Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: dn_store.ml 276 2010-10-25 18:51:27Z gerd $ *)

class type dn_store_config =
object
  method dn_directory : string
  method dn_blocksize : int
end


type dn_store =
    { directory : string;
      identity : string;
      blocksize : int;
      size : int64;
      fd : Unix.file_descr;
      mutable is_open : bool;
    }

let init_store config idstr size =
  let dir = config#dn_directory in
  let blocksize = config#dn_blocksize in
  let blocksize64 = Int64.of_int blocksize in
  if Sys.file_exists(dir ^ "/config") then
    failwith ("init_store: config file already exists");
  if Sys.file_exists(dir ^ "/data") then
    failwith ("init_store: data file already exists");
  if blocksize <= 0 then
    failwith ("init_store: bad blocksize");
  if size <= 0L then
    failwith ("init_store: bad size");
  let total_size = Int64.mul blocksize64 size in
  if Int64.div total_size blocksize64 <> size then
    failwith "init_store: total size outside the range of 64 bit numbers";
  let f = open_out (dir ^ "/config") in
  ( try
      output_string f (idstr ^ "\n");
      output_string f (string_of_int blocksize ^ "\n");
      close_out f
    with
      | error ->
	  close_out_noerr f;
	  raise error
  );
  let fd = Unix.openfile (dir ^ "/data") [Unix.O_RDWR; Unix.O_CREAT] 0o666 in
  ( try
      Plasma_util.fallocate fd 0L total_size;
      Unix.close fd
    with
      | error ->
	  Unix.close fd;
	  raise error
  )

  
let open_store config =
  let dir = config#dn_directory in
  let blocksize = config#dn_blocksize in
  let blocksize64 = Int64.of_int blocksize in
  let (idstr, act_blocksize) = (
    let f = open_in (dir ^ "/config") in
    try
      let idstr = input_line f in
      let bsizestr = input_line f in
      let bsize = int_of_string bsizestr in
      close_in f;
      (idstr, bsize)
    with
      | error ->
	  close_in f;
	  raise error
  ) in
  if act_blocksize <> blocksize then
    failwith "open_store: unexpected blocksize";
  let fd = Unix.openfile (dir ^ "/data") [Unix.O_RDWR] 0 in
  let st = Unix.LargeFile.fstat fd in
  let size = Int64.div st.Unix.LargeFile.st_size blocksize64 in
  { directory = dir;
    identity = idstr;
    blocksize = blocksize;
    size = size;
    fd = fd;
    is_open = true
  }


let close_store st =
  if not st.is_open then
    failwith "close_store: store not open";
  Unix.close st.fd;
  st.is_open <- false


let get_identity st = st.identity
let get_blocksize st = st.blocksize
let get_size st = st.size


let will_read_block st block =
  if not st.is_open then
    failwith "will_read_block: store not open";
  if block < 0L || block >= st.size then
    failwith "will_read_block: bad block number";
  let blocksize64 = Int64.of_int st.blocksize in
  if Netsys_posix.have_fadvise() then
    Netsys_posix.fadvise 
      st.fd (Int64.mul block blocksize64) blocksize64 
      Netsys_posix.FADV_WILLNEED
  

let read_block st block blockpos mem mpos len =
  if not st.is_open then
    failwith "read_block: store not open";
  if block < 0L || block >= st.size then
    failwith "read_block: bad block number";
  if len < 0 then
    failwith "read_block: negative length";
  if blockpos < 0 || blockpos > st.blocksize - len then
    failwith "read_block: bad block region";
  let mpos_len = Bigarray.Array1.dim mem in
  if mpos < 0 || mpos > mpos_len - len then
    failwith "read_block: bad memory region";
  let blocksize64 = Int64.of_int st.blocksize in
  ignore(Unix.LargeFile.lseek
	   st.fd 
	   (Int64.add (Int64.mul block blocksize64) (Int64.of_int blockpos))
	   Unix.SEEK_SET);
  let n = Netsys_mem.mem_read st.fd mem mpos len in
  if n <> len then
    failwith "read_block: reached EOF"


let write_block st block mem mpos =
  if not st.is_open then
    failwith "write_block: store not open";
  if block < 0L || block >= st.size then
    failwith "write_block: bad block number";
  let mpos_len = Bigarray.Array1.dim mem in
  if mpos < 0 || mpos > mpos_len - st.blocksize then
    failwith "write_block: bad memory region";
  let blocksize64 = Int64.of_int st.blocksize in
  ignore(Unix.LargeFile.lseek
	   st.fd 
	   (Int64.mul block blocksize64)
	   Unix.SEEK_SET);
  let n = Netsys_mem.mem_write st.fd mem mpos st.blocksize in
  if n <> st.blocksize then
    failwith "write_block: reached EOF"



external fdatasync : Unix.file_descr -> unit = "netsys_fdatasync"
  (* Was not exported in ocamlnet-3.0test2 *)

let sync st =
  if not st.is_open then
    failwith "sync: store not open";
  fdatasync st.fd


This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml