Plasma GitLab Archive
Projects Blog Knowledge

Module Plasma_client


module Plasma_client: sig .. end
Client access to the Plasma Filesystem


This is a client library providing full access to the Plasma filesystem. It is probably intuitive to understand this interface, but if any question pops up, please consult the page Plasmafs_protocol. It explains all background concepts of the PlasmaFS protocol.

Many of the following functions return so-called engines. These functions have the suffix _e. There is always a "normal", i.e. synchronous variant not returning engines computing the result, but directly the result. The engines make it possible to send queries asynchronously. For more information about engines, see the module Uq_engines of Ocamlnet. It is generally not possible to use the client in a synchronous way when an engine is still running.

Some of the following types are defined in Plasma_rpcapi_aux, especially
type plasma_cluster 
an open Plasma cluster
type plasma_trans 
plasma transaction
type inode = int64 
inodes are int64 numbers
type errno = [ `eaccess
| `ebadpath
| `econflict
| `ecoord
| `eexist
| `efailed
| `efailedcommit
| `efbig
| `efhier
| `einval
| `eio
| `eisdir
| `elongtrans
| `eloop
| `enametoolong
| `enoent
| `enonode
| `enospc
| `enotdir
| `enotempty
| `enotrans
| `eperm
| `erofs
| `estale
| `etbusy ]
see errno_code for documentation
type topology = [ `Chain | `Star ] 
see copy_in
type copy_in_flags = [ `Late_datasync | `No_datasync ] 
see copy_in
type copy_out_flags = [ `No_truncate ] 
see copy_out
exception Plasma_error of errno
Error reported by the server
exception Cluster_down of string
No access to the cluster possible

Open cluster


val open_cluster : string ->
(string * int) list -> Unixqueue.event_system -> plasma_cluster
open_cluster name namenodes: Opens the cluster with these namenodes (given as (hostname,port) pairs). The client automatically determines which is the coordinator.
val open_cluster_cc : Plasma_client_config.client_config ->
Unixqueue.event_system -> plasma_cluster
Same, but takes a Plasma_client_config.client_config object which can in turn be obtained via Plasma_client_config.get_config.
val event_system : plasma_cluster -> Unixqueue.event_system
Returns the event system
val sync : ('a -> 'b Uq_engines.engine) -> 'a -> 'b
Waits until the event system is done and returns the result of the engine
val dump_buffers : plasma_cluster -> unit
debug report
val close_cluster : plasma_cluster -> unit
Closes all file descriptors permanently
val abort_cluster : plasma_cluster -> unit
Closes the descriptors to remote services so far possible, but does not permanently shut down the client functionality. The descriptors are automatically opened again when needed. The effect is not only that resources are given back temporarily, but also that the pending transactions are aborted.
val cluster_name : plasma_cluster -> string
Returns the cluster name
val cluster_namenodes : plasma_cluster -> (string * int) list
Returns the namenodes passed to open_cluster
val configure_buffer : plasma_cluster -> int -> unit
configure_buffer c n: configures to use n buffers. Each buffer is one block. These buffers are only used for buffered I/O, i.e. for Plasma_client.read and Plasma_client.write, but not for Plasma_client.copy_in and Plasma_client.copy_out.
val configure_pref_nodes : plasma_cluster -> string list -> unit
Configures that the data nodes with the given identities are preferred for the allocation of new blocks. This config is active until changed again. Useful for configuring local identities (see local_identities below), i.e. for enforcing that blocks are allocated on the same machine, so far possible.
val configure_shm_manager : plasma_cluster -> Plasma_shm.shm_manager -> unit
Configures a shared memory manager. This is an optional feature. The manager must be configured before the cluster is used.
val shm_manager : plasma_cluster -> Plasma_shm.shm_manager
Returns the current manager
val blocksize_e : plasma_cluster -> int Uq_engines.engine
val blocksize : plasma_cluster -> int
Returns the blocksize
val fsstat_e : plasma_cluster -> Plasma_rpcapi_aux.fsstat Uq_engines.engine
val fsstat : plasma_cluster -> Plasma_rpcapi_aux.fsstat
Return statistics
val local_identities_e : plasma_cluster -> string list Uq_engines.engine
val local_identities : plasma_cluster -> string list
Return the identities of the data nodes running on this machine (for configure_pref_nodes)

Transactions



All functions requiring a plasma_trans value as argument must be run inside a transaction. This means one has to first call start to open the transaction, call then the functions covered by the transaction, and then either commit or abort.

It is allowed to open several transactions simultaneously.

