Plasma GitLab Archive
Projects Blog Knowledge

Module Mapred_io


module Mapred_io: sig .. end
Utility library for record-based I/O


A file consists here of lines. Each line is terminated by LF, and is considered as a record.

The record reader can be used to iterate over a whole file, or only a part. For the latter, it is assumed that the file is processed bigblock by bigblock. Of course, it is then possible that the lines do not end at bigblock boundaries. However, it must be determined whether the block reader for bigblock N or the block reader for bigblock N+1 processes such lines. The following rules do that:

  • The first bigblock does not have this problem. Its first line is always processed by the block reader for block 0.
  • For the other bigblocks we define that the first processed line is the line starting after the first LF character in the bigblock.
  • For all bigblocks we define that the last processed line is the line following the last LF character in the bigblock. The line can be stored partly, even fully in the next bigblock. The block reader has to read the next bigblock to allow this.
  • A bigblock must contain at least one LF character (and hence, have at least one line to process)
For best efficiency, the block reader should not be used for reading individual bigblocks, but for contiguous ranges of bigblocks.
class type record_config = object .. end
class type record_reader = object .. end
class type record_writer = object .. end
val bigblock_size : Plasma_client.plasma_cluster -> int -> int
bigblock_size c suggested: Returns the real bigblock size for the suggested value. The real size is rounded up to the next block multiple.

The blocksize is retrieved from c (synchronously)

val read_file : ?force_bof:bool ->
?force_eof:int64 ->
?divisor:int ->
Plasma_client.plasma_cluster ->
record_config ->
string -> int64 -> int64 -> record_reader
read_file c rc name block len: Reads from name, starting at block, ending at block+len-1 (plus the following overflow region). Reading is done in a separate transaction.

Note that len>=1 is a requirement here.

If block=0 (or force_bof), the first line of the selected file region is returned to the user. Otherwise (block > 0 && not force_bof), the first line is skipped, i.e. reading starts after the first encountered LF char.

Reading continues past the last block if the last line does not terminate with an LF char. The maximum line length is the size of a bigblock (including the LF), so read_file will at most read the bigblock following the last selected block.

Reading always stops if the real EOF of the file is reached, or the position force_eof (whatever comes first).

It is not required that block or len are multiples of bigblocks, although this is usually the case.

The buffer size is taken from rc: The configured buffer size is divided by divisor (by default 1). Setting a divisor is reasonable if you have several file readers, but the total buffer size should not grow linearly with the number of readers.

The cluster c is set to aborted state when not used. Note that this also affects all transactions unrelated to read_file, so is best to create a separate plasma_cluster object for reading.

type sync_readers = (unit -> record_reader) list 
type async_readers = (unit -> record_reader Uq_engines.engine) list 
val read_multiple : Plasma_client.plasma_cluster ->
record_config ->
readers:[ `Async of async_readers | `Sync of sync_readers ] ->
unit -> record_reader
Constructs a record reader that reads from the input readers one after the other. The readers can be given in a synchronous form, or in an asynchronous form. The latter is preferrable when the reader is in asynchronous mode (i.e. when to_fd_e is running).
val write_file : ?divisor:int ->
Plasma_client.plasma_cluster ->
record_config -> string -> record_writer
write_file c rc name: Appends records to this file (which must already exist). Writing is done in separate transactions.

As read_file, the cluster c is set to aborted state when not used. Note that this also affects all transactions unrelated to read_file, so is best to create a separate plasma_cluster object for writing.

val write_multiple : ?divisor:int ->
Plasma_client.plasma_cluster ->
record_config ->
string ->
int64 ->
create_sync:(string -> int -> string) ->
?create_async:(string -> int -> string Uq_engines.engine) ->
unit -> record_writer
write_multiple c rc prefix limit create_sync create_async (): Writes into a sequence of files whose names are composed of prefix followed by an integer k. The files are created by calling create_async prefix k. A new file is started when the current file reaches the size limit (in bytes).

create_async: If passed, this function is used instead of create_sync in asynchronous mode (i.e. when from_fd_e is running).

val create_file_e : ?repl:int -> Plasma_client.plasma_cluster -> string -> unit Uq_engines.engine
val create_file : ?repl:int -> Plasma_client.plasma_cluster -> string -> unit
create_file c name: Creates this file exclusively. repl is the replication factor, 0 by default (i.e. use server default).
val delete_file : Plasma_client.plasma_cluster -> string -> unit
Delete this file
val file_blocks : Plasma_client.plasma_cluster -> string -> int64
Get the length of the file in blocks
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml