Plasma GitLab Archive
Projects Blog Knowledge

(*
  Copyright 2012 Gerd Stolpmann

  This file is part of Plasma, a distributed filesystem and a
  map/reduce computation framework. Unless you have a written license
  agreement with the copyright holder (Gerd Stolpmann), the following
  terms apply:

  Plasma is free software: you can redistribute it and/or modify
  it under the terms of the GNU General Public License as published by
  the Free Software Foundation, either version 3 of the License, or
  (at your option) any later version.

  Plasma is distributed in the hope that it will be useful,
  but WITHOUT ANY WARRANTY; without even the implied warranty of
  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  GNU General Public License for more details.

  You should have received a copy of the GNU General Public License
  along with Plasma.  If not, see <http://www.gnu.org/licenses/>.

*)
(* $Id: mapred_toolkit.mli 581 2012-01-18 01:27:47Z gerd $ *)

(** Map/reduce toolkit *)

(** There is a tutorial: {!Plasmamr_toolkit}. *)


(** {1 Registered functions} *)

(** Functions can be registered at program initialization time, and get
    a unique ID. Later, it is possible to not only call such functions
    directly, but also remotely on task nodes. Remember that exactly
    the same executable must be running on the task nodes.

    Registered functions are used to name functions that are filled into
    the placeholders of the map/reduce algorithm scheme (such as
    [map] and [reduce], but also a few more).

    Because the registration must happen at initialization time, it is
    effectively only possible to register globally defined functions,
    and not local functions defined inside other functions. (This
    limitation can currently not removed; a workaround is to pass all
    data via arguments.)

    There is a camlp4 preprocessor helping to define registered functions.
    Use it like

    {[
      let my_function =
        <:rfun< larg1 larg2 ... largM @ rarg1 rarg2 ... rargN -> body >>
    ]}

    The "@" and "->" characters need to occur literally here. The function
    arguments before "@" are local arguments and can be omitted. The
    arguments after "@" are remote arguments, and at least one of these
    is mandatory. Remember that there is a local caller, and a task server
    executing the function. A local argument comes from the caller, and
    is sent to the task server (using marshalling). The remote arguments
    are, in contrast, supplied with values from the task server (e.g.
    a value previously computed in the task server). The type of
    [my_function] is something like

    {[
      my_function : L1 -> ... -> LM -> (R1 -> ... -> RN -> T) Mapred_rfun.rfun
    ]}

    (when the local arguments have types [Li] and the remote arguments have
    types [Ri]).
    
    The camlp4 extension is activated if you compile with

    {[
      ocamlfind ocamlc -syntax camlp4o -package mr_framework.toolkit ...
    ]}

    (or use directly the preprocessor [camlp4 pa_toolkit.cma]).

    If there are no local arguments, you can also define without camlp4
    as

    {[
      let my_function =
        Mapred_rfun.register name (fun rarg1 ... rargN -> body)
    ]}

    Here, [name] needs to be a unique identifier for the function. Use
    {!Mapred_rfun.apply_partially} to get the effect of local arguments.

    Registered functions can, as a consequence of the value restriction,
    only be monomorphic. (The usual workaround of eta-expanding the
    functions is not applicable here.)

 *)


open Mapred_rfun

val invoke : ('a -> 'b) rfun -> 'a -> 'b

(** {1 Formats} *)

(** How a file is split into records. See {!Plasmamr_file_formats}
    for detailed explanations:

    - [`Line_structured]: A record is a line terminated by an LF byte
    - [`Fixed_size n]: A record has exactly a size of [n] bytes
    - [`Var_size]: This is a binary format allowing records of
      variable size
    - [`Auto_input]: Recognize the format automatically from the
      file name. If you specify this format, only reading files is
      supported, and writing files will raise an exception.
 *)
type format =
    [ `Line_structured
    | `Fixed_size of int
    | `Var_size
    | `Auto_input
    ]


(** {1 Place} *)

