diff --git a/.planning b/.planning index e2d94d9a..1d058287 160000 --- a/.planning +++ b/.planning @@ -1 +1 @@ -Subproject commit e2d94d9a55afe9b54259880d744fb803a9410ce6 +Subproject commit 1d058287006c1b976e7957983e87c2d71832c743 diff --git a/src/admin/metrics_setup.rs b/src/admin/metrics_setup.rs index 6071adaf..c7e062fe 100644 --- a/src/admin/metrics_setup.rs +++ b/src/admin/metrics_setup.rs @@ -35,6 +35,21 @@ static CONNECTED_CLIENTS: AtomicU64 = AtomicU64::new(0); static WAL_AGGRESSIVE_RECYCLE_SEGMENTS_TOTAL: AtomicU64 = AtomicU64::new(0); static WAL_AGGRESSIVE_RECYCLE_BYTES_TOTAL: AtomicU64 = AtomicU64::new(0); +// ── T1.4: Cross-shard dispatch counters (read by INFO stats) ──────────── +// The Prometheus `counter!()` facade is not directly readable from the INFO +// path — it only surfaces via /metrics. These dedicated AtomicU64 counters +// are incremented in parallel inside the existing record_dispatch_* helpers +// so INFO always returns meaningful stats even with admin_port=0. +// Relaxed ordering is correct: these are monotonic event counters for +// observability, not synchronisation primitives. +// +// Note: a separate write-SPSC counter does not exist in the codebase — +// the existing `record_dispatch_cross_spsc` covers both reads routed via +// SPSC (when fast-path is off) and writes. INFO exposes the unified total +// as `total_dispatch_cross_spsc`. +static DISPATCH_CROSS_READ_FASTPATH_TOTAL: AtomicU64 = AtomicU64::new(0); +static DISPATCH_CROSS_READ_SPSC_TOTAL: AtomicU64 = AtomicU64::new(0); + /// Initialize the Prometheus metrics exporter and admin HTTP server. /// /// Must be called once before any metrics recording. Spawns a custom admin @@ -707,6 +722,8 @@ pub fn record_dispatch_local_batch(count: u64) { /// (RwLock read on the target shard's database, no SPSC message). #[inline] pub fn record_dispatch_cross_read_fastpath() { + // Always increment the INFO-visible atomic (works even with admin_port=0). + DISPATCH_CROSS_READ_FASTPATH_TOTAL.fetch_add(1, Ordering::Relaxed); if !METRICS_INITIALIZED.load(Ordering::Relaxed) { return; } @@ -716,7 +733,12 @@ pub fn record_dispatch_cross_read_fastpath() { /// Batched variant of `record_dispatch_cross_read_fastpath`. #[inline] pub fn record_dispatch_cross_read_fastpath_batch(count: u64) { - if count == 0 || !METRICS_INITIALIZED.load(Ordering::Relaxed) { + if count == 0 { + return; + } + // Always increment the INFO-visible atomic (works even with admin_port=0). + DISPATCH_CROSS_READ_FASTPATH_TOTAL.fetch_add(count, Ordering::Relaxed); + if !METRICS_INITIALIZED.load(Ordering::Relaxed) { return; } counter!("moon_dispatch_path_total", "path" => "cross_read_fast").increment(count); @@ -771,8 +793,13 @@ pub fn record_dispatch_cross_read_fastpath_timed(target_shard: usize, lock_acqui /// Command deferred to cross-shard SPSC dispatch (the slow path). /// Recorded when a command is enqueued into a `remote_groups` bucket that /// will be flushed as a `PipelineBatchSlotted` message. +/// +/// Note: covers both read and write commands routed via SPSC (no split +/// counter exists — all non-fast-path cross-shard traffic goes here). #[inline] pub fn record_dispatch_cross_spsc() { + // Always increment the INFO-visible atomic (works even with admin_port=0). + DISPATCH_CROSS_READ_SPSC_TOTAL.fetch_add(1, Ordering::Relaxed); if !METRICS_INITIALIZED.load(Ordering::Relaxed) { return; } @@ -782,7 +809,12 @@ pub fn record_dispatch_cross_spsc() { /// Batched variant of `record_dispatch_cross_spsc`. #[inline] pub fn record_dispatch_cross_spsc_batch(count: u64) { - if count == 0 || !METRICS_INITIALIZED.load(Ordering::Relaxed) { + if count == 0 { + return; + } + // Always increment the INFO-visible atomic (works even with admin_port=0). + DISPATCH_CROSS_READ_SPSC_TOTAL.fetch_add(count, Ordering::Relaxed); + if !METRICS_INITIALIZED.load(Ordering::Relaxed) { return; } counter!("moon_dispatch_path_total", "path" => "cross_spsc").increment(count); @@ -1057,6 +1089,22 @@ pub fn total_connections_received() -> u64 { TOTAL_CONNECTIONS.load(Ordering::Relaxed) } +/// Total cross-shard reads served via the shared-read fast path +/// (RwLock read on the target shard's database, no SPSC message). +/// Always accurate — does not require Prometheus to be initialised. +#[inline] +pub fn total_dispatch_cross_read_fastpath() -> u64 { + DISPATCH_CROSS_READ_FASTPATH_TOTAL.load(Ordering::Relaxed) +} + +/// Total cross-shard commands dispatched via the SPSC slow path. +/// Covers both read and write commands that bypass the fast path. +/// Always accurate — does not require Prometheus to be initialised. +#[inline] +pub fn total_dispatch_cross_spsc() -> u64 { + DISPATCH_CROSS_READ_SPSC_TOTAL.load(Ordering::Relaxed) +} + /// Read process CPU usage via `getrusage(RUSAGE_SELF)`. /// /// Returns `(used_cpu_sys, used_cpu_user)` in seconds (f64). @@ -1386,4 +1434,82 @@ mod tests { record_command_error_cached("set", &mut cache); // Must not panic, must not churn the cache on the hot path. } + + // ── T1.4: cross-shard dispatch atomics ─────────────────────────────── + // These tests share process-wide static AtomicU64 counters. Because + // the test runner is multi-threaded, other tests may increment the same + // static concurrently. We therefore only assert on monotone lower + // bounds (after >= before + N) for positive-increment tests, which is + // still sufficient to prove the counter was incremented. + // The zero-is-noop tests read the counter twice with no intervening + // increment; they assert `after >= before` (monotone), which is the + // strongest correct claim under parallel execution. + + #[test] + fn cross_read_fastpath_atomic_increments() { + let before = total_dispatch_cross_read_fastpath(); + record_dispatch_cross_read_fastpath(); + let after = total_dispatch_cross_read_fastpath(); + assert!( + after >= before + 1, + "counter must have increased by at least 1; before={before} after={after}" + ); + } + + #[test] + fn cross_read_fastpath_batch_atomic_increments() { + let before = total_dispatch_cross_read_fastpath(); + record_dispatch_cross_read_fastpath_batch(7); + let after = total_dispatch_cross_read_fastpath(); + assert!( + after >= before + 7, + "counter must have increased by at least 7; before={before} after={after}" + ); + } + + #[test] + fn cross_read_fastpath_batch_zero_is_noop() { + // batch(0) must not call fetch_add — the counter must not move + // backward; it may move forward due to concurrent tests. + let before = total_dispatch_cross_read_fastpath(); + record_dispatch_cross_read_fastpath_batch(0); + let after = total_dispatch_cross_read_fastpath(); + assert!( + after >= before, + "counter must be monotone; before={before} after={after}" + ); + } + + #[test] + fn cross_spsc_atomic_increments() { + let before = total_dispatch_cross_spsc(); + record_dispatch_cross_spsc(); + let after = total_dispatch_cross_spsc(); + assert!( + after >= before + 1, + "counter must have increased by at least 1; before={before} after={after}" + ); + } + + #[test] + fn cross_spsc_batch_atomic_increments() { + let before = total_dispatch_cross_spsc(); + record_dispatch_cross_spsc_batch(3); + let after = total_dispatch_cross_spsc(); + assert!( + after >= before + 3, + "counter must have increased by at least 3; before={before} after={after}" + ); + } + + #[test] + fn cross_spsc_batch_zero_is_noop() { + let before = total_dispatch_cross_spsc(); + record_dispatch_cross_spsc_batch(0); + let after = total_dispatch_cross_spsc(); + assert!( + after >= before, + "counter must be monotone; before={before} after={after}" + ); + } } diff --git a/src/command/connection.rs b/src/command/connection.rs index ddfd8fec..9ae20948 100644 --- a/src/command/connection.rs +++ b/src/command/connection.rs @@ -245,9 +245,13 @@ pub fn info(db: &Database, _args: &[Frame]) -> Frame { let _ = write!( sections, "total_commands_processed:{}\r\n\ - total_connections_received:{}\r\n", + total_connections_received:{}\r\n\ + total_dispatch_cross_read_fastpath:{}\r\n\ + total_dispatch_cross_spsc:{}\r\n", crate::admin::metrics_setup::total_commands_processed(), crate::admin::metrics_setup::total_connections_received(), + crate::admin::metrics_setup::total_dispatch_cross_read_fastpath(), + crate::admin::metrics_setup::total_dispatch_cross_spsc(), ); sections.push_str("\r\n"); diff --git a/src/command/key_extra.rs b/src/command/key_extra.rs index 3e1f53fb..7c6252ea 100644 --- a/src/command/key_extra.rs +++ b/src/command/key_extra.rs @@ -39,9 +39,8 @@ pub fn copy(db: &mut Database, args: &[Frame]) -> Frame { if arg.eq_ignore_ascii_case(b"REPLACE") { replace = true; } else if arg.eq_ignore_ascii_case(b"DB") { - // Cross-DB copy requires shard_databases context not available here return Frame::Error(Bytes::from_static( - b"ERR COPY with DB option is not supported yet", + b"ERR COPY DB option not implemented (tracked in T2.3)", )); } else { return Frame::Error(Bytes::from_static(b"ERR syntax error")); @@ -460,10 +459,24 @@ mod tests { } #[test] - fn test_copy_db_option_errors() { + fn test_copy_db_option_not_implemented() { + // T1.3: COPY ... DB n returns the "not implemented (tracked in T2.3)" error. let mut db = setup_db_with_key(b"src", b"hello"); let result = copy(&mut db, &[bs(b"src"), bs(b"dst"), bs(b"DB")]); - assert!(matches!(result, Frame::Error(_))); + match result { + Frame::Error(msg) => { + let text = std::str::from_utf8(&msg).unwrap(); + assert!( + text.contains("T2.3"), + "expected T2.3 tracking ref in error, got: {text}" + ); + assert!( + text.contains("not implemented"), + "expected 'not implemented' in error, got: {text}" + ); + } + other => panic!("expected Error frame, got {other:?}"), + } } // --- SORT tests --- diff --git a/src/command/metadata.rs b/src/command/metadata.rs index 029f2408..37617b39 100644 --- a/src/command/metadata.rs +++ b/src/command/metadata.rs @@ -359,6 +359,8 @@ pub static COMMAND_META: phf::Map<&'static str, CommandMeta> = phf_map! { "FLUSHDB" => CommandMeta { name: "FLUSHDB", arity: -1, flags: W, first_key: 0, last_key: 0, step: 0, acl_categories: DNG }, "FLUSHALL" => CommandMeta { name: "FLUSHALL", arity: -1, flags: W, first_key: 0, last_key: 0, step: 0, acl_categories: DNG }, "SWAPDB" => CommandMeta { name: "SWAPDB", arity: 3, flags: W, first_key: 0, last_key: 0, step: 0, acl_categories: DNG }, + // MOVE is not yet implemented; stub returns ERR until T2.2 lands. + "MOVE" => CommandMeta { name: "MOVE", arity: 3, flags: W, first_key: 1, last_key: 1, step: 1, acl_categories: GEN }, "SHUTDOWN" => CommandMeta { name: "SHUTDOWN", arity: -1, flags: A, first_key: 0, last_key: 0, step: 0, acl_categories: DNG }, "TIME" => CommandMeta { name: "TIME", arity: 1, flags: RF, first_key: 0, last_key: 0, step: 0, acl_categories: SRV }, "LOLWUT" => CommandMeta { name: "LOLWUT", arity: -1, flags: R, first_key: 0, last_key: 0, step: 0, acl_categories: SRV }, diff --git a/src/command/mod.rs b/src/command/mod.rs index 105aac48..bad9ae88 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -178,13 +178,18 @@ fn dispatch_inner( } } (4, b'm') => { - // MGET MSET + // MGET MSET MOVE if cmd.eq_ignore_ascii_case(b"MGET") { return resp(string::mget(db, args)); } if cmd.eq_ignore_ascii_case(b"MSET") { return resp(string::mset(db, args)); } + if cmd.eq_ignore_ascii_case(b"MOVE") { + return resp(Frame::Error(Bytes::from_static( + b"ERR MOVE not implemented (tracked in T2.2)", + ))); + } } (4, b'p') => { // PING PTTL @@ -536,9 +541,8 @@ fn dispatch_inner( } b'w' => { if cmd.eq_ignore_ascii_case(b"SWAPDB") { - // SWAPDB requires cross-database access not available in dispatch return resp(Frame::Error(Bytes::from_static( - b"ERR SWAPDB is not supported in sharded mode", + b"ERR SWAPDB not implemented (tracked in T2.1)", ))); } } @@ -1781,4 +1785,46 @@ mod tests { let _result = dispatch_read(&db, cmd, &[], now_ms, &mut selected, 16); } } + + // ── T1.3: reframed "not implemented" errors ─────────────────────────── + + fn dispatch_resp(cmd: &[u8], args: &[&[u8]]) -> Frame { + let mut db = Database::new(); + let mut selected = 0usize; + let frame_args = make_args(args); + match dispatch(&mut db, cmd, &frame_args, &mut selected, 1) { + DispatchResult::Response(f) => f, + DispatchResult::Quit(_) => panic!("unexpected Quit result"), + } + } + + #[test] + fn swapdb_error_is_not_implemented() { + let frame = dispatch_resp(b"SWAPDB", &[b"0", b"1"]); + match frame { + Frame::Error(msg) => { + let text = std::str::from_utf8(&msg).unwrap(); + assert!(text.contains("not implemented"), "got: {text}"); + assert!(text.contains("T2.1"), "expected T2.1 ref, got: {text}"); + assert!( + !text.contains("sharded mode"), + "old text must be gone, got: {text}" + ); + } + other => panic!("expected Error frame, got {other:?}"), + } + } + + #[test] + fn move_error_is_not_implemented() { + let frame = dispatch_resp(b"MOVE", &[b"mykey", b"1"]); + match frame { + Frame::Error(msg) => { + let text = std::str::from_utf8(&msg).unwrap(); + assert!(text.contains("not implemented"), "got: {text}"); + assert!(text.contains("T2.2"), "expected T2.2 ref, got: {text}"); + } + other => panic!("expected Error frame, got {other:?}"), + } + } } diff --git a/src/main.rs b/src/main.rs index f33e6656..8bff34ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -242,16 +242,42 @@ fn main() -> anyhow::Result<()> { moon::vector::distance::init(); // Determine number of shards + // T1.2: when --shards 0 (auto-detect), optionally cap at the empirical + // sweet-spot of min(2, vCPU) via MOON_AUTO_SHARDS_CONSERVATIVE=1. + // Default behaviour is unchanged: full available_parallelism(). let num_shards = if config.shards == 0 { - std::thread::available_parallelism() + let parallelism = std::thread::available_parallelism() .map(|n| n.get()) - .unwrap_or(4) + .unwrap_or(4); + let conservative = std::env::var_os("MOON_AUTO_SHARDS_CONSERVATIVE").is_some(); + let resolved = compute_auto_shards(parallelism, conservative); + if conservative { + info!( + "auto-detected shards={resolved} \ + (capped at 2 via MOON_AUTO_SHARDS_CONSERVATIVE; \ + unset to use full vCPU count of {parallelism})" + ); + } else { + info!( + "auto-detected shards={resolved} \ + (set MOON_AUTO_SHARDS_CONSERVATIVE=1 to cap at 2)" + ); + } + resolved } else { config.shards }; info!("Starting with {} shards", num_shards); + // T1.1: warn when maxclients < 25 × shards (undersubscription footgun). + // Suppressed by MOON_NO_UNDERSUBSCRIPTION_WARN=1. + if let Some(msg) = should_warn_undersubscription(config.maxclients, num_shards) + && std::env::var_os("MOON_NO_UNDERSUBSCRIPTION_WARN").is_none() + { + tracing::warn!("{msg}"); + } + // Create channel mesh for inter-shard communication let mut mesh = ChannelMesh::new(num_shards, CHANNEL_BUFFER_SIZE); @@ -935,7 +961,116 @@ fn maybe_respawn_with_arena_override() -> anyhow::Result<()> { )) } +/// Resolve the automatic shard count, optionally capped to the empirical +/// knee of 2 when `MOON_AUTO_SHARDS_CONSERVATIVE=1` is set. +/// +/// * `parallelism` — value from `available_parallelism()` (or fallback). +/// * `conservative` — when `true`, clamps the result to `min(parallelism, 2)`. +/// +/// The cap is intentionally opt-IN: the default `--shards 0` continues to +/// resolve to the full CPU count. Operators on high-core hosts who observe +/// sub-linear multi-shard scaling can set the env var to stay in the +/// `s≤2` sweet spot without changing the startup flag. +pub fn compute_auto_shards(parallelism: usize, conservative: bool) -> usize { + if conservative { + parallelism.min(2) + } else { + parallelism + } +} + +/// Returns a warning message when the server is configured with too few +/// client slots for the number of shards, or `None` when no warning is +/// needed. +/// +/// # Arguments +/// * `maxclients` — configured `--maxclients` value (0 = unlimited; no +/// warning is emitted in that case since there is no per-shard ceiling). +/// * `num_shards` — resolved shard count after auto-detect. +/// +/// The empirical threshold is **25 clients per shard**: below this the +/// per-shard SPSC channels are chronically under-subscribed and throughput +/// collapses (documented in `benchmark_scaling_concurrency_2026_04_26`). +/// +/// Suppressed entirely when `num_shards == 1` (single-shard has no +/// cross-shard dispatch) or when `maxclients == 0` (unlimited). +pub fn should_warn_undersubscription(maxclients: usize, num_shards: usize) -> Option { + if num_shards <= 1 || maxclients == 0 { + return None; + } + let threshold = num_shards.saturating_mul(25); + if maxclients < threshold { + Some(format!( + "multi-shard mode with shards={num_shards} expects \ + \u{2265}{threshold} concurrent clients; current \ + maxclients={maxclients} may cause throughput collapse — \ + see CLAUDE.md Gotchas or set MOON_NO_UNDERSUBSCRIPTION_WARN=1 \ + to suppress this warning" + )) + } else { + None + } +} + #[cfg(not(all(feature = "jemalloc", unix)))] fn maybe_respawn_with_arena_override() -> anyhow::Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::should_warn_undersubscription; + + #[test] + fn no_warn_single_shard() { + assert!(should_warn_undersubscription(100, 1).is_none()); + } + + #[test] + fn no_warn_unlimited_clients() { + assert!(should_warn_undersubscription(0, 8).is_none()); + } + + #[test] + fn no_warn_sufficient_clients() { + // 8 shards × 25 = 200 → exactly 200 is sufficient + assert!(should_warn_undersubscription(200, 8).is_none()); + } + + #[test] + fn warns_below_threshold() { + // 8 shards × 25 = 200 → 199 is insufficient + let msg = should_warn_undersubscription(199, 8).expect("should warn"); + assert!(msg.contains("shards=8"), "got: {msg}"); + assert!(msg.contains("maxclients=199"), "got: {msg}"); + } + + #[test] + fn warns_default_maxclients_low_shards() { + // 4 shards × 25 = 100 → default maxclients=10000 is fine + assert!(should_warn_undersubscription(10000, 4).is_none()); + } + + #[test] + fn warns_high_shard_count() { + // 32 shards × 25 = 800 → 50 is way below threshold + let msg = should_warn_undersubscription(50, 32).expect("should warn"); + assert!(msg.contains("shards=32"), "got: {msg}"); + } + + #[test] + fn threshold_is_inclusive() { + // exactly at threshold: no warn + assert!(should_warn_undersubscription(25, 1).is_none()); // single shard + assert!(should_warn_undersubscription(50, 2).is_none()); // 2×25 = 50 + } + + #[test] + fn auto_shards_conservative_cap() { + // verify compute_auto_shards pure function + assert_eq!(super::compute_auto_shards(16, true), 2); + assert_eq!(super::compute_auto_shards(16, false), 16); + assert_eq!(super::compute_auto_shards(1, true), 1); + assert_eq!(super::compute_auto_shards(4, true), 2); + } +}