If you use the engine-based interface, it is important to ensure that the next function in a transaction can first be called when the current function has responded the result. This restriction is only valid in the same transaction - other transactions are totally independent in this respect.

val start_e : plasma_cluster -> plasma_trans Uq_engines.engine
val start : plasma_cluster -> plasma_trans
Starts a transaction
val commit_e : plasma_trans -> unit Uq_engines.engine
val commit : plasma_trans -> unit
Commits a transaction, and makes the changes of the transaction permanent.
val abort_e : plasma_trans -> unit Uq_engines.engine
val abort : plasma_trans -> unit
Aborts a transaction, and abandons the changes of the transaction
val cluster : plasma_trans -> plasma_cluster
the cluster to which a transaction belongs

File creation/access over the inode interface


val create_inode_e : plasma_trans ->
Plasma_rpcapi_aux.inodeinfo -> inode Uq_engines.engine
val create_inode : plasma_trans ->
Plasma_rpcapi_aux.inodeinfo -> inode
Create a new inode. The inode does initially not have a name.

At the end of the transaction inodes are automatically deleted that do not have a name. Use link_e to assign names (below).

See also Plasma_client.create_file below, which immediately links the inode to a name. See also Plasma_client.regular_ii, Plasma_client.dir_ii, and Plasma_client.symlink_ii for how to create inodeinfo values.

val delete_inode_e : plasma_trans -> inode -> unit Uq_engines.engine
val delete_inode : plasma_trans -> inode -> unit
Delete the inode
val get_inodeinfo_e : plasma_trans ->
inode -> Plasma_rpcapi_aux.inodeinfo Uq_engines.engine
val get_inodeinfo : plasma_trans ->
inode -> Plasma_rpcapi_aux.inodeinfo
Get info about inode. This returns the inodeinfo record from the perspective of this transaction.
val get_cached_inodeinfo_e : plasma_cluster ->
inode -> bool -> Plasma_rpcapi_aux.inodeinfo Uq_engines.engine
val get_cached_inodeinfo : plasma_cluster ->
inode -> bool -> Plasma_rpcapi_aux.inodeinfo
Get info about inode. This function returns the inodeinfo record from the cache. The cache can only contain committed versions of the inodeinfo record, and it is tried that only recent versions are in the cache. If the cache does not contain the data, or if the data is out of date, a new transaction is started to get the newest committed version.

The bool argument can be set to true to enforce that the newest version is retrieved. However, there is no guarantee that the returned version is still the newest one when this function returns.

Note that get_inodeinfo also implicitly refreshes the cache when the transaction is (still) only used for read accesses.

The returned inodeinfo does not include modifications caused by block writes that were not yet flushed to disk.

val set_inodeinfo_e : plasma_trans ->
inode -> Plasma_rpcapi_aux.inodeinfo -> unit Uq_engines.engine
val set_inodeinfo : plasma_trans ->
inode -> Plasma_rpcapi_aux.inodeinfo -> unit
set info about inode. Note that setting EOF does neither increase nor reduce the number of allocated blocks.
val truncate_e : plasma_trans ->
inode -> int64 -> unit Uq_engines.engine
val truncate : plasma_trans -> inode -> int64 -> unit
Sets EOF value, and all blocks beyond EOF are deallocated.

Fast sequential data access



The function copy_in writes a local file to the cluster. copy_out reads a file from the cluster and copies it into a local file.

Especially copy_in works only in units of whole blocks. The function never reads a block from the filesystem, modifies it, and writes it back. Instead, it writes the block with the data it has, and if there is still space to fill, it pads the block with zero bytes. If you need support for updating parts of a block only, better use the buffered access below.

val copy_in_e : ?flags:copy_in_flags list ->
plasma_cluster ->
inode ->
int64 ->
Unix.file_descr -> int64 -> topology -> int64 Uq_engines.engine
val copy_in : ?flags:copy_in_flags list ->
plasma_cluster ->
inode ->
int64 -> Unix.file_descr -> int64 -> topology -> int64
copy_in_e c inode pos fd len: Copies the data from the file descriptor fd to the file given by inode. The data is taken from the current position of the descriptor. Up to len bytes are copied. The data is written to position pos of the file referenced by the inode. If it is written past the EOF position of the destination file, the EOF position is advanced. The function returns the number of copied bytes.

For seekable descriptors, len specifies the exact number of bytes to copy. If the input file is shorter, null bytes are appended to the file until len is reached.

