(* $Id: pkv_api.ml 489 2011-10-26 22:34:57Z gerd $ *) open Pkv_files open Uq_engines.Operators open Printf module P = Plasma_rpcapi_aux type db = { c : Plasma_client.plasma_cluster; name : string; mutable is_open : bool; mutable is_mutated : bool; mutable t : Plasma_client.plasma_trans option; mutable data : data_file; mutable del : data_file; mutable idx : idx_file; } type openflag = [ `Create of int | `Transactional ] type openflag_e = [ `Transactional ] let with_trans_e c t filename f = match t with | None -> Plasma_client.retry_e c filename (fun () -> Plasma_client.with_trans_e c (fun trans -> f trans ) ) () | Some trans -> f trans let with_trans c t filename f = match t with | None -> Plasma_client.retry c filename (fun () -> Plasma_client.with_trans c (fun trans -> f trans ) ) () | Some trans -> f trans exception Inconsistent_versions let rec opendb_existing_e c name t attempt = let esys = Plasma_client.event_system c in let data_name = name ^ ".data" in let del_name = name ^ ".del" in let idx_name = name ^ ".idx" in let lookup_snapshot_e trans name append = Plasma_client.lookup_e trans name false ++ (fun inode -> ( if t <> None then Plasma_client.snapshot_e ~append trans inode else eps_e (`Done()) esys ) ++ (fun () -> eps_e (`Done inode) esys ) ) in let e = with_trans_e c t data_name (fun trans -> lookup_snapshot_e trans data_name true ++ data_open_e trans ) ++ (fun data_file -> with_trans_e c t del_name (fun trans -> lookup_snapshot_e trans del_name false ++ del_open_e trans ) ++ (fun del_file -> with_trans_e c t idx_name (fun trans -> lookup_snapshot_e trans idx_name false ++ idx_open_e trans ) ++ (fun idx_file -> eps_e (`Done(data_file,del_file,idx_file)) esys ) ) ) ++ (fun (data_file, del_file, idx_file) -> (* seqno check: because the three snapshots are not done at the same time, they can refer to different file versions. We protect against this by storing the expected sequence numbers of the del and idx files in a metadata field of the data file. If a mismatch is found, we just try again. *) ( match t with | None -> eps_e (`Done()) esys | Some trans -> Plasma_client.get_inodeinfo_e trans data_file.data_inode ++ (fun ii_data -> Plasma_client.get_inodeinfo_e trans del_file.data_inode ++ (fun ii_del -> Plasma_client.get_inodeinfo_e trans idx_file.idx_inode ++ (fun ii_idx -> let exp_field1 = sprintf "pkv %Ld %Ld" ii_del.P.seqno ii_idx.P.seqno in if (ii_data.P.field1 <> "" && ii_data.P.field1 <> exp_field1) then ( Plasma_client.abort_e trans ++ (fun () -> raise Inconsistent_versions ) ) else eps_e (`Done()) esys ) ) ) ) ++ (fun () -> eps_e (`Done(t, data_file, del_file, idx_file)) esys ) ) in Uq_engines.meta_engine e ++ (function | `Error Inconsistent_versions -> if attempt < 10 then Plasma_client.start_e c ++ (fun t -> opendb_existing_e c name (Some t) (attempt+1) ) else failwith "Pkv_api.opendb: the db files are inconsistent to each other" | st -> eps_e (st :> _ Uq_engines.engine_state) esys ) let opendb_existing c name t attempt = Plasma_client.sync (opendb_existing_e c name t) attempt let opendb c name flags = let data_name = name ^ ".data" in let del_name = name ^ ".del" in let idx_name = name ^ ".idx" in let t = if List.mem `Transactional flags then Some(Plasma_client.start c) else None in try let (t, data_file, del_file, idx_file) = opendb_existing c name t 0 in { c = c; name = name; is_open = true; is_mutated = false; t = t; data = data_file; del = del_file; idx = idx_file; } with | Plasma_client.Plasma_error `enoent as error -> try let create_flag = List.find (function | `Create n -> true | _ -> false ) flags in let n = match create_flag with | `Create n -> n | _ -> assert false in data_create c data_name; del_create c del_name; idx_create c idx_name n; let (t, data_file, del_file, idx_file) = opendb_existing c name t 0 in { c = c; name = name; is_open = true; is_mutated = false; t = t; data = data_file; del = del_file; idx = idx_file; } with | Not_found -> raise error let opendb_e c name flags = let esys = Plasma_client.event_system c in ( if List.mem `Transactional flags then Plasma_client.start_e c ++ (fun trans -> eps_e (`Done(Some trans)) esys) else eps_e (`Done None) esys ) ++ (fun t -> opendb_existing_e c name t 0 ++ (fun (t, data_file, del_file, idx_file) -> let db = { c = c; name = name; is_open = true; is_mutated = false; t = t; data = data_file; del = del_file; idx = idx_file; } in eps_e (`Done db) esys ) ) let reopen_e db = if db.is_open then failwith "Pkv_api.reopen: db is open (commit or abort first)"; let esys = Plasma_client.event_system db.c in ( if db.t <> None then Plasma_client.start_e db.c ++ (fun trans -> eps_e (`Done(Some trans)) esys) else eps_e (`Done None) esys ) ++ (fun t0 -> opendb_existing_e db.c db.name t0 0 ++ (fun (t, data_file, del_file, idx_file) -> db.is_open <- true; db.t <- t; db.data <- data_file; db.del <- del_file; db.idx <- idx_file; eps_e (`Done ()) esys ) ) let reopen db = Plasma_client.sync reopen_e db let open_check db = if not db.is_open then failwith "Pkv_api: db is closed" let mutate_check db = if not db.is_mutated then ( (* Get the inode lock immediately *) match db.t with | None -> () | Some trans -> let ii = Plasma_client.get_inodeinfo trans db.data.data_inode in Plasma_client.set_inodeinfo trans db.data.data_inode ii ); db.is_mutated <- true let max_key_size db = open_check db; get_max_key_length db.idx let replace db key old_pos new_pos = if (old_pos <> (-1L)) then ignore(del_append db.del key old_pos); idx_insert db.idx key new_pos true let insert db key value = open_check db; mutate_check db; let ch = new Netchannels.input_string value in let data_pos = data_append db.data key (Int64.of_int(String.length value)) ch in try idx_insert db.idx key data_pos false with | Key_exists old_pos -> replace db key old_pos data_pos let insert_large db key size ch = open_check db; mutate_check db; let data_pos = data_append db.data key size ch in try idx_insert db.idx key data_pos false with | Key_exists old_pos -> replace db key old_pos data_pos let insert_channel db key size = open_check db; mutate_check db; let ch = data_append_ch db.data key size (fun data_pos -> try idx_insert db.idx key data_pos false with | Key_exists old_pos -> replace db key old_pos data_pos ) in Netchannels.lift_out ~buffered:false (`Rec ch) let delete db key = open_check db; mutate_check db; try let pos = idx_lookup db.idx key in if pos = (-1L) then raise Not_found; idx_insert db.idx key (-1L) true; ignore(del_append db.del key pos); with | Not_found -> () let lookup db key = open_check db; let pos = idx_lookup db.idx key in if pos = (-1L) then raise Not_found; let (k, v, _) = data_lookup_small db.data pos in assert(k = key); v let lookup_large db key buf f = open_check db; let pos = idx_lookup db.idx key in if pos = (-1L) then raise Not_found; ignore (data_lookup db.data pos buf (fun k size -> assert(k=key); f size ) (fun _ -> ()) ) let lookup_large_e db key buf f = open_check db; let esys = Plasma_client.event_system db.c in idx_lookup_e db.idx key ++ (fun pos -> if pos = (-1L) then raise Not_found; data_lookup_dev db.data pos buf (fun k size -> assert(k=key); f size ) (fun _ -> eps_e (`Done()) esys) ) ++ (fun _ -> eps_e (`Done()) esys ) let iterate db f = open_check db; idx_iterate db.idx (fun key pos -> if pos <> (-1L) then f key ) let update_field1 trans data idx del = (* Put the sequence numbers of del and idx files into field1 of the data file *) let ii_data = Plasma_client.get_inodeinfo trans data.data_inode in let ii_del = Plasma_client.get_inodeinfo trans del.data_inode in let ii_idx = Plasma_client.get_inodeinfo trans idx.idx_inode in let ii_data' = { ii_data with P.field1 = sprintf "pkv %Ld %Ld" ii_del.P.seqno ii_idx.P.seqno } in Plasma_client.set_inodeinfo trans data.data_inode ii_data' let commit db = open_check db; Plasma_client.flush db.c db.data.data_inode 0L 0L; Plasma_client.flush db.c db.del.data_inode 0L 0L; idx_flush db.idx; ( match db.t with | None -> if db.is_mutated then (* Remove the sequence numbers, if any *) with_trans db.c db.t (db.name ^ ".data") (fun trans -> let ii_data = Plasma_client.get_inodeinfo trans db.data.data_inode in let ii_data' = { ii_data with P.field1 = "" } in Plasma_client.set_inodeinfo trans db.data.data_inode ii_data' ) | Some trans -> if db.is_mutated then update_field1 trans db.data db.idx db.del; Plasma_client.commit trans; ); db.is_open <- false let abort_e db = open_check db; let esys = Plasma_client.event_system db.c in ( match db.t with | None -> eps_e (`Done()) esys | Some trans -> Plasma_client.abort_e trans; ) >> (fun st -> db.is_open <- false; st ) let abort = Plasma_client.sync abort_e let newer_version_available_e db = open_check db; let esys = Plasma_client.event_system db.c in let data_name = db.name ^ ".data" in match db.t with | None -> with_trans_e db.c None data_name (fun trans -> Plasma_client.lookup_e trans data_name false ++ (fun inode_n -> eps_e (`Done(inode_n <> db.data.data_inode)) esys ) ) | Some trans -> Plasma_client.lookup_e trans data_name false ++ (fun inode_n -> if inode_n <> db.data.data_inode then eps_e (`Done true) esys else Plasma_client.get_cached_inodeinfo_e db.c db.data.data_inode true ++ (fun ii_n -> eps_e (`Done(db.data.data_ii.P.seqno <> ii_n.P.seqno)) esys ) ) let newer_version_available = Plasma_client.sync newer_version_available_e let vacuum db = (* We write new versions of the files, and rename when done *) open_check db; mutate_check db; let n = max_key_size db in let data_name_n = db.name ^ ".data_next" in let idx_name_n = db.name ^ ".idx_next" in let del_name_n = db.name ^ ".del_next" in List.iter (fun name -> with_trans db.c None name (fun trans -> try Plasma_client.unlink trans name with Plasma_client.Plasma_error `enoent -> () ) ) [ data_name_n; idx_name_n; del_name_n ]; data_create db.c data_name_n; idx_create db.c idx_name_n n; del_create db.c del_name_n; let data_n = with_trans db.c None data_name_n (fun trans -> let inode_n = Plasma_client.lookup trans data_name_n false in data_open trans inode_n ) in let idx_n = with_trans db.c None idx_name_n (fun trans -> let inode_n = Plasma_client.lookup trans idx_name_n false in idx_open trans inode_n ) in let del_tab = ( let ht = Hashtbl.create 56 in del_load db.del ht; ht ) in let buf = String.create 65535 in data_iterate db.data buf (fun old_pos key size -> if Hashtbl.mem del_tab (key,old_pos) then raise Skip; Netchannels.lift_out ~buffered:false (`Rec (data_append_ch data_n key size (fun new_pos -> idx_insert idx_n key new_pos false ) ) ) ); Plasma_client.flush db.c data_n.data_inode 0L 0L; idx_flush idx_n; ( match db.t with | None -> () | Some _ -> (* Just write field1 *) with_trans db.c None data_name_n (fun trans -> let inode_n = Plasma_client.lookup trans del_name_n false in let del_n = del_open trans inode_n in update_field1 trans data_n idx_n del_n ) ); (* This abort does not mean much. Any uncommitted modifications of the db are now in the newly written files, so the overall effect is that of a commit *) abort db; with_trans db.c None data_name_n (fun trans -> List.iter (fun (old_name, new_name) -> ( try Plasma_client.unlink trans new_name with Plasma_client.Plasma_error `enoent -> () ); Plasma_client.rename trans old_name new_name ) [ data_name_n, (db.name ^ ".data"); idx_name_n, (db.name ^ ".idx"); del_name_n, (db.name ^ ".del"); ] )