(* $Id: pkv_api.ml 489 2011-10-26 22:34:57Z gerd $ *)
open Pkv_files
open Uq_engines.Operators
open Printf
module P = Plasma_rpcapi_aux
type db =
{ c : Plasma_client.plasma_cluster;
name : string;
mutable is_open : bool;
mutable is_mutated : bool;
mutable t : Plasma_client.plasma_trans option;
mutable data : data_file;
mutable del : data_file;
mutable idx : idx_file;
}
type openflag =
[ `Create of int
| `Transactional
]
type openflag_e =
[ `Transactional
]
let with_trans_e c t filename f =
match t with
| None ->
Plasma_client.retry_e c filename
(fun () ->
Plasma_client.with_trans_e c
(fun trans ->
f trans
)
)
()
| Some trans ->
f trans
let with_trans c t filename f =
match t with
| None ->
Plasma_client.retry c filename
(fun () ->
Plasma_client.with_trans c
(fun trans ->
f trans
)
)
()
| Some trans ->
f trans
exception Inconsistent_versions
let rec opendb_existing_e c name t attempt =
let esys = Plasma_client.event_system c in
let data_name = name ^ ".data" in
let del_name = name ^ ".del" in
let idx_name = name ^ ".idx" in
let lookup_snapshot_e trans name append =
Plasma_client.lookup_e trans name false
++ (fun inode ->
( if t <> None then
Plasma_client.snapshot_e ~append trans inode
else
eps_e (`Done()) esys
)
++ (fun () ->
eps_e (`Done inode) esys
)
) in
let e =
with_trans_e c t data_name
(fun trans ->
lookup_snapshot_e trans data_name true
++ data_open_e trans
)
++ (fun data_file ->
with_trans_e c t del_name
(fun trans ->
lookup_snapshot_e trans del_name false
++ del_open_e trans
)
++ (fun del_file ->
with_trans_e c t idx_name
(fun trans ->
lookup_snapshot_e trans idx_name false
++ idx_open_e trans
)
++ (fun idx_file ->
eps_e (`Done(data_file,del_file,idx_file)) esys
)
)
)
++ (fun (data_file, del_file, idx_file) ->
(* seqno check: because the three snapshots are not done at the same
time, they can refer to different file versions. We protect against
this by storing the expected sequence numbers of the del and idx
files in a metadata field of the data file. If a mismatch is found,
we just try again.
*)
( match t with
| None -> eps_e (`Done()) esys
| Some trans ->
Plasma_client.get_inodeinfo_e trans data_file.data_inode
++ (fun ii_data ->
Plasma_client.get_inodeinfo_e trans del_file.data_inode
++ (fun ii_del ->
Plasma_client.get_inodeinfo_e
trans idx_file.idx_inode
++ (fun ii_idx ->
let exp_field1 =
sprintf "pkv %Ld %Ld"
ii_del.P.seqno ii_idx.P.seqno in
if (ii_data.P.field1 <> "" &&
ii_data.P.field1 <> exp_field1) then (
Plasma_client.abort_e trans
++ (fun () ->
raise Inconsistent_versions
)
)
else
eps_e (`Done()) esys
)
)
)
)
++ (fun () ->
eps_e (`Done(t, data_file, del_file, idx_file)) esys
)
) in
Uq_engines.meta_engine e
++ (function
| `Error Inconsistent_versions ->
if attempt < 10 then
Plasma_client.start_e c
++ (fun t ->
opendb_existing_e c name (Some t) (attempt+1)
)
else
failwith
"Pkv_api.opendb: the db files are inconsistent to each other"
| st ->
eps_e (st :> _ Uq_engines.engine_state) esys
)
let opendb_existing c name t attempt =
Plasma_client.sync (opendb_existing_e c name t) attempt
let opendb c name flags =
let data_name = name ^ ".data" in
let del_name = name ^ ".del" in
let idx_name = name ^ ".idx" in
let t =
if List.mem `Transactional flags then
Some(Plasma_client.start c)
else
None in
try
let (t, data_file, del_file, idx_file) =
opendb_existing c name t 0 in
{ c = c;
name = name;
is_open = true;
is_mutated = false;
t = t;
data = data_file;
del = del_file;
idx = idx_file;
}
with
| Plasma_client.Plasma_error `enoent as error ->
try
let create_flag =
List.find
(function
| `Create n -> true
| _ -> false
)
flags in
let n =
match create_flag with
| `Create n -> n
| _ -> assert false in
data_create c data_name;
del_create c del_name;
idx_create c idx_name n;
let (t, data_file, del_file, idx_file) =
opendb_existing c name t 0 in
{ c = c;
name = name;
is_open = true;
is_mutated = false;
t = t;
data = data_file;
del = del_file;
idx = idx_file;
}
with
| Not_found ->
raise error
let opendb_e c name flags =
let esys = Plasma_client.event_system c in
( if List.mem `Transactional flags then
Plasma_client.start_e c
++ (fun trans -> eps_e (`Done(Some trans)) esys)
else
eps_e (`Done None) esys
)
++ (fun t ->
opendb_existing_e c name t 0
++ (fun (t, data_file, del_file, idx_file) ->
let db =
{ c = c;
name = name;
is_open = true;
is_mutated = false;
t = t;
data = data_file;
del = del_file;
idx = idx_file;
} in
eps_e (`Done db) esys
)
)
let reopen_e db =
if db.is_open then
failwith "Pkv_api.reopen: db is open (commit or abort first)";
let esys = Plasma_client.event_system db.c in
( if db.t <> None then
Plasma_client.start_e db.c
++ (fun trans -> eps_e (`Done(Some trans)) esys)
else
eps_e (`Done None) esys
)
++ (fun t0 ->
opendb_existing_e db.c db.name t0 0
++ (fun (t, data_file, del_file, idx_file) ->
db.is_open <- true;
db.t <- t;
db.data <- data_file;
db.del <- del_file;
db.idx <- idx_file;
eps_e (`Done ()) esys
)
)
let reopen db = Plasma_client.sync reopen_e db
let open_check db =
if not db.is_open then
failwith "Pkv_api: db is closed"
let mutate_check db =
if not db.is_mutated then (
(* Get the inode lock immediately *)
match db.t with
| None -> ()
| Some trans ->
let ii = Plasma_client.get_inodeinfo trans db.data.data_inode in
Plasma_client.set_inodeinfo trans db.data.data_inode ii
);
db.is_mutated <- true
let max_key_size db =
open_check db;
get_max_key_length db.idx
let replace db key old_pos new_pos =
if (old_pos <> (-1L)) then
ignore(del_append db.del key old_pos);
idx_insert db.idx key new_pos true
let insert db key value =
open_check db;
mutate_check db;
let ch = new Netchannels.input_string value in
let data_pos =
data_append db.data key (Int64.of_int(String.length value)) ch in
try
idx_insert db.idx key data_pos false
with
| Key_exists old_pos ->
replace db key old_pos data_pos
let insert_large db key size ch =
open_check db;
mutate_check db;
let data_pos = data_append db.data key size ch in
try
idx_insert db.idx key data_pos false
with
| Key_exists old_pos ->
replace db key old_pos data_pos
let insert_channel db key size =
open_check db;
mutate_check db;
let ch =
data_append_ch db.data key size
(fun data_pos ->
try
idx_insert db.idx key data_pos false
with
| Key_exists old_pos ->
replace db key old_pos data_pos
) in
Netchannels.lift_out ~buffered:false (`Rec ch)
let delete db key =
open_check db;
mutate_check db;
try
let pos = idx_lookup db.idx key in
if pos = (-1L) then raise Not_found;
idx_insert db.idx key (-1L) true;
ignore(del_append db.del key pos);
with
| Not_found -> ()
let lookup db key =
open_check db;
let pos = idx_lookup db.idx key in
if pos = (-1L) then raise Not_found;
let (k, v, _) = data_lookup_small db.data pos in
assert(k = key);
v
let lookup_large db key buf f =
open_check db;
let pos = idx_lookup db.idx key in
if pos = (-1L) then raise Not_found;
ignore
(data_lookup db.data pos buf
(fun k size ->
assert(k=key);
f size
)
(fun _ -> ())
)
let lookup_large_e db key buf f =
open_check db;
let esys = Plasma_client.event_system db.c in
idx_lookup_e db.idx key
++ (fun pos ->
if pos = (-1L) then raise Not_found;
data_lookup_dev db.data pos buf
(fun k size ->
assert(k=key);
f size
)
(fun _ -> eps_e (`Done()) esys)
)
++ (fun _ ->
eps_e (`Done()) esys
)
let iterate db f =
open_check db;
idx_iterate db.idx
(fun key pos ->
if pos <> (-1L) then f key
)
let update_field1 trans data idx del =
(* Put the sequence numbers of del and idx files into field1
of the data file
*)
let ii_data =
Plasma_client.get_inodeinfo trans data.data_inode in
let ii_del =
Plasma_client.get_inodeinfo trans del.data_inode in
let ii_idx =
Plasma_client.get_inodeinfo trans idx.idx_inode in
let ii_data' =
{ ii_data with
P.field1 = sprintf "pkv %Ld %Ld" ii_del.P.seqno ii_idx.P.seqno
} in
Plasma_client.set_inodeinfo trans data.data_inode ii_data'
let commit db =
open_check db;
Plasma_client.flush db.c db.data.data_inode 0L 0L;
Plasma_client.flush db.c db.del.data_inode 0L 0L;
idx_flush db.idx;
( match db.t with
| None ->
if db.is_mutated then
(* Remove the sequence numbers, if any *)
with_trans db.c db.t (db.name ^ ".data")
(fun trans ->
let ii_data =
Plasma_client.get_inodeinfo trans db.data.data_inode in
let ii_data' =
{ ii_data with
P.field1 = ""
} in
Plasma_client.set_inodeinfo trans db.data.data_inode ii_data'
)
| Some trans ->
if db.is_mutated then
update_field1 trans db.data db.idx db.del;
Plasma_client.commit trans;
);
db.is_open <- false
let abort_e db =
open_check db;
let esys = Plasma_client.event_system db.c in
( match db.t with
| None ->
eps_e (`Done()) esys
| Some trans ->
Plasma_client.abort_e trans;
)
>> (fun st ->
db.is_open <- false;
st
)
let abort = Plasma_client.sync abort_e
let newer_version_available_e db =
open_check db;
let esys = Plasma_client.event_system db.c in
let data_name = db.name ^ ".data" in
match db.t with
| None ->
with_trans_e db.c None data_name
(fun trans ->
Plasma_client.lookup_e trans data_name false
++ (fun inode_n ->
eps_e (`Done(inode_n <> db.data.data_inode)) esys
)
)
| Some trans ->
Plasma_client.lookup_e trans data_name false
++ (fun inode_n ->
if inode_n <> db.data.data_inode then
eps_e (`Done true) esys
else
Plasma_client.get_cached_inodeinfo_e
db.c db.data.data_inode true
++ (fun ii_n ->
eps_e
(`Done(db.data.data_ii.P.seqno <> ii_n.P.seqno))
esys
)
)
let newer_version_available =
Plasma_client.sync newer_version_available_e
let vacuum db =
(* We write new versions of the files, and rename when done *)
open_check db;
mutate_check db;
let n = max_key_size db in
let data_name_n = db.name ^ ".data_next" in
let idx_name_n = db.name ^ ".idx_next" in
let del_name_n = db.name ^ ".del_next" in
List.iter
(fun name ->
with_trans db.c None name
(fun trans ->
try Plasma_client.unlink trans name
with Plasma_client.Plasma_error `enoent -> ()
)
)
[ data_name_n; idx_name_n; del_name_n ];
data_create db.c data_name_n;
idx_create db.c idx_name_n n;
del_create db.c del_name_n;
let data_n =
with_trans db.c None data_name_n
(fun trans ->
let inode_n = Plasma_client.lookup trans data_name_n false in
data_open trans inode_n
) in
let idx_n =
with_trans db.c None idx_name_n
(fun trans ->
let inode_n = Plasma_client.lookup trans idx_name_n false in
idx_open trans inode_n
) in
let del_tab = (
let ht = Hashtbl.create 56 in
del_load db.del ht;
ht
) in
let buf = String.create 65535 in
data_iterate db.data buf
(fun old_pos key size ->
if Hashtbl.mem del_tab (key,old_pos) then raise Skip;
Netchannels.lift_out ~buffered:false
(`Rec
(data_append_ch data_n key size
(fun new_pos ->
idx_insert idx_n key new_pos false
)
)
)
);
Plasma_client.flush db.c data_n.data_inode 0L 0L;
idx_flush idx_n;
( match db.t with
| None -> ()
| Some _ ->
(* Just write field1 *)
with_trans db.c None data_name_n
(fun trans ->
let inode_n = Plasma_client.lookup trans del_name_n false in
let del_n = del_open trans inode_n in
update_field1 trans data_n idx_n del_n
)
);
(* This abort does not mean much. Any uncommitted modifications of the
db are now in the newly written files, so the overall effect is that of a
commit
*)
abort db;
with_trans db.c None data_name_n
(fun trans ->
List.iter
(fun (old_name, new_name) ->
( try Plasma_client.unlink trans new_name
with Plasma_client.Plasma_error `enoent -> ()
);
Plasma_client.rename trans old_name new_name
)
[ data_name_n, (db.name ^ ".data");
idx_name_n, (db.name ^ ".idx");
del_name_n, (db.name ^ ".del");
]
)