PlasmaFS uses SunRPC for all TCP connections. There is nothing that is language-dependent.
There are a number of servers involved:
There is right now nothing that prevents clients from calling internal RPC programs.
Coordination
: Finding the coordinatorThe first step of a client is to find the coordinator. The client may know one or several namenode ports, however, only a certain port is the right one to send the namenode queries to. At cluster startup, the namenodes elect a coordinator. Only the coordinator can actually respond to client queries. However, all namenode servers create the namenode socket, and RPC requests can be sent to them. The non-coordinators will emit errors if they get a query only the coordinator is able to respond.
The program Coordination (reachable via the main namenode port) includes functions to find the coordinator. All namenode servers can respond to these queries:
Filesystem
programThe Filesystem program allows the client to:
An inode is identified by a 64 bit integer (in XDR notification this
is the hyper
type). The inode ID's are only used once - this is important
because it means once you get an inode ID at hand, it is a permanent
identifier for the same object. It is not possible that a parallel
accessing client changes the inode ID so it points to a different file.
Unlike the normal Unix filesystem interface, the PlasmaFS protocol returns the inode ID's to the user, and the user can also access files by this ID. This is even the primary method of doing so.
An inode can exist without filename - but only for the lifetime of a transaction. When the transaction is committed, inodes without filename are automatically deleted. This prevents that PlasmaFS space is permanently consumed without having a visible entry in the directory tree.
There are two kinds of objects connected with an inode:
As an important addition of what is traditionally stored in an inode, PlasmaFS includes a sequence number in inodeinfo. The sequence number is automatically increased when the content data is changed. We will later see that the modification of content data always implies that new data blocks are allocated, so every change of the file contents is reflected as a change in the blocklist, i.e. it is not possible to change content data without notifying the namenode. This allows the namenode to maintain the sequence number. The sequence number can be thought as a version number of the contents, and may e.g. be useful for quick checks whether content has changed compared to some previous point in time.
All metadata and data accesses are done in a transactional way. (There
are a few exceptions, but these are quite dangerous.) This means the
client has to open a transaction (begin_transaction )
first, and has to finish the transaction after doing the operations
(either via commit_transaction or
abort_transaction). A transaction sees immediately what
other transactions have committed, i.e. we generally have a "read
committed" isolation level. (An important exception is explained
below.) (Note: While I write this I realize that we already have
a "repeatable read" isolation level for inodeinfo
. It is likely that
this issue needs some cleanup.)
The client may open several transactions simultaneously on the same TCP connection. A transaction ID is used to identify the transactions.
When the TCP connection terminates, all pending transactions are implicitly aborted. This protects server resources.
From the client's perspective the transactions look very much like SQL transactions. There are some subtle differences, though:
ECONFLICT
. It does not wait until the locks are free
again. The rationale is that the namenode is a scarce resource,
and we don't want to waste it for organizing waiting. In the
future we might add better lock management, but this will be
most likely be done in a separate server.
Clients make best use of transactions when they only use them
for short sequences of operations. The background of this is not
only limited memory, but also the danger of getting ECONFLICT
.
The best way the clients handle this is to abort the current
transaction, and to restart it from the beginning.
Note that the PlasmaFS transactions do not translate directly to PostgreSQL transactions. Actually, PlasmaFS keeps a number of PostgreSQL transactions continuously open, but these are only for reading data. During a commit, another PostgreSQL transaction is used for writing data. On PostgreSQL level, no conflicting updates can occur anymore.
PlasmaFS stores directories not as a special kind of file, but in the namenode database. This has the advantage of not having to deal with a special file format, and also the advantage of saving disk space for clusters with large block sizes.
There is a special PostgreSQL table storing the directory tree. In addition to this, each directory also has an inode. The inode is used for storing access rights. It is not possible to allocate data blocks for a directory inode.
It is possible that an inode has several file names. This works just like the hard links in Unix.
Unlike Unix, a directory does not have special entries for "." and "..". If the client wishes to handle these names, it has to implement its own lookup mechanism (i.e. it has to walk through directories). This feature may be added in the server at some time, though.
There are symbolic links, but these are only incompletely implemented. There is no symlink resolution in the PlasmaFS server yet. Other than that, symlinks work already. (The NFS bridge supports symlink resolution.)
This transaction creates a new file by allocating a new inode, and then linking this inode to a filename:
tid = 1; /* transaction ID chosen by client */
begin_transaction(tid);
inode = allocate_inode(tid, ii); /* ii: an inodeinfo struct */
link(tid, "/dir/filename", inode);
commit_transaction(tid);
There is no rename operation. However, one can easily get exactly the same effect by:
tid = 1; /* transaction ID chosen by client */
begin_transaction(tid);
inode = lookup(tid, "/dir/oldname", -1);
unlink(tid, "/dir/oldname");
link(tid, "/dir/newname", inode);
commit_transaction(tid);
There is a subtle point, though. The lookup
operation acquires a
lock for the argument filename, so no competing transaction can unlink
it or replace it, so we can be sure the looked up inode really
corresponds to the filename.
As already mentioned, the client needs to present tickets to the datanode server in order to get the permission to read or write file blocks. The ticket system is controlled by the namenode. We have to keep that in mind for the following that the namenode can exactly control which blocks are accessible for the client.
Block allocation happens when a client extends a file, or when it modifies a file. The latter is crucial, because we cannot allow that clients directly modify a file. Instead, clients have to allocate replacement blocks for the file region they want to modify, and have to write the replacement blocks in units of whole blocks to the datanode servers. For instance, if the client wants to modify a single byte in a file, it has to read the whole block, change the byte locally, allocate a replacement block, and write the modified block to the position for the replacement.
This looks complicated, but is necessary. Why? If we allowed direct
modification of the block, the data change would be immediately visible
to all other transactions, even before commit
. Well, one could say,
then let's that happen, and do not make such transactional guarantees.
This is a bit short-sighted, however, because we do not implement
transactions to establish a nice logical view to data, but also to
make modifications safe.
The problem here is that we do not write the data block to only one datanode, but that it is the task of the client to write the same block to all datanodes that store replicas of the block. We have to ensure that these replicas are all identical. If we allowed direct modifications of existing blocks, we cannot be sure whether these modifications are done on all datanodes in the same way: The client could simply crash in the middle of writing the replicas. In the transactional scheme, however, a crash is no longer a problem. The client would not reach the point where the changes are committed. When the transaction is aborted, the namenode simply gives the (incompletely written) replacement blocks up, and ensures again a consistent view.
Note that we fully trust the client to write the same data to all replicas. Even worse, we fully trust the client to write the data blocks at all: after allocating blocks, the client may choose not to write data to the datanode, in which case the allocated blocks simply keep their old content (which may be data from other files). Once PlasmaFS gets access control (which is not yet implemented), this becomes an issue, because data can leak from files a user must not have access to. This will be fixed when we introduce access control to PlasmaFS.
Here, we allocate 10 blocks at block position 0 of the file. If there
are already blocks, the old blocks are implicitly deallocated, i.e.
we get the effect of a replacement. The returned
blockinfo structs b
have everything we need to write
the data blocks to the datanodes. (See below.)
tid = 1;
begin_transaction(tid);
inode = lookup(tid, "/filename", -1);
bl = allocate_blocks(tid, inode, 0, 10, true, []);
for (b in bl) {
<write_data_to_datanode>(b);
};
commit_transaction(tid);
It looks trivial to read the data blocks: Just get the array of blockinfo structs that say where the data blocks are stored, and contact the datanodes for getting the data.
There is one problem, though. A read operation may conflict with the deallocation of blocks, i.e. we must prevent that a client reads blocks that are deallocated at the same time. For example, we could have two transactions doing this:
<transaction 1> <transaction2>
begin_transaction(tid1);
bl = get_blocks(tid1, inode, pos, len);
<read_data_from_datanode>(bl[0]);
<read_data_from_datanode>(bl[1]); begin_transaction(tid2);
<read_data_from_datanode>(bl[2]); free_blocks(tid2, inode, pos, len);
... commit_transaction(tid2);
<read_data_from_datanode>(bl[k]);
...
<read_data_from_datanode>(bl[n-1]);
commit_transaction(tid1);
The second transaction deallocates the blocks of the inode while the first transaction still reads them, and even commits this modification. With the usual "read committed" isolation level, we would be faced by the problem that the blocks are no longer associated to the inode, and can even be reused by other transactions that allocate blocks.
PlasmaFS solves this by enforcing the stricter isolation level of "repeatable read" in this case. Once blocks are handed out to the client, they cannot be immediately deallocated. Instead, the blocks enter a special transitional state between "used" and "free". They are no longer in the blocklist of the inode, but they cannot be reclaimed immediately for other files. The blocks leave this special state when the last transaction finishes accessing these blocks.
As you see, the client extends files block by block. Well, not every file has a length that is a multiple of the block size. How is that solved?
There is an EOF position in the inodeinfo struct. This is simply a 64 bit number. The convention is now that clients update and respect this EOF position, i.e. when more data is appended to the file, the EOF position is moved to the position where the data ends logically, and readers discard the part of the last block that is beyond EOF.
Note that this is only a convention - it is not enforced. This means we can have files whose EOF position is unrelated to where the last block is (i.e. a position before or after this block). In such cases, clients should give EOF precedence, and treat non-existing blocks as null blocks.
There is also some help for finding out what the last block is.
In inodeinfo the field blocklimit
is the "block
EOF", i.e. the file has only blocks with index 0 to
blocklimit-1
. This field is automatically maintained with
every block allocation or deallocation.
Datanode
program
The Datanode program provides the operations for
reading and writing blocks. The program supports several transport
methods - right now an inline method (called DNCH_RPC
) and a method
where the block data are exchanged over POSIX shared memory (called
DNCH_SHM
). See the next section for how to use shared memory -
let us first focus on DNCH_RPC
.
The port the Datanode
program listens on is returned in the field
node
of the blockinfo struct, so there is no additional
communication overhead for finding this out.
For writing data, one has to call write as in
write(block, DNCH_RPC(data), ticket_id, ticket_verifier)
Here, block
is the block number, and the ticket numbers come
from the blockinfo struct (as returned by
allocate_blocks). With the notation DNCH_RPC(data)
it is meant that the DNCH_RPC
branch of the union is selected,
and data
is the string field put into this branch. For writes,
this string must be exactly as long as one block.
For reading data, one has to call read as in
r = read(block, DNCH_RPC, pos, len, ticket_id, ticket_verifier)
Again, block
is the block number. Because read
supports partial
block reads, one can also pass a position and length within the
block (pos
and len
, resp.). The ticket information comes from
get_blocks. (Right now it is not evaluated - read
access is always granted.)
The read
operation returns a value like DNCH_RPC(data)
.
The Datanode
program can handle multiple requests simultaneously.
This means clients can send several requests in one go, without
having to wait for responses. Clients should avoid to create
several TCP connections to the same datanode to save resources.
The Datanode
program tries to be as responsive to clients as
possible. This especially means all requests sent to it are
immediately interpreted, although this usually means they are only
buffered up until the I/O operation can actually be done. Right now,
there is even no upper limit for this buffering. Clients should try
not to send too many requests to Datanode
at once, i.e. before the
responses arrive. A good scheme for a client is to limit the number of
outstanding requests, e.g. to 10. It is likely that the protocol
will be refined at some point, and a real limit will be set. Clients
can then query this limit.
Datanode
with shared memory
In the interfaces of read and write it
is easy to request shared memory transport. Just use DNCH_SHM
instead of DNCH_RPC
, and put a dn_channel_shm_obj
struct into it. This struct has fields for naming the shared memory
file, and for selecting the part of this file that is used for
data exchange. The read
call will then put the read data there,
and write
will expect there the block to write. Of course, the
datanode server must have the permission to access the shared
memory file (it always requests for read/write access).
There are two difficulties, though. First, it is required that the client opens a connection to the Unix Domain socket the datanode server provides - the TCP port will not work here. Second, it may be difficult for the client to manage the lifetime of the shared memory file. For both problems, the datanode server has special support RPC's.
The function udsocket_if_local checks whether it is
called from the same node as the datanode server runs on, and if so,
the path of the Unix Domain socket is returned. This "same node check"
is done by comparing the socket addresses of both endpoints of the
incoming call - if both addresses have the same IP address, it is
concluded that this is only possible on the same node (i.e. the
criterion is whether getsockname
and getpeername
return the same
IP address). Clients should call udsocket_if_local
on the usual TCP port of the datanode when they want to use the
shared memory transport method, and if they get the name of the
Unix Domain socket, they should switch to this connection.
The function alloc_shm_if_local can be used to allocate shared memory so that the lifetime is bound to the current connection. This means this shared memory will be automatically deallocated when the client closes the connection to the datanode server. This is very practical, as there is no easy way to bind the lifetime of shared memory to the lifetime of another system resource (say, a process, or a file descriptor).
The overhead of the shared memory transport is quite low - especially, it can be avoided that the data blocks are copied on the path to or from the disk. This allows it to do local I/O at full disk speed.
However, one should also realize that the latency of the data path is higher than when directly writing to a file. To compensate for that it is suggested to submit several requests at once to the datanode server, and to keep the server busy.
For getting maximum performance one should avoid using shared
memory buffers that are not page-aligned. Also, one should avoid to
write directly to a buffer. It is better to get a file descriptor
for the buffer and to write to it via the Unix write
system call.
The kernel can then play nice tricks with the page table to fully
avoid data copying.
In comparison to accessing local files, there is of course also the overhead of the central namenode. For reading or writing large files this overhead can be neglected.
All in all, the performance of the shared memory transport is good enough that Plasma MapReduce stores all data files, even local ones, in the distributed filesystem.
The tickets are created by the coordinator, but are checked by the datanodes. This means there must be some additional communication between the coordinator and the datanodes.
Before going into detail, let us first explain why we need this. Of course, at some point PlasmaFS will allow to restrict access to data, and the ticket system is a way to extend the scope of an authorization system to loosely attached subsystems like datanodes. Right now, however, there are no access restrictions - every client can read and write everything. It is nevertheless useful to already implement the ticket system, at least for write access. The ticket system helps us to enforce the integrity condition that blocks can only be written from inside transactions. Clients could otherwise e.g. allocate blocks, commit the transaction, and write the blocks later. This is dangerous, however, because at this moment there is no guarantee that the blocks are still associated to the file the client thinks they are.
In order to avoid that the coordinator has to notify the datanodes about the access permissions for each block separately, a cryptographic scheme is used to lower the overhead. At transaction start, the coordinator creates two numbers:
ticket_id
: This is the user-visible ID of the ticketticket_secret
: This is a random numberThere is also a timeout for every ticket - for catching the rare and mostly hypothetical case that the datanode server is unreachable in the moment the coordinator wants to revoke a ticket, but is back up later.
The read and write calls now do not
take the ticket_secret
as arguments, but a ticket_verifier
. The
verifier is a message authentication code (MAC) for the combination of
ticket_id
, block
number, and the permission (read-only or
read-write). This means the verifier is a cryptographic hash computed
from ticket_id
, block
, permission, and ticket_secret
. Clients
cannot compute it because they do not know the secret.
Especially designed for the NFS bridge, there is a way of reading data blocks outside transactions. Right now, this method does not require any additional privilege, but in the future this will be changed, because the client needs to be trusted.
It is possible to pass 0 for the ticket_id
and ticket_verifier
in
the read call. This returns the contents of the data
block outside a transaction. However, there is absolutely no guarantee
that this block is still entered into the blocklist of the file
when the read
call responds.
The block can be used for something else when other transactions change the file, free the block, and allocate it for a different file. Such modifications can be done quite quickly - sometimes faster than reading a single data block, so this is a real issue.
Because of this, the client has to check after read
whether the
block stores still the data the client expects to be there. This check
can be done in an expensive manner by opening a new transaction,
issuing a get_blocks request, and checking whether the
read block is still used at the same place in the file. If the block
is no longer there, the client has to start over, or to fall back to
the normal method of reading this block from inside a transaction.
There is a possible optimization making the non-transactional method attractive. The check can be sped up whether the block is still allocated for the same file, and the same position in the file. In order to help here, the coordinator provides a special service called the inode cache. The inode cache is provided on an extra port and is implemented by a special server program. The RPC program the client has to call is Inodecache.
The inode cache port can be determined by invoking find_inodecaches on the normal coordinator port.
As already explained earlier, the coordinator maintains a sequence
number for every inode that is automatically increased when new blocks
are allocated or blocks are freed. The sequence number gives us a
quick criterion whether blocks have changed. Of course, it is possible
that block allocations have taken place that did not affect the block
we are interested in. However, we consider this as a case that does
not happen frequently and that we are not optimizing for. We only
interpret the case that the sequence number is still indentical after
the read
because we can conclude then that the block is valid.
The inode cache now provides a way to quickly check exactly that.
It has a function is_up_to_date_seqno that in many
cases immediately knows whether the sequence number is still the same.
Callers should be prepared, though, that this function returns false
,
even sometimes when the sequence number has not changed (because this
piece of information is not in the cache).
So, a client can read the blocks of a file by doing:
/* Phase 1. Get the blocks */
begin_transaction(tid);
bl = get_blocks(tid, inode, index, len);
commit_transaction(tid);
/* Phase 2. Read the blocks */
data[0] = read(bl[0]);
if ( not is_up_to_date_seqno(bl[0].seqno)) <fall_back_to_alternate_method>;
data[1] = read(bl[1]);
if ( not is_up_to_date_seqno(bl[1].seqno)) <fall_back_to_alternate_method>;
...
data[n-1] = read(bl[n-1]);
if ( not is_up_to_date_seqno(bl[n-1].seqno)) <fall_back_to_alternate_method>;
There are a few more things a client can do: First, it can cache the
blocklist
array for a file. This reduces the frequency of the
expensive get_blocks
call, especially if only one or a few blocks
are read every time this routine is called. In the
"<fall_back_to_alternate_method>" case, the blocklist would be deleted
from the blocklist cache.
Second, there is the possibility of omitting the is_up_to_date_seqno
calls for all blocks read in a sequence except the last. The last call
is the important - it approves that all blocks are valid up to this
point in time. There is the downside that the risk becomes higher that
blocks are read in vain, i.e. blocks are read that turn out not to be
up to date.
For the NFS bridge this non-transactional method of reading files
turned out to be a relevant optimization. This is mainly the case
because the NFS protocol does not know the concept of a transaction,
and because NFS requests each block individually. There is
practicallay no chance to do several reads in one go, and no chance
for getting a natural speedup by combining several reads in a single
transaction. The non-transactional read method helps because we often
only need one extra is_up_to_date_seqno
after every read
, and
is_up_to_date_seqno
is very cheap.