For non-seekable descriptors, an additional buffer needs to be allocated. Also, len is ignored for non-seekable descriptors - data is always copied until EOF is seen. (However, in the future this might be changed. It is better to pass Int64.max_int as len if unlimited copying is required.)

topology says how to transfer data from the client to the data nodes. `Star means the client organizes the writes to the data nodes as independent streams. `Chain means that the data is first written to one of the data nodes, and the replicas are transferred from there to the next data node.

flags:

  • `No_datasync: Data blocks are not synchronized to disk
  • `Late_datasync: Only the last block is synchronized to disk. This also includes are preceding blocks. If an error occurs, though, nothing is guaranteed.
The default is to write synchronously: At the end of each transaction copy_in commits, all blocks are guaranteed to be on disk.

Limitation: pos must be a multiple of the blocksize. The file is written in units of the blocksize (i.e. blocks are never partially updated).

copy_in performs its operations always in separate transactions.

val copy_in_from_buf_e : ?flags:copy_in_flags list ->
plasma_cluster ->
inode ->
int64 ->
Netsys_mem.memory -> int -> topology -> int Uq_engines.engine
val copy_in_from_buf : ?flags:copy_in_flags list ->
plasma_cluster ->
inode ->
int64 -> Netsys_mem.memory -> int -> topology -> int
copy_in_from_buf c inode pos buf len: Copies the data from buf to the file denoted by inode. The data is taken from the beginning of buf, and the length is given by len. The data is written to position pos of inode.

copy_in_from_buf works much in the same way as copy_in, only that the data is taken from a buffer and not from a file descriptor.

val copy_out_e : ?flags:copy_out_flags list ->
plasma_cluster ->
inode ->
int64 -> Unix.file_descr -> int64 -> int64 Uq_engines.engine
val copy_out : ?flags:copy_out_flags list ->
plasma_cluster ->
inode -> int64 -> Unix.file_descr -> int64 -> int64
copy_out_e c inode pos fd len Copies the data from the file referenced by inode to file descriptor fd. The data is taken from position pos to pos+len-1 of the file, and it is written to the current position of fd. The number of copied bytes is returned.

Seekable output files may only be extended, but are never truncated.

For non-seekable descriptors, an additional buffer needs to be allocated.

If there are holes in the input file, the corresponding byte region is filled with zero bytes in the output. If it is tried to read past EOF, this is not prevented, but handled as if the region past EOF was a file hole.

Limitation: pos must be a multiple of the blocksize.

copy_out performs its operations always in separate transactions.

