module Mapred_io:Utility library for record-based I/Osig
..end
The record reader can be used to iterate over a whole file, or only a part. For the latter, it is assumed that the file is processed bigblock by bigblock. Of course, it is then possible that the records do not end at bigblock boundaries. However, it must be determined whether the block reader for bigblock N or the block reader for bigblock N+1 processes such records. The following rules do that:
class type record_config =object
..end
class type record_reader =object
..end
class type record_writer =object
..end
typeread_flag =
[ `Bof_block of int64 | `Eof_pos of int64 ]
`Bof_block n
: Assume that the first record to read is at position
n
of the file. The position should not be after the first block
to read.`Eof_pos n
: Assume that the EOF position is at n
(in bytes)`Bof
and `Eof
can be used to read from a part of the file
only.class type record_reader_factory =object
..end
class type record_writer_factory =object
..end
class type record_rw_factory =object
..end
val line_structured_format : unit -> record_rw_factory
Otherwise, a line can include every byte (including NUL). We do not assume any character set - a line is simply a sequence of bytes.
For line-structured files, the first record (line) to process in
a bigblock is the line following the first LF byte. The only
exception is the first bigblock, where the first line starts at
the beginning of the file.
val fixed_size_format : int -> record_rw_factory
For files with fixed-size records, the first record of block k
of the file starts at byte
(k * blocksize) mod recordsize
This format cannot be used with streaming mode!
val var_size_format : unit -> record_rw_factory
The file consists of chunks (where a whole number of chunks must fit into a bigblock). Every chunk of N bytes has a header of 32 bytes, followed by a data region until the end of the chunk:
chunk<k> = header<k> ^ data<k>
Here, k
is the index identifying the chunk, k=0..K-1
.
The length of a chunk is currently always 64K, with no option for the user to change it. Note that the last chunk of the file may be incompletely stored: The data part may be cut off when the real end of the data stream is reached.
If no compression is done, the concatenated data regions contain the sequence of records. A record may start in one region and be continued in the next:
data<0> ^ ... ^ data<K-1> = record<0> ^ ... ^ record<R-1>
If a compression algorithm is applied, every chunk is compressed individually. So:
U(data<0>) ^ ... ^ U(data<K-1>) = record<0> ^ ... ^ record<R-1>
where U
is the uncompression function.
The header consists of:
header = chunksize ^ datasize ^ recstart ^ flags ^ checksum
where:
chunksize
is the number N of bytes every chunk consists of.
It is represented as 64 bit number in big endian order.datasize
is the number of bytes for the data area. Normally,
this should be chunksize-32
, but for compressed chunks it
is possible that some bytes must remain free. Again a 64 bit
number in big endian order.recstart
is the offset to the start of the first record
in the data area (if compression is done, this refers to the
data in uncompressed form). The offset is relative to the
start of the data area. E.g. 0 means that the data area
starts immediately with a record. If -1, no record begins in this
chunk. Again a 64 bit number in big endian order.flags
are 32 bits that can be used for flags (also big endian).
Bit 0 says whether GZIP compression is enabled. Other bits are not
yet assigned.checksum
are the first four bytes of
MD5(chunksize ^ datasize ^ recstart ^ flags ^ dec(k))
where dec(k)
is the decimal representation of k
, the chunk index.
record = length ^ record_data
If record_data
is up to 254 bytes long, the length is just this
length as single byte. If record_data
is longer, the length
is represented as a single byte 0xFF followed by a 64 bit number
in big endian order.
This format cannot be used with streaming mode!
val auto_input_format : unit -> record_rw_factory
Writing files is not supported!
val bigblock_size : Mapred_fs.filesystem -> string -> int -> int
bigblock_size fs path suggested
: Returns the real bigblock size for
the suggested
value. The real size is rounded up to the
next block multiple.
The blocksize is retrieved from fs
(synchronously)
typesync_readers =
(unit -> record_reader) list
typeasync_readers =
(unit -> record_reader Uq_engines.engine) list
val read_multiple : Mapred_fs.filesystem ->
record_config ->
readers:[ `Async of async_readers | `Sync of sync_readers ] ->
unit -> record_reader
readers
one
after the other. The readers can be given in a synchronous form,
or in an asynchronous form. The latter is preferrable when the
reader is in asynchronous mode (i.e. when to_fd_e
is
running).val write_multiple : Mapred_fs.filesystem ->
Plasma_shm.shm_manager ->
record_config ->
string ->
int64 ->
int ->
create_sync:(string -> int -> string) ->
create_async:(string -> int -> string Uq_engines.engine) option ->
record_writer_factory -> record_writer
write_multiple fs shm rc prefix size_limit lines_limit create_sync create_async rwf
:
Writes into a sequence of files
whose names are composed of prefix
followed by an integer k
. The
files are created by calling create_async prefix k
. A new file is
started when the current file reaches the size size_limit
(in bytes),
or when the current file has lines_limit
lines.
Note that the size limit is checked without taking the LF line separator into account.
create_async
: If passed, this function is used instead of create_sync
in asynchronous mode (i.e. when from_fd_e
is running).
Restriction: from_dev_e
is not implemented.
val divide : record_config -> int -> record_config
buffer_size
and buffer_size_tight
by this numberPlasma_netfs
.
On the lower level you can of course also use Plasma_client
directly, but the interface is slightly more complicated.
val file_exists : Mapred_fs.filesystem -> string -> bool
file_exists fs name
: whether this file, directory or symbolic
link existsval is_directory : Mapred_fs.filesystem -> string -> bool
file_exists fs name
: whether this directory existsval create_file : ?repl:int ->
?pref_nodes:string list -> Mapred_fs.filesystem -> string -> unit
create_file fs name
: Creates this file exclusively. repl
is
the replication factor, 0 by default (i.e. use server default).
pref_nodes
can be set to the list of preferred datanode
identities (actually, this only configured the cluster).val create_dir : Mapred_fs.filesystem -> string -> unit
create_dir fs name
: Creates a directory exclusivelyval delete_file : Mapred_fs.filesystem -> string -> unit
val delete_rec : Mapred_fs.filesystem -> string -> unit
val file_blocks : Mapred_fs.filesystem -> string -> int64
typefileinfo =
[ `Directory | `Other | `Regular of int64 ]
`Regular n
where n
is the length, or
`Directory
, or `Other
. We follow symlinks in this module.
`Other
is also returned for dead symlinksval scan_file : Mapred_fs.filesystem -> string -> fileinfo
val scan_dir : Mapred_fs.filesystem -> string -> bool -> (string * fileinfo) list
scan_dir fs dir deeply
: Scans the directory dir
for files,
and returns the files (w/o directory prefix) as pairs
(name,info)
. If deeply
, subdirectories
are recursively scanned, otherwise they are ignored.
Files and directories starting with "." and "_" are ignored.
Symbolic links are followed (and the max scanning depth is restricted).
The returned data refers to the files pointed to by the symlinks.
Dead symlinks are skipped.