Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2010 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Foobar.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: nn_db.mli 271 2010-10-20 00:09:51Z gerd $ *)

(** Namenode access routines to database *)

class type ro_transaction =
object
  method connection : Pfs_db.ro_async_connection
    (** The connection. This method fails if the transaction is already done *)
  method event_system : Unixqueue.event_system
    (** The event system this transaction uses *)
  method return : unit -> unit
    (** return the transaction to the pool *)
end


class type transaction =
object
  method connection : Pfs_db.rw_async_connection
    (** The connection. This method fails if the transaction is already done *)
  method ro_connection : Pfs_db.ro_async_connection
    (** The same connection coerced to read-only *)
  method event_system : Unixqueue.event_system
    (** The event system this transaction uses *)
  method prepare : unit -> unit Uq_engines.engine
    (** Prepares the transaction for commit *)
  method commit : unit -> unit Uq_engines.engine
    (** Commits the transaction (prepared or not), and returns *)
  method rollback : unit -> unit Uq_engines.engine
    (** Rolls the transaction back, and returns *)
  method return : unit -> unit
    (** return the transaction to the pool or closes it *)
end


type inodeinfo = Pfs_rpcapi_aux.inodeinfo

val init : Pfs_db.db_config -> Unixqueue.event_system -> unit
  (** The esys is used for ro transactions *)

val ro_esys : unit -> Unixqueue.event_system
  (** Returns the esys for ro transactions *)

val commit_all_prepared_transactions : unit -> unit
  (** Commits all prepared transactions at system startup *)

val transact : Unixqueue.event_system -> transaction
  (** Finds an unused connection, and start a transaction *)

val ro_transact : unit -> ro_transaction Uq_engines.engine
  (** Same for read-only transactions. Note that this is an engine: If the
      number of connections has reached the limit, it is waited until an
      existing connection becomes unused.
   *)