Flags:

  • `No_truncate: The descriptor fd is not truncated to the real file size

val copy_out_to_buf_e : ?flags:copy_out_flags list ->
plasma_cluster ->
inode ->
int64 -> Netsys_mem.memory -> int -> int Uq_engines.engine
val copy_out_to_buf : ?flags:copy_out_flags list ->
plasma_cluster ->
inode -> int64 -> Netsys_mem.memory -> int -> int
copy_out_to_buf_e c inode pos buf len Copies the data from the file denoted by inode to the buffer buf. The data is taken from position pos to pos+len-1 of the file, and it is written to the beginning of buf.

Buffered data access



For getting well-performing buffered access, you should configure the size of the buffer via Plasma_client.configure_buffer.
type strmem = [ `Memory of Netsys_mem.memory | `String of string ] 
The buffer for read and write can be given as string or as bigarray (memory). The latter is advantageous, because there are some optimizations that are only applicable to bigarrays.
val read_e : plasma_cluster ->
inode ->
int64 ->
strmem ->
int -> int -> (int * bool * Plasma_rpcapi_aux.inodeinfo) Uq_engines.engine
val read : plasma_cluster ->
inode ->
int64 ->
strmem ->
int -> int -> int * bool * Plasma_rpcapi_aux.inodeinfo
read_e c inode pos s spos len: Reads data from inode, and returns (n,eof,ii) where n is the number of read bytes, and eof the indicator that EOF was reached. This number n may be less than len only if EOF is reached. ii is the current inodeinfo.

Before a read is responded from a clean buffer it is checked whether the buffer is still up to date.

The read function uses cached metadata, especially the EOF position of the file is cached. Before starting a sequence of reads it is recommended to refresh the cache. This can be done with get_cached_inodeinfo c inode true, i.e. the refresh flag must be set in this call.

val write_e : plasma_cluster ->
inode ->
int64 -> strmem -> int -> int -> int Uq_engines.engine
val write : plasma_cluster ->
inode -> int64 -> strmem -> int -> int -> int
write_e c inode pos s spos len: Writes data to inode and returns the number of written bytes. This number n may be less than len for arbitrary reasons (unlike read - to be fixed).

A write that is not aligned to a block implies that the old version of the block is read first (if not available in a buffer). This is a big performance penalty, and best avoided.

It is not ensured that the write is completed when the return value becomes available. The write is actually done in the background, and can be explicitly triggered with the flush_e operation. Also, note that the write happens in a separate transaction. (With "background" we do not mean a separate kernel thread, but an execution thread modeled with engines.)

Writing also triggers that the EOF position is at least set to the position after the last written position. However, this is first done when the blocks are flushed in the background. (Use get_write_eof to get this value immediately, before flushing.)

As writing happens in the background, some special attention has to be paid for the way errors are reported. At the first error the write thread stops, and an error code is set. This code is reported at the next write or flush. After being reported, the code is cleared again. Writing is not automatically resumed - only further write and flush invocations will restart the writing thread. Also, the data buffers are kept intact after errors - so everything will be again tried to be written (which may run into the same error). The function drop_inode can be invoked to drop all dirty buffers of the inode in the near future.

val get_write_eof : plasma_cluster -> inode -> int64
Returns the designated new EOF value of pending writes. Raises Not_found if nothing is known.
val get_write_mtime : plasma_cluster -> inode -> Plasma_rpcapi_aux.time
Returns the designated new mtime value of pending writes. Raises Not_found if nothing is known.
val flush_e : plasma_cluster ->
inode -> int64 -> int64 -> unit Uq_engines.engine
val flush : plasma_cluster -> inode -> int64 -> int64 -> unit
flush_e inode pos len: Flushes all buffered data of inode from pos to pos+len-1, or to the end of the file if len=0. This ensures that data is really written.
val drop_inode : plasma_cluster -> inode -> unit
Drops all dirty buffers of this inode. This will prevent that they are again tried to be written, and it will free up buffer space.
val flush_all_e : plasma_cluster -> unit Uq_engines.engine
val flush_all : plasma_cluster -> unit
Flushes all buffers. (No error reporting, though.)

Filename interface


val lookup_e : plasma_trans ->
string -> bool -> inode Uq_engines.engine
val lookup : plasma_trans -> string -> bool -> inode
Looks the absolute filename up and returns the inode number

The bool says whether to keep the last component of the path as symbolic link (lstat semantics).

val dir_lookup_e : plasma_trans ->
inode ->
string -> bool -> inode Uq_engines.engine
val dir_lookup : plasma_trans ->
inode -> string -> bool -> inode
Looks the filename up relative to a directory (given as inode) and returns the inode number. The filename can also be given as relative path.

If the filename is absolute the inode number is ignored.

The bool says whether to keep the last component of the path as symbolic link (lstat semantics).

dir_lookup trans inode "" _ is legal and just returns inode.

val rev_lookup_e : plasma_trans ->
inode -> string list Uq_engines.engine
val rev_lookup : plasma_trans -> inode -> string list
Returns the filenames linked with this inode number.
val rev_lookup_dir_e : plasma_trans -> inode -> string Uq_engines.engine
val rev_lookup_dir : plasma_trans -> inode -> string
Returns the abs filename linked with this inode number which must be a directory. The filename is read-locked (i.e. cannot be renamed or deleted by a competing transaction).

It is possible to get an `econflict error when the lock requirement cannot be satisfied.

val namelock_e : plasma_trans ->
inode -> string -> unit Uq_engines.engine
val namelock : plasma_trans -> inode -> string -> unit
namelock trans dir name: Acquires an existence lock on the member name of directory dir. name must not contain slashes.

