Skip to content
Merged
2 changes: 1 addition & 1 deletion .planning
130 changes: 128 additions & 2 deletions src/admin/metrics_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ static CONNECTED_CLIENTS: AtomicU64 = AtomicU64::new(0);
static WAL_AGGRESSIVE_RECYCLE_SEGMENTS_TOTAL: AtomicU64 = AtomicU64::new(0);
static WAL_AGGRESSIVE_RECYCLE_BYTES_TOTAL: AtomicU64 = AtomicU64::new(0);

// ── T1.4: Cross-shard dispatch counters (read by INFO stats) ────────────
// The Prometheus `counter!()` facade is not directly readable from the INFO
// path — it only surfaces via /metrics. These dedicated AtomicU64 counters
// are incremented in parallel inside the existing record_dispatch_* helpers
// so INFO always returns meaningful stats even with admin_port=0.
// Relaxed ordering is correct: these are monotonic event counters for
// observability, not synchronisation primitives.
//
// Note: a separate write-SPSC counter does not exist in the codebase —
// the existing `record_dispatch_cross_spsc` covers both reads routed via
// SPSC (when fast-path is off) and writes. INFO exposes the unified total
// as `total_dispatch_cross_spsc`.
static DISPATCH_CROSS_READ_FASTPATH_TOTAL: AtomicU64 = AtomicU64::new(0);
static DISPATCH_CROSS_READ_SPSC_TOTAL: AtomicU64 = AtomicU64::new(0);

/// Initialize the Prometheus metrics exporter and admin HTTP server.
///
/// Must be called once before any metrics recording. Spawns a custom admin
Expand Down Expand Up @@ -707,6 +722,8 @@ pub fn record_dispatch_local_batch(count: u64) {
/// (RwLock read on the target shard's database, no SPSC message).
#[inline]
pub fn record_dispatch_cross_read_fastpath() {
// Always increment the INFO-visible atomic (works even with admin_port=0).
DISPATCH_CROSS_READ_FASTPATH_TOTAL.fetch_add(1, Ordering::Relaxed);
if !METRICS_INITIALIZED.load(Ordering::Relaxed) {
return;
}
Expand All @@ -716,7 +733,12 @@ pub fn record_dispatch_cross_read_fastpath() {
/// Batched variant of `record_dispatch_cross_read_fastpath`.
#[inline]
pub fn record_dispatch_cross_read_fastpath_batch(count: u64) {
if count == 0 || !METRICS_INITIALIZED.load(Ordering::Relaxed) {
if count == 0 {
return;
}
// Always increment the INFO-visible atomic (works even with admin_port=0).
DISPATCH_CROSS_READ_FASTPATH_TOTAL.fetch_add(count, Ordering::Relaxed);
if !METRICS_INITIALIZED.load(Ordering::Relaxed) {
return;
}
counter!("moon_dispatch_path_total", "path" => "cross_read_fast").increment(count);
Expand Down Expand Up @@ -771,8 +793,13 @@ pub fn record_dispatch_cross_read_fastpath_timed(target_shard: usize, lock_acqui
/// Command deferred to cross-shard SPSC dispatch (the slow path).
/// Recorded when a command is enqueued into a `remote_groups` bucket that
/// will be flushed as a `PipelineBatchSlotted` message.
///
/// Note: covers both read and write commands routed via SPSC (no split
/// counter exists — all non-fast-path cross-shard traffic goes here).
#[inline]
pub fn record_dispatch_cross_spsc() {
// Always increment the INFO-visible atomic (works even with admin_port=0).
DISPATCH_CROSS_READ_SPSC_TOTAL.fetch_add(1, Ordering::Relaxed);
if !METRICS_INITIALIZED.load(Ordering::Relaxed) {
return;
}
Expand All @@ -782,7 +809,12 @@ pub fn record_dispatch_cross_spsc() {
/// Batched variant of `record_dispatch_cross_spsc`.
#[inline]
pub fn record_dispatch_cross_spsc_batch(count: u64) {
if count == 0 || !METRICS_INITIALIZED.load(Ordering::Relaxed) {
if count == 0 {
return;
}
// Always increment the INFO-visible atomic (works even with admin_port=0).
DISPATCH_CROSS_READ_SPSC_TOTAL.fetch_add(count, Ordering::Relaxed);
if !METRICS_INITIALIZED.load(Ordering::Relaxed) {
return;
}
counter!("moon_dispatch_path_total", "path" => "cross_spsc").increment(count);
Expand Down Expand Up @@ -1057,6 +1089,22 @@ pub fn total_connections_received() -> u64 {
TOTAL_CONNECTIONS.load(Ordering::Relaxed)
}

/// Total cross-shard reads served via the shared-read fast path
/// (RwLock read on the target shard's database, no SPSC message).
/// Always accurate — does not require Prometheus to be initialised.
#[inline]
pub fn total_dispatch_cross_read_fastpath() -> u64 {
DISPATCH_CROSS_READ_FASTPATH_TOTAL.load(Ordering::Relaxed)
}

/// Total cross-shard commands dispatched via the SPSC slow path.
/// Covers both read and write commands that bypass the fast path.
/// Always accurate — does not require Prometheus to be initialised.
#[inline]
pub fn total_dispatch_cross_spsc() -> u64 {
DISPATCH_CROSS_READ_SPSC_TOTAL.load(Ordering::Relaxed)
}

/// Read process CPU usage via `getrusage(RUSAGE_SELF)`.
///
/// Returns `(used_cpu_sys, used_cpu_user)` in seconds (f64).
Expand Down Expand Up @@ -1386,4 +1434,82 @@ mod tests {
record_command_error_cached("set", &mut cache);
// Must not panic, must not churn the cache on the hot path.
}

// ── T1.4: cross-shard dispatch atomics ───────────────────────────────
// These tests share process-wide static AtomicU64 counters. Because
// the test runner is multi-threaded, other tests may increment the same
// static concurrently. We therefore only assert on monotone lower
// bounds (after >= before + N) for positive-increment tests, which is
// still sufficient to prove the counter was incremented.
// The zero-is-noop tests read the counter twice with no intervening
// increment; they assert `after >= before` (monotone), which is the
// strongest correct claim under parallel execution.

#[test]
fn cross_read_fastpath_atomic_increments() {
let before = total_dispatch_cross_read_fastpath();
record_dispatch_cross_read_fastpath();
let after = total_dispatch_cross_read_fastpath();
assert!(
after >= before + 1,
"counter must have increased by at least 1; before={before} after={after}"
);
}

#[test]
fn cross_read_fastpath_batch_atomic_increments() {
let before = total_dispatch_cross_read_fastpath();
record_dispatch_cross_read_fastpath_batch(7);
let after = total_dispatch_cross_read_fastpath();
assert!(
after >= before + 7,
"counter must have increased by at least 7; before={before} after={after}"
);
}

#[test]
fn cross_read_fastpath_batch_zero_is_noop() {
// batch(0) must not call fetch_add — the counter must not move
// backward; it may move forward due to concurrent tests.
let before = total_dispatch_cross_read_fastpath();
record_dispatch_cross_read_fastpath_batch(0);
let after = total_dispatch_cross_read_fastpath();
assert!(
after >= before,
"counter must be monotone; before={before} after={after}"
);
Comment on lines +1471 to +1480
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Zero-count batch tests are currently non-detecting for regressions.

Both tests only assert monotonicity, so they still pass if *_batch(0) accidentally increments. Please serialize the dispatch-counter tests and assert equality for zero-count cases.

Suggested hardening
+    static DISPATCH_COUNTER_TEST_GUARD: once_cell::sync::Lazy<parking_lot::Mutex<()>> =
+        once_cell::sync::Lazy::new(|| parking_lot::Mutex::new(()));

     #[test]
     fn cross_read_fastpath_batch_zero_is_noop() {
+        let _guard = DISPATCH_COUNTER_TEST_GUARD.lock();
         let before = total_dispatch_cross_read_fastpath();
         record_dispatch_cross_read_fastpath_batch(0);
         let after = total_dispatch_cross_read_fastpath();
-        assert!(
-            after >= before,
-            "counter must be monotone; before={before} after={after}"
-        );
+        assert_eq!(after, before, "batch(0) must not increment");
     }

     #[test]
     fn cross_spsc_batch_zero_is_noop() {
+        let _guard = DISPATCH_COUNTER_TEST_GUARD.lock();
         let before = total_dispatch_cross_spsc();
         record_dispatch_cross_spsc_batch(0);
         let after = total_dispatch_cross_spsc();
-        assert!(
-            after >= before,
-            "counter must be monotone; before={before} after={after}"
-        );
+        assert_eq!(after, before, "batch(0) must not increment");
     }

Also applies to: 1506-1513

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/admin/metrics_setup.rs` around lines 1471 - 1480, The zero-count batch
test cross_read_fastpath_batch_zero_is_noop currently only checks monotonicity
and can miss regressions where record_dispatch_cross_read_fastpath_batch(0)
increments the counter; change the test to serialize access (avoid concurrent
interference) by running it single-threaded and assert equality instead of >=:
capture before = total_dispatch_cross_read_fastpath(), call
record_dispatch_cross_read_fastpath_batch(0), then assert after == before; apply
the same change to the other zero-count dispatch tests (the equivalent test
around lines 1506-1513) using their corresponding helper functions (e.g.,
total_dispatch_... and record_dispatch_..._batch) so zero-count calls are
verified to be no-ops.

}

#[test]
fn cross_spsc_atomic_increments() {
let before = total_dispatch_cross_spsc();
record_dispatch_cross_spsc();
let after = total_dispatch_cross_spsc();
assert!(
after >= before + 1,
"counter must have increased by at least 1; before={before} after={after}"
);
}

#[test]
fn cross_spsc_batch_atomic_increments() {
let before = total_dispatch_cross_spsc();
record_dispatch_cross_spsc_batch(3);
let after = total_dispatch_cross_spsc();
assert!(
after >= before + 3,
"counter must have increased by at least 3; before={before} after={after}"
);
}

#[test]
fn cross_spsc_batch_zero_is_noop() {
let before = total_dispatch_cross_spsc();
record_dispatch_cross_spsc_batch(0);
let after = total_dispatch_cross_spsc();
assert!(
after >= before,
"counter must be monotone; before={before} after={after}"
);
}
}
6 changes: 5 additions & 1 deletion src/command/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,13 @@ pub fn info(db: &Database, _args: &[Frame]) -> Frame {
let _ = write!(
sections,
"total_commands_processed:{}\r\n\
total_connections_received:{}\r\n",
total_connections_received:{}\r\n\
total_dispatch_cross_read_fastpath:{}\r\n\
total_dispatch_cross_spsc:{}\r\n",
crate::admin::metrics_setup::total_commands_processed(),
crate::admin::metrics_setup::total_connections_received(),
crate::admin::metrics_setup::total_dispatch_cross_read_fastpath(),
crate::admin::metrics_setup::total_dispatch_cross_spsc(),
);
sections.push_str("\r\n");

Expand Down
21 changes: 17 additions & 4 deletions src/command/key_extra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ pub fn copy(db: &mut Database, args: &[Frame]) -> Frame {
if arg.eq_ignore_ascii_case(b"REPLACE") {
replace = true;
} else if arg.eq_ignore_ascii_case(b"DB") {
// Cross-DB copy requires shard_databases context not available here
return Frame::Error(Bytes::from_static(
b"ERR COPY with DB option is not supported yet",
b"ERR COPY DB option not implemented (tracked in T2.3)",
));
} else {
return Frame::Error(Bytes::from_static(b"ERR syntax error"));
Expand Down Expand Up @@ -460,10 +459,24 @@ mod tests {
}

#[test]
fn test_copy_db_option_errors() {
fn test_copy_db_option_not_implemented() {
// T1.3: COPY ... DB n returns the "not implemented (tracked in T2.3)" error.
let mut db = setup_db_with_key(b"src", b"hello");
let result = copy(&mut db, &[bs(b"src"), bs(b"dst"), bs(b"DB")]);
assert!(matches!(result, Frame::Error(_)));
match result {
Frame::Error(msg) => {
let text = std::str::from_utf8(&msg).unwrap();
assert!(
text.contains("T2.3"),
"expected T2.3 tracking ref in error, got: {text}"
);
assert!(
text.contains("not implemented"),
"expected 'not implemented' in error, got: {text}"
);
}
other => panic!("expected Error frame, got {other:?}"),
}
}

// --- SORT tests ---
Expand Down
2 changes: 2 additions & 0 deletions src/command/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ pub static COMMAND_META: phf::Map<&'static str, CommandMeta> = phf_map! {
"FLUSHDB" => CommandMeta { name: "FLUSHDB", arity: -1, flags: W, first_key: 0, last_key: 0, step: 0, acl_categories: DNG },
"FLUSHALL" => CommandMeta { name: "FLUSHALL", arity: -1, flags: W, first_key: 0, last_key: 0, step: 0, acl_categories: DNG },
"SWAPDB" => CommandMeta { name: "SWAPDB", arity: 3, flags: W, first_key: 0, last_key: 0, step: 0, acl_categories: DNG },
// MOVE is not yet implemented; stub returns ERR until T2.2 lands.
"MOVE" => CommandMeta { name: "MOVE", arity: 3, flags: W, first_key: 1, last_key: 1, step: 1, acl_categories: GEN },
"SHUTDOWN" => CommandMeta { name: "SHUTDOWN", arity: -1, flags: A, first_key: 0, last_key: 0, step: 0, acl_categories: DNG },
"TIME" => CommandMeta { name: "TIME", arity: 1, flags: RF, first_key: 0, last_key: 0, step: 0, acl_categories: SRV },
"LOLWUT" => CommandMeta { name: "LOLWUT", arity: -1, flags: R, first_key: 0, last_key: 0, step: 0, acl_categories: SRV },
Expand Down
52 changes: 49 additions & 3 deletions src/command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,18 @@ fn dispatch_inner(
}
}
(4, b'm') => {
// MGET MSET
// MGET MSET MOVE
if cmd.eq_ignore_ascii_case(b"MGET") {
return resp(string::mget(db, args));
}
if cmd.eq_ignore_ascii_case(b"MSET") {
return resp(string::mset(db, args));
}
if cmd.eq_ignore_ascii_case(b"MOVE") {
return resp(Frame::Error(Bytes::from_static(
b"ERR MOVE not implemented (tracked in T2.2)",
)));
}
}
(4, b'p') => {
// PING PTTL
Expand Down Expand Up @@ -536,9 +541,8 @@ fn dispatch_inner(
}
b'w' => {
if cmd.eq_ignore_ascii_case(b"SWAPDB") {
// SWAPDB requires cross-database access not available in dispatch
return resp(Frame::Error(Bytes::from_static(
b"ERR SWAPDB is not supported in sharded mode",
b"ERR SWAPDB not implemented (tracked in T2.1)",
)));
}
}
Expand Down Expand Up @@ -1781,4 +1785,46 @@ mod tests {
let _result = dispatch_read(&db, cmd, &[], now_ms, &mut selected, 16);
}
}

// ── T1.3: reframed "not implemented" errors ───────────────────────────

fn dispatch_resp(cmd: &[u8], args: &[&[u8]]) -> Frame {
let mut db = Database::new();
let mut selected = 0usize;
let frame_args = make_args(args);
match dispatch(&mut db, cmd, &frame_args, &mut selected, 1) {
DispatchResult::Response(f) => f,
DispatchResult::Quit(_) => panic!("unexpected Quit result"),
}
}

#[test]
fn swapdb_error_is_not_implemented() {
let frame = dispatch_resp(b"SWAPDB", &[b"0", b"1"]);
match frame {
Frame::Error(msg) => {
let text = std::str::from_utf8(&msg).unwrap();
assert!(text.contains("not implemented"), "got: {text}");
assert!(text.contains("T2.1"), "expected T2.1 ref, got: {text}");
assert!(
!text.contains("sharded mode"),
"old text must be gone, got: {text}"
);
}
other => panic!("expected Error frame, got {other:?}"),
}
}

#[test]
fn move_error_is_not_implemented() {
let frame = dispatch_resp(b"MOVE", &[b"mykey", b"1"]);
match frame {
Frame::Error(msg) => {
let text = std::str::from_utf8(&msg).unwrap();
assert!(text.contains("not implemented"), "got: {text}");
assert!(text.contains("T2.2"), "expected T2.2 ref, got: {text}");
}
other => panic!("expected Error frame, got {other:?}"),
}
}
}
Loading
Loading