diff --git a/src/storage/seg/src/error.rs b/src/storage/seg/src/error.rs index 0c92e99c5..fc695cfdc 100644 --- a/src/storage/seg/src/error.rs +++ b/src/storage/seg/src/error.rs @@ -9,6 +9,8 @@ use thiserror::Error; #[derive(Error, Debug, PartialEq, Eq, Copy, Clone)] /// Possible errors returned by the top-level API pub enum SegError { + #[error("key size too large")] + KeySizeTooLargeEx, #[error("hashtable insert exception")] HashTableInsertEx, #[error("eviction exception")] diff --git a/src/storage/seg/src/hashtable/mod.rs b/src/storage/seg/src/hashtable/mod.rs index c2bd6b896..8d4d4a010 100644 --- a/src/storage/seg/src/hashtable/mod.rs +++ b/src/storage/seg/src/hashtable/mod.rs @@ -308,8 +308,10 @@ impl HashTable { *item_info = (*item_info & !FREQ_MASK) | freq; } + let age = segments.get_age(*item_info).unwrap(); let item = Item::new( current_item, + age, get_cas(self.data[(hash & self.mask) as usize].data[0]), ); item.check_magic(); @@ -322,6 +324,101 @@ impl HashTable { None } + /// Lookup an item by key and return it + pub fn get_age(&mut self, key: &[u8], segments: &mut Segments) -> Option { + let hash = self.hash(key); + let tag = tag_from_hash(hash); + + let iter = IterMut::new(self, hash); + + for item_info in iter { + if get_tag(*item_info) == tag { + let current_item = segments.get_item(*item_info).unwrap(); + if current_item.key() != key { + HASH_TAG_COLLISION.increment(); + } else { + return segments.get_age(*item_info); + } + } + } + + None + } + + /// Lookup an item by key and return it + /// compare to get, this is designed to support multiple readers and single writer. + /// because eviction always remove hashtable entry first, + /// so if an object is evicted, its hash table entry must have been removed, + /// as a result, we can verify hash table entry after reading/copying the value. + /// + /// Therefore, we can leverage opportunistic concurrency control to support + /// multiple readers and a single writer. + /// we check the hash table after a reader reads the data, + /// if the data is evicted, then its hash table entry must have been removed. + /// + pub fn get_with_item_info( + &mut self, + key: &[u8], + time: Instant, + segments: &mut Segments, + ) -> Option { + let hash = self.hash(key); + let tag = tag_from_hash(hash); + let bucket_id = hash & self.mask; + + let bucket_info = self.data[bucket_id as usize].data[0]; + + let curr_ts = (time - self.started).as_secs() & PROC_TS_MASK; + + if curr_ts != get_ts(bucket_info) as u32 { + self.data[bucket_id as usize].data[0] = (bucket_info & !TS_MASK) | (curr_ts as u64); + + let iter = IterMut::new(self, hash); + for item_info in iter { + *item_info &= CLEAR_FREQ_SMOOTH_MASK; + } + } + + let cas = get_cas(self.data[(hash & self.mask) as usize].data[0]); + let iter = IterMut::new(self, hash); + + for item_info in iter { + let item_info_val = *item_info; + if get_tag(item_info_val) == tag { + let current_item = segments.get_item(*item_info).unwrap(); + if current_item.key() != key { + HASH_TAG_COLLISION.increment(); + } else { + // update item frequency + let mut freq = get_freq(*item_info); + if freq < 127 { + let rand = thread_rng().gen::(); + if freq <= 16 || rand % freq == 0 { + freq = ((freq + 1) | 0x80) << FREQ_BIT_SHIFT; + } else { + freq = (freq | 0x80) << FREQ_BIT_SHIFT; + } + *item_info = (*item_info & !FREQ_MASK) | freq; + } + + let age = segments.get_age(item_info_val).unwrap(); + let item = RichItem::new( + current_item, + age, + cas, + item_info_val & !FREQ_MASK, + item_info + ); + item.check_magic(); + + return Some(item); + } + } + } + + None + } + /// Lookup an item by key and return it without incrementing the item /// frequency. This may be used to compose higher-level functions which do /// not want a successful item lookup to count as a hit for that item. @@ -338,8 +435,10 @@ impl HashTable { if current_item.key() != key { HASH_TAG_COLLISION.increment(); } else { + let age = segments.get_age(*item_info).unwrap(); let item = Item::new( current_item, + age, get_cas(self.data[(hash & self.mask) as usize].data[0]), ); item.check_magic(); diff --git a/src/storage/seg/src/item/mod.rs b/src/storage/seg/src/item/mod.rs index abf944421..e35a24a13 100644 --- a/src/storage/seg/src/item/mod.rs +++ b/src/storage/seg/src/item/mod.rs @@ -11,6 +11,7 @@ mod reserved; #[cfg(any(feature = "magic", feature = "debug"))] pub(crate) use header::ITEM_MAGIC_SIZE; +use crate::hashtable::FREQ_MASK; use crate::SegError; use crate::Value; @@ -21,13 +22,14 @@ pub(crate) use reserved::ReservedItem; /// Items are the base unit of data stored within the cache. pub struct Item { cas: u32, + age: u32, raw: RawItem, } impl Item { /// Creates a new `Item` from its parts - pub(crate) fn new(raw: RawItem, cas: u32) -> Self { - Item { cas, raw } + pub(crate) fn new(raw: RawItem, age: u32, cas: u32) -> Self { + Item { cas, age, raw } } /// If the `magic` or `debug` features are enabled, this allows for checking @@ -56,6 +58,10 @@ impl Item { self.cas } + pub fn age(&self) -> u32 { + self.age + } + /// Borrow the optional data pub fn optional(&self) -> Option<&[u8]> { self.raw.optional() @@ -83,6 +89,104 @@ impl std::fmt::Debug for Item { } } +/// Items are the base unit of data stored within the cache. +pub struct RichItem { + item: Item, + item_info: u64, + item_info_ptr: *const u64, +} + +impl RichItem { + /// Creates a new `Item` from its parts + pub(crate) fn new( + raw: RawItem, + age: u32, + cas: u32, + item_info: u64, + item_info_ptr: *const u64, + ) -> Self { + let item = Item::new(raw, age, cas); + RichItem { + item, + item_info, + item_info_ptr, + } + } + + /// If the `magic` or `debug` features are enabled, this allows for checking + /// that the magic bytes at the start of an item match the expected value. + /// + /// # Panics + /// + /// Panics if the magic bytes are incorrect, indicating that the data has + /// become corrupted or the item was loaded from the wrong offset. + pub(crate) fn check_magic(&self) { + self.item.raw.check_magic() + } + + /// Borrow the item key + pub fn key(&self) -> &[u8] { + self.item.raw.key() + } + + /// Borrow the item value + pub fn value(&self) -> Value { + self.item.raw.value() + } + + /// CAS value for the item + pub fn cas(&self) -> u32 { + self.item.cas + } + + pub fn age(&self) -> u32 { + self.item.age + } + + pub fn item(&self) -> &Item { + &self.item + } + + pub fn item_mut(&mut self) -> &mut Item { + &mut self.item + } + + // used to support multi readers and single writer + // return true, if the item is evicted/updated since being + // read from the hash table + pub fn is_not_changed(&self) -> bool { + unsafe { return self.item_info == *self.item_info_ptr & !FREQ_MASK } + } + + /// Borrow the optional data + pub fn optional(&self) -> Option<&[u8]> { + self.item.raw.optional() + } + + /// Perform a wrapping addition on the value. Returns an error if the item + /// is not a numeric type. + pub fn wrapping_add(&mut self, rhs: u64) -> Result<(), SegError> { + self.item.raw.wrapping_add(rhs) + } + + /// Perform a saturating subtraction on the value. Returns an error if the + /// item is not a numeric type. + pub fn saturating_sub(&mut self, rhs: u64) -> Result<(), SegError> { + self.item.raw.saturating_sub(rhs) + } +} + +impl std::fmt::Debug for RichItem { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { + f.debug_struct("Item") + .field("cas", &self.cas()) + .field("raw", &self.item.raw) + .field("item_info", &self.item_info) + .field("item_info_ptr", &self.item_info_ptr) + .finish() + } +} + pub fn size_of(value: &Value) -> usize { match value { Value::Bytes(v) => v.len(), diff --git a/src/storage/seg/src/lib.rs b/src/storage/seg/src/lib.rs index 34f64a300..c797cc042 100644 --- a/src/storage/seg/src/lib.rs +++ b/src/storage/seg/src/lib.rs @@ -58,7 +58,7 @@ pub use crate::seg::Seg; pub use builder::Builder; pub use error::SegError; pub use eviction::Policy; -pub use item::Item; +pub use item::{Item, RichItem}; // publicly exported items from external crates pub use storage_types::Value; diff --git a/src/storage/seg/src/seg.rs b/src/storage/seg/src/seg.rs index 4ec4f686b..b757a8d65 100644 --- a/src/storage/seg/src/seg.rs +++ b/src/storage/seg/src/seg.rs @@ -84,6 +84,50 @@ impl Seg { self.hashtable.get(key, self.time, &mut self.segments) } + /// Get the age of the item in the `Seg` with the provided key + /// + /// ``` + /// use seg::{Policy, Seg}; + /// use std::time::Duration; + /// + /// let mut cache = Seg::builder().build().expect("failed to create cache"); + /// assert!(cache.get(b"coffee").is_none()); + /// + /// cache.insert(b"coffee", b"strong", None, Duration::ZERO); + /// let age = cache.get_age(b"coffee").expect("didn't get item back"); + /// assert_eq!(age, 0); + /// ``` + pub fn get_age(&mut self, key: &[u8]) -> Option { + self.hashtable.get_age(key, &mut self.segments) + } + + /// Get the item in the `Seg` with the provided key + /// this differs from get by returning information about hash table entry + /// this allows opportunistic concurrency control and enables + /// multiple readers and a single writer + /// To use it, one simply checks the hash table entry does not change after + /// copying/using the item value, if it has changed, it means the item + /// is evicted or updated by another thread and we need to roll back + /// + /// ``` + /// use seg::{Policy, Seg}; + /// use std::time::Duration; + /// + /// let mut cache = Seg::builder().build().expect("failed to create cache"); + /// assert!(cache.get(b"coffee").is_none()); + /// + /// cache.insert(b"coffee", b"strong", None, Duration::ZERO); + /// let item = cache.get_with_item_info(b"coffee").expect("didn't get item back"); + /// assert_eq!(item.value(), b"strong"); + /// assert!(item.is_not_changed()); + /// cache.insert(b"coffee", b"notStrong", None, Duration::ZERO); + /// assert!(!item.is_not_changed()); + /// ``` + pub fn get_with_item_info(&mut self, key: &[u8]) -> Option { + self.hashtable + .get_with_item_info(key, self.time, &mut self.segments) + } + /// Get the item in the `Seg` with the provided key without /// increasing the item frequency - useful for combined operations that /// check for presence - eg replace is a get + set @@ -123,6 +167,10 @@ impl Seg { ) -> Result<(), SegError> { let value: Value = value.into(); + if key.len() > 255 { + return Err(SegError::KeySizeTooLargeEx); + } + // default optional data is empty let optional = optional.unwrap_or(&[]); @@ -133,6 +181,8 @@ impl Seg { // try to get a `ReservedItem` let mut retries = RESERVE_RETRIES; + let mut has_removed_expired = false; + let reserved; loop { match self @@ -149,6 +199,11 @@ impl Seg { return Err(SegError::ItemOversized { size }); } Err(TtlBucketsError::NoFreeSegments) => { + if !has_removed_expired { + self.expire(); + has_removed_expired = true; + continue; + } if self .segments .evict(&mut self.ttl_buckets, &mut self.hashtable) diff --git a/src/storage/seg/src/segments/header.rs b/src/storage/seg/src/segments/header.rs index 5bbed133a..bb5db7b3b 100644 --- a/src/storage/seg/src/segments/header.rs +++ b/src/storage/seg/src/segments/header.rs @@ -33,7 +33,7 @@ use crate::*; // the minimum age of a segment before it is eligible for eviction // TODO(bmartin): this should be parameterized. -const SEG_MATURE_TIME: Duration = Duration::from_secs(20); +const SEG_MATURE_TIME: Duration = Duration::from_secs(0); #[derive(Debug)] #[repr(C)] diff --git a/src/storage/seg/src/segments/segments.rs b/src/storage/seg/src/segments/segments.rs index cde24ce8a..2781a660b 100644 --- a/src/storage/seg/src/segments/segments.rs +++ b/src/storage/seg/src/segments/segments.rs @@ -147,6 +147,16 @@ impl Segments { self.get_item_at(seg_id, offset) } + pub(crate) fn get_age(&self, item_info: u64) -> Option { + let seg_id = get_seg_id(item_info).map(|v| v.get())?; + return Some( + self.headers[seg_id as usize - 1] + .create_at() + .elapsed() + .as_secs(), + ); + } + /// Retrieve a `RawItem` from a specific segment id at the given offset // TODO(bmartin): consider changing the return type here and removing asserts? pub(crate) fn get_item_at(