module Mapred_io:Utility library for record-based I/Osig
..end
The record reader can be used to iterate over a whole file, or only a part. For the latter, it is assumed that the file is processed bigblock by bigblock. Of course, it is then possible that the lines do not end at bigblock boundaries. However, it must be determined whether the block reader for bigblock N or the block reader for bigblock N+1 processes such lines. The following rules do that:
class type record_config =object
..end
class type record_reader =object
..end
class type record_writer =object
..end
val bigblock_size : Plasma_client.plasma_cluster -> int -> int
bigblock_size c suggested
: Returns the real bigblock size for
the suggested
value. The real size is rounded up to the
next block multiple.
The blocksize is retrieved from c
(synchronously)
val read_file : ?force_bof:bool ->
?force_eof:int64 ->
?divisor:int ->
Plasma_client.plasma_cluster ->
record_config ->
string -> int64 -> int64 -> record_reader
read_file c rc name block len
: Reads from name
, starting at block
,
ending at block+len-1
(plus the following overflow region).
Reading is done in a separate transaction.
Note that len>=1 is a requirement here.
If block=0
(or force_bof
), the first line of the selected file
region is returned to the user. Otherwise (block > 0 && not force_bof
),
the first line is skipped, i.e. reading starts after the first
encountered LF char.
Reading continues past the last block if the last line does not
terminate with an LF char. The maximum line length is the size
of a bigblock (including the LF), so read_file
will at most
read the bigblock following the last selected block.
Reading always stops if the real EOF of the file is reached, or
the position force_eof
(whatever comes first).
It is not required that block
or len
are multiples of bigblocks,
although this is usually the case.
The buffer size is taken from rc
: The configured buffer size is
divided by divisor
(by default 1). Setting a divisor is reasonable
if you have several file readers, but the total buffer size should
not grow linearly with the number of readers.
The cluster c
is set to aborted state when not used. Note that this
also affects all transactions unrelated to read_file
, so is best
to create a separate plasma_cluster
object for reading.
typesync_readers =
(unit -> record_reader) list
typeasync_readers =
(unit -> record_reader Uq_engines.engine) list
val read_multiple : Plasma_client.plasma_cluster ->
record_config ->
readers:[ `Async of async_readers | `Sync of sync_readers ] ->
unit -> record_reader
readers
one
after the other. The readers can be given in a synchronous form,
or in an asynchronous form. The latter is preferrable when the
reader is in asynchronous mode (i.e. when to_fd_e
is
running).val write_file : ?divisor:int ->
Plasma_client.plasma_cluster ->
record_config -> string -> record_writer
write_file c rc name
: Appends records to this file (which must already
exist). Writing is done in separate transactions.
As read_file
, the cluster c
is set to aborted state when not used.
Note that this also affects all transactions unrelated to read_file
,
so is best to create a separate plasma_cluster
object for writing.
val write_multiple : ?divisor:int ->
Plasma_client.plasma_cluster ->
record_config ->
string ->
int64 ->
create_sync:(string -> int -> string) ->
?create_async:(string -> int -> string Uq_engines.engine) ->
unit -> record_writer
write_multiple c rc prefix limit create_sync create_async ()
:
Writes into a sequence of files
whose names are composed of prefix
followed by an integer k
. The
files are created by calling create_async prefix k
. A new file is
started when the current file reaches the size limit
(in bytes).
create_async
: If passed, this function is used instead of create_sync
in asynchronous mode (i.e. when from_fd_e
is running).
val create_file_e : ?repl:int -> Plasma_client.plasma_cluster -> string -> unit Uq_engines.engine
val create_file : ?repl:int -> Plasma_client.plasma_cluster -> string -> unit
create_file c name
: Creates this file exclusively. repl
is
the replication factor, 0 by default (i.e. use server default).val delete_file : Plasma_client.plasma_cluster -> string -> unit
val file_blocks : Plasma_client.plasma_cluster -> string -> int64