A namelock prevents that the entry name of the directory dir can be moved or deleted. This protection lasts until the end of the transaction. If a concurrent transaction tries to move or delete the file, it will get an `econflict error.

It is not allowed to lock a not yet existing entry.

It is not prevented that the directory dir is moved, and thus it is possible that the absolute path of the protected file changes.

val link_count_e : plasma_trans -> inode -> int Uq_engines.engine
val link_count : plasma_trans -> inode -> int
Returns the number of links (also no locks)
val link_e : plasma_trans ->
string -> inode -> unit Uq_engines.engine
val link : plasma_trans -> string -> inode -> unit
Links a name with an inode

For directories there is the restriction that at most one name may be linked with the inode.

val link_at_e : plasma_trans ->
inode ->
string -> inode -> unit Uq_engines.engine
val link_at : plasma_trans ->
inode -> string -> inode -> unit
link_at trans dir_inode name inode: Adds the entry name into the directory dir_inode and connects the entry with inode. name must not contain slashes.
val unlink_e : plasma_trans -> string -> unit Uq_engines.engine
val unlink : plasma_trans -> string -> unit
Unlinks the name. If the count of links drops to 0 this also removes the inode.

This also works for directories! (They must be empty, of course.)

val unlink_at_e : plasma_trans ->
inode -> string -> unit Uq_engines.engine
val unlink_at : plasma_trans -> inode -> string -> unit
unlink_at trans dir_inode name: Removes the entry name from the directory dir_inode. name must not contain slashes.
val rename_e : plasma_trans -> string -> string -> unit Uq_engines.engine
val rename : plasma_trans -> string -> string -> unit
rename trans old_path new_path: Renames/moves the file or directory identified by old_path to the location identified by new_path. There must not be a file at new_path (i.e. you cannot move into a directory).
val rename_at_e : plasma_trans ->
inode ->
string -> inode -> string -> unit Uq_engines.engine
val rename_at : plasma_trans ->
inode -> string -> inode -> string -> unit
rename_at trans old_dir_inode old_name new_dir_inode new_name: Moves the file old_name in old_dir_inode to the new location which is given by new_name in new_dir_inode. Neither old_name nor new_name must contain slashes.
val list_inode_e : plasma_trans ->
inode -> (string * inode) list Uq_engines.engine
val list_inode : plasma_trans ->
inode -> (string * inode) list
Lists the contents of the directory, given by inode
val list_e : plasma_trans ->
string -> (string * inode) list Uq_engines.engine
val list : plasma_trans -> string -> (string * inode) list
Lists the contents of the directory, given by filename
val create_file_e : plasma_trans ->
string ->
Plasma_rpcapi_aux.inodeinfo -> inode Uq_engines.engine
val create_file : plasma_trans ->
string -> Plasma_rpcapi_aux.inodeinfo -> inode
Creates a regular file (inode plus name) or a symlink. The file type must be `ftype_regular or `ftype_symlink.
val mkdir_e : plasma_trans ->
string ->
Plasma_rpcapi_aux.inodeinfo -> inode Uq_engines.engine
val mkdir : plasma_trans ->
string -> Plasma_rpcapi_aux.inodeinfo -> inode
Creates a directory
val regular_ii : plasma_cluster -> int -> Plasma_rpcapi_aux.inodeinfo
regular_ii c mode: Creates an inodeinfo record for a new empty regular file, where the mode field is set to mode modulo the current mask
val symlink_ii : plasma_cluster -> string -> Plasma_rpcapi_aux.inodeinfo
regular_ii c target: Creates an inodeinfo record for a symlink pointing to target
val dir_ii : plasma_cluster -> int -> Plasma_rpcapi_aux.inodeinfo
regular_ii c mode: Creates an inodeinfo record for a new directory, where the mode field is set to mode modulo the current mask

Low-level functions


val get_blocklist_e : plasma_trans ->
inode ->
int64 -> int64 -> bool -> Plasma_rpcapi_aux.blockinfo list Uq_engines.engine
val get_blocklist : plasma_trans ->
inode ->
int64 -> int64 -> bool -> Plasma_rpcapi_aux.blockinfo list
get_blocklist_e t inode block n keep_flag Returns the list of blocks for blocks block to blocks+n-1. This is useful for analyzing where the blocks are actually physically stored.

If keep_flag the blocks are protected for the duration of the transaction.


Utilities


val with_trans_e : plasma_cluster ->
(plasma_trans -> 'a Uq_engines.engine) -> 'a Uq_engines.engine
val with_trans : plasma_cluster -> (plasma_trans -> 'a) -> 'a
with_trans c f: Starts a new transaction t and runs f t. The transaction is committed if f returns normally, and aborted if f raises an exception.
val retry_e : plasma_cluster ->
string -> ('a -> 'b Uq_engines.engine) -> 'a -> 'b Uq_engines.engine
val retry : plasma_cluster -> string -> ('a -> 'b) -> 'a -> 'b
retry c name f arg: Executes f arg and returns the result. If an ECONFLICT error or timeout occurs the execution is repeated, until a general timeout is reached.

Errors are logged (Netlog). name is used in log output.

It is common to combine retry and with_trans, e.g.

         retry c "create_file"
           (fun filename ->
              with_trans c
                (fun trans ->
                   create_file trans filename (regular_ii c 0o666)
                )
           )
      

implementing the general convention that retry means to retry whole transactions.

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml