Plasma GitLab Archive
Projects Blog Knowledge

(* $Id: pkv_api.ml 489 2011-10-26 22:34:57Z gerd $ *)

open Pkv_files
open Uq_engines.Operators
open Printf

module P = Plasma_rpcapi_aux

type db =
    { c : Plasma_client.plasma_cluster;
      name : string;
      mutable is_open : bool;
      mutable is_mutated : bool;
      mutable t : Plasma_client.plasma_trans option;
      mutable data : data_file;
      mutable del : data_file;
      mutable idx : idx_file;
    }

type openflag =
    [ `Create of int
    | `Transactional
    ]

type openflag_e =
    [ `Transactional
    ]

let with_trans_e c t filename f =
  match t with
    | None ->
	Plasma_client.retry_e c filename
	  (fun () ->
	     Plasma_client.with_trans_e c
	       (fun trans ->
		  f trans
	       )
	  )
	  ()
    | Some trans ->  
	f trans


let with_trans c t filename f =
  match t with
    | None ->
	Plasma_client.retry c filename
	  (fun () ->
	     Plasma_client.with_trans c
	       (fun trans ->
		  f trans
	       )
	  )
	  ()
    | Some trans ->  
	f trans


exception Inconsistent_versions

let rec opendb_existing_e c name t attempt =
  let esys = Plasma_client.event_system c in
  let data_name = name ^ ".data" in
  let del_name = name ^ ".del" in
  let idx_name = name ^ ".idx" in

  let lookup_snapshot_e trans name append =
    Plasma_client.lookup_e trans name false
    ++ (fun inode ->
	  ( if t <> None then
	      Plasma_client.snapshot_e ~append trans inode
	    else
	      eps_e (`Done()) esys
	  )
	  ++ (fun () ->
		eps_e (`Done inode) esys
	     )
       ) in

  let e =
    with_trans_e c t data_name
      (fun trans ->
	 lookup_snapshot_e trans data_name true
	 ++ data_open_e trans
      )
    ++ (fun data_file ->
	  with_trans_e c t del_name
	    (fun trans -> 
	       lookup_snapshot_e trans del_name false
	       ++ del_open_e trans
	    )
	  ++ (fun del_file ->
		with_trans_e c t idx_name
		  (fun trans -> 
		     lookup_snapshot_e trans idx_name false
		     ++ idx_open_e trans
		  )
		++ (fun idx_file ->
		      eps_e (`Done(data_file,del_file,idx_file)) esys
		   )
	     )
       )
    ++ (fun (data_file, del_file, idx_file) ->
	  (* seqno check: because the three snapshots are not done at the same
	     time, they can refer to different file versions. We protect against
	     this by storing the expected sequence numbers of the del and idx
	     files in a metadata field of the data file. If a mismatch is found,
	     we just try again.
	   *)
	  ( match t with
	      | None -> eps_e (`Done()) esys
	      | Some trans ->
		  Plasma_client.get_inodeinfo_e trans data_file.data_inode
		  ++ (fun ii_data ->
			Plasma_client.get_inodeinfo_e trans del_file.data_inode
			++ (fun ii_del ->
			      Plasma_client.get_inodeinfo_e
				trans idx_file.idx_inode
			      ++ (fun ii_idx ->
				    let exp_field1 =
				      sprintf "pkv %Ld %Ld" 
					ii_del.P.seqno ii_idx.P.seqno in
				    if (ii_data.P.field1 <> "" && 
					  ii_data.P.field1 <> exp_field1) then (
				      Plasma_client.abort_e trans
				      ++ (fun () ->
					    raise Inconsistent_versions
					 )
				    )
				    else
				      eps_e (`Done()) esys
				 )
			   )
		     )
	  )
	  ++ (fun () ->
		eps_e (`Done(t, data_file, del_file, idx_file)) esys
	     )
       ) in
  Uq_engines.meta_engine e
  ++ (function
	| `Error Inconsistent_versions ->
	    if attempt < 10 then
	      Plasma_client.start_e c
	      ++ (fun t ->
		    opendb_existing_e c name (Some t) (attempt+1)
		 )
	    else
	      failwith
		"Pkv_api.opendb: the db files are inconsistent to each other"
	| st ->
	    eps_e (st :> _ Uq_engines.engine_state) esys
     )


let opendb_existing c name t attempt =
  Plasma_client.sync (opendb_existing_e c name t) attempt
  

let opendb c name flags =
  let data_name = name ^ ".data" in
  let del_name = name ^ ".del" in
  let idx_name = name ^ ".idx" in
  let t =
    if List.mem `Transactional flags then
      Some(Plasma_client.start c)
    else
      None in
  try
    let (t, data_file, del_file, idx_file) =
      opendb_existing c name t 0 in
    { c = c;
      name = name;
      is_open = true;
      is_mutated = false;
      t = t;
      data = data_file;
      del = del_file;
      idx = idx_file;
    }
  with
    | Plasma_client.Plasma_error `enoent as error ->
	try
	  let create_flag =
	    List.find
	      (function
		 | `Create n -> true
		 | _ -> false
	      )
	      flags in
	  let n =
	    match create_flag with
	      | `Create n -> n
	      | _ -> assert false in
	  data_create c data_name;
	  del_create c del_name;
	  idx_create c idx_name n;
	  let (t, data_file, del_file, idx_file) =
	    opendb_existing c name t 0 in
	  { c = c;
	    name = name;
	    is_open = true;
	    is_mutated = false;
	    t = t;
	    data = data_file;
	    del = del_file;
	    idx = idx_file;
	  }
	with
	  | Not_found ->
	      raise error


let opendb_e c name flags =
  let esys = Plasma_client.event_system c in
  ( if List.mem `Transactional flags then
      Plasma_client.start_e c
      ++ (fun trans -> eps_e (`Done(Some trans)) esys)
    else
      eps_e (`Done None) esys
  )
  ++ (fun t ->
	opendb_existing_e c name t 0 
	++ (fun (t, data_file, del_file, idx_file) ->
	      let db =
		{ c = c;
		  name = name;
		  is_open = true;
		  is_mutated = false;
		  t = t;
		  data = data_file;
		  del = del_file;
		  idx = idx_file;
		} in
	      eps_e (`Done db) esys
	   )
     )


let reopen_e db =
  if db.is_open then
    failwith "Pkv_api.reopen: db is open (commit or abort first)";
  let esys = Plasma_client.event_system db.c in

  ( if db.t <> None then
      Plasma_client.start_e db.c
      ++ (fun trans -> eps_e (`Done(Some trans)) esys)
    else
      eps_e (`Done None) esys
  )
  ++ (fun t0 ->
	opendb_existing_e db.c db.name t0 0 
	++ (fun (t, data_file, del_file, idx_file) ->
	      db.is_open <- true;
	      db.t <- t;
	      db.data <- data_file;
	      db.del <- del_file;
	      db.idx <- idx_file;
	      eps_e (`Done ()) esys
	   )
     )

let reopen db = Plasma_client.sync reopen_e db


let open_check db =
  if not db.is_open then
    failwith "Pkv_api: db is closed"


let mutate_check db =
  if not db.is_mutated then (
    (* Get the inode lock immediately *)
    match db.t with
      | None -> ()
      | Some trans ->
	  let ii = Plasma_client.get_inodeinfo trans db.data.data_inode in
	  Plasma_client.set_inodeinfo trans db.data.data_inode ii
  );
  db.is_mutated <- true


let max_key_size db =
  open_check db;
  get_max_key_length db.idx


let replace db key old_pos new_pos =
  if (old_pos <> (-1L)) then
    ignore(del_append db.del key old_pos);
  idx_insert db.idx key new_pos true


let insert db key value =
  open_check db;
  mutate_check db;
  let ch = new Netchannels.input_string value in
  let data_pos = 
    data_append db.data key (Int64.of_int(String.length value)) ch in
  try
    idx_insert db.idx key data_pos false
  with
    | Key_exists old_pos ->
	replace db key old_pos data_pos


let insert_large db key size ch =
  open_check db;
  mutate_check db;
  let data_pos = data_append db.data key size ch in
  try
    idx_insert db.idx key data_pos false
  with
    | Key_exists old_pos ->
	replace db key old_pos data_pos
	

let insert_channel db key size =
  open_check db;
  mutate_check db;
  let ch =
    data_append_ch db.data key size
      (fun data_pos ->
	 try
	   idx_insert db.idx key data_pos false
	 with
	   | Key_exists old_pos ->
	       replace db key old_pos data_pos
      ) in
  Netchannels.lift_out ~buffered:false (`Rec ch)
	

let delete db key =
  open_check db;
  mutate_check db;
  try
    let pos = idx_lookup db.idx key in
    if pos = (-1L) then raise Not_found;
    idx_insert db.idx key (-1L) true;
    ignore(del_append db.del key pos);
  with
    | Not_found -> ()


let lookup db key =
  open_check db;
  let pos = idx_lookup db.idx key in
  if pos = (-1L) then raise Not_found;
  let (k, v, _) = data_lookup_small db.data pos in
  assert(k = key);
  v


let lookup_large db key buf f =
  open_check db;
  let pos = idx_lookup db.idx key in
  if pos = (-1L) then raise Not_found;
  ignore
    (data_lookup db.data pos buf
       (fun k size ->
	  assert(k=key);
	  f size
       )
       (fun _ -> ())
    )


let lookup_large_e db key buf f =
  open_check db;
  let esys = Plasma_client.event_system db.c in
  idx_lookup_e db.idx key
  ++ (fun pos ->
	if pos = (-1L) then raise Not_found;
	data_lookup_dev db.data pos buf
	  (fun k size ->
	     assert(k=key);
	     f size
	  )
	  (fun _ -> eps_e (`Done()) esys)
     )
  ++ (fun _ ->
	eps_e (`Done()) esys
     )


let iterate db f =
  open_check db;
  idx_iterate db.idx
    (fun key pos ->
       if pos <> (-1L) then f key
    )

let update_field1 trans data idx del =
  (* Put the sequence numbers of del and idx files into field1
     of the data file
   *)
  let ii_data = 
    Plasma_client.get_inodeinfo trans data.data_inode in
  let ii_del = 
    Plasma_client.get_inodeinfo trans del.data_inode in
  let ii_idx = 
    Plasma_client.get_inodeinfo trans idx.idx_inode in
  let ii_data' =
    { ii_data with
	P.field1 = sprintf "pkv %Ld %Ld" ii_del.P.seqno ii_idx.P.seqno
    } in
  Plasma_client.set_inodeinfo trans data.data_inode ii_data'



let commit db =
  open_check db;
  Plasma_client.flush db.c db.data.data_inode 0L 0L;
  Plasma_client.flush db.c db.del.data_inode 0L 0L;
  idx_flush db.idx;
  ( match db.t with
      | None ->
	  if db.is_mutated then
	    (* Remove the sequence numbers, if any *)
	    with_trans db.c db.t (db.name ^ ".data")
	      (fun trans ->
		 let ii_data = 
		   Plasma_client.get_inodeinfo trans db.data.data_inode in
		 let ii_data' =
		   { ii_data with
		       P.field1 = ""
		   } in
		 Plasma_client.set_inodeinfo trans db.data.data_inode ii_data'
	      )
      | Some trans ->
	  if db.is_mutated then
	    update_field1 trans db.data db.idx db.del;
	  Plasma_client.commit trans;
  );
  db.is_open <- false


let abort_e db =
  open_check db;
  let esys = Plasma_client.event_system db.c in
  ( match db.t with
      | None ->
	  eps_e (`Done()) esys
      | Some trans ->
	  Plasma_client.abort_e trans;
  )
  >> (fun st ->
	db.is_open <- false;
	st
     )

let abort = Plasma_client.sync abort_e


let newer_version_available_e db =
  open_check db;
  let esys = Plasma_client.event_system db.c in
  let data_name = db.name ^ ".data" in
  match db.t with
    | None ->
	with_trans_e db.c None data_name
	  (fun trans ->
	     Plasma_client.lookup_e trans data_name false
	     ++ (fun inode_n  ->
		   eps_e (`Done(inode_n <> db.data.data_inode)) esys
		)
	  )
    | Some trans ->
	Plasma_client.lookup_e trans data_name false
	++ (fun inode_n ->
	      if inode_n <> db.data.data_inode then
		eps_e (`Done true) esys
	      else
		Plasma_client.get_cached_inodeinfo_e
		  db.c db.data.data_inode true
		++ (fun ii_n ->
		      eps_e
			(`Done(db.data.data_ii.P.seqno <> ii_n.P.seqno))
			esys
		   )
	   )

let newer_version_available =
  Plasma_client.sync newer_version_available_e


let vacuum db =
  (* We write new versions of the files, and rename when done *)
  open_check db;
  mutate_check db;
  let n = max_key_size db in
  let data_name_n = db.name ^ ".data_next" in
  let idx_name_n = db.name ^ ".idx_next" in
  let del_name_n = db.name ^ ".del_next" in

  List.iter
    (fun name ->
       with_trans db.c None name
	 (fun trans ->
	    try Plasma_client.unlink trans name
	    with Plasma_client.Plasma_error `enoent -> ()
	 )
    )
    [ data_name_n; idx_name_n; del_name_n ];

  data_create db.c data_name_n;
  idx_create db.c idx_name_n n;
  del_create db.c del_name_n;
  
  let data_n =
    with_trans db.c None data_name_n
      (fun trans ->
	 let inode_n = Plasma_client.lookup trans data_name_n false in
	 data_open trans inode_n
      ) in

  let idx_n =
    with_trans db.c None idx_name_n
      (fun trans -> 
	 let inode_n = Plasma_client.lookup trans idx_name_n false in
	 idx_open trans inode_n
      ) in

  let del_tab = (
    let ht = Hashtbl.create 56 in
    del_load db.del ht;
    ht
  ) in


  let buf = String.create 65535 in

  data_iterate db.data buf
    (fun old_pos key size ->
       if Hashtbl.mem del_tab (key,old_pos) then raise Skip;
       Netchannels.lift_out ~buffered:false
	 (`Rec
	    (data_append_ch data_n key size
	       (fun new_pos ->
		  idx_insert idx_n key new_pos false
	       )
	    )
	 )
    );
  Plasma_client.flush db.c data_n.data_inode 0L 0L;
  idx_flush idx_n;

  ( match db.t with
      | None -> ()
      | Some _ ->
	  (* Just write field1 *)
	  with_trans db.c None data_name_n
	    (fun trans -> 
	       let inode_n = Plasma_client.lookup trans del_name_n false in
	       let del_n = del_open trans inode_n in
	       update_field1 trans data_n idx_n del_n
	    )
  );
  
  (* This abort does not mean much. Any uncommitted modifications of the 
     db are now in the newly written files, so the overall effect is that of a
     commit
   *)
  abort db;

  with_trans db.c None data_name_n
    (fun trans -> 
       List.iter
	 (fun (old_name, new_name) ->
	    ( try Plasma_client.unlink trans new_name
	      with Plasma_client.Plasma_error `enoent -> ()
	    );
	    Plasma_client.rename trans old_name new_name
	 )
	 [ data_name_n, (db.name ^ ".data"); 
	   idx_name_n, (db.name ^ ".idx"); 
	   del_name_n, (db.name ^ ".del"); 
	 ]
    )

This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml