module Mapred_io:Utility library for record-based I/Osig..end
The record reader can be used to iterate over a whole file, or only a part. For the latter, it is assumed that the file is processed bigblock by bigblock. Of course, it is then possible that the lines do not end at bigblock boundaries. However, it must be determined whether the block reader for bigblock N or the block reader for bigblock N+1 processes such lines. The following rules do that:
class type record_config =object..end
class type record_reader =object..end
class type record_writer =object..end
val bigblock_size : Plasma_client.plasma_cluster -> int -> intbigblock_size c suggested: Returns the real bigblock size for
the suggested value. The real size is rounded up to the
next block multiple.
The blocksize is retrieved from c (synchronously)
val read_file : ?force_bof:bool ->
?force_eof:int64 ->
?divisor:int ->
Plasma_client.plasma_cluster ->
record_config ->
string -> int64 -> int64 -> record_readerread_file c rc name block len: Reads from name, starting at block,
ending at block+len-1 (plus the following overflow region).
Reading is done in a separate transaction.
Note that len>=1 is a requirement here.
If block=0 (or force_bof), the first line of the selected file
region is returned to the user. Otherwise (block > 0 && not force_bof),
the first line is skipped, i.e. reading starts after the first
encountered LF char.
Reading continues past the last block if the last line does not
terminate with an LF char. The maximum line length is the size
of a bigblock (including the LF), so read_file will at most
read the bigblock following the last selected block.
Reading always stops if the real EOF of the file is reached, or
the position force_eof (whatever comes first).
It is not required that block or len are multiples of bigblocks,
although this is usually the case.
The buffer size is taken from rc: The configured buffer size is
divided by divisor (by default 1). Setting a divisor is reasonable
if you have several file readers, but the total buffer size should
not grow linearly with the number of readers.
The cluster c is set to aborted state when not used. Note that this
also affects all transactions unrelated to read_file, so is best
to create a separate plasma_cluster object for reading.
typesync_readers =(unit -> record_reader) list
typeasync_readers =(unit -> record_reader Uq_engines.engine) list
val read_multiple : Plasma_client.plasma_cluster ->
record_config ->
readers:[ `Async of async_readers | `Sync of sync_readers ] ->
unit -> record_readerreaders one
after the other. The readers can be given in a synchronous form,
or in an asynchronous form. The latter is preferrable when the
reader is in asynchronous mode (i.e. when to_fd_e is
running).val write_file : ?divisor:int ->
Plasma_client.plasma_cluster ->
record_config -> string -> record_writerwrite_file c rc name: Appends records to this file (which must already
exist). Writing is done in separate transactions.
As read_file, the cluster c is set to aborted state when not used.
Note that this also affects all transactions unrelated to read_file,
so is best to create a separate plasma_cluster object for writing.
val write_multiple : ?divisor:int ->
Plasma_client.plasma_cluster ->
record_config ->
string ->
int64 ->
create_sync:(string -> int -> string) ->
?create_async:(string -> int -> string Uq_engines.engine) ->
unit -> record_writerwrite_multiple c rc prefix limit create_sync create_async ():
Writes into a sequence of files
whose names are composed of prefix followed by an integer k. The
files are created by calling create_async prefix k. A new file is
started when the current file reaches the size limit (in bytes).
create_async: If passed, this function is used instead of create_sync
in asynchronous mode (i.e. when from_fd_e is running).
val create_file_e : ?repl:int -> Plasma_client.plasma_cluster -> string -> unit Uq_engines.engineval create_file : ?repl:int -> Plasma_client.plasma_cluster -> string -> unitcreate_file c name: Creates this file exclusively. repl is
the replication factor, 0 by default (i.e. use server default).val delete_file : Plasma_client.plasma_cluster -> string -> unitval file_blocks : Plasma_client.plasma_cluster -> string -> int64