diff --git a/Cargo.lock b/Cargo.lock index 17e2a4201..bd96ade61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -408,6 +408,15 @@ dependencies = [ "libc", ] +[[package]] +name = "fastrand" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" +dependencies = [ + "instant", +] + [[package]] name = "foreign-types" version = "0.5.0" @@ -1000,6 +1009,15 @@ version = "0.6.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "rtrb" version = "0.1.4" @@ -1130,6 +1148,7 @@ dependencies = [ "rand_chacha", "rand_xoshiro", "storage-types", + "tempfile", "thiserror", ] @@ -1314,6 +1333,20 @@ dependencies = [ "winapi 0.2.8", ] +[[package]] +name = "tempfile" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4" +dependencies = [ + "cfg-if", + "fastrand", + "libc", + "redox_syscall", + "remove_dir_all", + "winapi 0.3.9", +] + [[package]] name = "termcolor" version = "1.1.2" diff --git a/config/segcache_test.toml b/config/segcache_test.toml new file mode 100644 index 000000000..406ad8fed --- /dev/null +++ b/config/segcache_test.toml @@ -0,0 +1,94 @@ +daemonize = false + +[admin] +# interfaces listening on +host = "0.0.0.0" +# port listening on +port = "9999" + +[server] +# interfaces listening on +host = "0.0.0.0" +# port listening on +port = "12321" +# epoll timeout in milliseconds +timeout = 100 +# epoll max events returned +nevent = 1024 + +[worker] +# epoll timeout in milliseconds +timeout = 100 +# epoll max events returned +nevent = 1024 +# number of worker threads +threads = 1 + +# storage configuration +[seg] +# hash power adjusts how many items can be held in the hashtable +hash_power = 3 +# total bytes to use for item storage - 32 segments * 1024 segment size +heap_size = 32768 +# size of each segment in bytes +segment_size = 1024 +# number of segments for a non-evict compaction +compact_target = 2 +# number of segments to merge in one merge eviction pass +merge_target = 4 +# max number of segments to merge in one pass +merge_max = 8 +# use merge based eviction +eviction = "Merge" +# optionally, set a file path to back the data datapool +datapool_path = "/home/users/u6632448/file_for_data" +#datapool_path = "/mnt/pmem1.0/cassy/data" +# set a file path to back the metadata datapool +metadata_path = "/home/users/u6632448/file_for_metadata" +#metadata_path = "/mnt/pmem1.0/cassy/metadata" +# state whether cache will be restored +restore = false +# state whether cache will be flushed upon shutdown +graceful_shutdown = true + +[time] +time_type = "Memcache" + +[buf] + +[debug] +# choose from: error, warn, info, debug, trace +log_level = "warn" +# optionally, log to the file below instead of standard out +# log_file = "segcache.log" +# backup file name for use with log rotation +log_backup = "segcache.log.old" +# trigger log rotation when the file grows beyond this size (in bytes). Set this +# option to '0' to disable log rotation. +log_max_size = 1073741824 + +[klog] +# optionally, log commands to the file below +# file = "segcache.cmd" +# backup file name for use with log rotation +backup = "segcache.cmd.old" +# trigger log rotation when the file grows beyond this size (in bytes). Set this +# option to '0' to disable log rotation. +max_size = 1073741824 +# specify the sampling ratio, 1 in N commands will be logged. Setting to '0' +# will disable command logging. +sample = 100 + +[sockio] + +[tcp] + +[tls] +# certificate chain used to validate client certificate +# certificate_chain = "client.chain" +# server certificate +# certificate = "server.crt" +# server private key +# private_key = "server.key" +# ca certificate file used as the root of trust +# ca_file = "ca.crt" diff --git a/src/rust/common/src/signal.rs b/src/rust/common/src/signal.rs index f4f89a342..bbeff1f0a 100644 --- a/src/rust/common/src/signal.rs +++ b/src/rust/common/src/signal.rs @@ -5,4 +5,5 @@ #[derive(Clone)] pub enum Signal { Shutdown, + Stop, } diff --git a/src/rust/config/src/seg.rs b/src/rust/config/src/seg.rs index 3a84b8331..330b1db01 100644 --- a/src/rust/config/src/seg.rs +++ b/src/rust/config/src/seg.rs @@ -8,6 +8,10 @@ use serde::{Deserialize, Serialize}; const MB: usize = 1024 * 1024; +// restore and graceful shutdown options +const RESTORE: bool = false; +const GRACEFUL_SHUTDOWN: bool = false; + // defaults for hashtable const HASH_POWER: u8 = 16; const OVERFLOW_FACTOR: f64 = 1.0; @@ -24,9 +28,12 @@ const COMPACT_TARGET: usize = 2; const MERGE_TARGET: usize = 4; const MERGE_MAX: usize = 8; -// datapool +// datapool (`Segments.data`) const DATAPOOL_PATH: Option<&str> = None; +// hashtable +const HASHTABLE_PATH: Option<&str> = None; + #[derive(Copy, Clone, Debug, Serialize, Deserialize)] pub enum Eviction { None, @@ -39,6 +46,14 @@ pub enum Eviction { } // helper functions for default values +fn restore() -> bool { + RESTORE +} + +fn graceful_shutdown() -> bool { + GRACEFUL_SHUTDOWN +} + fn hash_power() -> u8 { HASH_POWER } @@ -75,9 +90,17 @@ fn datapool_path() -> Option { DATAPOOL_PATH.map(|v| v.to_string()) } +fn metadata_path() -> Option { + HASHTABLE_PATH.map(|v| v.to_string()) +} + // definitions #[derive(Serialize, Deserialize, Debug)] pub struct Seg { + #[serde(default = "restore")] + restore: bool, + #[serde(default = "graceful_shutdown")] + graceful_shutdown: bool, #[serde(default = "hash_power")] hash_power: u8, #[serde(default = "overflow_factor")] @@ -96,11 +119,15 @@ pub struct Seg { compact_target: usize, #[serde(default = "datapool_path")] datapool_path: Option, + #[serde(default = "metadata_path")] + metadata_path: Option, } impl Default for Seg { fn default() -> Self { Self { + restore: restore(), + graceful_shutdown: graceful_shutdown(), hash_power: hash_power(), overflow_factor: overflow_factor(), heap_size: heap_size(), @@ -110,12 +137,28 @@ impl Default for Seg { merge_max: merge_max(), compact_target: compact_target(), datapool_path: datapool_path(), + metadata_path: metadata_path(), } } } // implementation impl Seg { + // Determines if the `Seg` will be restored. The restoration will be + // successful if `datapool_path` and `metadata_path` are valid paths. + // Otherwise, the `Seg` will be created as + //new. + pub fn restore(&self) -> bool { + self.restore + } + + // Determines if the `Seg` will be gracefully shutdown. The graceful + // shutdown will be successful if the cache is file backed and + // metadata_path` is a valid path to save the relevant `Seg` fields to. + // Otherwise, the relevant `Seg` fields will not be saved. + pub fn graceful_shutdown(&self) -> bool { + self.graceful_shutdown + } pub fn hash_power(&self) -> u8 { self.hash_power } @@ -151,6 +194,10 @@ impl Seg { pub fn datapool_path(&self) -> Option { self.datapool_path.as_ref().map(|v| Path::new(v).to_owned()) } + + pub fn metadata_path(&self) -> Option { + self.metadata_path.as_ref().map(|v| Path::new(v).to_owned()) + } } // trait definitions diff --git a/src/rust/core/server/src/lib.rs b/src/rust/core/server/src/lib.rs index 3b9198c2f..a0b215ef7 100644 --- a/src/rust/core/server/src/lib.rs +++ b/src/rust/core/server/src/lib.rs @@ -114,6 +114,11 @@ pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024; // 16KB // specific upper bounds. const ADMIN_MAX_BUFFER_SIZE: usize = 2 * 1024 * 1024; // 1MB +// TODO(bmartin): this *should* be plenty safe, the queue should rarely ever be +// full, and a single wakeup should drain at least one message and make room for +// the response. A stat to prove that this is sufficient would be good. +const QUEUE_RETRIES: usize = 3; + const THREAD_PREFIX: &str = "pelikan"; metrics::test_no_duplicates!(); diff --git a/src/rust/core/server/src/threads/admin.rs b/src/rust/core/server/src/threads/admin.rs index 1f15f4d63..abb2c24ac 100644 --- a/src/rust/core/server/src/threads/admin.rs +++ b/src/rust/core/server/src/threads/admin.rs @@ -7,6 +7,7 @@ use super::EventLoop; use crate::poll::{Poll, LISTENER_TOKEN, WAKER_TOKEN}; +use crate::QUEUE_RETRIES; use crate::TCP_ACCEPT_EX; use common::signal::Signal; use common::ssl::{HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslStream}; @@ -324,9 +325,9 @@ impl Admin { self.do_accept(); } WAKER_TOKEN => { - #[allow(clippy::never_loop)] // check if we have received signals from any sibling // thread + #[allow(clippy::never_loop)] while let Ok(signal) = self.signal_queue.recv_from(0) { match signal { Signal::Shutdown => { @@ -341,6 +342,11 @@ impl Admin { let _ = self.log_drain.flush(); return; } + Signal::Stop => { + warn!("received stop"); + let _ = self.log_drain.flush(); + return; + } } } } @@ -434,6 +440,25 @@ impl EventLoop for Admin { match request { AdminRequest::FlushAll => {} + AdminRequest::Stop => { + + for _ in 0..QUEUE_RETRIES { + // Send Stop to all other threads + if self.signal_queue.broadcast(Signal::Stop).is_ok() { + warn!("sending stop signal to all threads"); + break; + } + } + for _ in 0..QUEUE_RETRIES { + if self.signal_queue.wake_all().is_ok() { + break; + } + } + + let _ = session.write(b"OK\r\n"); + session.finalize_response(); + ADMIN_RESPONSE_COMPOSE.increment(); + } AdminRequest::Stats => { Self::handle_stats_request(session); } diff --git a/src/rust/core/server/src/threads/listener.rs b/src/rust/core/server/src/threads/listener.rs index 48bb5498b..dd9d7acfc 100644 --- a/src/rust/core/server/src/threads/listener.rs +++ b/src/rust/core/server/src/threads/listener.rs @@ -224,6 +224,10 @@ impl Listener { Signal::Shutdown => { return; } + Signal::Stop => { + warn!("received stop"); + return; + } } } } diff --git a/src/rust/core/server/src/threads/worker/multi.rs b/src/rust/core/server/src/threads/worker/multi.rs index 9fbfce5d3..ef4a1edaa 100644 --- a/src/rust/core/server/src/threads/worker/multi.rs +++ b/src/rust/core/server/src/threads/worker/multi.rs @@ -12,6 +12,7 @@ use super::*; use crate::poll::Poll; use crate::threads::worker::StorageWorker; use crate::threads::worker::TokenWrapper; +use crate::QUEUE_RETRIES; use common::signal::Signal; use common::time::Instant; use config::WorkerConfig; @@ -26,11 +27,6 @@ use session::Session; use std::io::{BufRead, Write}; use std::sync::Arc; -// TODO(bmartin): this *should* be plenty safe, the queue should rarely ever be -// full, and a single wakeup should drain at least one message and make room for -// the request. A stat to prove that this is sufficient would be good. -const QUEUE_RETRIES: usize = 3; - const WAKER_TOKEN: usize = usize::MAX; /// A `MultiWorker` handles events on `Session`s and routes storage requests to @@ -110,8 +106,8 @@ where self.handle_new_sessions(); self.handle_storage_queue(); - #[allow(clippy::never_loop)] // check if we received any signals from the admin thread + #[allow(clippy::never_loop)] while let Ok(signal) = self.signal_queue.recv_from(0) { match signal { Signal::Shutdown => { @@ -119,6 +115,10 @@ where // and stop processing events return; } + Signal::Stop => { + warn!("received stop"); + return; + } } } } diff --git a/src/rust/core/server/src/threads/worker/single.rs b/src/rust/core/server/src/threads/worker/single.rs index 4e4b726e9..ca4c4eeb5 100644 --- a/src/rust/core/server/src/threads/worker/single.rs +++ b/src/rust/core/server/src/threads/worker/single.rs @@ -97,8 +97,8 @@ where WAKER_TOKEN => { self.handle_new_sessions(); - #[allow(clippy::never_loop)] // check if we received any signals from the admin thread + #[allow(clippy::never_loop)] while let Ok(signal) = self.signal_queue.recv_from(0) { match signal { Signal::Shutdown => { @@ -106,6 +106,11 @@ where // and stop processing events return; } + Signal::Stop => { + warn!("received stop"); + self.storage.flush(); + return; + } } } } diff --git a/src/rust/core/server/src/threads/worker/storage.rs b/src/rust/core/server/src/threads/worker/storage.rs index 41b957b3a..0cfd89896 100644 --- a/src/rust/core/server/src/threads/worker/storage.rs +++ b/src/rust/core/server/src/threads/worker/storage.rs @@ -4,6 +4,7 @@ use super::*; use crate::threads::worker::TokenWrapper; +use crate::QUEUE_RETRIES; use common::signal::Signal; use common::time::Instant; use config::WorkerConfig; @@ -17,11 +18,6 @@ use protocol::{Compose, Execute}; use queues::{QueueError, QueuePair, QueuePairs}; use std::sync::Arc; -// TODO(bmartin): this *should* be plenty safe, the queue should rarely ever be -// full, and a single wakeup should drain at least one message and make room for -// the response. A stat to prove that this is sufficient would be good. -const QUEUE_RETRIES: usize = 3; - const WAKER_TOKEN: usize = usize::MAX; /// A `Storage` thread is used in a multi-worker configuration. It owns the @@ -145,8 +141,8 @@ where } } - #[allow(clippy::never_loop)] // check if we received any signals from the admin thread + #[allow(clippy::never_loop)] while let Ok(s) = self.signal_queue.recv_from(0) { match s { Signal::Shutdown => { @@ -158,6 +154,11 @@ where return; } + Signal::Stop => { + warn!("received stop"); + self.storage.flush(); + return; + } } } } diff --git a/src/rust/entrystore/src/lib.rs b/src/rust/entrystore/src/lib.rs index feac89e40..725860507 100644 --- a/src/rust/entrystore/src/lib.rs +++ b/src/rust/entrystore/src/lib.rs @@ -21,4 +21,7 @@ pub trait EntryStore { /// implementation is a no-op. Types which can efficiently implement eager /// expiration should implement their own handling logic for this function. fn expire(&mut self) {} + + /// Flush all values from the entry store to persistent storage. + fn flush(&mut self); } diff --git a/src/rust/entrystore/src/noop/mod.rs b/src/rust/entrystore/src/noop/mod.rs index c47159367..b2c5b78df 100644 --- a/src/rust/entrystore/src/noop/mod.rs +++ b/src/rust/entrystore/src/noop/mod.rs @@ -22,4 +22,5 @@ impl Noop { impl EntryStore for Noop { fn expire(&mut self) {} + fn flush(&mut self) {} } diff --git a/src/rust/entrystore/src/seg/mod.rs b/src/rust/entrystore/src/seg/mod.rs index 193c1aecc..70b7b48fb 100644 --- a/src/rust/entrystore/src/seg/mod.rs +++ b/src/rust/entrystore/src/seg/mod.rs @@ -43,12 +43,15 @@ impl Seg { // build the datastructure from the config let data = ::seg::Seg::builder() + .restore(config.restore()) .hash_power(config.hash_power()) .overflow_factor(config.overflow_factor()) .heap_size(config.heap_size()) .segment_size(config.segment_size()) .eviction(eviction) .datapool_path(config.datapool_path()) + .metadata_path(config.metadata_path()) + .graceful_shutdown(config.graceful_shutdown()) .build(); Self { data } @@ -59,4 +62,10 @@ impl EntryStore for Seg { fn expire(&mut self) { self.data.expire(); } + + /// Flush (gracefully shutdown) the `Seg` cache + fn flush(&mut self) { + // TODO: check if successfully shutdown and record result + self.data.flush(); + } } diff --git a/src/rust/protocol/src/admin.rs b/src/rust/protocol/src/admin.rs index 0222ea490..430fef4b8 100644 --- a/src/rust/protocol/src/admin.rs +++ b/src/rust/protocol/src/admin.rs @@ -18,6 +18,7 @@ pub enum AdminRequest { Stats, Version, Quit, + Stop, } #[derive(Default, Copy, Clone)] @@ -62,6 +63,10 @@ impl Parse for AdminRequestParser { message: AdminRequest::Version, consumed: command_end + CRLF.len(), }), + b"stop" => Ok(ParseOk { + message: AdminRequest::Stop, + consumed: command_end + CRLF.len(), + }), _ => Err(ParseError::UnknownCommand), } } diff --git a/src/rust/protocol/src/memcache/wire/mod.rs b/src/rust/protocol/src/memcache/wire/mod.rs index 6c5ab318e..4f9787f0e 100644 --- a/src/rust/protocol/src/memcache/wire/mod.rs +++ b/src/rust/protocol/src/memcache/wire/mod.rs @@ -229,6 +229,9 @@ where MemcacheRequest::FlushAll => { return None; } + MemcacheRequest::Stop => { + return None; + } }; Some(MemcacheResponse { request, result }) diff --git a/src/rust/protocol/src/memcache/wire/request/command.rs b/src/rust/protocol/src/memcache/wire/request/command.rs index bd31eaea4..035ce7586 100644 --- a/src/rust/protocol/src/memcache/wire/request/command.rs +++ b/src/rust/protocol/src/memcache/wire/request/command.rs @@ -23,6 +23,7 @@ pub enum MemcacheCommand { Cas, Quit, FlushAll, + Stop, } impl TryFrom<&[u8]> for MemcacheCommand { @@ -43,6 +44,7 @@ impl TryFrom<&[u8]> for MemcacheCommand { b"decr" => MemcacheCommand::Decr, b"quit" => MemcacheCommand::Quit, b"flush_all" => MemcacheCommand::FlushAll, + b"stop" => MemcacheCommand::Stop, _ => { return Err(ParseError::UnknownCommand); } @@ -67,6 +69,7 @@ impl std::fmt::Display for MemcacheCommand { Self::Decr => "decr", Self::Quit => "quit", Self::FlushAll => "flush_all", + Self::Stop => "stop", }; write!(f, "{}", name) } diff --git a/src/rust/protocol/src/memcache/wire/request/mod.rs b/src/rust/protocol/src/memcache/wire/request/mod.rs index 8a56ab78c..bd1d6525a 100644 --- a/src/rust/protocol/src/memcache/wire/request/mod.rs +++ b/src/rust/protocol/src/memcache/wire/request/mod.rs @@ -34,6 +34,7 @@ pub enum MemcacheRequest { Decr { key: Key, value: u64, noreply: bool }, Cas { entry: MemcacheEntry, noreply: bool }, FlushAll, + Stop, } impl MemcacheRequest { @@ -101,6 +102,7 @@ impl MemcacheRequest { Self::Decr { .. } => MemcacheCommand::Decr, Self::Cas { .. } => MemcacheCommand::Cas, Self::FlushAll => MemcacheCommand::FlushAll, + Self::Stop => MemcacheCommand::Stop, } } } diff --git a/src/rust/protocol/src/memcache/wire/request/parse.rs b/src/rust/protocol/src/memcache/wire/request/parse.rs index f4da4cfef..20891a67b 100644 --- a/src/rust/protocol/src/memcache/wire/request/parse.rs +++ b/src/rust/protocol/src/memcache/wire/request/parse.rs @@ -66,6 +66,7 @@ impl Parse for MemcacheRequestParser { Err(ParseError::Invalid) } MemcacheCommand::FlushAll => parse_flush_all(buffer), + MemcacheCommand::Stop => parse_stop(buffer), } } } @@ -637,3 +638,22 @@ fn parse_flush_all(buffer: &[u8]) -> Result, ParseError consumed, }) } + +#[allow(clippy::unnecessary_unwrap)] +fn parse_stop(buffer: &[u8]) -> Result, ParseError> { + let mut parse_state = ParseState::new(buffer); + + // this was already checked for when determining the command + let (whitespace, _cmd_end) = parse_state.next_sequence().unwrap(); + + if whitespace != Sequence::Crlf && whitespace != Sequence::SpaceCrlf { + return Err(ParseError::Invalid); + } + + let consumed = parse_state.position(); + + Ok(ParseOk { + message: MemcacheRequest::Stop, + consumed, + }) +} diff --git a/src/rust/protocol/src/memcache/wire/response/mod.rs b/src/rust/protocol/src/memcache/wire/response/mod.rs index 657d03b44..df188130d 100644 --- a/src/rust/protocol/src/memcache/wire/response/mod.rs +++ b/src/rust/protocol/src/memcache/wire/response/mod.rs @@ -245,6 +245,8 @@ impl Compose for MemcacheResponse { CAS.increment(); } MemcacheRequest::FlushAll => {} + // TODO: if needed, add to this statement + MemcacheRequest::Stop => {} } if let MemcacheResult::Values { ref entries, cas } = self.result { let mut hits = 0; diff --git a/src/rust/server/segcache/src/lib.rs b/src/rust/server/segcache/src/lib.rs index 4679f2bab..3f9e6df8f 100644 --- a/src/rust/server/segcache/src/lib.rs +++ b/src/rust/server/segcache/src/lib.rs @@ -71,6 +71,7 @@ impl Segcache { /// fully terminated. This is more likely to be used for running integration /// tests or other automated testing. pub fn shutdown(self) { + // TODO: flush the cache self.process.shutdown() } } diff --git a/src/rust/storage/seg/Cargo.toml b/src/rust/storage/seg/Cargo.toml index 775448ebb..632baf8e9 100644 --- a/src/rust/storage/seg/Cargo.toml +++ b/src/rust/storage/seg/Cargo.toml @@ -35,6 +35,7 @@ rand_chacha = { version = "0.3.0" } rand_xoshiro = { version = "0.6.0" } storage-types = { path = "../types" } thiserror = "1.0.24" +tempfile = "3.3.0" [dev-dependencies] criterion = "0.3.4" diff --git a/src/rust/storage/seg/src/builder.rs b/src/rust/storage/seg/src/builder.rs index 0c41a90e9..abb908c32 100644 --- a/src/rust/storage/seg/src/builder.rs +++ b/src/rust/storage/seg/src/builder.rs @@ -4,28 +4,44 @@ //! A builder for configuring a new [`Seg`] instance. +use crate::datapool::*; use crate::*; use std::path::Path; +use std::path::PathBuf; /// A builder that is used to construct a new [`Seg`] instance. pub struct Builder { + restore: bool, hash_power: u8, overflow_factor: f64, segments_builder: SegmentsBuilder, + metadata_path: Option, + graceful_shutdown: bool, } // Defines the default parameters impl Default for Builder { fn default() -> Self { Self { + restore: false, hash_power: 16, overflow_factor: 0.0, segments_builder: SegmentsBuilder::default(), + metadata_path: None, + graceful_shutdown: false, } } } impl Builder { + /// Specify to `Builder` and `SegmentsBuilder` whether the cache will be restored. + /// Otherwise, the cache will be created and treated as new. + pub fn restore(mut self, restore: bool) -> Self { + self.restore = restore; + self.segments_builder = self.segments_builder.restore(restore); + self + } + /// Specify the hash power, which limits the size of the hashtable to 2^N /// entries. 1/8th of these are used for metadata storage, meaning that the /// total number of items which can be held in the cache is limited to @@ -135,17 +151,32 @@ impl Builder { self } - /// Specify a backing file to be used for segment storage. - /// - /// # Panics - /// - /// This will panic if the file already exists + /// Specify a backing file to be used for `Segments.data` storage. pub fn datapool_path>(mut self, path: Option) -> Self { self.segments_builder = self.segments_builder.datapool_path(path); self } + /// Specify a backing file to be used for metadata storage. + pub fn metadata_path>(mut self, path: Option) -> Self { + self.metadata_path = path.map(|p| p.as_ref().to_owned()); + self + } + + /// Specify whether the cache will be gracefully shutdown. If `true`, then + /// when the cache is flushed, the relevant parts will be stored to the file + /// with path `metadata_path` + pub fn graceful_shutdown(mut self, graceful_shutdown: bool) -> Self { + self.graceful_shutdown = graceful_shutdown; + self + } + /// Consumes the builder and returns a fully-allocated `Seg` instance. + /// If `restore`, the cache `Segments.data` is file backed by an existing + /// file and a valid file for the `metadata` is given, `Seg` will be + /// restored. Otherwise, create a new `Seg` instance. The path for the + /// `metadata` file will be saved with the `Seg` instance to be used to save + // the structures to upon graceful shutdown. /// /// ``` /// use seg::{Policy, Seg}; @@ -159,14 +190,58 @@ impl Builder { /// .eviction(Policy::Random).build(); /// ``` pub fn build(self) -> Seg { + // If `restore` and there is a path for the metadata file to + // restore from, restore the cache + if self.restore && self.metadata_path.is_some() { + // Check if the metadata file exists and with what size + if let Ok(file_size) = + std::fs::metadata(self.metadata_path.as_ref().unwrap()).map(|m| m.len()) + { + // TODO: implement a non-messy way to calculate expected file size, rather than just taking actual size + let file_size = file_size as usize; + + // Mmap file + let mut pool = File::create(self.metadata_path.clone().unwrap(), file_size, true) + .expect("failed to allocate file backed storage"); + let metadata = pool.as_mut_slice(); + + let hashtable = HashTable::restore(metadata, self.hash_power, self.overflow_factor); + + let mut offset = hashtable.recover_size(); + let ttl_buckets = TtlBuckets::restore(&metadata[offset..]); + + offset += ttl_buckets.recover_size(); + + let segments = self + .segments_builder + .clone() + .build(Some(&mut metadata[offset..])); + + // Check that `Segments` was copied back, it will fail if the + // file for the file backed `Segments.data` did not exist + if segments.fields_copied_back { + return Seg { + hashtable, + segments, + ttl_buckets, + metadata_path: self.metadata_path, + graceful_shutdown: self.graceful_shutdown, + }; + } + } + } + + // Otherwise, create a new cache + let segments = self.segments_builder.build(None); let hashtable = HashTable::new(self.hash_power, self.overflow_factor); - let segments = self.segments_builder.build(); - let ttl_buckets = TtlBuckets::default(); + let ttl_buckets = TtlBuckets::new(); Seg { hashtable, segments, ttl_buckets, + metadata_path: self.metadata_path, + graceful_shutdown: self.graceful_shutdown, } } } diff --git a/src/rust/storage/seg/src/datapool/file.rs b/src/rust/storage/seg/src/datapool/file.rs index 1bd68c7fd..46d5bfc08 100644 --- a/src/rust/storage/seg/src/datapool/file.rs +++ b/src/rust/storage/seg/src/datapool/file.rs @@ -21,29 +21,58 @@ pub struct File { impl File { /// Create a new `File` datapool at the given path and with the specified - /// size (in bytes). Returns an error if the file already exists, could not - /// be created, couldn't be extended to the requested size, or couldn't be - /// mmap'd + /// size (in bytes). If a file already exists at the given path, check it is + /// the right size and open it. Otherwise, open a new file at the given path + ///and with the specified size. Returns an error if could not be created, + /// size of file is not the right size (opening), couldn't be extended to + /// the requested size (creating), or couldn't be mmap'd. pub fn create>( path: T, size: usize, prefault: bool, ) -> Result { - let file = OpenOptions::new() - .create_new(true) - .read(true) - .write(true) - .open(path)?; - file.set_len(size as u64)?; - let mut mmap = unsafe { MmapOptions::new().populate().map_mut(&file)? }; - if prefault { - let mut offset = 0; - while offset < size { - mmap[offset] = 0; - offset += PAGE_SIZE; + // TODO: uncomment below code once there is a better way to determine expected `size` of the existing file + // check if the file exists and is the right size + // let exists = if let Ok(current_size) = std::fs::metadata(&path).map(|m| m.len()) { + // if current_size != size as u64 { + // return Err(std::io::Error::new( + // std::io::ErrorKind::Other, + // "existing file has wrong size", + // )); + // } + // true + // } else { + // false + // }; + + let exists = std::fs::metadata(&path).is_ok(); + + let mmap = if exists { + let f = OpenOptions::new().read(true).write(true).open(path)?; + + unsafe { MmapOptions::new().populate().map_mut(&f)? } + } else { + let f = OpenOptions::new() + .create_new(true) + .read(true) + .write(true) + .open(path)?; + f.set_len(size as u64)?; + + let mut mmap = unsafe { MmapOptions::new().populate().map_mut(&f)? }; + + if prefault { + let mut offset = 0; + while offset < size { + mmap[offset] = 0; + offset += PAGE_SIZE; + } + mmap.flush()?; } - mmap.flush()?; - } + + mmap + }; + Ok(Self { mmap, size }) } } diff --git a/src/rust/storage/seg/src/datapool/memory.rs b/src/rust/storage/seg/src/datapool/memory.rs index f9aff9f88..d905d61a0 100644 --- a/src/rust/storage/seg/src/datapool/memory.rs +++ b/src/rust/storage/seg/src/datapool/memory.rs @@ -8,6 +8,7 @@ use crate::datapool::Datapool; /// A contiguous allocation of bytes in main memory +#[derive(Clone)] pub struct Memory { data: Box<[u8]>, } @@ -49,3 +50,10 @@ impl Datapool for Memory { Ok(()) } } + +// Used only in Segments::clone() in order to clone `Segments.data` +impl From> for Memory { + fn from(data: Box<[u8]>) -> Memory { + Memory { data } + } +} diff --git a/src/rust/storage/seg/src/eviction/mod.rs b/src/rust/storage/seg/src/eviction/mod.rs index f5a31111c..184944014 100644 --- a/src/rust/storage/seg/src/eviction/mod.rs +++ b/src/rust/storage/seg/src/eviction/mod.rs @@ -21,6 +21,7 @@ pub use policy::Policy; /// The `Eviction` struct is used to rank and return segments for eviction. It /// implements eviction strategies corresponding to the `Policy`. +#[derive(Clone, PartialEq)] pub struct Eviction { policy: Policy, last_update_time: Instant, diff --git a/src/rust/storage/seg/src/hashtable/hash_bucket.rs b/src/rust/storage/seg/src/hashtable/hash_bucket.rs index d29f47c41..e9c3bc194 100644 --- a/src/rust/storage/seg/src/hashtable/hash_bucket.rs +++ b/src/rust/storage/seg/src/hashtable/hash_bucket.rs @@ -72,7 +72,7 @@ pub(crate) const CLEAR_FREQ_SMOOTH_MASK: u64 = 0xFFF7_FFFF_FFFF_FFFF; /// Mask to get the lower 16 bits from a timestamp pub(crate) const PROC_TS_MASK: u64 = 0x0000_0000_0000_FFFF; -#[derive(Copy, Clone)] +#[derive(Clone, Copy, Debug, PartialEq)] pub(crate) struct HashBucket { pub(super) data: [u64; N_BUCKET_SLOT], } diff --git a/src/rust/storage/seg/src/hashtable/mod.rs b/src/rust/storage/seg/src/hashtable/mod.rs index 230b720c8..2b73ed1f0 100644 --- a/src/rust/storage/seg/src/hashtable/mod.rs +++ b/src/rust/storage/seg/src/hashtable/mod.rs @@ -98,6 +98,7 @@ static_metrics! { /// Main structure for performing item lookup. Contains a contiguous allocation /// of [`HashBucket`]s which are used to store item info and metadata. +#[derive(Clone)] #[repr(C)] pub(crate) struct HashTable { hash_builder: Box, @@ -107,6 +108,10 @@ pub(crate) struct HashTable { rng: Box, started: Instant, next_to_chain: u64, + /// Is `HashTable` copied back from a file? + pub(crate) _table_copied_back: bool, + /// Used in graceful shutdown + overflow_factor: f64, } impl HashTable { @@ -130,19 +135,16 @@ impl HashTable { let total_buckets = (buckets as f64 * (1.0 + overflow_factor)).ceil() as usize; let mut data = Vec::with_capacity(0); + // set number of elements in `data` to be `total_buckets` data.reserve_exact(total_buckets as usize); + // fill all elements with `HashBucket::new()` data.resize(total_buckets as usize, HashBucket::new()); debug!( "hashtable has: {} primary slots across {} primary buckets and {} total buckets", slots, buckets, total_buckets, ); - let hash_builder = RandomState::with_seeds( - 0xbb8c484891ec6c86, - 0x0522a25ae9c769f9, - 0xeed2797b9571bc75, - 0x4feb29c1fbbd59d0, - ); + let hash_builder = hash_builder(); Self { hash_builder: Box::new(hash_builder), @@ -152,9 +154,140 @@ impl HashTable { rng: Box::new(rng()), started: Instant::recent(), next_to_chain: buckets as u64, + _table_copied_back: false, + overflow_factor, + } + } + + // Returns a restored `HashTable` using recovery data (`metadata`) + pub fn restore(metadata: &[u8], cfg_power: u8, overflow_factor: f64) -> Self { + // restore() assumes no changes in `power`. + // I.e. config specifies same `power` as `HashTable` we are + // restoring from + // TODO: Detect a change of `power` and adjust `HashTable` accordingly + let total_buckets = total_buckets(cfg_power.into(), overflow_factor); + let bucket_size = ::std::mem::size_of::(); + let u64_size = ::std::mem::size_of::(); + let started_size = ::std::mem::size_of::(); + // Size of all components of `HashTable` that are being restored + let hashtable_size = u64_size * 3 // `power`, `mask`, `next_to_chain` + + total_buckets * bucket_size // `data` + + started_size; + + // create blank bytes to copy data into + let mut bytes = vec![0; hashtable_size]; + // retrieve bytes from mmapped file + bytes.copy_from_slice(&metadata[0..hashtable_size]); + + // ----- Re-initialise `hash_builder` ----- + + let hash_builder = hash_builder(); + + let mut offset = 0; + // ----- Retrieve `power` --------- + let mut end = u64_size; + + let power = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut u64) }; + // TODO: compare `cfg_power` and `power` + + offset += u64_size; + // ----- Retrieve `mask` --------- + end += u64_size; + + let mask = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut u64) }; + + offset += u64_size; + // ----- Retrieve `data` --------- + + let mut data = Vec::with_capacity(0); + data.reserve_exact(total_buckets as usize); + + // Get each `HashBucket` from the raw bytes + for _ in 0..total_buckets { + end += bucket_size; + + // cast bytes to `HashBucket` + let bucket = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut HashBucket) }; + data.push(bucket); + + offset += bucket_size; + } + + // ----- Retrieve `started` --------- + end += started_size; + + let started = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut Instant) }; + + offset += started_size; + // ----- Retrieve `next_to_chain` --------- + end += u64_size; + + let next_to_chain = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut u64) }; + + Self { + hash_builder: Box::new(hash_builder), + power, + mask, + data: data.into_boxed_slice(), + rng: Box::new(rng()), + started, + next_to_chain, + _table_copied_back: true, + overflow_factor, } } + /// Flushes the `HashTable` by copying it to `metadata` + pub fn flush(&self, metadata: &mut [u8]) { + let total_buckets = total_buckets(self.power, self.overflow_factor); + let bucket_size = ::std::mem::size_of::(); + let u64_size = ::std::mem::size_of::(); + let started_size = ::std::mem::size_of::(); + + let mut offset = 0; + // --------------------- Store `power` ----------------- + + // cast `power` to byte pointer + let byte_ptr = (&self.power as *const u64) as *const u8; + + // store `power` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, u64_size, metadata); + + // --------------------- Store `mask` ----------------- + + // cast `mask` to byte pointer + let byte_ptr = (&self.mask as *const u64) as *const u8; + + // store `mask` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, u64_size, metadata); + // --------------------- Store `data` ----------------- + + // for every `HashBucket` + for id in 0..total_buckets { + // cast `HashBucket` to byte pointer + let byte_ptr = (&self.data[id] as *const HashBucket) as *const u8; + + // store `HashBucket` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, bucket_size, metadata); + } + + // --------------------- Store `started` ----------------- + + // cast `started` to byte pointer + let byte_ptr = (&self.started as *const Instant) as *const u8; + + // store `started` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, started_size, metadata); + // --------------------- Store `next_to_chain` ----------------- + + // cast `next_to_chain` to byte pointer + let byte_ptr = (&self.next_to_chain as *const u64) as *const u8; + + // store `next_to_chain` back to mmapped file + store::store_bytes_and_update_offset(byte_ptr, offset, u64_size, metadata); + // ------------------------------------------------------------- + } + /// Lookup an item by key and return it pub fn get(&mut self, key: &[u8], segments: &mut Segments) -> Option { let hash = self.hash(key); @@ -721,4 +854,46 @@ impl HashTable { hasher.write(key); hasher.finish() } + + /// TODO: this code is repeated in restore() and flush(), can it be reduced? + /// Function used by `Builder` to calculate the number of bytes of the `HashTable` + /// that are stored/restored + pub fn recover_size(&self) -> usize { + let total_buckets = total_buckets(self.power, self.overflow_factor); + let bucket_size = ::std::mem::size_of::(); + let u64_size = ::std::mem::size_of::(); + let started_size = ::std::mem::size_of::(); + // Size of all components of `HashTable` that are being restored + u64_size * 3 // `power`, `mask`, `next_to_chain` + + total_buckets * bucket_size // `data` + + started_size + } +} + +impl PartialEq for HashTable { + // Checks if `HashTable` are equivalent + fn eq(&self, other: &Self) -> bool { + self.data == other.data + && self.power == other.power + && self.mask == other.mask + && self.started == other.started + && self.next_to_chain == other.next_to_chain + } +} + +/// Internal function used to calculate the total number of buckets +fn total_buckets(power: u64, overflow_factor: f64) -> usize { + let slots = 1_u64 << power; + let buckets = slots / 8; + (buckets as f64 * (1.0 + overflow_factor)).ceil() as usize +} + +/// Internal function used to generate a new `hash_builder` +fn hash_builder() -> RandomState { + RandomState::with_seeds( + 0xbb8c484891ec6c86, + 0x0522a25ae9c769f9, + 0xeed2797b9571bc75, + 0x4feb29c1fbbd59d0, + ) } diff --git a/src/rust/storage/seg/src/lib.rs b/src/rust/storage/seg/src/lib.rs index 4f573cbd3..cf0b3fd1f 100644 --- a/src/rust/storage/seg/src/lib.rs +++ b/src/rust/storage/seg/src/lib.rs @@ -43,6 +43,7 @@ mod item; mod rand; mod seg; mod segments; +mod store; mod ttl_buckets; // tests diff --git a/src/rust/storage/seg/src/seg.rs b/src/rust/storage/seg/src/seg.rs index c9581953e..eff802a63 100644 --- a/src/rust/storage/seg/src/seg.rs +++ b/src/rust/storage/seg/src/seg.rs @@ -4,9 +4,11 @@ //! Core datastructure +use crate::datapool::*; use crate::Value; use crate::*; use std::cmp::min; +use std::path::PathBuf; use metrics::{static_metrics, Counter}; @@ -22,10 +24,15 @@ static_metrics! { /// segment-structured design that stores data in fixed-size segments, grouping /// objects with nearby expiration time into the same segment, and lifting most /// per-object metadata into the shared segment header. +#[derive(Clone)] pub struct Seg { pub(crate) hashtable: HashTable, pub(crate) segments: Segments, pub(crate) ttl_buckets: TtlBuckets, + // Path to metadata datapool + pub(crate) metadata_path: Option, + // Will the cache be gracefully shutdown? + pub(crate) graceful_shutdown: bool, } impl Seg { @@ -48,6 +55,40 @@ impl Seg { Builder::default() } + /// If `graceful_shutdown`, flushe cache by storing all the relevant fields + /// of `Segments`, `HashTable` and `TtlBuckets` to the `metadata` file + /// (if it exists) and flushing `Segments.data` (if it is file backed) + pub fn flush(&self) -> std::io::Result<()> { + if self.graceful_shutdown { + if let Some(file) = &self.metadata_path { + let file_size = self.hashtable.recover_size() + + self.ttl_buckets.recover_size() + + self.segments.recover_size(); + + // Mmap file + let mut pool = File::create(file, file_size, true) + .expect("failed to allocate file backed storage"); + let metadata = pool.as_mut_slice(); + + self.hashtable.flush(metadata); + let mut offset = self.hashtable.recover_size(); + self.ttl_buckets.flush(&mut metadata[offset..]); + offset += self.ttl_buckets.recover_size(); + self.segments.flush(&mut metadata[offset..])?; + + // TODO: check if this flushes the CPU caches + pool.flush()?; + return Ok(()); + } + } + + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Path to datapool to is None, cannot gracefully + shutdown cache", + )) + } + /// Gets a count of items in the `Seg` instance. This is an expensive /// operation and is only enabled for tests and builds with the `debug` /// feature enabled. @@ -146,6 +187,11 @@ impl Seg { return Err(SegError::ItemOversized { size }); } Err(TtlBucketsError::NoFreeSegments) => { + if retries == RESERVE_RETRIES { + // first attempt to acquire a free segment, increment + // the stats + SEGMENT_REQUEST.increment(); + } if self .segments .evict(&mut self.ttl_buckets, &mut self.hashtable) @@ -307,6 +353,14 @@ impl Seg { } } + // Indicated if `Seg` has been restored + #[cfg(test)] + pub(crate) fn restored(&self) -> bool { + self.segments.fields_copied_back + && self.ttl_buckets._buckets_copied_back + && self.hashtable._table_copied_back + } + /// Perform a wrapping addition on the value stored at the supplied key. /// Returns an error if the key is invalid, the item is not found, or the /// stored value is not a numeric type. @@ -331,3 +385,12 @@ impl Seg { Ok(item) } } + +impl PartialEq for Seg { + // Checks if `Segments` are equivalent + fn eq(&self, other: &Self) -> bool { + self.segments == other.segments + && self.hashtable == other.hashtable + && self.ttl_buckets == other.ttl_buckets + } +} diff --git a/src/rust/storage/seg/src/segments/builder.rs b/src/rust/storage/seg/src/segments/builder.rs index 578bbb915..87e59c9f5 100644 --- a/src/rust/storage/seg/src/segments/builder.rs +++ b/src/rust/storage/seg/src/segments/builder.rs @@ -11,7 +11,9 @@ use crate::segments::*; use std::path::{Path, PathBuf}; /// The `SegmentsBuilder` allows for the configuration of the segment storage. +#[derive(Clone)] pub(crate) struct SegmentsBuilder { + pub(super) restore: bool, pub(super) heap_size: usize, pub(super) segment_size: i32, pub(super) evict_policy: Policy, @@ -21,6 +23,7 @@ pub(crate) struct SegmentsBuilder { impl Default for SegmentsBuilder { fn default() -> Self { Self { + restore: false, segment_size: 1024 * 1024, heap_size: 64 * 1024 * 1024, evict_policy: Policy::Random, @@ -30,6 +33,14 @@ impl Default for SegmentsBuilder { } impl<'a> SegmentsBuilder { + /// Specify whether the `Segments` fields' will be restored + /// from the `metadata`. Otherwise, the cache will be created and treated as + // new. + pub fn restore(mut self, restore: bool) -> Self { + self.restore = restore; + self + } + /// Set the segment size in bytes. /// /// # Panics @@ -62,7 +73,7 @@ impl<'a> SegmentsBuilder { self } - /// Specify a backing file to be used for the segment storage. If provided, + /// Specify a backing file to be used for the `Segments.data` storage. If provided, /// a file will be created at the corresponding path and used for segment /// storage. pub fn datapool_path>(mut self, path: Option) -> Self { @@ -71,7 +82,7 @@ impl<'a> SegmentsBuilder { } /// Construct the [`Segments`] from the builder - pub fn build(self) -> Segments { - Segments::from_builder(self) + pub fn build(self, option_metadata: Option<&mut [u8]>) -> Segments { + Segments::from_builder(self, option_metadata) } } diff --git a/src/rust/storage/seg/src/segments/header.rs b/src/rust/storage/seg/src/segments/header.rs index 5bbed133a..67bf3a85e 100644 --- a/src/rust/storage/seg/src/segments/header.rs +++ b/src/rust/storage/seg/src/segments/header.rs @@ -35,7 +35,7 @@ use crate::*; // TODO(bmartin): this should be parameterized. const SEG_MATURE_TIME: Duration = Duration::from_secs(20); -#[derive(Debug)] +#[derive(Debug, Copy, Clone, PartialEq)] #[repr(C)] pub struct SegmentHeader { /// The id for this segment diff --git a/src/rust/storage/seg/src/segments/mod.rs b/src/rust/storage/seg/src/segments/mod.rs index dbc89e376..ab7e3a664 100644 --- a/src/rust/storage/seg/src/segments/mod.rs +++ b/src/rust/storage/seg/src/segments/mod.rs @@ -29,7 +29,7 @@ mod test { fn free_q() { let mut segments = SegmentsBuilder::default() .heap_size(16 * 1024 * 1024) - .build(); + .build(None); let mut used = Vec::new(); for _i in 0..16 { let id = segments.pop_free().unwrap(); diff --git a/src/rust/storage/seg/src/segments/segments.rs b/src/rust/storage/seg/src/segments/segments.rs index 3c7645d5b..51cc4413c 100644 --- a/src/rust/storage/seg/src/segments/segments.rs +++ b/src/rust/storage/seg/src/segments/segments.rs @@ -41,42 +41,50 @@ pub(crate) struct Segments { flush_at: Instant, /// Eviction configuration and state evict: Box, + /// Is `data` file backed? + data_file_backed: bool, + /// Are `headers` copied back from a file? + pub(crate) fields_copied_back: bool, } impl Segments { /// Private function which allocates and initializes the `Segments` by - /// taking ownership of the builder - pub(super) fn from_builder(builder: SegmentsBuilder) -> Self { - let segment_size = builder.segment_size; - let segments = builder.heap_size / (builder.segment_size as usize); + /// taking ownership of the builder. `Segments` is restored if there is + /// recovery `metadata`, otherwise a new `Segments` is created. + pub(super) fn from_builder( + builder: SegmentsBuilder, + option_metadata: Option<&mut [u8]>, + ) -> Self { + let cfg_segment_size = builder.segment_size; + let cfg_segments = builder.heap_size / (builder.segment_size as usize); debug!( "heap size: {} seg size: {} segments: {}", - builder.heap_size, segment_size, segments + builder.heap_size, cfg_segment_size, cfg_segments ); assert!( - segments < (1 << 24), // we use just 24 bits to store the seg id + cfg_segments < (1 << 24), // we use just 24 bits to store the seg id "heap size requires too many segments, reduce heap size or increase segment size" ); + // initialise `evict` let evict_policy = builder.evict_policy; + let evict = Eviction::new(cfg_segments, evict_policy); debug!("eviction policy: {:?}", evict_policy); let mut headers = Vec::with_capacity(0); - headers.reserve_exact(segments); - for id in 0..segments { - // safety: we start iterating from 1 and seg id is constrained to < 2^24 - let header = SegmentHeader::new(unsafe { NonZeroU32::new_unchecked(id as u32 + 1) }); - headers.push(header); - } - let mut headers = headers.into_boxed_slice(); + headers.reserve_exact(cfg_segments); - let heap_size = segments * segment_size as usize; + let heap_size = cfg_segments * cfg_segment_size as usize; + let mut data_file_backed = false; + let mut data_on_existing_file = false; // TODO(bmartin): we always prefault, this should be configurable let mut data: Box = if let Some(file) = builder.datapool_path { + data_file_backed = true; + data_on_existing_file = std::fs::metadata(&file).is_ok(); let pool = File::create(file, heap_size, true) .expect("failed to allocate file backed storage"); Box::new(pool) @@ -84,42 +92,221 @@ impl Segments { Box::new(Memory::create(heap_size, true)) }; - for idx in 0..segments { - let begin = segment_size as usize * idx; - let end = begin + segment_size as usize; + // If `builder.restore` `Segments.data` is file backed with an existing + // file and metadata` to restore the `Segments` with, restore relevant + // `Segments` fields. Otherwise create a new `Segments`. + if builder.restore && data_on_existing_file { + if let Some(metadata) = option_metadata { + // TODO: like with the HashTable fields, we assume that the configuration + // options for `Segments` hasn't changed upon recovery. We need a way to + // detect the change in fields as well as decided how to + // deal with such changes. + + let header_size: usize = ::std::mem::size_of::(); + let i32_size = ::std::mem::size_of::(); + let u32_size = ::std::mem::size_of::(); + let free_q_size = ::std::mem::size_of::>(); + let flush_at_size = ::std::mem::size_of::(); + // Size of all components of `Segments` that are being restored + let fields_size = cfg_segments * header_size // `headers` + + i32_size // `segment_size` + + u32_size * 2 // `free` and `cap` + + free_q_size + + flush_at_size; + + // create blank bytes to copy data into + let mut bytes = vec![0; fields_size]; + // retrieve bytes from mmapped file + bytes.copy_from_slice(&metadata[0..fields_size]); + + let mut offset = 0; + let mut end = 0; + // ----- Retrieve `headers` ----- + + // retrieve each `SegmentHeader` from the raw bytes + for _ in 0..cfg_segments { + end += header_size; + + // cast bytes to `SegmentHeader` + let header = + unsafe { *(bytes[offset..end].as_mut_ptr() as *mut SegmentHeader) }; + headers.push(header); + + offset += header_size; + } + + // ----- Retrieve `segment_size` ----- + end += i32_size; + + let segment_size = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut i32) }; + // TODO: compare `cfg_segment_size` and `segment_size` + + offset += i32_size; + // ----- Retrieve `free` ----- + end += u32_size; + + let free = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut u32) }; + + offset += u32_size; + // ----- Retrieve `cap` ----- + end += u32_size; + + let cap = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut u32) }; + + offset += u32_size; + // ----- Retrieve `free_q` ----- + end += free_q_size; + + let free_q = + unsafe { *(bytes[offset..end].as_mut_ptr() as *mut Option) }; + + offset += free_q_size; + // ----- Retrieve `flush_at` ----- + end += flush_at_size; + + let flush_at = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut Instant) }; + + SEGMENT_CURRENT.set(cap as _); + SEGMENT_FREE.set(free as _); + + return Self { + headers: headers.into_boxed_slice(), + data, + segment_size, + free, + cap, + free_q, + flush_at, + evict: Box::new(evict), + data_file_backed: true, + fields_copied_back: true, + }; + } + } + + // Create new `Segments` + for id in 0..cfg_segments { + // safety: we start iterating from 1 and seg id is constrained to < 2^24 + let header = SegmentHeader::new(unsafe { NonZeroU32::new_unchecked(id as u32 + 1) }); + headers.push(header); + } + + let mut headers = headers.into_boxed_slice(); + + for idx in 0..cfg_segments { + let begin = cfg_segment_size as usize * idx; + let end = begin + cfg_segment_size as usize; let mut segment = Segment::from_raw_parts(&mut headers[idx], &mut data.as_mut_slice()[begin..end]); segment.init(); - let id = idx as u32 + 1; // we index segments from 1 + let id = idx as u32 + 1; // we index cfg_segments from 1 segment.set_prev_seg(NonZeroU32::new(id - 1)); - if id < segments as u32 { + if id < cfg_segments as u32 { segment.set_next_seg(NonZeroU32::new(id + 1)); } } - SEGMENT_CURRENT.set(segments as _); - SEGMENT_FREE.set(segments as _); + SEGMENT_CURRENT.set(cfg_segments as _); + SEGMENT_FREE.set(cfg_segments as _); Self { headers, - segment_size, - cap: segments as u32, - free: segments as u32, + segment_size: cfg_segment_size, + cap: cfg_segments as u32, + free: cfg_segments as u32, free_q: NonZeroU32::new(1), data, flush_at: Instant::recent(), - evict: Box::new(Eviction::new(segments, evict_policy)), + evict: Box::new(evict), + data_file_backed, + fields_copied_back: false, } } + /// Flushes the `Segments` by flushing the `Segments.data` (if filed backed) + /// and copying the other `Segments` fields' by copying it to `metadata` + pub fn flush(&self, metadata: &mut [u8]) -> std::io::Result<()> { + // if `Segments.data` is file backed, flush it to file + if self.data_file_backed { + self.data.flush()?; + } + + let header_size: usize = ::std::mem::size_of::(); + let i32_size = ::std::mem::size_of::(); + let u32_size = ::std::mem::size_of::(); + let free_q_size = ::std::mem::size_of::>(); + let flush_at_size = ::std::mem::size_of::(); + + let mut offset = 0; + // ----- Store `headers` ----- + + // for every `SegmentHeader` + for id in 0..(self.cap as usize) { + // cast `SegmentHeader` to byte pointer + let byte_ptr = (&self.headers[id] as *const SegmentHeader) as *const u8; + + // store `SegmentHeader` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, header_size, metadata); + } + + // ----- Store `segment_size` ----- + + // cast `segment_size` to byte pointer + let byte_ptr = (&self.segment_size as *const i32) as *const u8; + + // store `segment_size` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, i32_size, metadata); + + // ----- Store `free` ----- + + // cast `free` to byte pointer + let byte_ptr = (&self.free as *const u32) as *const u8; + + // store `free` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, u32_size, metadata); + + // ----- Store `cap` ----- + + // cast `cap` to byte pointer + let byte_ptr = (&self.cap as *const u32) as *const u8; + + // store `cap` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, u32_size, metadata); + + // ----- Store `free_q` ----- + + // cast `free_q` to byte pointer + let byte_ptr = (&self.free_q as *const Option) as *const u8; + + // store `free_q` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, free_q_size, metadata); + + // ----- Store `flush_at` ----- + + // cast `flush_at` to byte pointer + let byte_ptr = (&self.flush_at as *const Instant) as *const u8; + + // store `flush_at` back to mmapped file + store::store_bytes_and_update_offset(byte_ptr, offset, flush_at_size, metadata); + + // ----------------------------- + Ok(()) + } + /// Return the size of each segment in bytes #[inline] pub fn segment_size(&self) -> i32 { self.segment_size } + /// Returns if `data` is file backed + #[cfg(test)] + pub fn data_file_backed(&self) -> bool { + self.data_file_backed + } + /// Returns the number of free segments #[cfg(test)] pub fn free(&self) -> usize { @@ -571,7 +758,7 @@ impl Segments { // reduces CPU load under heavy rewrite/delete workloads at the // cost of letting more dead items remain in the segements, // reducing the hitrate - // if self.headers[seg_id as usize].merge_at() + CoarseDuration::from_secs(30) > CoarseInstant::recent() { + // if self.headers[seg_id as usize].merge_at() + CoarseDuration::from_secs(30) > Instant::recent() { // return Ok(()); // } @@ -920,10 +1107,66 @@ impl Segments { Ok(next_id) } + + /// TODO: this code is repeated in restore() and flush(), can it be reduced? + /// Function used by `Builder` to calculate the number of bytes of the `Segments` + /// that are stored/restored + pub fn recover_size(&self) -> usize { + let header_size: usize = ::std::mem::size_of::(); + let i32_size = ::std::mem::size_of::(); + let u32_size = ::std::mem::size_of::(); + let free_q_size = ::std::mem::size_of::>(); + let flush_at_size = ::std::mem::size_of::(); + // Size of all components of `Segments` that are being restored + (self.cap as usize) * header_size // `headers` + + i32_size // `segment_size` + + u32_size * 2 // `free` and `cap` + + free_q_size + + flush_at_size + } } impl Default for Segments { fn default() -> Self { - Self::from_builder(Default::default()) + Self::from_builder(Default::default(), None) + } +} + +impl PartialEq for Segments { + // Checks if `Segments` are equivalent + fn eq(&self, other: &Self) -> bool { + self.headers == other.headers + && self.data.as_slice() == other.data.as_slice() + && self.segment_size == other.segment_size + && self.free == other.free + && self.cap == other.cap + && self.free_q == other.free_q + && self.flush_at == other.flush_at + } +} + +impl Clone for Segments { + // Used in testing to clone a `Segments` to compare equivalency with + fn clone(&self) -> Self { + // clone `data` + let heap_size = self.segment_size as usize * self.cap as usize; + let mut data = vec![0; heap_size]; + data.clone_from_slice(self.data.as_slice()); + let segment_data = Memory::from(data.into_boxed_slice()); + //let segment_data = Memory::memory_from_data(data.into_boxed_slice()); + + // Return a `Segments` where everything relevant is cloned + Self { + headers: self.headers.clone(), + data: Box::new(segment_data), + segment_size: self.segment_size, + free: self.free, + cap: self.cap, + free_q: self.free_q, + flush_at: self.flush_at, + evict: self.evict.clone(), // not relevant + data_file_backed: self.data_file_backed, // not relevant + fields_copied_back: self.fields_copied_back, // not relevant + } } } diff --git a/src/rust/storage/seg/src/store.rs b/src/rust/storage/seg/src/store.rs new file mode 100644 index 000000000..1e3e275f3 --- /dev/null +++ b/src/rust/storage/seg/src/store.rs @@ -0,0 +1,19 @@ +/// Copies `size` bytes at `byte_ptr` to the `offset` of `data` +/// Returns the next `offset`, that is, the next byte of `data` to be copied into +pub fn store_bytes_and_update_offset( + byte_ptr: *const u8, + offset: usize, + size: usize, + data: &mut [u8], +) -> usize { + // get corresponding bytes from byte pointer + let bytes = unsafe { ::std::slice::from_raw_parts(byte_ptr, size) }; + + let end = offset + size; + + // store `bytes` to `data` + data[offset..end].copy_from_slice(bytes); + + // next `offset` + end +} diff --git a/src/rust/storage/seg/src/tests.rs b/src/rust/storage/seg/src/tests.rs index f54e2a7d6..b578be017 100644 --- a/src/rust/storage/seg/src/tests.rs +++ b/src/rust/storage/seg/src/tests.rs @@ -6,8 +6,10 @@ use super::*; use crate::hashtable::HashBucket; use crate::item::ITEM_HDR_SIZE; use core::num::NonZeroU32; - +use std::collections::HashSet; +use std::path::PathBuf; use std::time::Duration; +use tempfile::TempDir; #[test] fn sizes() { @@ -21,7 +23,7 @@ fn sizes() { assert_eq!(std::mem::size_of::(), 64); assert_eq!(std::mem::size_of::(), 64); - assert_eq!(std::mem::size_of::(), 64); + assert_eq!(std::mem::size_of::(), 80); assert_eq!(std::mem::size_of::(), 64); assert_eq!(std::mem::size_of::(), 24); @@ -468,3 +470,413 @@ fn saturating_sub() { .expect("failed to increment"); assert_eq!(item.value(), 0, "item is: {:?}", item); } + +// ----------- TESTS FOR RECOVERY ------------- +// Configuration Options: +// +// New cache, not file backed +// ---- Cache is created new in main memory. +// New cache, file backed +// ---- Cache is created new and is file backed. +// ---- In other words, file is used as an extension of DRAM. +// ---- Note: The cache cannot be gracefully shutdown by if it isn't file backed. +// ---- That is, if there is no path used to file back the cache, there is no +// ---- path to copy the cache data to on shutdown +// Not gracefully shutdown +// ---- Nothing is saved on shutdown. +// Gracefully shutdown +// ---- `Segments.data` is flushed if it is file backed +// ---- Rest of `Seg` instance saved on shutdown if the `metadata_path` is valid. +// ---- That is, all of `Seg.hashtable`, `Seg.ttl_buckets` and the relevant +// ---- `Seg.Segments` fields are saved +// Restored cache +// ---- `Segments.data` must be file backed +// ---- Rest of `Seg` copied back from the files they were saved to and +// ---- If any of the file paths are not valid, then the cache is created new + +// ------------- Set up / Helper Functions for below tests ------------ + +// path to tmp directory used for temp files +const TMP_DIR: &str = "target/debug/tmp"; + +const SEGMENTS: usize = 64; + +// Creates a temporary directory for temporary test files +fn tmp_dir() -> TempDir { + // Create parent directory for the temporary directory + std::fs::create_dir_all(TMP_DIR).expect("failed to create parent tmp directory"); + + // Create the temporary directory + TempDir::new_in(TMP_DIR).unwrap() +} + +// Returns a `Seg` instance. Cache is restored only if `restore` and +// `metadata_path` and `datapool_path` are not `None`. Otherwise, new `Seg` +// instance is returned. Cache is file backed if `datapool_path` is not `None`. +fn make_cache( + restore: bool, + datapool_path: Option, + metadata_path: Option, + graceful_shutdown: bool, +) -> Seg { + let segment_size = 4096; + let segments = SEGMENTS; + let heap_size = segments * segment_size as usize; + + Seg::builder() + .restore(restore) + .segment_size(segment_size as i32) + .heap_size(heap_size) + .datapool_path(datapool_path) // set path + .metadata_path(metadata_path) // set path + .graceful_shutdown(graceful_shutdown) + .build() +} + +// ------------------- Set Paths Correctly Tests -------------------------- + +// Check that a file backed, new cache is file backed and the `Seg` and thus the +// `Segments` fields', `HashTable` and `TTLBuckets` are new (and not restored) +#[test] +fn new_cache_file_backed() { + // Create parent directory for temporary test files + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + + // create new, file backed cache + let restore = false; + let graceful_shutdown = false; + let cache = make_cache(restore, datapool_path, None, graceful_shutdown); + + // the `Segments.data` should be filed backed + assert!(cache.segments.data_file_backed()); + // the `Seg` should not be restored + assert!(!cache.restored()); +} + +// Check that a new, not file backed cache is not file backed and the `Seg` is +// new (and not restored) +#[test] +fn new_cache_not_file_backed() { + // create new, not file backed cache + let restore = false; + let graceful_shutdown = false; + let cache = make_cache(restore, None, None, graceful_shutdown); + + // the `Segments.data` should not be filed backed + assert!(!cache.segments.data_file_backed()); + // the `Seg` should not be restored + assert!(!cache.restored()); +} + +// Edge Case: Check that an attempt to restore a cache without specifing any +// paths will lead to `Segments.data` not being file backed and +// no of the other structures being restored +#[test] +fn restored_cache_no_paths_set() { + let restore = true; + let graceful_shutdown = false; + let cache = make_cache(restore, None, None, graceful_shutdown); + + // the `Segments.data` should not be filed backed + assert!(!cache.segments.data_file_backed()); + // the `Seg` should not be restored + assert!(!cache.restored()); +} + +// Check that if paths are specified, then the cache is gracefully +// shutdown +#[test] +fn cache_gracefully_shutdown() { + // Create a temporary directory + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + // Create tempfile for `HashTable` + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // create new, file backed cache + let restore = false; + let graceful_shutdown = true; + let cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + // Flush cache + assert!(cache.flush().is_ok()); +} + +// Check that if paths are not specified, then the cache is not gracefully +// shutdown +#[test] +fn cache_not_gracefully_shutdown() { + // Create a temporary directory + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + + // create new, file backed cache + let restore = false; + let graceful_shutdown = true; + let cache = make_cache( + restore, + datapool_path, + None, // Don't set a `HashTable` path + graceful_shutdown, + ); + + // Flushing cache should fail + assert!(cache.flush().is_err()); +} + +// --------------------- Data copied back Tests---------------------------- + +// Creates a new cache, stores an item, gracefully shutsdown cache and restore +// cache. Check item is still there and caches are equivalent +#[test] +fn new_file_backed_cache_changed_and_restored() { + // Create a temporary directory + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + // Create tempfile for `HashTable` + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // create new, file backed cache + let mut restore = false; + let mut graceful_shutdown = true; + let mut cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(!cache.restored()); + assert_eq!(cache.items(), 0); + assert_eq!(cache.segments.free(), SEGMENTS); + + // "latte" should not be in a new, empty cache + assert!(cache.get(b"latte").is_none()); + // insert "latte" into cache + assert!(cache + .insert(b"latte", b"", None, Duration::from_secs(5)) + .is_ok()); + // "latte" should now be in cache + assert!(cache.get(b"latte").is_some()); + + assert_eq!(cache.items(), 1); + assert_eq!(cache.segments.free(), SEGMENTS - 1); + + // Get a copy of the cache to be compared later + let old_cache = cache.clone(); + + // Flush cache + assert!(cache.flush().is_ok()); + + // Create same tempfiles (they have been moved since first created) + let datapool_path: Option = Some(dir.path().join("datapool")); + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // restore cache + // This cache is file backed by same file as the above cache + // saved `Segments.data` to and the `Seg` is restored + restore = true; + graceful_shutdown = false; + let mut new_cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(new_cache.restored()); + // "latte" should be in restored cache + assert!(new_cache.get(b"latte").is_some()); + assert_eq!(new_cache.items(), 1); + assert_eq!(new_cache.segments.free(), SEGMENTS - 1); + + // the restored cache should be equivalent to the old cache + assert!(new_cache == old_cache); +} + +// Creates a new cache, gracefully shutsdown cache and restore cache. Check +// caches are equivalent +#[test] +fn new_file_backed_cache_not_changed_and_restored() { + // Create a temporary directory + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + // Create tempfile for `HashTable` + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // create new, file backed cache + let mut restore = false; + let mut graceful_shutdown = true; + + let cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(!cache.restored()); + + // Get a copy of the cache to be compared later + let old_cache = cache.clone(); + + // Flush cache + assert!(cache.flush().is_ok()); + + // Create same tempfiles (they have been moved since first created) + let datapool_path: Option = Some(dir.path().join("datapool")); + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // restore cache + // This cache is file backed by same file as the above cache + // saved `Segments.data` to and the `Seg` is restored + restore = true; + graceful_shutdown = false; + let new_cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(new_cache.restored()); + + // the restored cache should be equivalent to the old cache + assert!(new_cache == old_cache); +} + +// Creates a new cache, stores an item, gracefully shutsdown cache and spawn new +// cache. Check item is not in new cache and caches are not equivalent +#[test] +fn new_cache_changed_and_not_restored() { + // Create a temporary directory + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + // Create tempfile for `HashTable` + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // create new, file backed cache + let mut restore = false; + let graceful_shutdown = true; + let mut cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(!cache.restored()); + assert_eq!(cache.items(), 0); + assert_eq!(cache.segments.free(), SEGMENTS); + + // "latte" should not be in a new, empty cache + assert!(cache.get(b"latte").is_none()); + // insert "latte" into cache + assert!(cache + .insert(b"latte", b"", None, Duration::from_secs(5)) + .is_ok()); + // "latte" should now be in cache + assert!(cache.get(b"latte").is_some()); + + assert_eq!(cache.items(), 1); + assert_eq!(cache.segments.free(), SEGMENTS - 1); + + // Get a copy of the cache to be compared later + let old_cache = cache.clone(); + + // Flush cache + assert!(cache.flush().is_ok()); + + // Create same tempfile (it has been moved since first created) + let datapool_path: Option = Some(dir.path().join("datapool")); + + // create new, file backed cache. + // This new cache is file backed by same file as the above cache + // saved `Segments.data` to but this cache is treated as new + restore = false; + let mut new_cache = make_cache(restore, datapool_path, None, graceful_shutdown); + + assert!(!new_cache.restored()); + assert_eq!(new_cache.items(), 0); + assert_eq!(new_cache.segments.free(), SEGMENTS); + + // "latte" should not be in new cache + assert!(new_cache.get(b"latte").is_none()); + + // the restored cache should not be equivalent to the old cache + assert!(new_cache != old_cache); +} + +// Create a new cache, fill it with items. Gracefully shutdown this cache. +// Restore cache and check that every key from the original cache exists in the +// restored cache. Check caches are equivalent +#[test] +fn full_cache_recovery_long() { + // Create a temporary directory + let dir = tmp_dir(); + // Create tempfile for datapool + let datapool_path: Option = Some(dir.path().join("datapool")); + // Create tempfile for `HashTable` + let metadata_path: Option = Some(dir.path().join("hashtable")); + + let ttl = Duration::ZERO; + let value_size = 512; + let key_size = 1; + let iters = 1_000_000; + + // create new, file backed cache + let mut restore = false; + let mut graceful_shutdown = true; + let mut cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(!cache.restored()); + assert_eq!(cache.items(), 0); + assert_eq!(cache.segments.free(), SEGMENTS); + + let mut rng = rand::rng(); + + let mut key = vec![0; key_size]; + let mut value = vec![0; value_size]; + + // record all of the unique keys + let mut unique_keys = HashSet::new(); + + // fill cache + for _ in 0..iters { + rng.fill_bytes(&mut key); + rng.fill_bytes(&mut value); + + let save_key = key.clone(); + unique_keys.insert(save_key); + + assert!(cache.insert(&key, &value, None, ttl).is_ok()); + } + + // record all active keys in cache + // (this could be less than # unique keys if eviction has occurred) + let mut unique_active_keys = Vec::new(); + for key in &unique_keys { + // if this key exists, save it! + if cache.get(&key).is_some() { + unique_active_keys.push(key); + } + } + + // check that the number of active items in the cache equals the number + // of active keys + assert_eq!(cache.items(), unique_active_keys.len()); + + // Get a copy of the cache to be compared later + let old_cache = cache.clone(); + + // Flush cache + assert!(cache.flush().is_ok()); + + // Create same tempfiles (they have been moved since first created) + let datapool_path: Option = Some(dir.path().join("datapool")); + let metadata_path: Option = Some(dir.path().join("hashtable")); + + // restore cache + // This new cache is file backed by same file as the above cache + // saved `Segments.data` to and the `Seg` is restored + restore = true; + graceful_shutdown = false; + let mut new_cache = make_cache(restore, datapool_path, metadata_path, graceful_shutdown); + + assert!(new_cache.restored()); + + // the restored cache should be equivalent to the old cache + assert!(new_cache == old_cache); + + // check that the number of active items in the restored cache + // equals the number of active keys in the original cache + assert_eq!(new_cache.items(), unique_active_keys.len()); + + // check that every active key from the original cache is in + // the restored cache + while let Some(key) = unique_active_keys.pop() { + assert!(new_cache.get(&key).is_some()); + } +} diff --git a/src/rust/storage/seg/src/ttl_buckets/ttl_bucket.rs b/src/rust/storage/seg/src/ttl_buckets/ttl_bucket.rs index aae90e230..3f8bae2b0 100644 --- a/src/rust/storage/seg/src/ttl_buckets/ttl_bucket.rs +++ b/src/rust/storage/seg/src/ttl_buckets/ttl_bucket.rs @@ -34,6 +34,8 @@ use core::num::NonZeroU32; /// in an ordered fashion. The first segment to expire will be the head of the /// segment chain. This allows us to efficiently scan across the [`TtlBuckets`] /// and expire segments in an eager fashion. +#[derive(Clone, Copy, Debug, PartialEq)] +#[repr(C)] pub struct TtlBucket { head: Option, tail: Option, diff --git a/src/rust/storage/seg/src/ttl_buckets/ttl_buckets.rs b/src/rust/storage/seg/src/ttl_buckets/ttl_buckets.rs index 590b63326..ca66b2459 100644 --- a/src/rust/storage/seg/src/ttl_buckets/ttl_buckets.rs +++ b/src/rust/storage/seg/src/ttl_buckets/ttl_buckets.rs @@ -42,16 +42,19 @@ const TTL_BOUNDARY_3: i32 = 1 << (TTL_BUCKET_INTERVAL_N_BIT_3 + N_BUCKET_PER_STE const MAX_N_TTL_BUCKET: usize = N_BUCKET_PER_STEP * 4; const MAX_TTL_BUCKET_IDX: usize = MAX_N_TTL_BUCKET - 1; - +#[derive(Clone)] pub struct TtlBuckets { pub(crate) buckets: Box<[TtlBucket]>, pub(crate) last_expired: Instant, + /// Are `TtlBuckets` copied back from a file? + pub(crate) _buckets_copied_back: bool, } impl TtlBuckets { /// Create a new set of `TtlBuckets` which cover the full range of TTLs. See /// the module-level documentation for how the range of TTLs are stored. pub fn new() -> Self { + // TODO: add path as argument let intervals = [ TTL_BUCKET_INTERVAL_1, TTL_BUCKET_INTERVAL_2, @@ -76,10 +79,83 @@ impl TtlBuckets { Self { buckets, last_expired, + _buckets_copied_back: false, + } + } + + // Returns a restored `TtlBuckets` using recovery data (`metadata`) + pub fn restore(metadata: &[u8]) -> Self { + let bucket_size = ::std::mem::size_of::(); + let last_expired_size = ::std::mem::size_of::(); + let ttl_buckets_struct_size = MAX_N_TTL_BUCKET * bucket_size // `buckets` + + last_expired_size; + + // create blank bytes to copy data into + let mut bytes = vec![0; ttl_buckets_struct_size]; + // retrieve bytes from mmapped file + bytes.copy_from_slice(&metadata[0..ttl_buckets_struct_size]); + + let mut offset = 0; + // ----- Retrieve `last_expired` ----- + let mut end = last_expired_size; + let last_expired = + unsafe { *(bytes[offset..last_expired_size].as_mut_ptr() as *mut Instant) }; + + offset += last_expired_size; + // ----- Retrieve `buckets` ----- + + let mut buckets = Vec::with_capacity(0); + buckets.reserve_exact(MAX_N_TTL_BUCKET); + + // Get each `TtlBucket` from the raw bytes + for _ in 0..MAX_N_TTL_BUCKET { + end += bucket_size; + + // cast bytes to `TtlBucket` + let bucket = unsafe { *(bytes[offset..end].as_mut_ptr() as *mut TtlBucket) }; + buckets.push(bucket); + + offset += bucket_size; + } + + let buckets = buckets.into_boxed_slice(); + + Self { + buckets, + last_expired, + _buckets_copied_back: true, } } - /// Get the index of the `TtlBucket` for the given TTL. + /// Flushes the `TtlBuckets` by copying it to `metadata` + pub fn flush(&self, metadata: &mut [u8]) { + let bucket_size = ::std::mem::size_of::(); + let last_expired_size = ::std::mem::size_of::(); + + let mut offset = 0; + // --------------------- Store `last_expired` ----------------- + + // cast `last_expired` to byte pointer + let byte_ptr = (&self.last_expired as *const Instant) as *const u8; + + // store `last_expired` back to mmapped file + offset = + store::store_bytes_and_update_offset(byte_ptr, offset, last_expired_size, metadata); + + // --------------------- Store `buckets` ----------------- + + // for every `TtlBucket` + for id in 0..MAX_N_TTL_BUCKET { + // cast `TtlBucket` to byte pointer + let byte_ptr = (&self.buckets[id] as *const TtlBucket) as *const u8; + + // store `TtlBucket` back to mmapped file + offset = store::store_bytes_and_update_offset(byte_ptr, offset, bucket_size, metadata); + } + + // -------------------------------------------------- + } + pub(crate) fn get_bucket_index(&self, ttl: Duration) -> usize { let ttl = ttl.as_secs() as i32; if ttl <= 0 { @@ -142,6 +218,16 @@ impl TtlBuckets { CLEAR_TIME.add(duration.as_nanos() as _); cleared } + + /// TODO: this code is repeated in restore() and flush(), can it be reduced? + /// Function used by `Builder` to calculate the number of bytes of the `TtlBuckets` + /// that are stored/restored + pub fn recover_size(&self) -> usize { + let bucket_size = ::std::mem::size_of::(); + let last_expired_size = ::std::mem::size_of::(); + MAX_N_TTL_BUCKET * bucket_size // `buckets` + + last_expired_size + } } impl Default for TtlBuckets { @@ -149,3 +235,10 @@ impl Default for TtlBuckets { Self::new() } } + +impl PartialEq for TtlBuckets { + // Checks if `TtlBuckets` are equivalent + fn eq(&self, other: &Self) -> bool { + self.buckets == other.buckets && self.last_expired == other.last_expired + } +}