diff --git a/Cargo.lock b/Cargo.lock index cc2d73f845..41694a2bd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -187,16 +187,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] -name = "bit-vec" -version = "0.6.3" +name = "bincode" +version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] [[package]] -name = "bitflags" -version = "0.9.1" +name = "bit-vec" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4efd02e230a02e18f92fc2735f44597385ed02ad8f831e7c1c1156ee5e1ab3a5" +checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" [[package]] name = "bitflags" @@ -206,9 +209,12 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +dependencies = [ + "serde_core", +] [[package]] name = "blake2-rfc" @@ -273,10 +279,11 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.16" +version = "1.2.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c" +checksum = "43c5703da9466b66a946814e1adf53ea2c90f10063b86290cc9eb67ce3478a20" dependencies = [ + "find-msvc-tools", "jobserver", "libc", "shlex", @@ -422,6 +429,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -626,6 +642,15 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "doxygen-rs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9" +dependencies = [ + "phf", +] + [[package]] name = "easy-jsonrpc-mw" version = "0.5.4" @@ -748,6 +773,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + [[package]] name = "flate2" version = "1.0.32" @@ -884,12 +915,6 @@ dependencies = [ "slab", ] -[[package]] -name = "gcc" -version = "0.3.55" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" - [[package]] name = "generic-array" version = "0.14.7" @@ -923,7 +948,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2deb07a133b1520dc1a5690e9bd08950108873d7ed5de38dcc74d3b5ebffa110" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.11.1", "libc", "libgit2-sys", "log", @@ -1054,7 +1079,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "siphasher", + "siphasher 0.3.11", "thiserror", "zeroize", ] @@ -1154,7 +1179,6 @@ dependencies = [ "http", "hyper", "hyper-rustls", - "lmdb-zero", "log", "rand 0.6.5", "rustls", @@ -1177,8 +1201,8 @@ dependencies = [ "filetime", "grin_core", "grin_util", + "heed", "libc", - "lmdb-zero", "log", "memmap", "rand 0.6.5", @@ -1243,6 +1267,44 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "heed" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad82d6598ccf1dac15c8b758a1bd282b755b6776be600429176757190a1b0202" +dependencies = [ + "bitflags 2.11.1", + "byteorder", + "heed-traits", + "heed-types", + "libc", + "lmdb-master-sys", + "once_cell", + "page_size", + "serde", + "synchronoise", + "url", +] + +[[package]] +name = "heed-traits" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" + +[[package]] +name = "heed-types" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d" +dependencies = [ + "bincode", + "byteorder", + "heed-traits", + "serde", + "serde_json", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1560,9 +1622,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.158" +version = "0.2.185" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" [[package]] name = "libgit2-sys" @@ -1576,23 +1638,13 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "liblmdb-sys" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feed38a3a580f60bf61aaa067b0ff4123395966839adeaf67258a9e50c4d2e49" -dependencies = [ - "gcc", - "libc", -] - [[package]] name = "libredox" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.11.1", "libc", "redox_syscall 0.5.3", ] @@ -1628,15 +1680,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" [[package]] -name = "lmdb-zero" -version = "0.4.4" +name = "lmdb-master-sys" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13416eee745b087c22934f35f1f24da22da41ba2a5ce197143d168ce055cc58d" +checksum = "aaeb9bd22e73bd1babffff614994b341e9b2008de7bb73bf1f7e9154f1978f8b" dependencies = [ - "bitflags 0.9.1", + "cc", + "doxygen-rs", "libc", - "liblmdb-sys", - "supercow", ] [[package]] @@ -1780,7 +1831,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.11.1", "cfg-if 1.0.0", "cfg_aliases", "libc", @@ -1935,9 +1986,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" [[package]] name = "opaque-debug" @@ -1969,6 +2020,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "pancurses" version = "0.17.0" @@ -2069,6 +2130,48 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_macros", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" +dependencies = [ + "phf_shared", + "rand 0.8.5", +] + +[[package]] +name = "phf_macros" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216" +dependencies = [ + "phf_generator", + "phf_shared", + "proc-macro2 1.0.86", + "quote 1.0.36", + "syn 2.0.87", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher 1.0.2", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -2335,7 +2438,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.11.1", ] [[package]] @@ -2430,7 +2533,7 @@ version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.11.1", "errno", "libc", "linux-raw-sys", @@ -2522,7 +2625,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.11.1", "core-foundation", "core-foundation-sys", "libc", @@ -2541,10 +2644,11 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.208" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] @@ -2558,11 +2662,20 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.208" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2 1.0.86", "quote 1.0.36", @@ -2571,14 +2684,15 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.125" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ "itoa", "memchr", - "ryu", "serde", + "serde_core", + "zmij", ] [[package]] @@ -2638,6 +2752,12 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "siphasher" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" + [[package]] name = "slab" version = "0.4.9" @@ -2693,12 +2813,6 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" -[[package]] -name = "supercow" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171758edb47aa306a78dfa4ab9aeb5167405bd4e3dc2b64e88f6a84bbe98bd63" - [[package]] name = "syn" version = "0.15.44" @@ -2732,6 +2846,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "synchronoise" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2" +dependencies = [ + "crossbeam-queue", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -3439,3 +3562,9 @@ dependencies = [ "crc32fast", "thiserror", ] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/chain/src/chain.rs b/chain/src/chain.rs index e566d1bc0b..11057398dd 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -150,8 +150,8 @@ pub struct Chain { store: Arc, adapter: Arc, orphans: Arc, - txhashset: Arc>, - header_pmmr: Arc>>, + txhashset: Arc>, + header_pmmr: Arc>>, pibd_segmenter: Arc>>, pibd_desegmenter: Arc>>, // POW verification function @@ -189,9 +189,9 @@ impl Chain { // Initialize the output_pos index based on UTXO set // and NRD kernel_pos index based recent kernel history. { - let batch = store.batch()?; - txhashset.init_output_pos_index(&header_pmmr, &batch)?; - txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?; + let mut batch = store.batch()?; + txhashset.init_output_pos_index(&header_pmmr, &mut batch)?; + txhashset.init_recent_kernel_pos_index(&header_pmmr, &mut batch)?; batch.commit()?; } @@ -275,7 +275,7 @@ impl Chain { pub fn reset_chain_head_to_genesis(&self) -> Result<(), Error> { let mut header_pmmr = self.header_pmmr.write(); let mut txhashset = self.txhashset.write(); - let batch = self.store.batch()?; + let mut batch = self.store.batch()?; // Change head back to genesis { @@ -314,7 +314,7 @@ impl Chain { /// Reset PIBD head pub fn reset_pibd_head(&self) -> Result<(), Error> { - let batch = self.store.batch()?; + let mut batch = self.store.batch()?; batch.save_pibd_head(&self.genesis().into())?; Ok(()) } @@ -530,9 +530,9 @@ impl Chain { pub fn new_ctx<'a>( &self, opts: Options, - batch: store::Batch<'a>, - header_pmmr: &'a mut txhashset::PMMRHandle, - txhashset: &'a mut txhashset::TxHashSet, + batch: Batch<'a>, + header_pmmr: &'a mut PMMRHandle, + txhashset: &'a mut TxHashSet, ) -> Result, Error> { let denylist = self.denylist.read().clone(); Ok(pipe::BlockContext { @@ -832,7 +832,7 @@ impl Chain { &self, header: &BlockHeader, ext: &mut ExtensionPair, - batch: &Batch, + batch: &mut Batch, ) -> Result { let denylist = self.denylist.read().clone(); pipe::rewind_and_apply_fork(header, ext, batch, &|header| { @@ -846,7 +846,7 @@ impl Chain { &self, header: &BlockHeader, ext: &mut HeaderExtension, - batch: &Batch, + batch: &mut Batch, ) -> Result<(), Error> { let denylist = self.denylist.read().clone(); pipe::rewind_and_apply_header_fork(header, ext, batch, &|header| { @@ -1015,7 +1015,7 @@ impl Chain { fn validate_kernel_history( &self, header: &BlockHeader, - txhashset: &txhashset::TxHashSet, + txhashset: &TxHashSet, ) -> Result<(), Error> { debug!("validate_kernel_history: rewinding and validating kernel history (readonly)"); @@ -1151,11 +1151,11 @@ impl Chain { self.validate_kernel_history(&header, &txhashset)?; let header_pmmr = self.header_pmmr.read(); - let batch = self.store.batch()?; + let mut batch = self.store.batch()?; txhashset.verify_kernel_pos_index( &self.genesis.header, &header_pmmr, - &batch, + &mut batch, None, None, )?; @@ -1213,10 +1213,10 @@ impl Chain { } // Rebuild our output_pos index in the db based on fresh UTXO set. - txhashset.init_output_pos_index(&header_pmmr, &batch)?; + txhashset.init_output_pos_index(&header_pmmr, &mut batch)?; // Rebuild our NRD kernel_pos index based on recent kernel history. - txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?; + txhashset.init_recent_kernel_pos_index(&header_pmmr, &mut batch)?; // Commit all the changes to the db. batch.commit()?; @@ -1257,9 +1257,9 @@ impl Chain { /// *Only* runs if we are not in archive mode. fn remove_historical_blocks( &self, - header_pmmr: &txhashset::PMMRHandle, + header_pmmr: &PMMRHandle, archive_header: BlockHeader, - batch: &store::Batch<'_>, + batch: &mut Batch<'_>, ) -> Result<(), Error> { if self.archive_mode() { return Ok(()); @@ -1293,17 +1293,24 @@ impl Chain { return Ok(()); } - let mut count = 0; let tail_hash = header_pmmr.get_header_hash_by_height(head.height - horizon)?; let tail = batch.get_block_header(&tail_hash)?; - // Remove old blocks (including short lived fork blocks) which height < tail.height - for block in batch.blocks_iter()? { - if block.header.height < tail.height { - let _ = batch.delete_block(&block.hash()); - count += 1; + // Remove old blocks (including short-lived fork blocks) which height < tail.height + let mut blocks_to_delete = vec![]; + let iter = batch.blocks_iter()?; + for block in iter { + if let Ok(block) = block { + if block.header.height < tail.height { + blocks_to_delete.push(block.hash()); + } } } + let mut count = 0; + for bh in blocks_to_delete { + let _ = batch.delete_block(&bh); + count += 1; + } batch.save_body_tail(&Tip::from_header(&tail))?; @@ -1345,7 +1352,7 @@ impl Chain { // Take a write lock on the txhashet and start a new writeable db batch. let header_pmmr = self.header_pmmr.read(); let mut txhashset = self.txhashset.write(); - let batch = self.store.batch()?; + let mut batch = self.store.batch()?; // Compact the txhashset itself (rewriting the pruned backend files). { @@ -1361,15 +1368,15 @@ impl Chain { // If we are not in archival mode remove historical blocks from the db. if !self.archive_mode() { - self.remove_historical_blocks(&header_pmmr, archive_header, &batch)?; + self.remove_historical_blocks(&header_pmmr, archive_header, &mut batch)?; } // Make sure our output_pos index is consistent with the UTXO set. - txhashset.init_output_pos_index(&header_pmmr, &batch)?; + txhashset.init_output_pos_index(&header_pmmr, &mut batch)?; // TODO - Why is this part of chain compaction? // Rebuild our NRD kernel_pos index based on recent kernel history. - txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?; + txhashset.init_recent_kernel_pos_index(&header_pmmr, &mut batch)?; // Commit all the above db changes. batch.commit()?; diff --git a/chain/src/linked_list.rs b/chain/src/linked_list.rs index 14039b6626..40b9b36b38 100644 --- a/chain/src/linked_list.rs +++ b/chain/src/linked_list.rs @@ -18,10 +18,11 @@ use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; use crate::store::Batch; use crate::types::CommitPos; use crate::util::secp::pedersen::Commitment; +use byteorder::{BigEndian, WriteBytesExt}; use enum_primitive::FromPrimitive; use grin_store as store; use std::marker::PhantomData; -use store::{to_key, to_key_u64, Error}; +use store::Error; enum_from_primitive! { #[derive(Copy, Clone, Debug, PartialEq)] @@ -74,28 +75,24 @@ pub trait ListIndex { /// List entry type type Entry: ListIndexEntry; - /// Construct a key for the list. - fn list_key(&self, commit: Commitment) -> Vec; - /// Construct a key for an individual entry in the list. - fn entry_key(&self, commit: Commitment, pos: u64) -> Vec; + fn entry_key(&self, commit: Commitment, pos: u64) -> (Option, Vec); /// Returns either a "Single" with embedded "pos" or a "list" with "head" and "tail". - /// Key is "prefix|commit". - /// Note the key for an individual entry in the list is "prefix|commit|pos". - fn get_list(&self, batch: &Batch<'_>, commit: Commitment) -> Result, Error> { - batch.db.get_ser(&self.list_key(commit), None) - } + /// Key is "commit". + /// Note the key for an individual entry in the list is "commit|pos". + fn get_list(&self, batch: &Batch<'_>, commit: Commitment) -> Result, Error>; /// Returns one of "head", "tail" or "middle" entry variants. - /// Key is "prefix|commit|pos". + /// Key is "commit|pos". fn get_entry( &self, batch: &Batch<'_>, commit: Commitment, pos: u64, ) -> Result, Error> { - batch.db.get_ser(&self.entry_key(commit, pos), None) + let (db_key, key) = self.entry_key(commit, pos); + batch.db.get_ser(db_key, &key, None) } /// Peek the head of the list for the specified commitment. @@ -108,7 +105,7 @@ pub trait ListIndex { /// Push a pos onto the list for the specified commitment. fn push_pos( &self, - batch: &Batch<'_>, + batch: &mut Batch<'_>, commit: Commitment, new_pos: ::Pos, ) -> Result<(), Error>; @@ -116,7 +113,7 @@ pub trait ListIndex { /// Pop a pos off the list for the specified commitment. fn pop_pos( &self, - batch: &Batch<'_>, + batch: &mut Batch<'_>, commit: Commitment, ) -> Result::Pos>, Error>; } @@ -124,7 +121,12 @@ pub trait ListIndex { /// Supports "rewind" given the provided commit and a pos to rewind back to. pub trait RewindableListIndex { /// Rewind the index for the given commitment to the specified position. - fn rewind(&self, batch: &Batch<'_>, commit: Commitment, rewind_pos: u64) -> Result<(), Error>; + fn rewind( + &self, + batch: &mut Batch<'_>, + commit: Commitment, + rewind_pos: u64, + ) -> Result<(), Error>; } /// A pruneable list index supports pruning of old data from the index lists. @@ -133,15 +135,20 @@ pub trait RewindableListIndex { pub trait PruneableListIndex: ListIndex { /// Clear all data from the index. /// Used when rebuilding the index. - fn clear(&self, batch: &Batch<'_>) -> Result<(), Error>; + fn clear(&self, batch: &mut Batch<'_>) -> Result<(), Error>; /// Prune old data. - fn prune(&self, batch: &Batch<'_>, commit: Commitment, cutoff_pos: u64) -> Result<(), Error>; + fn prune( + &self, + batch: &mut Batch<'_>, + commit: Commitment, + cutoff_pos: u64, + ) -> Result<(), Error>; /// Pop a pos off the back of the list (used for pruning old data). fn pop_pos_back( &self, - batch: &Batch<'_>, + batch: &mut Batch<'_>, commit: Commitment, ) -> Result::Pos>, Error>; } @@ -233,12 +240,17 @@ where type List = ListWrapper; type Entry = ListEntry; - fn list_key(&self, commit: Commitment) -> Vec { - to_key(self.list_prefix, &mut commit.as_ref().to_vec()) + fn entry_key(&self, commit: Commitment, pos: u64) -> (Option, Vec) { + let mut key = commit.as_ref().to_vec(); + key.write_u64::(pos).unwrap(); + (Some(self.entry_prefix), key) } - fn entry_key(&self, commit: Commitment, pos: u64) -> Vec { - to_key_u64(self.entry_prefix, &mut commit.as_ref().to_vec(), pos) + fn get_list(&self, batch: &Batch<'_>, commit: Commitment) -> Result, Error> { + let list_key = (Some(self.list_prefix), commit.as_ref()); + batch + .db + .get_ser::>(list_key.0, list_key.1, None) } fn peek_pos(&self, batch: &Batch<'_>, commit: Commitment) -> Result, Error> { @@ -255,11 +267,12 @@ where } } - fn push_pos(&self, batch: &Batch<'_>, commit: Commitment, new_pos: T) -> Result<(), Error> { + fn push_pos(&self, batch: &mut Batch<'_>, commit: Commitment, new_pos: T) -> Result<(), Error> { + let list_key = (Some(self.list_prefix), commit.as_ref()); match self.get_list(batch, commit)? { None => { let list = ListWrapper::Single { pos: new_pos }; - batch.db.put_ser(&self.list_key(commit), &list)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; } Some(ListWrapper::Single { pos: current_pos }) => { if new_pos.pos() <= current_pos.pos() { @@ -278,13 +291,11 @@ where head: new_pos.pos(), tail: current_pos.pos(), }; - batch - .db - .put_ser(&self.entry_key(commit, new_pos.pos()), &head)?; - batch - .db - .put_ser(&self.entry_key(commit, current_pos.pos()), &tail)?; - batch.db.put_ser(&self.list_key(commit), &list)?; + let (new_pos_db_key, new_pos_key) = self.entry_key(commit, new_pos.pos()); + batch.db.put_ser(new_pos_db_key, &new_pos_key, &head)?; + let (cur_pos_db_key, cur_pos_key) = self.entry_key(commit, current_pos.pos()); + batch.db.put_ser(cur_pos_db_key, &cur_pos_key, &tail)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; } Some(ListWrapper::Multi { head, tail }) => { if new_pos.pos() <= head { @@ -309,13 +320,11 @@ where head: new_pos.pos(), tail, }; - batch - .db - .put_ser(&self.entry_key(commit, new_pos.pos()), &head)?; - batch - .db - .put_ser(&self.entry_key(commit, current_pos.pos()), &middle)?; - batch.db.put_ser(&self.list_key(commit), &list)?; + let (new_pos_db_key, new_pos_key) = self.entry_key(commit, new_pos.pos()); + batch.db.put_ser(new_pos_db_key, &new_pos_key, &head)?; + let (cur_pos_db_key, cur_pos_key) = self.entry_key(commit, current_pos.pos()); + batch.db.put_ser(cur_pos_db_key, &cur_pos_key, &middle)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; } else { return Err(Error::OtherErr("expected head to be head variant".into())); } @@ -327,11 +336,12 @@ where /// Pop the head of the list. /// Returns the output_pos. /// Returns None if list was empty. - fn pop_pos(&self, batch: &Batch<'_>, commit: Commitment) -> Result, Error> { + fn pop_pos(&self, batch: &mut Batch<'_>, commit: Commitment) -> Result, Error> { + let list_key = (Some(self.list_prefix), commit.as_ref()); match self.get_list(batch, commit)? { None => Ok(None), Some(ListWrapper::Single { pos }) => { - batch.delete(&self.list_key(commit))?; + batch.delete(list_key.0, list_key.1)?; Ok(Some(pos)) } Some(ListWrapper::Multi { head, tail }) => { @@ -347,17 +357,20 @@ where head: pos.pos(), tail, }; - batch.delete(&self.entry_key(commit, current_pos.pos()))?; - batch - .db - .put_ser(&self.entry_key(commit, pos.pos()), &head)?; - batch.db.put_ser(&self.list_key(commit), &list)?; + let (cur_pos_db_key, cur_pos_key) = + self.entry_key(commit, current_pos.pos()); + batch.delete(cur_pos_db_key, &cur_pos_key)?; + let (pos_db_key, pos_key) = self.entry_key(commit, pos.pos()); + batch.db.put_ser(pos_db_key, &pos_key, &head)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; Ok(Some(current_pos)) } Some(ListEntry::Tail { pos, .. }) => { let list = ListWrapper::Single { pos }; - batch.delete(&self.entry_key(commit, current_pos.pos()))?; - batch.db.put_ser(&self.list_key(commit), &list)?; + let (cur_pos_db_key, cur_pos_key) = + self.entry_key(commit, current_pos.pos()); + batch.delete(cur_pos_db_key, &cur_pos_key)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; Ok(Some(current_pos)) } Some(_) => Err(Error::OtherErr("next was unexpected".into())), @@ -373,7 +386,12 @@ where /// List index that supports rewind. impl RewindableListIndex for MultiIndex { - fn rewind(&self, batch: &Batch<'_>, commit: Commitment, rewind_pos: u64) -> Result<(), Error> { + fn rewind( + &self, + batch: &mut Batch<'_>, + commit: Commitment, + rewind_pos: u64, + ) -> Result<(), Error> { while self .peek_pos(batch, commit)? .map(|x| x.pos() > rewind_pos) @@ -386,19 +404,37 @@ impl RewindableListIndex for MultiIndex { } impl PruneableListIndex for MultiIndex { - fn clear(&self, batch: &Batch<'_>) -> Result<(), Error> { + fn clear(&self, batch: &mut Batch<'_>) -> Result<(), Error> { + let mut lists_to_delete = vec![]; + let list_db_key = Some(self.list_prefix); + for key in batch.db.iter(list_db_key, |k, _| Ok(k.to_vec()))? { + if let Ok(key) = key { + lists_to_delete.push(key); + } + } let mut list_count = 0; - let mut entry_count = 0; - let prefix = to_key(self.list_prefix, ""); - for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? { - let _ = batch.delete(&key); - list_count += 1; + for l in lists_to_delete { + match batch.delete(list_db_key, &l) { + Ok(_) => list_count += 1, + Err(_) => {} + } } - let prefix = to_key(self.entry_prefix, ""); - for key in batch.db.iter(&prefix, |k, _| Ok(k.to_vec()))? { - let _ = batch.delete(&key); - entry_count += 1; + + let mut entries_to_delete = vec![]; + let entry_db_key = Some(self.entry_prefix); + for key in batch.db.iter(entry_db_key, |k, _| Ok(k.to_vec()))? { + if let Ok(key) = key { + entries_to_delete.push(key); + } + } + let mut entry_count = 0; + for e in entries_to_delete { + match batch.delete(entry_db_key, &e) { + Ok(_) => entry_count += 1, + Err(_) => {} + } } + debug!( "clear: lists deleted: {}, entries deleted: {}", list_count, entry_count @@ -409,7 +445,7 @@ impl PruneableListIndex for MultiIndex { /// Pruning will be more performant than full rebuild but not yet necessary. fn prune( &self, - _batch: &Batch<'_>, + _batch: &mut Batch<'_>, _commit: Commitment, _cutoff_pos: u64, ) -> Result<(), Error> { @@ -420,11 +456,12 @@ impl PruneableListIndex for MultiIndex { /// Pop off the back/tail of the linked list. /// Used when pruning old data. - fn pop_pos_back(&self, batch: &Batch<'_>, commit: Commitment) -> Result, Error> { + fn pop_pos_back(&self, batch: &mut Batch<'_>, commit: Commitment) -> Result, Error> { + let list_key = (Some(self.list_prefix), commit.as_ref()); match self.get_list(batch, commit)? { None => Ok(None), Some(ListWrapper::Single { pos }) => { - batch.delete(&self.list_key(commit))?; + batch.delete(list_key.0, list_key.1)?; Ok(Some(pos)) } Some(ListWrapper::Multi { head, tail }) => { @@ -440,17 +477,19 @@ impl PruneableListIndex for MultiIndex { head, tail: pos.pos(), }; - batch.delete(&self.entry_key(commit, current_pos.pos()))?; - batch - .db - .put_ser(&self.entry_key(commit, pos.pos()), &tail)?; - batch.db.put_ser(&self.list_key(commit), &list)?; + let (cur_pos_db_key, cur_pos_key) = + self.entry_key(commit, current_pos.pos()); + batch.delete(cur_pos_db_key, &cur_pos_key)?; + let (pos_db_key, pos_key) = self.entry_key(commit, pos.pos()); + batch.db.put_ser(pos_db_key, &pos_key, &tail)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; Ok(Some(current_pos)) } Some(ListEntry::Head { pos, .. }) => { let list = ListWrapper::Single { pos }; - batch.delete(&self.entry_key(commit, current_pos.pos()))?; - batch.db.put_ser(&self.list_key(commit), &list)?; + let (pos_db_key, pos_key) = self.entry_key(commit, current_pos.pos()); + batch.delete(pos_db_key, &pos_key)?; + batch.db.put_ser(list_key.0, list_key.1, &list)?; Ok(Some(current_pos)) } Some(_) => Err(Error::OtherErr("prev was unexpected".into())), diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index e49ea57924..820b3a07c4 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -161,11 +161,11 @@ pub fn process_block( // Note we do this in the outer batch, not the child batch from the extension // as we only commit the child batch if the extension increases total work. // We want to save the block to the db regardless. - add_block(b, &ctx.batch)?; + add_block(b, &mut ctx.batch)?; // If we have no "tail" then set it now. if ctx.batch.tail().is_err() { - update_body_tail(&b.header, &ctx.batch)?; + update_body_tail(&b.header, &mut ctx.batch)?; } if has_more_work(&b.header, &head) { @@ -198,13 +198,13 @@ pub fn process_block_headers( // Note: This batch may later be committed even if the MMR itself is rollbacked. for header in headers { validate_header(header, ctx)?; - add_block_header(header, &ctx.batch)?; + add_block_header(header, &mut ctx.batch)?; } let ctx_specific_validation = &ctx.header_allowed; // Now apply this entire chunk of headers to the header MMR. - txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, batch| { + txhashset::header_extending(&mut ctx.header_pmmr, &mut ctx.batch, |ext, mut batch| { rewind_and_apply_header_fork(&last_header, ext, batch, ctx_specific_validation)?; // If previous sync_head is not on the "current" chain then @@ -216,7 +216,7 @@ pub fn process_block_headers( // Note the outer batch may still be committed to db assuming no errors occur in the extension. if has_more_work(last_header, &head) { let header_head = last_header.into(); - update_header_head(&header_head, &batch)?; + update_header_head(&header_head, &mut batch)?; } else { ext.force_rollback(); }; @@ -275,7 +275,7 @@ pub fn process_block_header(header: &BlockHeader, ctx: &mut BlockContext<'_>) -> })?; // Add this new block header to the db. - add_block_header(header, &ctx.batch)?; + add_block_header(header, &mut ctx.batch)?; if has_more_work(header, &header_head) { update_header_head(&Tip::from_header(header), &mut ctx.batch)?; @@ -478,7 +478,7 @@ fn verify_coinbase_maturity( /// Verify kernel sums across the full utxo and kernel sets based on block_sums /// of previous block accounting for the inputs|outputs|kernels of the new block. /// Saves the new block_sums to the db via the current batch if successful. -fn verify_block_sums(b: &Block, batch: &store::Batch<'_>) -> Result<(), Error> { +fn verify_block_sums(b: &Block, batch: &mut store::Batch<'_>) -> Result<(), Error> { // Retrieve the block_sums for the previous block. let block_sums = batch.get_block_sums(&b.header.prev_hash)?; @@ -509,7 +509,7 @@ fn verify_block_sums(b: &Block, batch: &store::Batch<'_>) -> Result<(), Error> { fn apply_block_to_txhashset( block: &Block, ext: &mut txhashset::ExtensionPair<'_>, - batch: &store::Batch<'_>, + batch: &mut store::Batch<'_>, ) -> Result<(), Error> { ext.extension .apply_block(block, ext.header_extension, batch)?; @@ -520,13 +520,13 @@ fn apply_block_to_txhashset( /// Officially adds the block to our chain (possibly on a losing fork). /// Header must be added separately (assume this has been done previously). -fn add_block(b: &Block, batch: &store::Batch<'_>) -> Result<(), Error> { +fn add_block(b: &Block, batch: &mut store::Batch<'_>) -> Result<(), Error> { batch.save_block(b)?; Ok(()) } /// Update the block chain tail so we can know the exact tail of full blocks in this node -fn update_body_tail(bh: &BlockHeader, batch: &store::Batch<'_>) -> Result<(), Error> { +fn update_body_tail(bh: &BlockHeader, batch: &mut store::Batch<'_>) -> Result<(), Error> { let tip = Tip::from_header(bh); batch .save_body_tail(&tip) @@ -536,27 +536,28 @@ fn update_body_tail(bh: &BlockHeader, batch: &store::Batch<'_>) -> Result<(), Er } /// Officially adds the block header to our header chain. -fn add_block_header(bh: &BlockHeader, batch: &store::Batch<'_>) -> Result<(), Error> { +fn add_block_header(bh: &BlockHeader, batch: &mut store::Batch<'_>) -> Result<(), Error> { batch .save_block_header(bh) .map_err(|e| Error::StoreErr(e, "pipe save header".to_owned()))?; Ok(()) } -fn update_header_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> { +fn update_header_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> { batch .save_header_head(&head) .map_err(|e| Error::StoreErr(e, "pipe save header head".to_owned()))?; - debug!( + trace!( "header head updated to {} at {}", - head.last_block_h, head.height + head.last_block_h, + head.height ); Ok(()) } -fn update_head(head: &Tip, batch: &store::Batch<'_>) -> Result<(), Error> { +fn update_head(head: &Tip, batch: &mut store::Batch<'_>) -> Result<(), Error> { batch .save_body_head(&head) .map_err(|e| Error::StoreErr(e, "pipe save body".to_owned()))?; @@ -575,7 +576,7 @@ fn has_more_work(header: &BlockHeader, head: &Tip) -> bool { pub fn rewind_and_apply_header_fork( header: &BlockHeader, ext: &mut txhashset::HeaderExtension<'_>, - batch: &store::Batch<'_>, + batch: &mut store::Batch<'_>, ctx_specific_validation: &dyn Fn(&BlockHeader) -> Result<(), Error>, ) -> Result<(), Error> { let mut fork_hashes = vec![]; @@ -616,7 +617,7 @@ pub fn rewind_and_apply_header_fork( pub fn rewind_and_apply_fork( header: &BlockHeader, ext: &mut txhashset::ExtensionPair<'_>, - batch: &store::Batch<'_>, + batch: &mut store::Batch<'_>, ctx_specific_validation: &dyn Fn(&BlockHeader) -> Result<(), Error>, ) -> Result { let extension = &mut ext.extension; diff --git a/chain/src/store.rs b/chain/src/store.rs index e11d1f4863..d8df65f2c2 100644 --- a/chain/src/store.rs +++ b/chain/src/store.rs @@ -26,7 +26,7 @@ use crate::util::secp::pedersen::Commitment; use croaring::Bitmap; use grin_core::ser; use grin_store as store; -use grin_store::{option_to_not_found, to_key, Error}; +use grin_store::{option_to_not_found, Error}; use std::convert::TryInto; use std::sync::Arc; @@ -38,7 +38,8 @@ const HEAD_PREFIX: u8 = b'H'; const TAIL_PREFIX: u8 = b'T'; const PIBD_HEAD_PREFIX: u8 = b'I'; const HEADER_HEAD_PREFIX: u8 = b'G'; -const OUTPUT_POS_PREFIX: u8 = b'p'; +/// Prefix for output pos index. +pub const OUTPUT_POS_PREFIX: u8 = b'p'; /// Prefix for NRD kernel pos index lists. pub const NRD_KERNEL_LIST_PREFIX: u8 = b'K'; @@ -48,6 +49,17 @@ pub const NRD_KERNEL_ENTRY_PREFIX: u8 = b'k'; const BLOCK_SUMS_PREFIX: u8 = b'M'; const BLOCK_SPENT_PREFIX: u8 = b'S'; +/// All database prefixes. +const DB_PREFIXES: [u8; 7] = [ + BLOCK_HEADER_PREFIX, + BLOCK_PREFIX, + OUTPUT_POS_PREFIX, + NRD_KERNEL_LIST_PREFIX, + NRD_KERNEL_ENTRY_PREFIX, + BLOCK_SUMS_PREFIX, + BLOCK_SPENT_PREFIX, +]; + /// All chain-related database operations pub struct ChainStore { db: store::Store, @@ -56,30 +68,40 @@ pub struct ChainStore { impl ChainStore { /// Create new chain store pub fn new(db_root: &str) -> Result { - let db = store::Store::new(db_root, None, Some(STORE_SUBPATH), None)?; + let db = store::Store::new( + db_root, + None, + Some(STORE_SUBPATH), + DB_PREFIXES.to_vec(), + None, + )?; Ok(ChainStore { db }) } /// The current chain head. pub fn head(&self) -> Result { - option_to_not_found(self.db.get_ser(&[HEAD_PREFIX], None), || "HEAD".to_owned()) + option_to_not_found(self.db.get_ser(None, &[HEAD_PREFIX], None), || { + "HEAD".to_owned() + }) } /// The current header head (may differ from chain head). pub fn header_head(&self) -> Result { - option_to_not_found(self.db.get_ser(&[HEADER_HEAD_PREFIX], None), || { + option_to_not_found(self.db.get_ser(None, &[HEADER_HEAD_PREFIX], None), || { "HEADER_HEAD".to_owned() }) } /// The current chain "tail" (earliest block in the store). pub fn tail(&self) -> Result { - option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned()) + option_to_not_found(self.db.get_ser(None, &[TAIL_PREFIX], None), || { + "TAIL".to_owned() + }) } /// The current PIBD head (will differ from the other heads. Return genesis block if PIBD head doesn't exist). pub fn pibd_head(&self) -> Result { - let res = option_to_not_found(self.db.get_ser(&[PIBD_HEAD_PREFIX], None), || { + let res = option_to_not_found(self.db.get_ser(None, &[PIBD_HEAD_PREFIX], None), || { "PIBD_HEAD".to_owned() }); @@ -96,21 +118,23 @@ impl ChainStore { /// Get full block. pub fn get_block(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || { - format!("BLOCK: {}", h) - }) + option_to_not_found( + self.db.get_ser(Some(BLOCK_PREFIX), h.as_ref(), None), + || format!("BLOCK: {}", h), + ) } /// Does this full block exist? pub fn block_exists(&self, h: &Hash) -> Result { - self.db.exists(&to_key(BLOCK_PREFIX, h)) + self.db.exists(Some(BLOCK_PREFIX), h.as_ref()) } /// Get block_sums for the block hash. pub fn get_block_sums(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, h), None), || { - format!("Block sums for block: {}", h) - }) + option_to_not_found( + self.db.get_ser(Some(BLOCK_SUMS_PREFIX), h.as_ref(), None), + || format!("Block sums for block: {}", h), + ) } /// Get previous header. @@ -129,7 +153,7 @@ impl ChainStore { /// Get block header. pub fn get_block_header(&self, h: &Hash) -> Result { option_to_not_found( - self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, h), None), + self.db.get_ser(Some(BLOCK_HEADER_PREFIX), h.as_ref(), None), || format!("BLOCK HEADER: {}", h), ) } @@ -139,8 +163,9 @@ impl ChainStore { pub fn get_block_header_skip_proof(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser( - &to_key(BLOCK_HEADER_PREFIX, h), - Some(ser::DeserializationMode::SkipPow), + Some(BLOCK_HEADER_PREFIX), + h.as_ref(), + Some(DeserializationMode::SkipPow), ), || format!("BLOCK HEADER: {}", h), ) @@ -159,7 +184,8 @@ impl ChainStore { /// Get PMMR pos and block height for the given output commitment. pub fn get_output_pos_height(&self, commit: &Commitment) -> Result, Error> { - self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit), None) + self.db + .get_ser(Some(OUTPUT_POS_PREFIX), commit.as_ref(), None) } /// Builds a new batch to be used with this store. @@ -180,17 +206,21 @@ pub struct Batch<'a> { impl<'a> Batch<'a> { /// The head. pub fn head(&self) -> Result { - option_to_not_found(self.db.get_ser(&[HEAD_PREFIX], None), || "HEAD".to_owned()) + option_to_not_found(self.db.get_ser(None, &[HEAD_PREFIX], None), || { + "HEAD".to_owned() + }) } /// The tail. pub fn tail(&self) -> Result { - option_to_not_found(self.db.get_ser(&[TAIL_PREFIX], None), || "TAIL".to_owned()) + option_to_not_found(self.db.get_ser(None, &[TAIL_PREFIX], None), || { + "TAIL".to_owned() + }) } /// The current header head (may differ from chain head). pub fn header_head(&self) -> Result { - option_to_not_found(self.db.get_ser(&[HEADER_HEAD_PREFIX], None), || { + option_to_not_found(self.db.get_ser(None, &[HEADER_HEAD_PREFIX], None), || { "HEADER_HEAD".to_owned() }) } @@ -201,40 +231,41 @@ impl<'a> Batch<'a> { } /// Save body head to db. - pub fn save_body_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_ser(&[HEAD_PREFIX], t) + pub fn save_body_head(&mut self, t: &Tip) -> Result<(), Error> { + self.db.put_ser(None, &[HEAD_PREFIX], t) } /// Save body "tail" to db. - pub fn save_body_tail(&self, t: &Tip) -> Result<(), Error> { - self.db.put_ser(&[TAIL_PREFIX], t) + pub fn save_body_tail(&mut self, t: &Tip) -> Result<(), Error> { + self.db.put_ser(None, &[TAIL_PREFIX], t) } /// Save header head to db. - pub fn save_header_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_ser(&[HEADER_HEAD_PREFIX], t) + pub fn save_header_head(&mut self, t: &Tip) -> Result<(), Error> { + self.db.put_ser(None, &[HEADER_HEAD_PREFIX], t) } /// Save PIBD head to db. - pub fn save_pibd_head(&self, t: &Tip) -> Result<(), Error> { - self.db.put_ser(&[PIBD_HEAD_PREFIX], t) + pub fn save_pibd_head(&mut self, t: &Tip) -> Result<(), Error> { + self.db.put_ser(None, &[PIBD_HEAD_PREFIX], t) } /// get block pub fn get_block(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(BLOCK_PREFIX, h), None), || { - format!("Block with hash: {}", h) - }) + option_to_not_found( + self.db.get_ser(Some(BLOCK_PREFIX), h.as_ref(), None), + || format!("Block with hash: {}", h), + ) } /// Does the block exist? pub fn block_exists(&self, h: &Hash) -> Result { - self.db.exists(&to_key(BLOCK_PREFIX, h)) + self.db.exists(Some(BLOCK_PREFIX), h.as_ref()) } /// Save the block to the db. /// Note: the block header is not saved to the db here, assumes this has already been done. - pub fn save_block(&self, b: &Block) -> Result<(), Error> { + pub fn save_block(&mut self, b: &Block) -> Result<(), Error> { debug!( "save_block: {} at {} ({} -> v{})", b.header.hash(), @@ -242,27 +273,27 @@ impl<'a> Batch<'a> { b.inputs().version_str(), self.db.protocol_version(), ); - self.db.put_ser(&to_key(BLOCK_PREFIX, b.hash())[..], b)?; + self.db.put_ser(Some(BLOCK_PREFIX), b.hash().as_ref(), b)?; Ok(()) } /// We maintain a "spent" index for each full block to allow the output_pos /// to be easily reverted during rewind. - pub fn save_spent_index(&self, h: &Hash, spent: &[CommitPos]) -> Result<(), Error> { + pub fn save_spent_index(&mut self, h: &Hash, spent: &[CommitPos]) -> Result<(), Error> { self.db - .put_ser(&to_key(BLOCK_SPENT_PREFIX, h)[..], &spent.to_vec())?; + .put_ser(Some(BLOCK_SPENT_PREFIX), h.as_ref(), &spent.to_vec())?; Ok(()) } /// Low level function to delete directly by raw key. - pub fn delete(&self, key: &[u8]) -> Result<(), Error> { - self.db.delete(key) + pub fn delete(&mut self, db_key: Option, key: &[u8]) -> Result<(), Error> { + self.db.delete(db_key, key) } /// Delete a full block. Does not delete any record associated with a block /// header. - pub fn delete_block(&self, bh: &Hash) -> Result<(), Error> { - self.db.delete(&to_key(BLOCK_PREFIX, bh)[..])?; + pub fn delete_block(&mut self, bh: &Hash) -> Result<(), Error> { + self.db.delete(Some(BLOCK_PREFIX), bh.as_ref())?; // Best effort at deleting associated data for this block. // Not an error if these fail. @@ -275,40 +306,44 @@ impl<'a> Batch<'a> { } /// Save block header to db. - pub fn save_block_header(&self, header: &BlockHeader) -> Result<(), Error> { + pub fn save_block_header(&mut self, header: &BlockHeader) -> Result<(), Error> { let hash = header.hash(); // Store the header itself indexed by hash. self.db - .put_ser(&to_key(BLOCK_HEADER_PREFIX, hash)[..], header)?; + .put_ser(Some(BLOCK_HEADER_PREFIX), hash.as_ref(), header)?; Ok(()) } /// Save output_pos and block height to index. - pub fn save_output_pos_height(&self, commit: &Commitment, pos: CommitPos) -> Result<(), Error> { + pub fn save_output_pos_height( + &mut self, + commit: &Commitment, + pos: CommitPos, + ) -> Result<(), Error> { self.db - .put_ser(&to_key(OUTPUT_POS_PREFIX, commit)[..], &pos) + .put_ser(Some(OUTPUT_POS_PREFIX), commit.as_ref(), &pos) } /// Delete the output_pos index entry for a spent output. - pub fn delete_output_pos_height(&self, commit: &Commitment) -> Result<(), Error> { - self.db.delete(&to_key(OUTPUT_POS_PREFIX, commit)) + pub fn delete_output_pos_height(&mut self, commit: &Commitment) -> Result<(), Error> { + self.db.delete(Some(OUTPUT_POS_PREFIX), commit.as_ref()) } /// When using the output_pos iterator we have access to the index keys but not the /// original commitment that the key is constructed from. So we need a way of comparing /// a key with another commitment without reconstructing the commitment from the key bytes. pub fn is_match_output_pos_key(&self, key: &[u8], commit: &Commitment) -> bool { - let commit_key = to_key(OUTPUT_POS_PREFIX, commit); - commit_key == key + commit.as_ref() == key } /// Iterator over the output_pos index. - pub fn output_pos_iter(&self) -> Result, CommitPos)>, Error> { - let key = to_key(OUTPUT_POS_PREFIX, ""); + pub fn output_pos_iter( + &'a self, + ) -> Result, CommitPos), Error>> + 'a, Error> { let protocol_version = self.db.protocol_version(); - self.db.iter(&key, move |k, mut v| { + self.db.iter(Some(OUTPUT_POS_PREFIX), move |k, mut v| { ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) .map(|pos| (k.to_vec(), pos)) .map_err(From::from) @@ -328,7 +363,8 @@ impl<'a> Batch<'a> { /// Get output_pos and block height from index. pub fn get_output_pos_height(&self, commit: &Commitment) -> Result, Error> { - self.db.get_ser(&to_key(OUTPUT_POS_PREFIX, commit), None) + self.db + .get_ser(Some(OUTPUT_POS_PREFIX), commit.as_ref(), None) } /// Get the previous header. @@ -348,7 +384,7 @@ impl<'a> Batch<'a> { /// Get block header. pub fn get_block_header(&self, h: &Hash) -> Result { option_to_not_found( - self.db.get_ser(&to_key(BLOCK_HEADER_PREFIX, h), None), + self.db.get_ser(Some(BLOCK_HEADER_PREFIX), h.as_ref(), None), || format!("BLOCK HEADER: {}", h), ) } @@ -358,33 +394,35 @@ impl<'a> Batch<'a> { pub fn get_block_header_skip_proof(&self, h: &Hash) -> Result { option_to_not_found( self.db.get_ser( - &to_key(BLOCK_HEADER_PREFIX, h), - Some(ser::DeserializationMode::SkipPow), + Some(BLOCK_HEADER_PREFIX), + h.as_ref(), + Some(DeserializationMode::SkipPow), ), || format!("BLOCK HEADER: {}", h), ) } /// Delete the block spent index. - fn delete_spent_index(&self, bh: &Hash) -> Result<(), Error> { - self.db.delete(&to_key(BLOCK_SPENT_PREFIX, bh)) + fn delete_spent_index(&mut self, bh: &Hash) -> Result<(), Error> { + self.db.delete(Some(BLOCK_SPENT_PREFIX), bh.as_ref()) } /// Save block_sums for the block. - pub fn save_block_sums(&self, h: &Hash, sums: BlockSums) -> Result<(), Error> { - self.db.put_ser(&to_key(BLOCK_SUMS_PREFIX, h)[..], &sums) + pub fn save_block_sums(&mut self, h: &Hash, sums: BlockSums) -> Result<(), Error> { + self.db.put_ser(Some(BLOCK_SUMS_PREFIX), h.as_ref(), &sums) } /// Get block_sums for the block. pub fn get_block_sums(&self, h: &Hash) -> Result { - option_to_not_found(self.db.get_ser(&to_key(BLOCK_SUMS_PREFIX, h), None), || { - format!("Block sums for block: {}", h) - }) + option_to_not_found( + self.db.get_ser(Some(BLOCK_SUMS_PREFIX), h.as_ref(), None), + || format!("Block sums for block: {}", h), + ) } /// Delete the block_sums for the block. - fn delete_block_sums(&self, bh: &Hash) -> Result<(), Error> { - self.db.delete(&to_key(BLOCK_SUMS_PREFIX, bh)) + fn delete_block_sums(&mut self, bh: &Hash) -> Result<(), Error> { + self.db.delete(Some(BLOCK_SUMS_PREFIX), bh.as_ref()) } /// Get the block input bitmap based on our spent index. @@ -402,7 +440,7 @@ impl<'a> Batch<'a> { /// If we need to rewind a block then we use this to "unspend" the spent outputs. pub fn get_spent_index(&self, bh: &Hash) -> Result, Error> { option_to_not_found( - self.db.get_ser(&to_key(BLOCK_SPENT_PREFIX, bh), None), + self.db.get_ser(Some(BLOCK_SPENT_PREFIX), bh.as_ref(), None), || format!("spent index: {}", bh), ) } @@ -423,10 +461,9 @@ impl<'a> Batch<'a> { /// Iterator over all full blocks in the db. /// Uses default db serialization strategy via db protocol version. - pub fn blocks_iter(&self) -> Result, Error> { - let key = to_key(BLOCK_PREFIX, ""); + pub fn blocks_iter(&'a self) -> Result> + 'a, Error> { let protocol_version = self.db.protocol_version(); - self.db.iter(&key, move |_, mut v| { + self.db.iter(Some(BLOCK_PREFIX), move |_, mut v| { ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) .map_err(From::from) }) @@ -434,9 +471,11 @@ impl<'a> Batch<'a> { /// Iterator over raw data for full blocks in the db. /// Used during block migration (we need flexibility around deserialization). - pub fn blocks_raw_iter(&self) -> Result, Vec)>, Error> { - let key = to_key(BLOCK_PREFIX, ""); - self.db.iter(&key, |k, v| Ok((k.to_vec(), v.to_vec()))) + pub fn blocks_raw_iter( + &'a self, + ) -> Result, Vec), Error>> + 'a, Error> { + self.db + .iter(Some(BLOCK_PREFIX), |k, v| Ok((k.to_vec(), v.to_vec()))) } /// Protocol version of our underlying db. diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index ca391397a8..2efbfee3c2 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -189,7 +189,7 @@ impl Desegmenter { // TODO: Unwraps let tip = Tip::from_header(&h); - let batch = self.store.batch()?; + let mut batch = self.store.batch()?; batch.save_pibd_head(&tip)?; batch.commit()?; @@ -283,11 +283,11 @@ impl Desegmenter { { let header_pmmr = self.header_pmmr.read(); let txhashset = self.txhashset.read(); - let batch = self.store.batch()?; + let mut batch = self.store.batch()?; txhashset.verify_kernel_pos_index( &self.genesis, &header_pmmr, - &batch, + &mut batch, Some(status.clone()), Some(stop_state.clone()), )?; @@ -308,9 +308,9 @@ impl Desegmenter { &mut header_pmmr, &mut txhashset, &mut batch, - |ext, batch| { + |ext, mut batch| { let extension = &mut ext.extension; - extension.rewind(&self.archive_header, batch)?; + extension.rewind(&self.archive_header, &mut batch)?; // Validate the extension, generating the utxo_sum and kernel_sum. // Full validation, including rangeproofs and kernel signature verification. @@ -359,10 +359,10 @@ impl Desegmenter { } // Rebuild our output_pos index in the db based on fresh UTXO set. - txhashset.init_output_pos_index(&header_pmmr, &batch)?; + txhashset.init_output_pos_index(&header_pmmr, &mut batch)?; // Rebuild our NRD kernel_pos index based on recent kernel history. - txhashset.init_recent_kernel_pos_index(&header_pmmr, &batch)?; + txhashset.init_recent_kernel_pos_index(&header_pmmr, &mut batch)?; // Commit all the changes to the db. batch.commit()?; diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index a0d6bc64ca..5d88cd290d 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -538,7 +538,7 @@ impl TxHashSet { pub fn init_recent_kernel_pos_index( &self, header_pmmr: &PMMRHandle, - batch: &Batch<'_>, + batch: &mut Batch<'_>, ) -> Result<(), Error> { let head = batch.head()?; let cutoff = head.height.saturating_sub(WEEK_HEIGHT * 2); @@ -552,7 +552,7 @@ impl TxHashSet { &self, from_header: &BlockHeader, header_pmmr: &PMMRHandle, - batch: &Batch<'_>, + batch: &mut Batch<'_>, status: Option>, stop_state: Option>, ) -> Result<(), Error> { @@ -635,7 +635,7 @@ impl TxHashSet { pub fn init_output_pos_index( &self, header_pmmr: &PMMRHandle, - batch: &Batch<'_>, + batch: &mut Batch<'_>, ) -> Result<(), Error> { let now = Instant::now(); @@ -643,21 +643,27 @@ impl TxHashSet { // Iterate over the current output_pos index, removing any entries that // do not point to to the expected output. - let mut removed_count = 0; - for (key, pos1) in batch.output_pos_iter()? { - let pos0 = pos1.pos - 1; - if let Some(out) = output_pmmr.get_data(pos0) { - if let Ok(pos0_via_mmr) = batch.get_output_pos(&out.commitment()) { - // If the pos matches and the index key matches the commitment - // then keep the entry, other we want to clean it up. - if pos0 == pos0_via_mmr - && batch.is_match_output_pos_key(&key, &out.commitment()) - { - continue; + let mut pos_to_delete = vec![]; + for kp in batch.output_pos_iter()? { + if let Ok((key, pos1)) = kp { + let pos0 = pos1.pos - 1; + if let Some(out) = output_pmmr.get_data(pos0) { + if let Ok(pos0_via_mmr) = batch.get_output_pos(&out.commitment()) { + // If the pos matches and the index key matches the commitment + // then keep the entry, other we want to clean it up. + if pos0 == pos0_via_mmr + && batch.is_match_output_pos_key(&key, &out.commitment()) + { + continue; + } } } + pos_to_delete.push(key); } - batch.delete(&key)?; + } + let mut removed_count = 0; + for p in pos_to_delete { + batch.delete(Some(store::OUTPUT_POS_PREFIX), &p)?; removed_count += 1; } debug!( @@ -733,10 +739,10 @@ pub fn extending_readonly( inner: F, ) -> Result where - F: FnOnce(&mut ExtensionPair<'_>, &Batch<'_>) -> Result, + F: FnOnce(&mut ExtensionPair<'_>, &mut Batch<'_>) -> Result, { let commit_index = trees.commit_index.clone(); - let batch = commit_index.batch()?; + let mut batch = commit_index.batch()?; trace!("Starting new txhashset (readonly) extension."); @@ -751,7 +757,7 @@ where header_extension: &mut header_extension, extension: &mut extension, }; - inner(&mut extension_pair, &batch) + inner(&mut extension_pair, &mut batch) }; trace!("Rollbacking txhashset (readonly) extension."); @@ -830,7 +836,7 @@ pub fn extending<'a, F, T>( inner: F, ) -> Result where - F: FnOnce(&mut ExtensionPair<'_>, &Batch<'_>) -> Result, + F: FnOnce(&mut ExtensionPair<'_>, &mut Batch<'_>) -> Result, { let sizes: (u64, u64, u64); let res: Result; @@ -842,7 +848,7 @@ where // create a child transaction so if the state is rolled back by itself, all // index saving can be undone - let child_batch = batch.child()?; + let mut child_batch = batch.child()?; { trace!("Starting new txhashset extension."); @@ -853,7 +859,7 @@ where header_extension: &mut header_extension, extension: &mut extension, }; - res = inner(&mut extension_pair, &child_batch); + res = inner(&mut extension_pair, &mut child_batch); rollback = extension_pair.extension.rollback; sizes = extension_pair.extension.sizes(); @@ -901,15 +907,15 @@ where /// Start a new readonly header MMR extension. /// This MMR can be extended individually beyond the other (output, rangeproof and kernel) MMRs /// to allow headers to be validated before we receive the full block data. -pub fn header_extending_readonly<'a, F, T>( - handle: &'a mut PMMRHandle, +pub fn header_extending_readonly( + handle: &mut PMMRHandle, store: &ChainStore, inner: F, ) -> Result where - F: FnOnce(&mut HeaderExtension<'_>, &Batch<'_>) -> Result, + F: FnOnce(&mut HeaderExtension<'_>, &mut Batch<'_>) -> Result, { - let batch = store.batch()?; + let mut batch = store.batch()?; let head = match handle.head_hash() { Ok(hash) => { @@ -921,7 +927,7 @@ where let pmmr = PMMR::at(&mut handle.backend, handle.size); let mut extension = HeaderExtension::new(pmmr, head); - let res = inner(&mut extension, &batch); + let res = inner(&mut extension, &mut batch); handle.backend.discard(); @@ -937,7 +943,7 @@ pub fn header_extending<'a, F, T>( inner: F, ) -> Result where - F: FnOnce(&mut HeaderExtension<'_>, &Batch<'_>) -> Result, + F: FnOnce(&mut HeaderExtension<'_>, &mut Batch<'_>) -> Result, { let size: u64; let res: Result; @@ -945,7 +951,7 @@ where // create a child transaction so if the state is rolled back by itself, all // index saving can be undone - let child_batch = batch.child()?; + let mut child_batch = batch.child()?; let head = match handle.head_hash() { Ok(hash) => { @@ -958,7 +964,7 @@ where { let pmmr = PMMR::at(&mut handle.backend, handle.size); let mut extension = HeaderExtension::new(pmmr, head); - res = inner(&mut extension, &child_batch); + res = inner(&mut extension, &mut child_batch); rollback = extension.rollback; size = extension.size(); @@ -1233,7 +1239,7 @@ impl<'a> Extension<'a> { &mut self, b: &Block, header_ext: &HeaderExtension<'_>, - batch: &Batch<'_>, + batch: &mut Batch<'_>, ) -> Result<(), Error> { let mut affected_pos = vec![]; @@ -1499,7 +1505,7 @@ impl<'a> Extension<'a> { &mut self, kernels: &[TxKernel], height: u64, - batch: &Batch<'_>, + batch: &mut Batch<'_>, ) -> Result<(), Error> { for kernel in kernels { let pos = self.apply_kernel(kernel)?; @@ -1579,7 +1585,7 @@ impl<'a> Extension<'a> { /// Rewinds the MMRs to the provided block, rewinding to the last output pos /// and last kernel pos of that block. If `updated_bitmap` is supplied, the /// bitmap accumulator will be replaced with its contents - pub fn rewind(&mut self, header: &BlockHeader, batch: &Batch<'_>) -> Result<(), Error> { + pub fn rewind(&mut self, header: &BlockHeader, batch: &mut Batch) -> Result<(), Error> { debug!( "Rewind extension to {} at {} from {} at {}", header.hash(), @@ -1622,7 +1628,11 @@ impl<'a> Extension<'a> { // Rewind the MMRs and the output_pos index. // Returns a vec of "affected_pos" so we can apply the necessary updates to the bitmap // accumulator in a single pass for all rewound blocks. - fn rewind_single_block(&mut self, block: &Block, batch: &Batch<'_>) -> Result, Error> { + fn rewind_single_block( + &mut self, + block: &Block, + batch: &mut Batch<'_>, + ) -> Result, Error> { let header = &block.header; let prev_header = batch.get_previous_header(&header)?; @@ -2206,7 +2216,11 @@ fn input_pos_to_rewind( } /// If NRD enabled then enforce NRD relative height rules. -fn apply_kernel_rules(kernel: &TxKernel, pos: CommitPos, batch: &Batch<'_>) -> Result<(), Error> { +fn apply_kernel_rules( + kernel: &TxKernel, + pos: CommitPos, + batch: &mut Batch<'_>, +) -> Result<(), Error> { if !global::is_nrd_enabled() { return Ok(()); } diff --git a/chain/tests/store_indices.rs b/chain/tests/store_indices.rs index 5cc193565e..c011ed6c64 100644 --- a/chain/tests/store_indices.rs +++ b/chain/tests/store_indices.rs @@ -50,7 +50,7 @@ fn test_store_indices() { { // Start a new batch and delete the block. let store = chain.store(); - let batch = store.batch().unwrap(); + let mut batch = store.batch().unwrap(); assert!(batch.delete_block(&block_hash).is_ok()); // Block is deleted within this batch. diff --git a/chain/tests/store_kernel_pos_index.rs b/chain/tests/store_kernel_pos_index.rs index 664e2818c4..d7cf6d30d0 100644 --- a/chain/tests/store_kernel_pos_index.rs +++ b/chain/tests/store_kernel_pos_index.rs @@ -39,14 +39,14 @@ fn test_store_kernel_idx() { let commit = Commitment::from_vec(vec![]); let store = ChainStore::new(chain_dir).unwrap(); - let batch = store.batch().unwrap(); + let mut batch = store.batch().unwrap(); let index = store::nrd_recent_kernel_index(); assert_eq!(index.peek_pos(&batch, commit), Ok(None)); assert_eq!(index.get_list(&batch, commit), Ok(None)); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); @@ -63,7 +63,7 @@ fn test_store_kernel_idx() { ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 2, height: 2 }), Ok(()), ); @@ -79,17 +79,17 @@ fn test_store_kernel_idx() { // Pos must always increase. assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Err(Error::OtherErr("pos must be increasing".into())), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 2, height: 2 }), Err(Error::OtherErr("pos must be increasing".into())), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 3, height: 3 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 3, height: 3 }), Ok(()), ); @@ -104,7 +104,7 @@ fn test_store_kernel_idx() { ); assert_eq!( - index.pop_pos(&batch, commit), + index.pop_pos(&mut batch, commit), Ok(Some(CommitPos { pos: 3, height: 3 })), ); @@ -119,7 +119,7 @@ fn test_store_kernel_idx() { ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 3, height: 3 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 3, height: 3 }), Ok(()), ); @@ -134,7 +134,7 @@ fn test_store_kernel_idx() { ); assert_eq!( - index.pop_pos(&batch, commit), + index.pop_pos(&mut batch, commit), Ok(Some(CommitPos { pos: 3, height: 3 })), ); @@ -149,7 +149,7 @@ fn test_store_kernel_idx() { ); assert_eq!( - index.pop_pos(&batch, commit), + index.pop_pos(&mut batch, commit), Ok(Some(CommitPos { pos: 2, height: 2 })), ); @@ -166,7 +166,7 @@ fn test_store_kernel_idx() { ); assert_eq!( - index.pop_pos(&batch, commit), + index.pop_pos(&mut batch, commit), Ok(Some(CommitPos { pos: 1, height: 1 })), ); @@ -186,14 +186,14 @@ fn test_store_kernel_idx_pop_back() { let commit = Commitment::from_vec(vec![]); let store = ChainStore::new(chain_dir).unwrap(); - let batch = store.batch().unwrap(); + let mut batch = store.batch().unwrap(); let index = store::nrd_recent_kernel_index(); assert_eq!(index.peek_pos(&batch, commit), Ok(None)); assert_eq!(index.get_list(&batch, commit), Ok(None)); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); @@ -210,7 +210,7 @@ fn test_store_kernel_idx_pop_back() { ); assert_eq!( - index.pop_pos_back(&batch, commit), + index.pop_pos_back(&mut batch, commit), Ok(Some(CommitPos { pos: 1, height: 1 })), ); @@ -218,32 +218,32 @@ fn test_store_kernel_idx_pop_back() { assert_eq!(index.get_list(&batch, commit), Ok(None)); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 2, height: 2 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 3, height: 3 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 3, height: 3 }), Ok(()), ); assert_eq!( - index.peek_pos(&batch, commit), + index.peek_pos(&mut batch, commit), Ok(Some(CommitPos { pos: 3, height: 3 })), ); assert_eq!( - index.get_list(&batch, commit), + index.get_list(&mut batch, commit), Ok(Some(ListWrapper::Multi { head: 3, tail: 1 })), ); assert_eq!( - index.pop_pos_back(&batch, commit), + index.pop_pos_back(&mut batch, commit), Ok(Some(CommitPos { pos: 1, height: 1 })), ); @@ -258,7 +258,7 @@ fn test_store_kernel_idx_pop_back() { ); assert_eq!( - index.pop_pos_back(&batch, commit), + index.pop_pos_back(&mut batch, commit), Ok(Some(CommitPos { pos: 2, height: 2 })), ); @@ -275,7 +275,7 @@ fn test_store_kernel_idx_pop_back() { ); assert_eq!( - index.pop_pos_back(&batch, commit), + index.pop_pos_back(&mut batch, commit), Ok(Some(CommitPos { pos: 3, height: 3 })), ); @@ -294,21 +294,21 @@ fn test_store_kernel_idx_rewind() { let commit = Commitment::from_vec(vec![]); let store = ChainStore::new(chain_dir).unwrap(); - let batch = store.batch().unwrap(); + let mut batch = store.batch().unwrap(); let index = store::nrd_recent_kernel_index(); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 2, height: 2 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 3, height: 3 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 3, height: 3 }), Ok(()), ); @@ -317,7 +317,7 @@ fn test_store_kernel_idx_rewind() { Ok(Some(ListWrapper::Multi { head: 3, tail: 1 })), ); - assert_eq!(index.rewind(&batch, commit, 1), Ok(()),); + assert_eq!(index.rewind(&mut batch, commit, 1), Ok(()),); assert_eq!( index.get_list(&batch, commit), @@ -327,7 +327,7 @@ fn test_store_kernel_idx_rewind() { ); // Check we can safely noop rewind. - assert_eq!(index.rewind(&batch, commit, 2), Ok(()),); + assert_eq!(index.rewind(&mut batch, commit, 2), Ok(()),); assert_eq!( index.get_list(&batch, commit), @@ -336,7 +336,7 @@ fn test_store_kernel_idx_rewind() { })), ); - assert_eq!(index.rewind(&batch, commit, 1), Ok(()),); + assert_eq!(index.rewind(&mut batch, commit, 1), Ok(()),); assert_eq!( index.get_list(&batch, commit), @@ -346,42 +346,42 @@ fn test_store_kernel_idx_rewind() { ); // Check we can rewind back to 0. - assert_eq!(index.rewind(&batch, commit, 0), Ok(()),); + assert_eq!(index.rewind(&mut batch, commit, 0), Ok(()),); assert_eq!(index.get_list(&batch, commit), Ok(None),); - assert_eq!(index.rewind(&batch, commit, 0), Ok(()),); + assert_eq!(index.rewind(&mut batch, commit, 0), Ok(()),); // Now check we can rewind past the end of a list safely. assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 2, height: 2 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 3, height: 3 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 3, height: 3 }), Ok(()), ); assert_eq!( - index.pop_pos_back(&batch, commit), + index.pop_pos_back(&mut batch, commit), Ok(Some(CommitPos { pos: 1, height: 1 })), ); assert_eq!( - index.get_list(&batch, commit), + index.get_list(&mut batch, commit), Ok(Some(ListWrapper::Multi { head: 3, tail: 2 })), ); - assert_eq!(index.rewind(&batch, commit, 1), Ok(()),); + assert_eq!(index.rewind(&mut batch, commit, 1), Ok(()),); - assert_eq!(index.get_list(&batch, commit), Ok(None),); + assert_eq!(index.get_list(&mut batch, commit), Ok(None),); clean_output_dir(chain_dir); } @@ -396,14 +396,14 @@ fn test_store_kernel_idx_multiple_commits() { let commit2 = Commitment::from_vec(vec![1]); let store = ChainStore::new(chain_dir).unwrap(); - let batch = store.batch().unwrap(); + let mut batch = store.batch().unwrap(); let index = store::nrd_recent_kernel_index(); assert_eq!(index.get_list(&batch, commit), Ok(None)); assert_eq!(index.get_list(&batch, commit2), Ok(None)); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); @@ -417,7 +417,7 @@ fn test_store_kernel_idx_multiple_commits() { assert_eq!(index.get_list(&batch, commit2), Ok(None)); assert_eq!( - index.push_pos(&batch, commit2, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit2, CommitPos { pos: 2, height: 2 }), Ok(()), ); @@ -436,7 +436,7 @@ fn test_store_kernel_idx_multiple_commits() { ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 3, height: 3 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 3, height: 3 }), Ok(()), ); @@ -453,7 +453,7 @@ fn test_store_kernel_idx_multiple_commits() { ); assert_eq!( - index.pop_pos(&batch, commit), + index.pop_pos(&mut batch, commit), Ok(Some(CommitPos { pos: 3, height: 3 })), ); @@ -488,18 +488,18 @@ fn test_store_kernel_idx_clear() -> Result<(), Error> { // Add a couple of single entries to the index and commit the batch. { - let batch = store.batch()?; + let mut batch = store.batch()?; assert_eq!(index.peek_pos(&batch, commit), Ok(None)); assert_eq!(index.get_list(&batch, commit), Ok(None)); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); assert_eq!( index.push_pos( - &batch, + &mut batch, commit2, CommitPos { pos: 10, @@ -544,8 +544,8 @@ fn test_store_kernel_idx_clear() -> Result<(), Error> { // Clear the index and confirm everything was deleted as expected. { - let batch = store.batch()?; - assert_eq!(index.clear(&batch), Ok(())); + let mut batch = store.batch()?; + assert_eq!(index.clear(&mut batch), Ok(())); assert_eq!(index.peek_pos(&batch, commit), Ok(None)); assert_eq!(index.get_list(&batch, commit), Ok(None)); assert_eq!(index.peek_pos(&batch, commit2), Ok(None)); @@ -555,13 +555,13 @@ fn test_store_kernel_idx_clear() -> Result<(), Error> { // Add multiple entries to the index, commit the batch. { - let batch = store.batch()?; + let mut batch = store.batch()?; assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 1, height: 1 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 1, height: 1 }), Ok(()), ); assert_eq!( - index.push_pos(&batch, commit, CommitPos { pos: 2, height: 2 }), + index.push_pos(&mut batch, commit, CommitPos { pos: 2, height: 2 }), Ok(()), ); assert_eq!( @@ -577,8 +577,8 @@ fn test_store_kernel_idx_clear() -> Result<(), Error> { // Clear the index and confirm everything was deleted as expected. { - let batch = store.batch()?; - assert_eq!(index.clear(&batch), Ok(())); + let mut batch = store.batch()?; + assert_eq!(index.clear(&mut batch), Ok(())); assert_eq!(index.peek_pos(&batch, commit), Ok(None)); assert_eq!(index.get_list(&batch, commit), Ok(None)); batch.commit()?; diff --git a/chain/tests/test_block_known.rs b/chain/tests/test_block_known.rs index bdab3a818f..d5c0694aec 100644 --- a/chain/tests/test_block_known.rs +++ b/chain/tests/test_block_known.rs @@ -61,7 +61,7 @@ fn check_known() { { let chain = init_chain(chain_dir, genesis.clone()); let store = chain.store(); - let batch = store.batch().unwrap(); + let mut batch = store.batch().unwrap(); let head_header = chain.head_header().unwrap(); let prev = batch.get_previous_header(&head_header).unwrap(); batch.save_body_head(&Tip::from_header(&prev)).unwrap(); diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 0f10a84eca..23746477c2 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -275,22 +275,33 @@ impl Peers { } } - /// Iterator over all peers we know about (stored in our db). - pub fn peer_data_iter(&self) -> Result, Error> { - self.store.peers_iter().map_err(From::from) - } - /// Convenience for reading all peer data from the db. pub fn all_peer_data(&self) -> Vec { - self.peer_data_iter() - .map(|peers| peers.collect()) - .unwrap_or(vec![]) + match self.store.iter_batch() { + Ok(batch) => match batch.peers_iter() { + Ok(iter) => iter + .filter(|p| p.is_ok()) + .map(|p| p.ok().unwrap()) + .collect(), + Err(e) => { + error!("failed to get all peer data: {:?}", e); + vec![] + } + }, + Err(e) => { + error!("failed to get all peer data: {:?}", e); + vec![] + } + } } /// Find peers in store (not necessarily connected) and return their data pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - match self.store.find_peers(state, cap, count) { - Ok(peers) => peers, + match self.store.iter_batch() { + Ok(batch) => batch.find_peers(state, cap, count).unwrap_or_else(|e| { + error!("failed to find peers: {:?}", e); + vec![] + }), Err(e) => { error!("failed to find peers: {:?}", e); vec![] @@ -445,24 +456,26 @@ impl Peers { let now = Utc::now(); // Delete defunct peers from storage - let _ = self.store.delete_peers(|peer| { - let diff = now - Utc.timestamp_opt(peer.last_connected, 0).unwrap(); - - let should_remove = peer.flags == State::Defunct - && diff > Duration::seconds(global::PEER_EXPIRATION_REMOVE_TIME); - - if should_remove { - debug!( - "removing peer {:?}: last connected {} days {} hours {} minutes ago.", - peer.addr, - diff.num_days(), - diff.num_hours(), - diff.num_minutes() - ); - } + if let Ok(batch) = self.store.iter_batch() { + let _ = batch.delete_peers(|peer| { + let diff = now - Utc.timestamp_opt(peer.last_connected, 0).unwrap(); + + let should_remove = peer.flags == State::Defunct + && diff > Duration::seconds(global::PEER_EXPIRATION_REMOVE_TIME); + + if should_remove { + debug!( + "removing peer {:?}: last connected {} days {} hours {} minutes ago.", + peer.addr, + diff.num_days(), + diff.num_hours(), + diff.num_minutes() + ); + } - should_remove - }); + should_remove + }); + } } } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 4b4dde9fac..39018ad536 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -20,7 +20,7 @@ use rand::prelude::*; use crate::core::ser::{self, DeserializationMode, Readable, Reader, Writeable, Writer}; use crate::types::{Capabilities, PeerAddr, ReasonForBan}; -use grin_store::{self, option_to_not_found, to_key, Error}; +use grin_store::{self, option_to_not_found, Error}; const DB_NAME: &str = "peer"; const STORE_SUBPATH: &str = "peers"; @@ -118,45 +118,93 @@ pub struct PeerStore { impl PeerStore { /// Instantiates a new peer store under the provided root path. pub fn new(db_root: &str) -> Result { - let db = grin_store::Store::new(db_root, Some(DB_NAME), Some(STORE_SUBPATH), None)?; - Ok(PeerStore { db: db }) + let db = grin_store::Store::new( + db_root, + Some(DB_NAME), + Some(STORE_SUBPATH), + vec![PEER_PREFIX], + None, + )?; + Ok(PeerStore { db }) } pub fn save_peer(&self, p: &PeerData) -> Result<(), Error> { debug!("save_peer: {:?} marked {:?}", p.addr, p.flags); - let batch = self.db.batch()?; - batch.put_ser(&peer_key(p.addr)[..], p)?; + let mut batch = self.db.batch()?; + let key = p.addr.as_key(); + batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), p)?; batch.commit() } pub fn save_peers(&self, p: Vec) -> Result<(), Error> { - let batch = self.db.batch()?; + let mut batch = self.db.batch()?; for pd in p { debug!("save_peers: {:?} marked {:?}", pd.addr, pd.flags); - batch.put_ser(&peer_key(pd.addr)[..], &pd)?; + let key = pd.addr.as_key(); + batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), &pd)?; } batch.commit() } pub fn get_peer(&self, peer_addr: PeerAddr) -> Result { - option_to_not_found(self.db.get_ser(&peer_key(peer_addr)[..], None), || { - format!("Peer at address: {}", peer_addr) - }) + let key = peer_addr.as_key(); + option_to_not_found( + self.db.get_ser(Some(PEER_PREFIX), key.as_bytes(), None), + || format!("Peer at address: {}", peer_addr), + ) } pub fn exists_peer(&self, peer_addr: PeerAddr) -> Result { - self.db.exists(&peer_key(peer_addr)[..]) + let key = peer_addr.as_key(); + self.db.exists(Some(PEER_PREFIX), key.as_bytes()) } - /// TODO - allow below added to avoid github issue reports - #[allow(dead_code)] - pub fn delete_peer(&self, peer_addr: PeerAddr) -> Result<(), Error> { - let batch = self.db.batch()?; - batch.delete(&peer_key(peer_addr)[..])?; + /// Convenience method to load a peer data, update its status and save it + /// back. If new state is Banned its last banned time will be updated too. + /// If new state is Defunct last connection attempt will be updated too. + pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> { + let mut batch = self.db.batch()?; + let key = peer_addr.as_key(); + let mut peer = option_to_not_found( + batch.get_ser::(Some(PEER_PREFIX), key.as_bytes(), None), + || format!("Peer at address: {}", peer_addr), + )?; + peer.flags = new_state; + if new_state == State::Banned { + peer.last_banned = Utc::now().timestamp(); + } else { + peer.last_attempt = Utc::now().timestamp(); + } + + batch.put_ser(Some(PEER_PREFIX), key.as_bytes(), &peer)?; batch.commit() } + /// Builds a new iterator batch to be used with this store. + pub fn iter_batch(&self) -> Result, Error> { + Ok(PeersIterBatch { + db: self.db.batch()?, + }) + } +} + +pub struct PeersIterBatch<'a> { + db: grin_store::Batch<'a>, +} + +impl<'a> PeersIterBatch<'a> { + /// Iterator over all known peers. + pub fn peers_iter( + &'a self, + ) -> Result> + 'a, Error> { + let protocol_version = self.db.protocol_version(); + self.db.iter(Some(PEER_PREFIX), move |_, mut v| { + ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) + .map_err(From::from) + }) + } + /// Find some peers in our local db. pub fn find_peers( &self, @@ -166,78 +214,48 @@ impl PeerStore { ) -> Result, Error> { let peers = self .peers_iter()? + .filter(|p| p.is_ok()) + .map(|p| p.ok().unwrap()) .filter(|p| p.flags == state && p.capabilities.contains(cap)) .choose_multiple(&mut thread_rng(), count); Ok(peers) } - /// Iterator over all known peers. - pub fn peers_iter(&self) -> Result, Error> { - let key = to_key(PEER_PREFIX, ""); - let protocol_version = self.db.protocol_version(); - self.db.iter(&key, move |_, mut v| { - ser::deserialize(&mut v, protocol_version, DeserializationMode::default()) - .map_err(From::from) - }) - } - /// List all known peers /// Used for /v1/peers/all api endpoint pub fn all_peers(&self) -> Result, Error> { - let peers: Vec = self.peers_iter()?.collect(); + let peers: Vec = self + .peers_iter()? + .filter(|p| p.is_ok()) + .map(|p| p.ok().unwrap()) + .collect(); Ok(peers) } - /// Convenience method to load a peer data, update its status and save it - /// back. If new state is Banned its last banned time will be updated too. - /// If new state is Defunct last connection attempt will be updated too. - pub fn update_state(&self, peer_addr: PeerAddr, new_state: State) -> Result<(), Error> { - let batch = self.db.batch()?; - - let mut peer = option_to_not_found( - batch.get_ser::(&peer_key(peer_addr)[..], None), - || format!("Peer at address: {}", peer_addr), - )?; - peer.flags = new_state; - if new_state == State::Banned { - peer.last_banned = Utc::now().timestamp(); - } else { - peer.last_attempt = Utc::now().timestamp(); - } - - batch.put_ser(&peer_key(peer_addr)[..], &peer)?; - batch.commit() - } - /// Deletes peers from the storage that satisfy some condition `predicate` - pub fn delete_peers(&self, predicate: F) -> Result<(), Error> + pub fn delete_peers(mut self, predicate: F) -> Result<(), Error> where F: Fn(&PeerData) -> bool, { let mut to_remove = vec![]; for x in self.peers_iter()? { - if predicate(&x) { - to_remove.push(x) + if let Ok(x) = x { + if predicate(&x) { + to_remove.push(x) + } } } // Delete peers in single batch if !to_remove.is_empty() { - let batch = self.db.batch()?; - for peer in to_remove { - batch.delete(&peer_key(peer.addr)[..])?; + let key = peer.addr.as_key(); + self.db.delete(Some(PEER_PREFIX), key.as_bytes())?; } - - batch.commit()?; + self.db.commit()?; } Ok(()) } } - -// Ignore the port unless ip is loopback address. -fn peer_key(peer_addr: PeerAddr) -> Vec { - to_key(PEER_PREFIX, &peer_addr.as_key()) -} diff --git a/servers/Cargo.toml b/servers/Cargo.toml index 16d8f775b1..9a795051b9 100644 --- a/servers/Cargo.toml +++ b/servers/Cargo.toml @@ -15,7 +15,6 @@ hyper-rustls = "0.23" fs2 = "0.4" futures = "0.3" http = "0.2" -lmdb-zero = "0.4.4" rand = "0.6" serde = "1" log = "0.4" diff --git a/store/Cargo.toml b/store/Cargo.toml index 9974a4c3df..f3568224ee 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -13,7 +13,7 @@ edition = "2021" byteorder = "1" croaring = "1.0.1" libc = "0.2" -lmdb-zero = "0.4.4" +heed = "0.22.1" memmap = "0.7" tempfile = "3.1" serde = "1" diff --git a/store/src/lib.rs b/store/src/lib.rs index 215b43f70f..b2408ca55e 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -26,8 +26,6 @@ extern crate log; extern crate grin_core; extern crate grin_util as util; -//use grin_core as core; - pub mod leaf_set; pub mod lmdb; pub mod pmmr; @@ -38,8 +36,6 @@ const SEP: u8 = b':'; use byteorder::{BigEndian, WriteBytesExt}; -pub use crate::lmdb::*; - /// Build a db key from a prefix and a byte vector identifier. pub fn to_key>(prefix: u8, k: K) -> Vec { let k = k.as_ref(); @@ -69,19 +65,17 @@ pub fn u64_to_key(prefix: u8, val: u64) -> Vec { res } +pub use crate::lmdb::*; + use std::ffi::OsStr; use std::fs::{remove_file, rename, File}; use std::path::Path; /// Creates temporary file with name created by adding `temp_suffix` to `path`. /// Applies writer function to it and renames temporary file into original specified by `path`. -pub fn save_via_temp_file( - path: P, - temp_suffix: E, - mut writer: F, -) -> Result<(), std::io::Error> +pub fn save_via_temp_file(path: P, temp_suffix: E, mut writer: F) -> Result<(), io::Error> where - F: FnMut(&mut File) -> Result<(), std::io::Error>, + F: FnMut(&mut File) -> Result<(), io::Error>, P: AsRef, E: AsRef, { diff --git a/store/src/lmdb.rs b/store/src/lmdb.rs index b1173fbcde..9e8fd4cc79 100644 --- a/store/src/lmdb.rs +++ b/store/src/lmdb.rs @@ -14,12 +14,14 @@ //! Storage of core types using LMDB. -use std::fs; -use std::sync::Arc; - -use lmdb_zero as lmdb; -use lmdb_zero::traits::CreateCursor; -use lmdb_zero::LmdbResultExt; +use heed::types::Bytes; +use heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, WithoutTls}; +use std::collections::HashMap; +use std::path::Path; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{Arc, OnceLock}; +use std::time::Duration; +use std::{fs, thread}; use crate::grin_core::global; use crate::grin_core::ser::{self, DeserializationMode, ProtocolVersion}; @@ -29,6 +31,7 @@ use crate::util::RwLock; pub const ALLOC_CHUNK_SIZE_DEFAULT: usize = 134_217_728; //128 MB /// And for test mode, to avoid too much disk allocation on windows pub const ALLOC_CHUNK_SIZE_DEFAULT_TEST: usize = 1_048_576; //1 MB +/// Minimal percent of used space when resizing must be performed. const RESIZE_PERCENT: f32 = 0.9; /// Want to ensure that each resize gives us at least this % /// of total space free @@ -42,7 +45,7 @@ pub enum Error { NotFoundErr(String), /// Wraps an error originating from LMDB #[error("LMDB error: {0}")] - LmdbErr(lmdb::error::Error), + LmdbErr(String), /// Wraps a serialization error for Writeable or Readable #[error("Serialization Error: {0}")] SerErr(ser::Error), @@ -54,9 +57,9 @@ pub enum Error { OtherErr(String), } -impl From for Error { - fn from(e: lmdb::error::Error) -> Error { - Error::LmdbErr(e) +impl From for Error { + fn from(e: heed::Error) -> Error { + Error::LmdbErr(e.to_string()) } } @@ -80,278 +83,605 @@ where const DEFAULT_DB_VERSION: ProtocolVersion = ProtocolVersion(3); +/// Default environment. +pub const DEFAULT_ENV_NAME: &'static str = "lmdb"; +/// Default multi-database environment without prefixes. +const DEFAULT_MULTI_DB_ENV_NAME: &'static str = "multi_lmdb"; +/// Prefix key separator. +pub const PREFIX_KEY_SEPARATOR: u8 = b':'; + +/// Mapping of database path to environment state. +static ENV_MAP: OnceLock>> = OnceLock::new(); + +/// State of active database environment. +struct EnvState { + env: Env, + open_txs_count: AtomicU32, + resizing: AtomicBool, + resize_checking: AtomicBool, + stores_count: AtomicU32, +} + /// LMDB-backed store facilitating data access and serialization. All writes /// are done through a Batch abstraction providing atomicity. pub struct Store { - env: Arc, - db: Arc>>>>, - name: String, + env: Env, + env_path: String, + pre_dbs: Arc>>, + def_db: Database, version: ProtocolVersion, alloc_chunk_size: usize, } +impl Drop for Store { + fn drop(&mut self) { + { + let mut w_map = ENV_MAP.get().unwrap().write(); + let stores_count = w_map + .get(&self.env_path) + .unwrap() + .stores_count + .load(Ordering::Relaxed); + w_map + .get_mut(&self.env_path) + .unwrap() + .stores_count + .store(stores_count - 1, Ordering::Relaxed); + } + let no_stores = { + ENV_MAP + .get() + .unwrap() + .read() + .get(&self.env_path) + .unwrap() + .stores_count + .load(Ordering::Relaxed) + == 0 + }; + if no_stores { + let mut w_map = ENV_MAP.get().unwrap().write(); + w_map.remove(&self.env_path); + } + } +} + impl Store { /// Create a new LMDB env under the provided directory. - /// By default creates an environment named "lmdb". + /// Creates default environment named "multi_lmdb". /// Be aware of transactional semantics in lmdb /// (transactions are per environment, not per database). + /// Data from non-default `env_name` and prefixes will be + /// migrated into default multi db env file if needed. pub fn new( root_path: &str, env_name: Option<&str>, db_name: Option<&str>, + prefixes: Vec, max_readers: Option, ) -> Result { - let name = match env_name { - Some(n) => n.to_owned(), - None => "lmdb".to_owned(), - }; - let db_name = match db_name { - Some(n) => n.to_owned(), - None => "lmdb".to_owned(), - }; - let full_path = [root_path.to_owned(), name].join("/"); + let full_path = Path::new(root_path) + .join(DEFAULT_MULTI_DB_ENV_NAME) + .to_str() + .unwrap() + .to_string(); fs::create_dir_all(&full_path).map_err(|e| { Error::FileErr(format!( - "Unable to create directory 'db_root' to store chain_data: {:?}", - e + "Unable to create {:?} to store data: {:?}", + full_path, e )) })?; - let mut env_builder = lmdb::EnvBuilder::new()?; - env_builder.set_maxdbs(8)?; - - if let Some(max_readers) = max_readers { - env_builder.set_maxreaders(max_readers)?; - } - let alloc_chunk_size = match global::is_production_mode() { true => ALLOC_CHUNK_SIZE_DEFAULT, false => ALLOC_CHUNK_SIZE_DEFAULT_TEST, }; - let env = unsafe { env_builder.open(&full_path, lmdb::open::NOTLS, 0o600)? }; + // Environment setup. + let env_map = ENV_MAP.get_or_init(|| RwLock::new(HashMap::new())); + let has_env = { + let r_env_map = env_map.read(); + r_env_map.contains_key(&full_path) + }; + if !has_env { + let env = unsafe { + let mut options = EnvOpenOptions::new().read_txn_without_tls(); + let mut env_options = options.map_size(alloc_chunk_size).max_dbs(24); + if let Some(max_readers) = max_readers { + env_options = env_options.max_readers(max_readers); + } + env_options.open(&full_path)? + }; + let (resize, new_size) = needs_resize(&env, alloc_chunk_size); + if resize { + unsafe { + env.resize(new_size)?; + }; + } + debug!("DB Mapsize is {}", env.info().map_size); + let mut w_env_map = env_map.write(); + w_env_map.insert( + full_path.clone(), + EnvState { + env, + open_txs_count: AtomicU32::new(0), + resizing: AtomicBool::new(false), + resize_checking: AtomicBool::new(false), + stores_count: AtomicU32::new(1), + }, + ); + } else { + let mut w_env_map = env_map.write(); + let stores_count = w_env_map + .get(&full_path) + .unwrap() + .stores_count + .load(Ordering::Relaxed); + w_env_map + .get_mut(&full_path) + .unwrap() + .stores_count + .store(stores_count + 1, Ordering::Relaxed); + } + + // Database setup. + let r_env_map = env_map.read(); + let env = r_env_map.get(&full_path).unwrap().env.clone(); + let mut write = env.write_txn()?; + let def_name = db_name.unwrap_or(DEFAULT_ENV_NAME); + let def_db = env.create_database(&mut write, Some(def_name))?; + let mut dbs_map = HashMap::>::new(); + for p in prefixes { + let db = env.create_database(&mut write, Some(p.to_string().as_str()))?; + dbs_map.insert(p, db); + } + write.commit()?; - debug!("DB Mapsize for {} is {}", full_path, env.info()?.mapsize); - let res = Store { - env: Arc::new(env), - db: Arc::new(RwLock::new(None)), - name: db_name, + let s = Store { + env: env.clone(), + env_path: full_path.clone(), + pre_dbs: Arc::new(dbs_map), + def_db, version: DEFAULT_DB_VERSION, alloc_chunk_size, }; - { - let mut w = res.db.write(); - *w = Some(Arc::new(lmdb::Database::open( - res.env.clone(), - Some(&res.name), - &lmdb::DatabaseOptions::new(lmdb::db::CREATE), - )?)); + // Migrate to default environment if needed. + let env_name = env_name.unwrap_or(DEFAULT_ENV_NAME); + if env_name != DEFAULT_MULTI_DB_ENV_NAME { + let migrate_from = Path::new(root_path).join(env_name); + if migrate_from.exists() { + match s.migrate_to_default_env(db_name, &migrate_from) { + Ok(_) => match fs::remove_dir_all(&migrate_from) { + Ok(_) => {} + Err(e) => { + return Err(Error::FileErr(format!( + "Can not remove old DB file: {:?}", + e + ))); + } + }, + Err(e) => { + error!("DB {} migration error: {:?}", env_name, e); + match s.clear() { + Ok(_) => {} + Err(e) => { + error!("Can not clear new DB after unsuccessful migration: {:?}", e) + } + } + return Err(e); + } + } + } } - Ok(res) + + Ok(s) } - /// Construct a new store using a specific protocol version. - /// Permits access to the db with legacy protocol versions for db migrations. - pub fn with_version(&self, version: ProtocolVersion) -> Store { - let alloc_chunk_size = match global::is_production_mode() { - true => ALLOC_CHUNK_SIZE_DEFAULT, - false => ALLOC_CHUNK_SIZE_DEFAULT_TEST, + /// Migrate database from provided path to default environment. + fn migrate_to_default_env( + &self, + from_name: Option<&str>, + from_path: &Path, + ) -> Result<(), Error> { + info!("Migrating DB {:?}, please wait...", from_path); + let from_env = unsafe { + let mut options = EnvOpenOptions::new().read_txn_without_tls(); + let env_options = options.map_size(self.alloc_chunk_size).max_dbs(24); + env_options.open(from_path)? }; - Store { - env: self.env.clone(), - db: self.db.clone(), - name: self.name.clone(), - version, - alloc_chunk_size, + let (resize, new_size) = needs_resize(&from_env, self.alloc_chunk_size); + if resize { + // We are sure there are no active txs, cause migration is called on database creation. + unsafe { + from_env.resize(new_size)?; + self.env.resize(new_size)?; + } } + let db_from = { + let mut write = from_env.write_txn()?; + let db: Database = from_env.create_database(&mut write, from_name)?; + write.commit()?; + db + }; + let mut write_to = self.env.write_txn()?; + let read_from = from_env.read_txn()?; + let mut count = 0; + for kv in db_from.iter(&read_from)? { + let (k, v) = kv?; + if k.len() > 1 && k[1] == PREFIX_KEY_SEPARATOR { + let db_name = k.split_at(1).0; + if let Some(db) = self.pre_dbs.get(&db_name[0]) { + let key = k.split_at(2).1; + db.put(&mut write_to, key, &v)?; + count += 1; + } else { + warn!("Migration: unknown DB key: {}", db_name[0]); + } + } else { + self.def_db.put(&mut write_to, k, &v)?; + count += 1; + } + } + write_to.commit()?; + info!("Migrated {} records from {:?}", count, from_path); + Ok(()) } - /// Protocol version for the store. - pub fn protocol_version(&self) -> ProtocolVersion { - self.version + /// Get number of active environment transactions. + fn open_txs_count(&self) -> u32 { + ENV_MAP + .get() + .unwrap() + .read() + .get(&self.env_path) + .unwrap() + .open_txs_count + .load(Ordering::Relaxed) } - /// Opens the database environment - pub fn open(&self) -> Result<(), Error> { - let mut w = self.db.write(); - *w = Some(Arc::new(lmdb::Database::open( - self.env.clone(), - Some(&self.name), - &lmdb::DatabaseOptions::new(lmdb::db::CREATE), - )?)); - Ok(()) + /// Check if requirement for environment resize is checking. + fn resize_checking(&self) -> bool { + ENV_MAP + .get() + .unwrap() + .read() + .get(&self.env_path) + .unwrap() + .resize_checking + .load(Ordering::Relaxed) } - /// Determines whether the environment needs a resize based on a simple percentage threshold - pub fn needs_resize(&self) -> Result { - let env_info = self.env.info()?; - let stat = self.env.stat()?; - - let size_used = stat.psize as usize * env_info.last_pgno; - trace!("DB map size: {}", env_info.mapsize); - trace!("Space used: {}", size_used); - trace!("Space remaining: {}", env_info.mapsize - size_used); - let resize_percent = RESIZE_PERCENT; - trace!( - "Percent used: {:.*} Percent threshold: {:.*}", - 4, - size_used as f64 / env_info.mapsize as f64, - 4, - resize_percent - ); - - if size_used as f32 / env_info.mapsize as f32 > resize_percent - || env_info.mapsize < self.alloc_chunk_size - { - trace!("Resize threshold met (percent-based)"); - Ok(true) - } else { - trace!("Resize threshold not met (percent-based)"); - Ok(false) + /// Set flag if requirement for environment resize is checking. + fn set_resize_checking(&self, resize_checking: bool) { + ENV_MAP + .get() + .unwrap() + .write() + .get_mut(&self.env_path) + .unwrap() + .resize_checking + .store(resize_checking, Ordering::Relaxed); + } + + /// Wait while database is resizing. + fn wait_for_resize(&self) { + loop { + if !ENV_MAP + .get() + .unwrap() + .read() + .get(&self.env_path) + .unwrap() + .resizing + .load(Ordering::Relaxed) + { + break; + } + trace!("Wait on resizing DB {}", self.env_path); + thread::sleep(Duration::from_millis(100)); } } - /// Increments the database size by as many ALLOC_CHUNK_SIZES - /// to give a minimum threshold of free space - pub fn do_resize(&self) -> Result<(), Error> { - let env_info = self.env.info()?; - let stat = self.env.stat()?; - let size_used = stat.psize as usize * env_info.last_pgno; + /// Resize database environment if needed. + fn maybe_resize(&self) { + self.wait_for_resize(); - let new_mapsize = if env_info.mapsize < self.alloc_chunk_size { - self.alloc_chunk_size + // Check only one resize requirement per time to avoid multiple resizes. + if self.resize_checking() { + return; } else { - let mut tot = env_info.mapsize; - while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT { - tot += self.alloc_chunk_size; + self.set_resize_checking(true); + } + + let (resize, new_size) = needs_resize(&self.env, self.alloc_chunk_size); + if resize { + let env_path = self.env_path.clone(); + let env = self.env.clone(); + + // Resize immediately or at another thread to not interrupt current + // transaction waiting all open transactions to be closed. + if self.open_txs_count() != 0 { + thread::spawn(move || { + loop { + let txs_count = ENV_MAP + .get() + .unwrap() + .read() + .get(&env_path) + .unwrap() + .open_txs_count + .load(Ordering::Relaxed); + if txs_count == 0 { + debug!("Start resizing DB {}", env_path); + ENV_MAP + .get() + .unwrap() + .write() + .get_mut(&env_path) + .unwrap() + .resizing + .store(true, Ordering::Relaxed); + // Wait to make sure there are no more active txs left. + thread::sleep(Duration::from_millis(1000)); + break; + } + thread::sleep(Duration::from_millis(10)); + } + + unsafe { + match env.resize(new_size) { + Ok(_) => debug!("End resizing DB {}", env_path), + Err(e) => error!("Resize DB {} error: {:?}", env_path, e), + } + } + + let mut w_env_map = ENV_MAP.get().unwrap().write(); + let env_state = w_env_map.get_mut(&env_path).unwrap(); + env_state.resizing.store(false, Ordering::Relaxed); + env_state.resize_checking.store(false, Ordering::Relaxed); + }); + return; + } else { + let mut w_env_map = ENV_MAP.get().unwrap().write(); + let env_state = w_env_map.get_mut(&env_path).unwrap(); + + debug!("Start immediate resizing DB {}", env_path); + env_state.resizing.store(true, Ordering::Relaxed); + unsafe { + match env.resize(new_size) { + Ok(_) => debug!("End resizing DB {}", env_path), + Err(e) => error!("Resize DB {} error: {:?}", env_path, e), + } + } + env_state.resizing.store(false, Ordering::Relaxed); } - tot - }; + } - // close - let mut w = self.db.write(); - *w = None; + self.set_resize_checking(false); + } - unsafe { - self.env.set_mapsize(new_mapsize)?; + /// Clear all data from database environment. + fn clear(&self) -> Result<(), Error> { + let mut w = self.env.write_txn()?; + self.def_db.clear(&mut w)?; + for db in self.pre_dbs.values() { + db.clear(&mut w)?; } + w.commit()?; + Ok(()) + } - *w = Some(Arc::new(lmdb::Database::open( - self.env.clone(), - Some(&self.name), - &lmdb::DatabaseOptions::new(lmdb::db::CREATE), - )?)); + /// Protocol version for the store. + pub fn protocol_version(&self) -> ProtocolVersion { + self.version + } - info!( - "Resized database from {} to {}", - env_info.mapsize, new_mapsize - ); - Ok(()) + /// Get database from provided key or return default. + fn get_db(&self, db_key: Option) -> Result<&Database, Error> { + match db_key { + Some(db) => { + if let Some(db) = self.pre_dbs.get(&db) { + Ok(db) + } else { + Err(Error::OtherErr("db for provided key not found".to_string())) + } + } + None => Ok(&self.def_db), + } } - /// Gets a value from the db, provided its key. + /// Gets a value from the database, provided its key. /// Deserializes the retrieved data using the provided function. - pub fn get_with( + fn get_with( &self, + db_key: Option, key: &[u8], - access: &lmdb::ConstAccessor<'_>, - db: &lmdb::Database<'_>, + read: &RoTxn, deserialize: F, ) -> Result, Error> where F: Fn(&[u8], &[u8]) -> Result, { - let res: Option<&[u8]> = access.get(db, key).to_opt()?; + let db = self.get_db(db_key)?; + let res: Option<&[u8]> = db.get(read, key)?; match res { None => Ok(None), Some(res) => deserialize(key, res).map(Some), } } - /// Gets a `Readable` value from the db, provided its key. + /// Gets a `Readable` value from the database, provided its key. /// Note: Creates a new read transaction so will *not* see any uncommitted data. pub fn get_ser( &self, + db_key: Option, key: &[u8], deser_mode: Option, ) -> Result, Error> { - let lock = self.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - let txn = lmdb::ReadTransaction::new(self.env.clone())?; - let access = txn.access(); - let d = match deser_mode { - Some(d) => d, - _ => DeserializationMode::default(), + self.wait_for_resize(); + + TxCounter::on_change_tx_count(&self.env_path, true); + let res = { + let d = match deser_mode { + Some(d) => d, + _ => DeserializationMode::default(), + }; + match self.env.read_txn() { + Ok(read) => self.get_with(db_key, key, &read, |_, mut data| { + ser::deserialize(&mut data, self.protocol_version(), d).map_err(From::from) + }), + Err(e) => Err(Error::from(e)), + } }; - self.get_with(key, &access, &db, |_, mut data| { - ser::deserialize(&mut data, self.protocol_version(), d).map_err(From::from) - }) + TxCounter::on_change_tx_count(&self.env_path, false); + res } - /// Whether the provided key exists - pub fn exists(&self, key: &[u8]) -> Result { - let lock = self.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - let txn = lmdb::ReadTransaction::new(self.env.clone())?; - let access = txn.access(); - - let res: Option<&lmdb::Ignore> = access.get(db, key).to_opt()?; - Ok(res.is_some()) + /// Whether the key exists at the provided database key. + pub fn exists(&self, db_key: Option, key: &[u8]) -> Result { + self.wait_for_resize(); + + TxCounter::on_change_tx_count(&self.env_path, true); + let res = { + match self.env.read_txn() { + Ok(read) => { + let db_res = self.get_db(db_key); + match db_res { + Ok(db) => { + let res = db.get(&read, key); + match res { + Ok(r) => Ok(r.is_some()), + Err(e) => Err(Error::from(e)), + } + } + Err(e) => Err(Error::from(e)), + } + } + Err(e) => Err(Error::from(e)), + } + }; + TxCounter::on_change_tx_count(&self.env_path, false); + res } - /// Produces an iterator from the provided key prefix. - pub fn iter(&self, prefix: &[u8], deserialize: F) -> Result, Error> + /// Produces an iterator from the provided database key. + pub fn iter<'a, F, T>( + &self, + db_key: Option, + deserialize: F, + ) -> Result, Error> where F: Fn(&[u8], &[u8]) -> Result, { - let lock = self.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - let tx = Arc::new(lmdb::ReadTransaction::new(self.env.clone())?); - let cursor = Arc::new(tx.cursor(db.clone())?); - Ok(PrefixIterator::new(tx, cursor, prefix, deserialize)) + self.wait_for_resize(); + + TxCounter::on_change_tx_count(&self.env_path, true); + match self.env.clone().static_read_txn() { + Ok(read) => { + let db_res = self.get_db(db_key); + match db_res { + Ok(db) => Ok(DatabaseIterator::new( + self, + Arc::new(db.clone()), + read, + deserialize, + )), + Err(e) => { + TxCounter::on_change_tx_count(&self.env_path, false); + Err(Error::from(e)) + } + } + } + Err(e) => { + TxCounter::on_change_tx_count(&self.env_path, false); + Err(Error::from(e)) + } + } } /// Builds a new batch to be used with this store. pub fn batch(&self) -> Result, Error> { - // check if the db needs resizing before returning the batch - if self.needs_resize()? { - self.do_resize()?; + self.maybe_resize(); + + TxCounter::on_change_tx_count(&self.env_path, true); + match Batch::new(self) { + Ok(batch) => Ok(batch), + Err(e) => { + TxCounter::on_change_tx_count(&self.env_path, false); + Err(e) + } } - let tx = lmdb::WriteTransaction::new(self.env.clone())?; - Ok(Batch { store: self, tx }) } } -/// Batch to write multiple Writeables to db in an atomic manner. +/// Environment transactions counter, allows to decrement value on drop. +struct TxCounter { + env_path: String, +} + +impl Drop for TxCounter { + fn drop(&mut self) { + Self::on_change_tx_count(&self.env_path, false); + } +} + +impl TxCounter { + /// Increment or decrement active transactions count for current environment. + fn on_change_tx_count(env_path: &String, inc: bool) { + let mut w_env_map = ENV_MAP.get().unwrap().write(); + let env_state = w_env_map.get_mut(env_path).unwrap(); + let open_txs_count = env_state.open_txs_count.load(Ordering::Relaxed); + if inc { + env_state + .open_txs_count + .store(open_txs_count + 1, Ordering::Relaxed); + } else { + env_state + .open_txs_count + .store(open_txs_count - 1, Ordering::Relaxed); + } + } +} + +/// Batch to write multiple Writeables to the database in an atomic manner. pub struct Batch<'a> { store: &'a Store, - tx: lmdb::WriteTransaction<'a>, + write: RwTxn<'a>, + #[allow(dead_code)] + tx_counter: TxCounter, } impl<'a> Batch<'a> { - /// Writes a single key/value pair to the db - pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { - let lock = self.store.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - self.tx - .access() - .put(db, key, value, lmdb::put::Flags::empty())?; + /// Creates a new batch for provided store. + pub fn new(store: &'a Store) -> Result, Error> { + let write = store.env.write_txn()?; + Ok(Batch { + store, + write, + tx_counter: TxCounter { + env_path: store.env_path.clone(), + }, + }) + } + + /// Writes a single key/value pair to the provided database key. + pub fn put(&mut self, db_key: Option, key: &[u8], value: &[u8]) -> Result<(), Error> { + let db = self.store.get_db(db_key)?; + let w = &mut self.write; + db.put(w, key, value)?; Ok(()) } - /// Writes a single key and its `Writeable` value to the db. + /// Writes a single key and its `Writeable` value to the provided database key. /// Encapsulates serialization using the (default) version configured on the store instance. - pub fn put_ser(&self, key: &[u8], value: &W) -> Result<(), Error> { - self.put_ser_with_version(key, value, self.store.protocol_version()) + pub fn put_ser( + &mut self, + db_key: Option, + key: &[u8], + value: &W, + ) -> Result<(), Error> { + self.put_ser_with_version(db_key, key, value, self.store.protocol_version()) } /// Protocol version used by this batch. @@ -359,59 +689,86 @@ impl<'a> Batch<'a> { self.store.protocol_version() } - /// Writes a single key and its `Writeable` value to the db. + /// Writes a single key and its `Writeable` value to the provided database key. /// Encapsulates serialization using the specified protocol version. pub fn put_ser_with_version( - &self, + &mut self, + db_key: Option, key: &[u8], value: &W, version: ProtocolVersion, ) -> Result<(), Error> { let ser_value = ser::ser_vec(value, version); match ser_value { - Ok(data) => self.put(key, &data), + Ok(data) => self.put(db_key, key, &data), Err(err) => Err(err.into()), } } /// Low-level access for retrieving data by key. /// Takes a function for flexible deserialization. - pub fn get_with(&self, key: &[u8], deserialize: F) -> Result, Error> + fn get_with( + &self, + db_key: Option, + key: &[u8], + deserialize: F, + ) -> Result, Error> where F: Fn(&[u8], &[u8]) -> Result, { - let access = self.tx.access(); - let lock = self.store.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - - self.store.get_with(key, &access, &db, deserialize) + let read = self.write.nested_read_txn()?; + self.store.get_with(db_key, key, &read, deserialize) } /// Whether the provided key exists. /// This is in the context of the current write transaction. - pub fn exists(&self, key: &[u8]) -> Result { - let access = self.tx.access(); - let lock = self.store.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - let res: Option<&lmdb::Ignore> = access.get(db, key).to_opt()?; + pub fn exists(&self, db_key: Option, key: &[u8]) -> Result { + let read = self.write.nested_read_txn()?; + let db = self.store.get_db(db_key)?; + let res = db.get(&read, key)?; Ok(res.is_some()) } - /// Produces an iterator from the provided key prefix. - pub fn iter(&self, prefix: &[u8], deserialize: F) -> Result, Error> + /// Produces an iterator from the provided database key. + pub fn iter( + &'a self, + db_key: Option, + deserialize: F, + ) -> Result, Error> where F: Fn(&[u8], &[u8]) -> Result, { - self.store.iter(prefix, deserialize) + self.store.wait_for_resize(); + + TxCounter::on_change_tx_count(&self.store.env_path, true); + let read = self.write.nested_read_txn(); + match read { + Ok(read) => { + let db_res = self.store.get_db(db_key); + match db_res { + Ok(db) => Ok(DatabaseIterator::new( + self.store, + Arc::new(db.clone()), + read, + deserialize, + )), + Err(e) => { + TxCounter::on_change_tx_count(&self.store.env_path, false); + Err(Error::from(e)) + } + } + } + Err(e) => { + TxCounter::on_change_tx_count(&self.store.env_path, false); + Err(Error::from(e)) + } + } } - /// Gets a `Readable` value from the db by provided key and provided deserialization strategy. + /// Gets a `Readable` value from the database by provided key and deserialization strategy. pub fn get_ser( &self, + db_key: Option, key: &[u8], deser_mode: Option, ) -> Result, Error> { @@ -419,7 +776,7 @@ impl<'a> Batch<'a> { Some(d) => d, _ => DeserializationMode::default(), }; - self.get_with(key, |_, mut data| { + self.get_with(db_key, key, |_, mut data| { match ser::deserialize(&mut data, self.protocol_version(), d) { Ok(res) => Ok(res), Err(e) => Err(From::from(e)), @@ -427,87 +784,189 @@ impl<'a> Batch<'a> { }) } - /// Deletes a key/value pair from the db - pub fn delete(&self, key: &[u8]) -> Result<(), Error> { - let lock = self.store.db.read(); - let db = lock - .as_ref() - .ok_or_else(|| Error::NotFoundErr("chain db is None".to_string()))?; - self.tx.access().del_key(db, key)?; + /// Deletes a key/value pair from the database. + pub fn delete(&mut self, db_key: Option, key: &[u8]) -> Result<(), Error> { + let db = self.store.get_db(db_key)?; + db.delete(&mut self.write, key)?; Ok(()) } - /// Writes the batch to db + /// Writes the batch to database. pub fn commit(self) -> Result<(), Error> { - self.tx.commit()?; + self.write.commit()?; Ok(()) } /// Creates a child of this batch. It will be merged with its parent on /// commit, abandoned otherwise. pub fn child(&mut self) -> Result, Error> { - Ok(Batch { - store: self.store, - tx: self.tx.child_tx()?, - }) + TxCounter::on_change_tx_count(&self.store.env_path, true); + match self.store.env.nested_write_txn(&mut self.write) { + Ok(write) => Ok(Batch { + store: self.store, + write, + tx_counter: TxCounter { + env_path: self.store.env_path.clone(), + }, + }), + Err(e) => { + TxCounter::on_change_tx_count(&self.store.env_path, false); + Err(Error::from(e)) + } + } } } -/// An iterator based on key prefix. +/// An iterator based on database key. /// Caller is responsible for deserialization of the data. -pub struct PrefixIterator +pub struct DatabaseIterator<'a, F, T> where F: Fn(&[u8], &[u8]) -> Result, { - tx: Arc>, - cursor: Arc>, - seek: bool, - prefix: Vec, + db: Arc>, + read: Arc>, + keys: Vec>, + total_keys: usize, + skip_cur: usize, + skip_total: usize, deserialize: F, + #[allow(dead_code)] + tx_counter: TxCounter, } -impl Iterator for PrefixIterator +impl Iterator for DatabaseIterator<'_, F, T> where F: Fn(&[u8], &[u8]) -> Result, { - type Item = T; + type Item = Result; fn next(&mut self) -> Option { - let access = self.tx.access(); - let cursor = Arc::get_mut(&mut self.cursor).expect("failed to get cursor"); - let kv: Result<(&[u8], &[u8]), _> = if self.seek { - cursor.next(&access) - } else { - self.seek = true; - cursor.seek_range_k(&access, &self.prefix[..]) - }; - kv.ok() - .filter(|(k, _)| k.starts_with(self.prefix.as_slice())) - .map(|(k, v)| match (self.deserialize)(k, v) { - Ok(v) => Some(v), - Err(_) => None, - }) - .flatten() + if let Some(k) = self.keys.iter().skip(self.skip_cur).next() { + self.skip_total += 1; + self.skip_cur += 1; + match self.db.get(&self.read, k) { + Ok(v) => { + if let Some(v) = v { + return match (self.deserialize)(k, v) { + Ok(v) => Some(Ok(v)), + Err(e) => { + error!("db iter: error deserializing: {}", e); + Some(Err(Error::from(e))) + } + }; + } + } + Err(e) => { + return { + error!("db iter: error read value: {}", e); + Some(Err(Error::from(e))) + } + } + } + } else if self.total_keys > self.skip_total { + let keys = if let Ok(iter) = self.db.iter(&self.read) { + iter.move_between_keys() + .skip(self.skip_total) + .take(10000) + .filter(|kv| kv.is_ok()) + .map(|kv| kv.unwrap().0.to_vec()) + .collect::>>() + } else { + vec![] + }; + self.skip_cur = 0; + self.keys = keys; + return self.next(); + } + None } } -impl PrefixIterator +impl<'a, F, T> DatabaseIterator<'a, F, T> where F: Fn(&[u8], &[u8]) -> Result, { /// Initialize a new prefix iterator. pub fn new( - tx: Arc>, - cursor: Arc>, - prefix: &[u8], + store: &Store, + db: Arc>, + read: RoTxn<'a, WithoutTls>, deserialize: F, - ) -> PrefixIterator { - PrefixIterator { - tx, - cursor, - seek: false, - prefix: prefix.to_vec(), + ) -> DatabaseIterator<'a, F, T> { + let (keys, total_keys) = if let Ok(iter) = db.iter(&read) { + let total = iter.move_between_keys().count(); + let keys = if let Ok(iter) = db.iter(&read) { + iter.move_between_keys() + .take(10000) + .filter(|kv| kv.is_ok()) + .map(|kv| kv.unwrap().0.to_vec()) + .collect::>>() + } else { + vec![] + }; + (keys, total) + } else { + (vec![], 0) + }; + DatabaseIterator { + db, + read: Arc::new(read), + keys, + total_keys, + skip_cur: 0, + skip_total: 0, deserialize, + tx_counter: TxCounter { + env_path: store.env_path.clone(), + }, } } } + +/// Determines whether the environment needs a resize based on a simple percentage threshold. +pub fn needs_resize(env: &Env, alloc_chunk_size: usize) -> (bool, usize) { + let env_info = env.info(); + let stat = env.stat(); + let size_used = stat.page_size as usize * env_info.last_page_number; + trace!("DB map size: {}", env_info.map_size); + trace!("Space used: {}", size_used); + trace!("Space remaining: {}", env_info.map_size - size_used); + let resize_percent = RESIZE_PERCENT; + trace!( + "Percent used: {:.*} Percent threshold: {:.*}", + 4, + size_used as f64 / env_info.map_size as f64, + 4, + resize_percent + ); + + let resize = if size_used as f32 / env_info.map_size as f32 > resize_percent + || env_info.map_size < alloc_chunk_size + { + trace!("Resize threshold met (percent-based)"); + true + } else { + trace!("Resize threshold not met (percent-based)"); + false + }; + + let new_size = if resize { + if env_info.map_size < alloc_chunk_size { + alloc_chunk_size + } else { + let mut tot = env_info.map_size - (env_info.map_size % alloc_chunk_size); + while size_used as f32 / tot as f32 > RESIZE_MIN_TARGET_PERCENT { + tot += alloc_chunk_size; + } + tot + } + } else { + env_info.map_size + }; + + if resize { + debug!("Resizing DB to {} from {}", new_size, env_info.map_size); + } + + (resize, new_size) +} diff --git a/store/tests/lmdb.rs b/store/tests/lmdb.rs index dc3d2329fd..7aebb52809 100644 --- a/store/tests/lmdb.rs +++ b/store/tests/lmdb.rs @@ -16,9 +16,17 @@ use grin_core as core; use grin_store as store; use grin_util as util; -use crate::core::global; -use crate::core::ser::{self, Readable, Reader, Writeable, Writer}; +use core::global; +use core::ser::{self, Readable, Reader, Writeable, Writer}; +use store::{ + needs_resize, to_key, to_key_u64, Store, ALLOC_CHUNK_SIZE_DEFAULT_TEST, DEFAULT_ENV_NAME, +}; + +use byteorder::WriteBytesExt; +use heed::types::Bytes; +use heed::{Database, Env, EnvOpenOptions, WithoutTls}; use std::fs; +use std::path::Path; const WRITE_CHUNK_SIZE: usize = 20; const TEST_ALLOC_SIZE: usize = store::lmdb::ALLOC_CHUNK_SIZE_DEFAULT / 8 / WRITE_CHUNK_SIZE; @@ -70,25 +78,26 @@ fn test_exists() -> Result<(), store::Error> { let test_dir = "target/test_exists"; setup(test_dir); - let store = store::Store::new(test_dir, Some("test1"), None, None)?; + let prefix = b'P'; + let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?; let key = [0, 0, 0, 1]; let value = [1, 1, 1, 1]; // Start new batch and insert a new key/value entry. - let batch = store.batch()?; - batch.put(&key, &value)?; + let mut batch = store.batch()?; + batch.put(Some(prefix), &key, &value)?; // Check we can see the new entry in uncommitted batch. - assert!(batch.exists(&key)?); + assert!(batch.exists(Some(prefix), &key)?); // Check we cannot see the new entry yet outside of the uncommitted batch. - assert!(!store.exists(&key)?); + assert!(!store.exists(Some(prefix), &key)?); batch.commit()?; // Check we can see the new entry after committing the batch. - assert!(store.exists(&key)?); + assert!(store.exists(Some(prefix), &key)?); clean_output_dir(test_dir); Ok(()) @@ -99,32 +108,32 @@ fn test_iter() -> Result<(), store::Error> { let test_dir = "target/test_iter"; setup(test_dir); - let store = store::Store::new(test_dir, Some("test1"), None, None)?; + let prefix = b'P'; + let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?; let key = [0, 0, 0, 1]; let value = [1, 1, 1, 1]; // Start new batch and insert a new key/value entry. - let batch = store.batch()?; - batch.put(&key, &value)?; + let mut batch = store.batch()?; + batch.put(Some(prefix), &key, &value)?; - // TODO - This is not currently possible (and we need to be aware of this). - // Currently our SerIterator is limited to using a ReadTransaction only. - // // Check we can see the new entry via an iterator using the uncommitted batch. - // let mut iter: SerIterator> = batch.iter(&[0])?; - // assert_eq!(iter.next(), Some((key.to_vec(), value.to_vec()))); - // assert_eq!(iter.next(), None); + { + let mut iter = batch.iter(Some(prefix), |_, v| Ok(v.to_vec()))?; + assert_eq!(iter.next(), Some(Ok(value.to_vec()))); + assert_eq!(iter.next(), None); + } // Check we can not yet see the new entry via an iterator outside the uncommitted batch. - let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?; + let mut iter = store.iter(Some(prefix), |_, v| Ok(v.to_vec()))?; assert_eq!(iter.next(), None); batch.commit()?; // Check we can see the new entry via an iterator after committing the batch. - let mut iter = store.iter(&[0], |_, v| Ok(v.to_vec()))?; - assert_eq!(iter.next(), Some(value.to_vec())); + let mut iter = store.iter(Some(prefix), |_, v| Ok(v.to_vec()))?; + assert_eq!(iter.next(), Some(Ok(value.to_vec()))); assert_eq!(iter.next(), None); clean_output_dir(test_dir); @@ -135,18 +144,18 @@ fn test_iter() -> Result<(), store::Error> { fn lmdb_allocate() -> Result<(), store::Error> { let test_dir = "target/lmdb_allocate"; setup(test_dir); + let prefix = b'P'; // Allocate more than the initial chunk, ensuring // the DB resizes underneath { - let store = store::Store::new(test_dir, Some("test1"), None, None)?; + let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?; for i in 0..WRITE_CHUNK_SIZE * 2 { println!("Allocating chunk: {}", i); let chunk = PhatChunkStruct::new(); let key_val = format!("phat_chunk_set_1_{}", i); - let batch = store.batch()?; - let key = store::to_key(b'P', &key_val); - batch.put_ser(&key, &chunk)?; + let mut batch = store.batch()?; + batch.put_ser(Some(prefix), key_val.as_bytes(), &chunk)?; batch.commit()?; } } @@ -155,17 +164,121 @@ fn lmdb_allocate() -> Result<(), store::Error> { println!("***********************************"); // Open env again and keep adding { - let store = store::Store::new(test_dir, Some("test1"), None, None)?; + let store = store::Store::new(test_dir, Some("test1"), None, vec![prefix], None)?; for i in 0..WRITE_CHUNK_SIZE * 2 { println!("Allocating chunk: {}", i); let chunk = PhatChunkStruct::new(); let key_val = format!("phat_chunk_set_2_{}", i); - let batch = store.batch()?; - let key = store::to_key(b'P', &key_val); - batch.put_ser(&key, &chunk)?; + let mut batch = store.batch()?; + batch.put_ser(Some(prefix), key_val.as_bytes(), &chunk)?; batch.commit()?; } } + clean_output_dir(test_dir); + Ok(()) +} + +fn create_old_db( + test_dir: &str, +) -> Result<(Database, Env), store::Error> { + let env_name = DEFAULT_ENV_NAME; + let alloc_chunk_size = ALLOC_CHUNK_SIZE_DEFAULT_TEST; + let full_path = Path::new(test_dir) + .join(env_name) + .to_str() + .unwrap() + .to_string(); + let _ = fs::create_dir_all(&full_path); + + let env = unsafe { + let mut options = EnvOpenOptions::new().read_txn_without_tls(); + let env_options = options.map_size(alloc_chunk_size).max_dbs(1); + env_options.open(&full_path)? + }; + let (resize, new_size) = needs_resize(&env, alloc_chunk_size); + if resize { + unsafe { + env.resize(new_size)?; + }; + } + + let mut write = env.write_txn()?; + let db: Database = env.create_database(&mut write, Some(env_name))?; + write.commit()?; + + Ok((db, env)) +} + +#[test] +fn test_migration() -> Result<(), store::Error> { + let test_dir = "target/test_migration"; + setup(test_dir); + + let test_prefix_1 = b'H'; + let test_key_1 = [0, 1, 2, 4]; + let test_data_1 = [1, 2, 3, 4]; + + let test_prefix_2 = b'G'; + let test_key_2 = [3, 4, 5, 6]; + let test_key_64_2 = 65480464; + let test_data_2 = [4, 5, 6, 7]; + + let test_key_3 = [6, 7, 8, 9]; + let test_data_3 = [7, 8, 9, 10]; + + // Create old db and fill the data. + { + let (old_db, old_env) = create_old_db(test_dir)?; + let mut w = old_env.write_txn()?; + + // Create old format key value. + let key_1 = to_key(test_prefix_1, test_key_1); + old_db.put(&mut w, key_1.as_slice(), test_data_1.as_slice())?; + + // Create old format 64 key value. + let key_2 = to_key_u64(test_prefix_2, test_key_2, test_key_64_2); + old_db.put(&mut w, key_2.as_slice(), test_data_2.as_slice())?; + + // Create key value without prefix. + old_db.put(&mut w, test_key_3.as_slice(), test_data_3.as_slice())?; + + w.commit()?; + } + + // Create new store to migrate data. + let store = Store::new( + test_dir, + None, + Some(DEFAULT_ENV_NAME), + vec![test_prefix_1, test_prefix_2], + None, + )?; + + // Check we can see key value. + { + assert!(store.exists(Some(test_prefix_1), &test_key_1)?); + let data = store.get_ser::>(Some(test_prefix_1), &test_key_1, None)?; + assert_eq!(data, Some(test_data_1.to_vec())); + } + + // Check we can see key 64 value. + { + let mut key = test_key_2.to_vec(); + key.write_u64::(test_key_64_2) + .unwrap(); + assert!(store.exists(Some(test_prefix_2), &key)?); + let data = store.get_ser::>(Some(test_prefix_2), &key, None)?; + assert_eq!(data, Some(test_data_2.to_vec())); + } + + // Check we can see key value without prefix. + { + assert!(store.exists(None, &test_key_3)?); + let data = store.get_ser::>(None, &test_key_3, None)?; + assert_eq!(data, Some(test_data_3.to_vec())); + } + + clean_output_dir(test_dir); Ok(()) }