module Place : sig
  (** A place is a file, or directory containing files in PlasmaFS. *)

  type 'a t
  (** A place for storing records of type ['a] *)

  type 'a codec = ('a -> string) rfun * (string -> int -> int -> 'a) rfun
  (** A codec (coder/decoder) says how to convert
      a record to string, and how to read it back. It is given as
      two functions [(coder,decoder)] which must both be registered
      (rfun). The decoder takes a string and a range with position
      and length.
   *)

  type ext_location = 
     [ `File of string | `Flat_dir of string | `Deep_dir of string 
     ]

  type location = [ ext_location | `Notebook ]


  val create : ?prefix:string -> ?repl:int ->
               Mapred_fs.filesystem -> 'a codec -> format -> 
               Mapred_io.record_config -> ext_location -> 
                 'a t
  (** Creates a new empty place. The file or directory is created. The
      place can be used to store new files

      - [prefix]: New files will get this prefix (ignored for [`File] location)
   *)

  val from : ?prefix:string -> ?repl:int ->
             Mapred_fs.filesystem -> 'a codec -> format -> 
             Mapred_io.record_config -> ext_location -> 
               'a t
  (** Picks up an existing location, and makes the files there accessible
      as place

      - [prefix]: New files will get this prefix (ignored for [`File] location)
   *)

  val notebook_place : unit -> 'a t
  (** A new in-memory place for notebooks *)

  val location : 'a t -> location
  val get_codec : 'a t -> 'a codec
  val get_format : _ t -> format
  val get_fs : _ t -> Mapred_fs.filesystem
  val get_rc : _ t -> Mapred_io.record_config

  val is_file_place : _ t -> bool
    (** Whether the place can store files *)

  val files : _ t -> string list
    (** Returns the files of the place *)

  val create_file : ?filename:string -> _ t -> string
    (** [create_file pl]:

	Create a new empty file at the place. The file name is automatically
	determined, but starts with the previously set [prefix]. Also,
	the file gets a suffix that is derived from the format (".var" or
	".fixed<n>").

	If the place refers to a [`File] location, it is only possible to
	call [create_file] once.

	This function fails for notebooks.

	The optional [filename] argument influences the filename.
	- If passed, and if [prefix ^ filename ^ suffix] does not exist yes,
	  this file name is chosen. Otherwise:
	- If passed, a fragment [uuid] is generated so that
	  [prefix ^ filename ^ uuid ^ suffix] is the new file
	- If not passed, the name is [prefix ^ uuid ^ suffix].

     *)

  val clear : _ t -> unit
     (** Deletes all files at this place the [files] function returns.
         Note that this excludes ignored files starting with "_" and
         ".", and files in directories starting with these characters.
	 Only files are deleted, and the directories are left in place.

         This function works for notebook places, and just deletes
         the notebooks.

	 For deleting directories entirely, please use 
	 {!Mapred_io.delete_rec}.
      *)

end


(** {1 Store} *)

module Store : sig
  (** A store is a container for records. There is the in-memory form
      [notebook], but a store can also be a file.

      Stores connected with files need to be closed after use.
   *)

  type 'a store
  (** A store is a container for data *)

  val notebook : unit -> 'a store
  (** Creates a new in-memory store. *)

  val place : 'a store -> 'a Place.t
    (** returns the place where this store resides *)

  val read_place : 'a Place.t -> 'a store list
    (** Returns the stores at one place. This is intended for reading only,
	and the returned store objects only support to read records.
     *)

  val write_place : ?filename:string -> 'a Place.t -> 'a store
    (** Returns a new store with an automatically generated name. 
	This is intended for writing only,
	and the returned store object only supports to write records.

	The optional [filename] argument influences the filename.
	If passed:
	- If [prefix ^ filename ^ suffix] does not exist yes, this file
	  name is chosen. Otherwise:
	- A fragment [uuid] is generated so that
	  [prefix ^ filename ^ uuid ^ suffix] is the new file

	If not passed, the name is [prefix ^ uuid ^ suffix].
     *)

  val length : 'a store -> int
  val length64 : 'a store -> int64
  (** These two functions return the number of records. For notebooks and
      write-only stores the number is immediately known. For read-only
      stores this means a pass over the whole file.
   *)

  val read : 'a store -> 'a store
    (** Returns another store value reading the data from the same
	physical place. This is especially useful to read data back
	that was appended to a write-only store

	It is undefined whether data added to the original store later
	will be visible after invoking [read].
     *)

  val file_name : 'a store -> string
    (** Returns the absolute file name *)

  (* val move : 'a store -> 'a Place.t -> unit 
     (** Move the store from its current place to the given place *)
   *)

  val flush : _ store -> unit
    (** Flushes the internal buffers (for stores writing to files) *)

  val close : _ store -> unit
    (** Flushes unwritten data to disk (when writing to a store), and
	closes all OS resources
     *)

end

(** {1 Sequences} *)

module Seq : sig
  (** A sequence is a view on a store. It is possible to iterate over
      the elements of the sequence in normal order. One can also add new
      elements to the end of a sequence, and get a new sequence where
      the new elements are found at the end.

      A sequence is always bound to a store. The store is the entity
      which is modified by sequence operations. The sequence as such
      behaves as if it were immutable (like a list).

      All the following operations are only executed locally (no
      distribution).
   *)
  open Store

  type ('a,'b) seq
  (** A sequence [(el,mode) seq] contains elements of type [el],
      and can be accessed according to [mode]:
      - [mode=[`R]]: the sequence is read-only
      - [mode=[`W]]: the sequence is write-only
      - [mode=[`R|`W]]: the sequence is read-write
   *)


  exception End_of_sequence

  val read : 'a store -> ('a,[`R]) seq
  (** Returns the sequence view such that the first element of the sequence
      is the first element of the store, i.e. reads the store from the
      beginning.
      
      This is not allowed for write-only stores!
   *)

  val extend : 'a store -> ('a,[`W]) seq
  (** Returns the sequence view on the end of the store. The returned seq
      is always empty. By adding new elements to the sequence, new elements
      are added to the store, though.
   *)

  val reread : ('a,_) seq -> ('a,[`R]) seq
    (** reads the elements from the store obtained by calling [Store.read] *)

  val store : ('a,_) seq -> 'a store
    (** The underlying store of the sequence *)

  val notebook : unit -> ('a,[`R|`W]) seq
    (** creates an in-memory sequence (not linked to a file) *)

  val is_empty : ('a,[>`R]) seq -> bool
    (** tests whether the sequence is empty *)

  val hd : ('a,[>`R]) seq -> 'a
    (** returns the head element of the sequence *)

  val tl : ('a,[>`R] as 'm) seq -> ('a,'m) seq
    (** returns the tail of the sequence *)

  (** hd and tl raise [End_of_sequence] if applied to an empty sequence *)


  val add : 'a -> ('a,[>`W] as 'm) seq -> ('a,'m) seq
    (** [let s2 = add x s1]: Adds the element to the underlying store,
	and returns a new sequence where the new element is visible at
	the end.  This means the last element of [s2] is now [x]. The
	view provided by [s1] does not change.

	It is a requirement that the last element of [s1] is the last
	element of the underlying store. Otherwise the add fails.
     *)

  val addl : 'a list -> ('a,[>`W] as 'm) seq -> ('a,'m) seq
    (** adds several elements *)

  val append : ('a,[>`W] as 'm) seq -> ('a,[>`R]) seq -> ('a,'m) seq
  (** [let s3 = append s1 s2]: Adds all elements visible via [s2] to the
      end of the store of [s1], and returns a new seq [s3] where the
      new elements are visible. The views provided by [s1] and [s2] do 
      not change.
   *)

  val flush : (_,_) seq -> unit
    (** Flushes the underlying store *)

  val length : (_,_) seq -> int
  val length64 : (_,_) seq -> int64

  val hdl : int -> ('a,[>`R]) seq -> 'a list
    (** returns the first up to n elements as list *)

(* TODO: tll *)

  val iter : ('a -> unit) -> ('a,[>`R]) seq -> unit
    (** [iter f s]: iterates over the elements of [s] and calls [f x] for
	every element [x]
     *)

  val fold : ('a -> 'b -> 'a) -> 'a -> ('b,[>`R]) seq -> 'a
    (** [fold f a0 s]: if the sequence contains the elements
	[[x0; x1; ...; x(n-1)]] this function computes
	{[
	  f (f (f a0 x0) x1 ... ) x(n-1)
	]}
     *)


  val map : ('a -> 'b) -> ('a,[>`R]) seq -> ('b,[>`W] as 'm) seq -> ('b,'m) seq
  (** [let s3 = map f s1 s2]: Maps the elements of [s1] and adds them
      to [s2]
   *)

  val mapl : ('a -> 'b list) -> 
               ('a,[>`R]) seq -> ('b,[>`W] as 'm) seq -> ('b,'m) seq
  (** [let s3 = map f s1 s2]: Maps the elements of [s1] to lists and adds them
      to [s2]
   *)

  val sort : 
        hash:('a -> int) ->
        cmp:('a -> 'a -> int) ->
        ('a,[>`R]) seq ->
        ('a,[>`W] as 'm) seq ->
        ('a,'m) seq
    (** [sort ~hash ~cmp s1 s2]: Sorts the elements of [s1] and appends
	the sorted result to [s2], and returns this sequence.

	It is first sorted by the value [hash x] of each element of [s1].
	If this results in the same value for two elements [x] and [y],
	these elements are directly compared with [cmp x y].

	The function [hash] may return a non-negative 30 bit wide integer.
	[cmp] can return 0 meaning equality, a positive number meaning
	that that the first argument is bigger, or a negative number 
	meaning that the second argument is bigger.

	It is required that [cmp x y = 0] implies [hash x = hash y].

	Currently, only in-memory sorting is implemented.
     *)

  val mapl_sort_fold :
        mapl:('a -> 'b list) ->
        hash:('b -> int) ->
        cmp:('b -> 'b -> int) ->
        initfold:(int -> 'c) ->
        fold:('c -> 'b -> 'c * 'd list) ->
        ?finfold:('c -> 'd list) ->
        partition_of:('b -> int) ->
        partitions:int ->
        'a Place.t ->
        'd Place.t ->
        ('d,[`W]) seq list
    (** [mapl_sort_fold ... p_in p_out]:

	This is a locally running version of {!DSeq.mapl_sort_fold},
	useful for testing map/reduce programs on small amounts of
	data. The following description is formal; for an easier
	to understand version look at the tutorial {!Plasmamr_toolkit}.

	The function returns a list of sequences

	{[ [s0; s1; ...; s(P-1)] ]}

	where [P=partitions]. Each [s(k)] is the result of applying
	a fold pass to [p(k)] which is defined below. The list

	{[ [p0; p1; ...; p(P-1)] ]}

	are the sequences containing the sorted and mapped input,
	split into P partitions. In particular, the partition
	[p(k)] contains the sorted elements [x] where
	[partition_of x = k]. The elements [x] are taken from
	the set [M = { mapl(y) | y in p_in} ].

	The sorting criterion is defined by [hash] and [cmp]. In order
	to decide the order of the elements [x] of [p(k)], the elements
	are first compared by [hash x]. If for two elements [x1] and
	[x2] the value [hash x1] is smaller than [hash x2], [x1] is
	sorted before [x2]. If [hash x1] is bigger than [hash x2],
	[x1] is sorted after [x2]. If the hash values are identical,
	the function [cmp x1 x2] is called, and if a negative value
	is returned, [x1] is sorted before [x2], and if a positive
	value is returned, [x1] is sorted after [x2]. The result 0
	means that [x1] and [x2] are considered equal.
	
	[hash] must return non-negative 30 bit integers.
	It is required that [cmp x y = 0] implies [hash x = hash y].

	The fold pass is defined as follows: The sequence [s(k)]
	is computed from [p(k)] by the following scheme. Assumed
	that

	{[ s(k) = [x0; x1; ...; x(n-1)] ]}

	we define

	{[ p(k) = out0 @ out1 @ ... @ out(n) ]}

	where the [out(i)] part lists are defined by:

	{[
	  let a0 = initfold k
	  let (a1, out0) = fold a0 x0
	  let (a2, out1) = fold a1 x1
	  ...
	  let (a(n), out(n-1)) = fold a(n-1) x(n-1)
	  let out(n) = finfold a(n)
	]}

	If [finfold] is not passed as argument, it is assumed to
	be [finfold = fun _ -> []].

	As a side effect, for each output sequence [s(k)] a new
	file is created in [pl_out].
     *)
end


(** {1 Distributed operations on sequences} *)

module DSeq : sig
  (** The following operations are distributed over the task nodes.

      It is here required that the underlying stores are files!
      It is not possible to distribute operations on notebooks.
   *)

  type config
  val create_config :
        ?name:string ->
        ?task_files:string list ->
        ?bigblock_size:int ->
        ?map_tasks:int ->
        ?merge_limit:int ->
        ?split_limit:int ->
        ?partitions:int ->
        ?enhanced_mapping:int ->
        ?phases:Mapred_def.phases ->
        ?report:bool ->
        ?report_to:Netchannels.out_obj_channel ->
        ?keep_temp_files:bool ->
        Mapred_def.mapred_env ->
          config
  (** The default values for the configurations are taken from [mapred_env],
      and are effectively taken from the [.conf] files. It is possible, though,
      to override the values.
   *)

  val get_rc : config -> Mapred_io.record_config
    (** Returns the record config.

	Another way for getting the record config is {!Mapred_def.get_rc}.
     *)

  type 'a result
  (** The result of a distributed operation *)

  val get_result : 'a result -> 'a
  (** Get the computed value (or an exception) *)

  val stats : _ result -> Mapred_stats.stats
  (** Get statistics *)

  val job_id : config -> string
  (** Return the job ID *)

  (** Access to the job environment and the job configuration *)
  class type mapred_info =
  object
    method mapred_env : Mapred_def.mapred_env
    method mapred_job_config : Mapred_def.mapred_job_config
  end

  val mapl : 
       (mapred_info -> 'a -> 'b list) rfun -> 
       'a Place.t -> 
       'b Place.t -> 
       config ->
          ('b,[`W]) Seq.seq list result
    (** [mapl m pl_in pl_out conf]: Runs a map-only job. This means that
	the records from [pl_in] are piped through the function [m], and
	the result is written into new files in [pl_out].

	The created files are also returned in the output sequences.
     *)

  val mapl_sort_fold :
        mapl:(mapred_info -> 'a -> 'b list) rfun ->
        hash:(mapred_info -> 'b -> int) rfun ->
        cmp:(mapred_info -> 'b -> 'b -> int) rfun ->
        initfold:(mapred_info -> int -> 'c) rfun ->
        fold:(mapred_info -> 'c -> 'b -> 'c * 'd list) rfun ->
        ?finfold:(mapred_info -> 'c -> 'd list) rfun ->
        partition_of:(mapred_info -> 'b -> int) rfun ->
        ?initcombine:(mapred_info -> 'e) rfun ->
        ?combine:(mapred_info -> 'e -> 'b -> 'e * 'b list) rfun ->
        ?fincombine:(mapred_info -> 'e -> 'b list) rfun ->
        'a Place.t ->
        'd Place.t ->
        config ->
        'b Place.codec ->
          ('d,[`W]) Seq.seq list result
    (** [mapl_sort_fold <args> pl_in pl_out conf int_codec]: This is 
	map/reduce. The records from [pl_in] are mapped/sorted/reduced
	and finally written into new files in [pl_out]. There are a
	number of named arguments defining the job:

	- [mapl] maps the elements of the inputs
	- [hash] returns the hash integer required for sorting (see below)
	- [cmp] compares two mapped elements
	- [initfold] initializes a reducer (the [int] argument is the
	  partition number)
	- [fold accu x] processes the record [x], and returns [(accu',out)]
	  where [out] is a list of records to output
	- [finfold] is called at the end of a reducer
	- [partition_of] returns the partition number of a mapped record
	- [initcombine] initializes a combiner
	- [combine accu x] processes the record [x] in the combiner, and
	  returns [(accu',out)] where [out] is a list of records to output.
	  It is required that [initcombine] is also set if [combine] is
	  used.
	- [fincombine] is called at the end of a combiner

	Note that the mapped elements are sorted by first comparing the
	integers returned by [hash], and only if such integers are equal,
	the two elements are compared in detail by calling [cmp].
	See {!Mapred_sorters} for useful definitions of [hash] and
	[cmp].

	The [int_codec] is used for representing intermediate files
	(output of the map phase, and input/output of the shuffle phases).
     *)
end


(** {1 Job definition} *)

val toolkit_job : Mapred_def.mapred_env -> Mapred_def.mapred_job
  (** This is a generic job definition that must be used together with
      the distributed algorithms in {!Mapred_toolkit.DSeq}.
   *)

class toolkit_job : Mapred_def.mapred_env -> Mapred_def.mapred_job
  (** Same as class *)

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml