Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
7ca757c
recovery files added
cassyunknown Feb 12, 2022
7e32771
removed workflows
cassyunknown Feb 12, 2022
26b55b5
added back workflows
cassyunknown Feb 12, 2022
356e4fd
removed newline from .gitignore
cassyunknown Feb 14, 2022
0503216
added descriptions for restore() and graceful_shutdown()
cassyunknown Feb 14, 2022
5ea5667
added full tempfile version
cassyunknown Feb 15, 2022
4ccf01f
changed derive traits to be in order
cassyunknown Feb 15, 2022
592a823
new function that generates hash_builder, to remove repeated code
cassyunknown Feb 15, 2022
67d5aa5
removed ccc comments
cassyunknown Feb 15, 2022
9caeec5
changed will_resore to restore. Moved hash_builder() so it is in scope
cassyunknown Feb 15, 2022
b4bed3a
changed will_resore to restore
cassyunknown Feb 15, 2022
d2966d2
took out of a Boxed struct so I can use helper function store() to c…
cassyunknown Feb 15, 2022
deaaa91
created store() and now will integreate into all demolish() functions
cassyunknown Feb 15, 2022
677200b
all of HashTable::demolish() uses store(). Will make similar changes …
cassyunknown Feb 15, 2022
3603c1b
updated store() so it also returns new offset
cassyunknown Feb 15, 2022
09a1f65
updated store() so it also returns new offset
cassyunknown Feb 15, 2022
21ba045
added helper function total_buckets() to reduce repeated code
cassyunknown Feb 15, 2022
7dc7d73
integrated store_bytes_and_update_offset() into Segments
cassyunknown Feb 15, 2022
45aa5b1
reduced number of variables from Segments::restore()
cassyunknown Feb 15, 2022
7c15eb8
added store_bytes_and_update_offset() usage to TtlBuckets::demolish()…
cassyunknown Feb 15, 2022
5d0e633
moved store_bytes_and_update_offset() to store.rs so there is only 1 …
cassyunknown Feb 15, 2022
aeb62de
changed back to only having from_builder() to build . This function d…
cassyunknown Feb 15, 2022
49c06e3
fixed ordering of derive traits for TtlBucket
cassyunknown Feb 15, 2022
1606340
removed unnecessary comments
cassyunknown Feb 15, 2022
1caaf22
removed newline
cassyunknown Feb 15, 2022
83e428a
removed newline
cassyunknown Feb 15, 2022
2df8f22
added TODOs to implement Drop trait for TtlBuckets
cassyunknown Feb 16, 2022
7be8f07
Added Brian's File::create() changes
cassyunknown Feb 16, 2022
19385a4
implemented PartialEq for HashTable
cassyunknown Feb 16, 2022
f7e9468
implemented PartialEq trait for TtlBuckets, Segments and Seg
cassyunknown Feb 16, 2022
710674d
removed _restored field from Seg and replaced it with a restored() fu…
cassyunknown Feb 16, 2022
a73aa63
Clone trait now implemented for Segments and Seg can now derive Clone…
cassyunknown Feb 16, 2022
7334b3f
changed implementation of PartialEq to be less awkward for Segments, …
cassyunknown Feb 17, 2022
9e9dd5c
ran cargo fmt
cassyunknown Feb 17, 2022
de08656
update File::create() documentation
cassyunknown Feb 17, 2022
6d77a73
update File::create() documentation
cassyunknown Feb 17, 2022
d7692ef
removed conditional derivations in Seg, Segments, HashTable and TtlBu…
cassyunknown Feb 17, 2022
8d72b48
cannot reproduce failing of test new_file_backed_cache_changed_and_re…
cassyunknown Feb 17, 2022
5b6dd2f
implemented From<Box<[u8]>> trait for Memory
cassyunknown Feb 17, 2022
f1ed123
Merge branch 'master' into recovery
cassyunknown Feb 17, 2022
bdd17b6
moved merged non-recovery tests to above recovery section
cassyunknown Feb 17, 2022
f47db51
fixed mismatched bracket introduced by merge
cassyunknown Feb 17, 2022
739365e
fixed mismatched bracket introduced by merge
cassyunknown Feb 17, 2022
186b9b4
added a Seg.flush() functio
cassyunknown Feb 17, 2022
ed43a6d
completed flush() for Segments
cassyunknown Feb 17, 2022
917855e
added flush() to TtlBuckets and HashTable. Next step: changes tests.r…
cassyunknown Feb 17, 2022
995544a
replaced demolish() with flush() in tests.rs. Deleted all traces of D…
cassyunknown Feb 17, 2022
43eef14
replaced demolish() with flush() in tests.rs. Deleted all traces of D…
cassyunknown Feb 17, 2022
d62bfb0
ran cargo fmt
cassyunknown Feb 17, 2022
92e1f24
ran cargo fmt
cassyunknown Feb 17, 2022
dc85b30
deleted tests that are attempting recovery from files that don't exis…
cassyunknown Feb 24, 2022
cefb4b9
uncommented File::create() check of expected size of file as there is…
cassyunknown Feb 24, 2022
6197220
Changed code so that HashTable and TtlBuckets share same file. Now wi…
cassyunknown Feb 24, 2022
154549b
refactored code to remove now unnecessary fields from HashTable and T…
cassyunknown Feb 24, 2022
eb671d0
neatened up comments for HashTable and TtlBuckets
cassyunknown Feb 24, 2022
62c953e
all of Segments, HashTable and ttlBuckets restored from same file
cassyunknown Feb 24, 2022
8816e11
ran cargo fmt
cassyunknown Feb 24, 2022
396e704
added a quit() function to be called when a Quit request is received
cassyunknown Mar 3, 2022
ea1e114
removed quit() as this is not what we want (termination of connection…
cassyunknown Mar 3, 2022
85531a3
added stop() function to be called when he request is Stop
cassyunknown Mar 3, 2022
2396be8
added stop() function to be called when he request is Stop
cassyunknown Mar 3, 2022
c641d8f
fixed seg tests so graceful shutdown was part of configuration
cassyunknown Mar 3, 2022
b1ed33c
added test config file
cassyunknown Mar 3, 2022
466a4c1
ran cargo fmt
cassyunknown Mar 3, 2022
7d7e952
fixed bug in config file
cassyunknown Mar 3, 2022
e8c0b6e
Added the Stop command wherever FlushAll command was implemented
cassyunknown Mar 7, 2022
85ab29a
stop signal sent to admin stops cache from processing events and shou…
cassyunknown Mar 8, 2022
7dc5c48
listener, admin and multi now return upon Stop
cassyunknown Mar 9, 2022
9d46425
attempted to add admin's own QueuePair to its signal_queue so it can …
cassyunknown Mar 9, 2022
2318a47
changed back to id=0 when admin receiving signals
cassyunknown Mar 9, 2022
be9a15a
undid Stop responsse through non-admin port
cassyunknown Mar 15, 2022
ae713b7
removed non-admin parsing of Stop command
cassyunknown Mar 15, 2022
553b333
updated config
cassyunknown Mar 16, 2022
befab11
ran cargo fmt
cassyunknown Mar 16, 2022
19e0147
removed unnecessary adding to admin queue
cassyunknown Mar 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 94 additions & 0 deletions config/segcache_test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
daemonize = false

[admin]
# interfaces listening on
host = "0.0.0.0"
# port listening on
port = "9999"

[server]
# interfaces listening on
host = "0.0.0.0"
# port listening on
port = "12321"
# epoll timeout in milliseconds
timeout = 100
# epoll max events returned
nevent = 1024

[worker]
# epoll timeout in milliseconds
timeout = 100
# epoll max events returned
nevent = 1024
# number of worker threads
threads = 1

# storage configuration
[seg]
# hash power adjusts how many items can be held in the hashtable
hash_power = 3
# total bytes to use for item storage - 32 segments * 1024 segment size
heap_size = 32768
# size of each segment in bytes
segment_size = 1024
# number of segments for a non-evict compaction
compact_target = 2
# number of segments to merge in one merge eviction pass
merge_target = 4
# max number of segments to merge in one pass
merge_max = 8
# use merge based eviction
eviction = "Merge"
# optionally, set a file path to back the data datapool
datapool_path = "/home/users/u6632448/file_for_data"
#datapool_path = "/mnt/pmem1.0/cassy/data"
# set a file path to back the metadata datapool
metadata_path = "/home/users/u6632448/file_for_metadata"
#metadata_path = "/mnt/pmem1.0/cassy/metadata"
# state whether cache will be restored
restore = false
# state whether cache will be flushed upon shutdown
graceful_shutdown = true

[time]
time_type = "Memcache"

[buf]

[debug]
# choose from: error, warn, info, debug, trace
log_level = "warn"
# optionally, log to the file below instead of standard out
# log_file = "segcache.log"
# backup file name for use with log rotation
log_backup = "segcache.log.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
log_max_size = 1073741824

[klog]
# optionally, log commands to the file below
# file = "segcache.cmd"
# backup file name for use with log rotation
backup = "segcache.cmd.old"
# trigger log rotation when the file grows beyond this size (in bytes). Set this
# option to '0' to disable log rotation.
max_size = 1073741824
# specify the sampling ratio, 1 in N commands will be logged. Setting to '0'
# will disable command logging.
sample = 100

[sockio]

[tcp]

[tls]
# certificate chain used to validate client certificate
# certificate_chain = "client.chain"
# server certificate
# certificate = "server.crt"
# server private key
# private_key = "server.key"
# ca certificate file used as the root of trust
# ca_file = "ca.crt"
1 change: 1 addition & 0 deletions src/rust/common/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@
#[derive(Clone)]
pub enum Signal {
Shutdown,
Stop,
}
49 changes: 48 additions & 1 deletion src/rust/config/src/seg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use serde::{Deserialize, Serialize};

const MB: usize = 1024 * 1024;

// restore and graceful shutdown options
const RESTORE: bool = false;
const GRACEFUL_SHUTDOWN: bool = false;

// defaults for hashtable
const HASH_POWER: u8 = 16;
const OVERFLOW_FACTOR: f64 = 1.0;
Expand All @@ -24,9 +28,12 @@ const COMPACT_TARGET: usize = 2;
const MERGE_TARGET: usize = 4;
const MERGE_MAX: usize = 8;

// datapool
// datapool (`Segments.data`)
const DATAPOOL_PATH: Option<&str> = None;

// hashtable
const HASHTABLE_PATH: Option<&str> = None;

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub enum Eviction {
None,
Expand All @@ -39,6 +46,14 @@ pub enum Eviction {
}

// helper functions for default values
fn restore() -> bool {
RESTORE
}

fn graceful_shutdown() -> bool {
GRACEFUL_SHUTDOWN
}

fn hash_power() -> u8 {
HASH_POWER
}
Expand Down Expand Up @@ -75,9 +90,17 @@ fn datapool_path() -> Option<String> {
DATAPOOL_PATH.map(|v| v.to_string())
}

fn metadata_path() -> Option<String> {
HASHTABLE_PATH.map(|v| v.to_string())
}

// definitions
#[derive(Serialize, Deserialize, Debug)]
pub struct Seg {
#[serde(default = "restore")]
restore: bool,
#[serde(default = "graceful_shutdown")]
graceful_shutdown: bool,
#[serde(default = "hash_power")]
hash_power: u8,
#[serde(default = "overflow_factor")]
Expand All @@ -96,11 +119,15 @@ pub struct Seg {
compact_target: usize,
#[serde(default = "datapool_path")]
datapool_path: Option<String>,
#[serde(default = "metadata_path")]
metadata_path: Option<String>,
}

impl Default for Seg {
fn default() -> Self {
Self {
restore: restore(),
graceful_shutdown: graceful_shutdown(),
hash_power: hash_power(),
overflow_factor: overflow_factor(),
heap_size: heap_size(),
Expand All @@ -110,12 +137,28 @@ impl Default for Seg {
merge_max: merge_max(),
compact_target: compact_target(),
datapool_path: datapool_path(),
metadata_path: metadata_path(),
}
}
}

// implementation
impl Seg {
// Determines if the `Seg` will be restored. The restoration will be
// successful if `datapool_path` and `metadata_path` are valid paths.
// Otherwise, the `Seg` will be created as
//new.
pub fn restore(&self) -> bool {
Comment thread
cassyunknown marked this conversation as resolved.
self.restore
}

// Determines if the `Seg` will be gracefully shutdown. The graceful
// shutdown will be successful if the cache is file backed and
// metadata_path` is a valid path to save the relevant `Seg` fields to.
// Otherwise, the relevant `Seg` fields will not be saved.
pub fn graceful_shutdown(&self) -> bool {
Comment thread
cassyunknown marked this conversation as resolved.
self.graceful_shutdown
}
pub fn hash_power(&self) -> u8 {
self.hash_power
}
Expand Down Expand Up @@ -151,6 +194,10 @@ impl Seg {
pub fn datapool_path(&self) -> Option<PathBuf> {
self.datapool_path.as_ref().map(|v| Path::new(v).to_owned())
}

pub fn metadata_path(&self) -> Option<PathBuf> {
self.metadata_path.as_ref().map(|v| Path::new(v).to_owned())
}
}

// trait definitions
Expand Down
5 changes: 5 additions & 0 deletions src/rust/core/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ pub const DEFAULT_BUFFER_SIZE: usize = 16 * 1024; // 16KB
// specific upper bounds.
const ADMIN_MAX_BUFFER_SIZE: usize = 2 * 1024 * 1024; // 1MB

// TODO(bmartin): this *should* be plenty safe, the queue should rarely ever be
// full, and a single wakeup should drain at least one message and make room for
// the response. A stat to prove that this is sufficient would be good.
const QUEUE_RETRIES: usize = 3;

const THREAD_PREFIX: &str = "pelikan";

metrics::test_no_duplicates!();
27 changes: 26 additions & 1 deletion src/rust/core/server/src/threads/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

use super::EventLoop;
use crate::poll::{Poll, LISTENER_TOKEN, WAKER_TOKEN};
use crate::QUEUE_RETRIES;
use crate::TCP_ACCEPT_EX;
use common::signal::Signal;
use common::ssl::{HandshakeError, MidHandshakeSslStream, Ssl, SslContext, SslStream};
Expand Down Expand Up @@ -324,9 +325,9 @@ impl Admin {
self.do_accept();
}
WAKER_TOKEN => {
#[allow(clippy::never_loop)]
// check if we have received signals from any sibling
// thread
#[allow(clippy::never_loop)]
while let Ok(signal) = self.signal_queue.recv_from(0) {
match signal {
Signal::Shutdown => {
Expand All @@ -341,6 +342,11 @@ impl Admin {
let _ = self.log_drain.flush();
return;
}
Signal::Stop => {
warn!("received stop");
let _ = self.log_drain.flush();
return;
}
}
}
}
Expand Down Expand Up @@ -434,6 +440,25 @@ impl EventLoop for Admin {

match request {
AdminRequest::FlushAll => {}
AdminRequest::Stop => {

for _ in 0..QUEUE_RETRIES {
// Send Stop to all other threads
if self.signal_queue.broadcast(Signal::Stop).is_ok() {
warn!("sending stop signal to all threads");
break;
}
}
for _ in 0..QUEUE_RETRIES {
if self.signal_queue.wake_all().is_ok() {
break;
}
}

let _ = session.write(b"OK\r\n");
session.finalize_response();
ADMIN_RESPONSE_COMPOSE.increment();
}
AdminRequest::Stats => {
Self::handle_stats_request(session);
}
Expand Down
4 changes: 4 additions & 0 deletions src/rust/core/server/src/threads/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ impl Listener {
Signal::Shutdown => {
return;
}
Signal::Stop => {
warn!("received stop");
return;
}
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/rust/core/server/src/threads/worker/multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use super::*;
use crate::poll::Poll;
use crate::threads::worker::StorageWorker;
use crate::threads::worker::TokenWrapper;
use crate::QUEUE_RETRIES;
use common::signal::Signal;
use common::time::Instant;
use config::WorkerConfig;
Expand All @@ -26,11 +27,6 @@ use session::Session;
use std::io::{BufRead, Write};
use std::sync::Arc;

// TODO(bmartin): this *should* be plenty safe, the queue should rarely ever be
// full, and a single wakeup should drain at least one message and make room for
// the request. A stat to prove that this is sufficient would be good.
const QUEUE_RETRIES: usize = 3;

const WAKER_TOKEN: usize = usize::MAX;

/// A `MultiWorker` handles events on `Session`s and routes storage requests to
Expand Down Expand Up @@ -110,15 +106,19 @@ where
self.handle_new_sessions();
self.handle_storage_queue();

#[allow(clippy::never_loop)]
// check if we received any signals from the admin thread
#[allow(clippy::never_loop)]
while let Ok(signal) = self.signal_queue.recv_from(0) {
match signal {
Signal::Shutdown => {
// if we received a shutdown, we can return
// and stop processing events
return;
}
Signal::Stop => {
warn!("received stop");
return;
}
}
}
}
Expand Down
Loading