val with_ro_trans : (Pfs_db.ro_async_connection -> 
                     Unixqueue.event_system -> 'a Uq_engines.engine) ->
                        'a Uq_engines.engine
  (** [with_ro_trans f]: Runs [e = f conn esys] for a shared read-only
      transaction and returns [e]
   *)

val datastore_list_e : Pfs_db.ro_async_connection -> 
                       Unixqueue.event_system ->
                       Nn_datastores.datastore list Uq_engines.engine
  (** Gets a list of all datastores *)


val datastore_list_ro_e : unit -> 
                            Nn_datastores.datastore list Uq_engines.engine
  (** same, using an ro transaction *)			      

val datastore_find_e : identity:string ->
                       Pfs_db.ro_async_connection -> 
                       Unixqueue.event_system ->
                       Nn_datastores.datastore option Uq_engines.engine
  (** Find a datastore by identity *)

val datastore_find_ro_e : identity:string ->
                            Nn_datastores.datastore option Uq_engines.engine
  (** same, using an ro transaction *)			      

val datastore_upd_e : id:int ->
                      identity:string ->
                      size:int64 ->
                      enabled:bool ->
                      Pfs_db.rw_async_connection -> 
                      Unixqueue.event_system ->
                      unit Uq_engines.engine
  (** Updates the datastore table. If the record is new, it is added.

      The blockalloc table is updated, too: For new stores, the
      rows are added. If the size of the existing store is increased,
      further rows are added.

      It is an error to decrease the size.
   *)

val datastore_del_e : id:int ->
                      Pfs_db.rw_async_connection -> 
                      Unixqueue.event_system ->
                      unit Uq_engines.engine
  (** Deletes the datastore with this ID and all rows referencing it *)

val revision_get_e : Pfs_db.ro_async_connection -> 
                     Unixqueue.event_system ->
                      string Uq_engines.engine
  (** Get the revision string *)

val revision_get_ro_e : unit -> string Uq_engines.engine
  (** same, using an ro transaction *)			      

val revision_upd_e : revstr:string ->
                      Pfs_db.rw_async_connection -> 
                      Unixqueue.event_system ->
                      unit Uq_engines.engine
  (** Updates the revision string *)

val blockalloc_list_e : datastore:int ->
                        Pfs_db.ro_async_connection -> 
                        Unixqueue.event_system ->
                        (int64 * string) list Uq_engines.engine
  (** Reads the blockalloc table for this [datastore] ID. Returns the table
      as pairs [(blkidx,blkmap)]
   *)

val blockalloc_list_ro_e : datastore:int ->
                              (int64 * string) list Uq_engines.engine
  (** same, using an ro transaction *)			      

val blockalloc_upd_e : datastore:int ->
                       blkidx:int64 ->
                       blkmap:string ->
                       Pfs_db.rw_async_connection -> 
                       Unixqueue.event_system ->
                       unit Uq_engines.engine
  (** Updates the [blockalloc] table *)

val inode_listall_e : Pfs_db.ro_async_connection -> 
                      Unixqueue.event_system ->
                          int64 list Uq_engines.engine
  (** list all inode numbers *)

val inode_get_e : id:int64 ->
                  Pfs_db.ro_async_connection -> 
                  Unixqueue.event_system ->
                    inodeinfo option Uq_engines.engine
  (** Loads the inode data if existing *)

val inode_get_ro_e : id:int64 ->
                        inodeinfo option Uq_engines.engine
  (** same, using an ro transaction *)			      

val inode_maxid_e : Pfs_db.ro_async_connection -> 
                    Unixqueue.event_system ->
                      int64 Uq_engines.engine
  (** Returns the max ID *)

val inode_maxid_ro_e : unit -> int64 Uq_engines.engine
  (** same, using an ro transaction *)			      

val inode_ins_e : id:int64 ->
                  inodeinfo ->
                  Pfs_db.rw_async_connection -> 
                  Unixqueue.event_system ->
                    unit Uq_engines.engine
  (** Inserts a new row into [inode] *)

val inode_upd_e : id:int64 ->
                  inodeinfo ->
                  Pfs_db.rw_async_connection -> 
                  Unixqueue.event_system ->
                    unit Uq_engines.engine
  (** Updates a row in [inode]. [filetype] cannot be updated, and changes are
      silently ignored.
   *)

val inode_upd_time_e : id:int64 ->
                       mtime:Pfs_rpcapi_aux.time option ->
                       ctime:Pfs_rpcapi_aux.time option ->
                       Pfs_db.rw_async_connection -> 
                       Unixqueue.event_system ->
                         unit Uq_engines.engine
  (** Sets the mtime and/or ctime. It is not an error if the inodes does
      not exist anymore.
   *)
  

val inode_del_e : id:int64 ->
                  Pfs_db.rw_async_connection -> 
                  Unixqueue.event_system ->
                    unit Uq_engines.engine
  (** Deletes a row from [inode] *)

val inodeblocks_get_e : inode:int64 ->
                        blkidx:int64 ->
                        len:int64 ->
                        Pfs_db.ro_async_connection -> 
                        Unixqueue.event_system ->
                          Pfs_rpcapi_aux.blockinfo list Uq_engines.engine
  (** Gets the block information for [inode] in the range from index
      [blkidx] to [blkidx+len-1].
   *)

val inodeblocks_get_ro_e : inode:int64 ->
                           blkidx:int64 ->
                           len:int64 ->
                             Pfs_rpcapi_aux.blockinfo list Uq_engines.engine
  (** same, using an ro transaction *)			      

val inodeblocks_getall_e : inode:int64 ->
                           dsid:int ->
                           Pfs_db.ro_async_connection -> 
                           Unixqueue.event_system ->
                             (int64 * int64) list Uq_engines.engine
  (** Returns list of [(blkidx,block)] for this [inode] and this datastore
      [dsid] (for fsck)
   *)

val inodeblocks_del_e : inode:int64 -> 
                        blkidx:int64 ->
                        len:int64 ->
                        Pfs_db.rw_async_connection -> 
                        Unixqueue.event_system ->
                          unit Uq_engines.engine
  (** Delete the blocks for [inode] in the range from index
      [blkidx] to [blkidx+len-1].
   *)

val inodeblocks_ins_e : inode:int64 -> 
                        Pfs_rpcapi_aux.blockinfo list -> 
                        Pfs_db.rw_async_connection -> 
                        Unixqueue.event_system ->
                          unit Uq_engines.engine
  (** Insert these blocks *)			    

val names_get_e : dir_inode:int64 ->
                  name:string ->
                  Pfs_db.ro_async_connection -> 
                  Unixqueue.event_system ->
                    int64 option Uq_engines.engine
   (** Look up inode by dir_inode and name *)

val names_get_ro_e : 
      dir_inode:int64 -> name:string -> int64 option Uq_engines.engine
  (** same, using an ro transaction *)			      

val names_rev_get_e : inode:int64 ->
                      Pfs_db.ro_async_connection -> 
                      Unixqueue.event_system ->
                        (int64 * string) list Uq_engines.engine
   (** Look up names by inode. The return values are the inode of the
       directory and the member name. (No complete paths.)
    *)

val names_rev_get_ro_e : inode:int64 -> (int64 * string) list Uq_engines.engine
  (** same, using an ro transaction *)			      

val names_get_parent_e : inode:int64 ->
                         Pfs_db.ro_async_connection -> 
                         Unixqueue.event_system ->
                           int64 option Uq_engines.engine
  (** Look up the parent inode of an inode. This works only reliably for
      directories - file inodes can have multiple parents, and in this
      case [None] is returned.
   *)

val names_get_parent_ro_e : inode:int64 -> int64 option Uq_engines.engine
  (** same, using an ro transaction *)

val names_count_e : inode:int64 ->
                    Pfs_db.ro_async_connection -> 
                    Unixqueue.event_system ->
                      int Uq_engines.engine
   (** Count names of an inode *)

val names_count_ro_e : inode:int64 -> int Uq_engines.engine
  (** same, using an ro transaction *)			      

val names_list_e : dir_inode:int64 ->
                   Pfs_db.ro_async_connection -> 
                   Unixqueue.event_system ->
                     (string * int64) list Uq_engines.engine
   (** Lists the contents of the directory identified by [dir_inode].
       The contents are the member names and the inodes of the members.
    *)

val names_list_ro_e : dir_inode:int64 -> (string * int64) list Uq_engines.engine
  (** same, using an ro transaction *)			      

val names_ins_e : dir_inode:int64 -> name:string -> inode:int64 ->
                  Pfs_db.rw_async_connection -> 
                  Unixqueue.event_system ->
                    unit Uq_engines.engine
  (** Insert this name to inode mapping *)			    

val names_del_e : dir_inode:int64 -> name:string -> 
                  Pfs_db.rw_async_connection -> 
                  Unixqueue.event_system ->
                    unit Uq_engines.engine
  (** Delete this name *)



(** {2 Two-phase commit support} *)

type modification =
    [ `Datastore_upd of int * string * int64 * bool
    | `Datastore_del of int
    | `Revision_upd of string
    | `Blockalloc_upd of int * int64 * string
    | `Inode_ins of int64 * inodeinfo
    | `Inode_upd of int64 * inodeinfo
    | `Inode_upd_time of int64 * Pfs_rpcapi_aux.time option * Pfs_rpcapi_aux.time option
    | `Inode_del of int64
    | `Inodeblocks_ins of int64 * Pfs_rpcapi_aux.blockinfo list
    | `Inodeblocks_del of int64 * int64 * int64
    | `Names_ins of int64 * string * int64
    | `Names_del of int64 * string
    ]
  (** Database modification operations that are covered by the 2-phase
      commit protocol. The names of the operations correspond to those
      in {!Nn_db} and {!Nn_slave}.
   *)

val exec_e : modification -> 
             Pfs_db.rw_async_connection -> 
             Unixqueue.event_system ->
               unit Uq_engines.engine
  (** Performs the passed modification *)

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml