From d414e9f437b4c963ad43840cad3d5e4b80dcfcd4 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:37:36 +0700 Subject: [PATCH 01/11] fix(sorted_set): ZREVRANGEBYSCORE and ZREVRANGEBYLEX return empty for finite score ranges MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The rev branch in zrange_by_score/zrange_by_lex swapped min_arg and max_arg during parsing, but all callers already pass them in semantic (min, max) order. This double-swap produced min_bound > max_bound, making the filter (s >= min AND s <= max) reject everything for finite ranges like "3 1". Fix: remove the rev-specific swap — rev only affects iteration direction (entries.reverse), not bound parsing. Added ZREVRANGEBYSCORE finite range test to test-commands.sh. --- scripts/test-commands.sh | 3 +- src/command/sorted_set/mod.rs | 57 ++++++++++------------------------- 2 files changed, 17 insertions(+), 43 deletions(-) diff --git a/scripts/test-commands.sh b/scripts/test-commands.sh index c93c8100..a1770a77 100755 --- a/scripts/test-commands.sh +++ b/scripts/test-commands.sh @@ -492,9 +492,8 @@ if should_run "sorted_set"; then # ZREVRANGEBYSCORE on clean key (ZPOPMIN/ZPOPMAX above mutated z:k1) rcli ZADD z:revtest 1 alpha 2 beta 3 gamma >/dev/null 2>&1; mcli ZADD z:revtest 1 alpha 2 beta 3 gamma >/dev/null 2>&1 assert_match "ZRANGEBYSCORE 2" ZRANGEBYSCORE z:revtest 1 3 - # ZREVRANGEBYSCORE: known issue — moon returns empty for rev range queries - # TODO: investigate src/command/sorted_set.rs zrevrangebyscore implementation assert_match "ZREVRANGEBYSCORE" ZREVRANGEBYSCORE z:revtest +inf -inf + assert_match "ZREVRANGEBYSCORE 2" ZREVRANGEBYSCORE z:revtest 3 1 assert_match "ZCOUNT" ZCOUNT z:k1 2 5 assert_match "ZINCRBY" ZINCRBY z:k1 100 b assert_match "ZREM" ZREM z:k1 e diff --git a/src/command/sorted_set/mod.rs b/src/command/sorted_set/mod.rs index bc417dd3..23fd2f9f 100644 --- a/src/command/sorted_set/mod.rs +++ b/src/command/sorted_set/mod.rs @@ -263,27 +263,15 @@ pub(super) fn zrange_by_score( limit_offset: Option, limit_count: Option, ) -> Frame { - let (min_bound, max_bound) = if rev { - // With REV, max comes first in args - let max_b = match parse_score_bound(min_arg) { - Ok(b) => b, - Err(e) => return e, - }; - let min_b = match parse_score_bound(max_arg) { - Ok(b) => b, - Err(e) => return e, - }; - (min_b, max_b) - } else { - let min_b = match parse_score_bound(min_arg) { - Ok(b) => b, - Err(e) => return e, - }; - let max_b = match parse_score_bound(max_arg) { - Ok(b) => b, - Err(e) => return e, - }; - (min_b, max_b) + // All callers pass (min, max) in semantic order regardless of rev. + // The rev flag only affects iteration direction (entries.reverse below). + let min_bound = match parse_score_bound(min_arg) { + Ok(b) => b, + Err(e) => return e, + }; + let max_bound = match parse_score_bound(max_arg) { + Ok(b) => b, + Err(e) => return e, }; let _ = members; // not directly needed; scores has all data @@ -344,26 +332,13 @@ pub(super) fn zrange_by_lex( limit_offset: Option, limit_count: Option, ) -> Frame { - let (min_bound, max_bound) = if rev { - let max_b = match parse_lex_bound(min_arg) { - Ok(b) => b, - Err(e) => return e, - }; - let min_b = match parse_lex_bound(max_arg) { - Ok(b) => b, - Err(e) => return e, - }; - (min_b, max_b) - } else { - let min_b = match parse_lex_bound(min_arg) { - Ok(b) => b, - Err(e) => return e, - }; - let max_b = match parse_lex_bound(max_arg) { - Ok(b) => b, - Err(e) => return e, - }; - (min_b, max_b) + let min_bound = match parse_lex_bound(min_arg) { + Ok(b) => b, + Err(e) => return e, + }; + let max_bound = match parse_lex_bound(max_arg) { + Ok(b) => b, + Err(e) => return e, }; let mut entries: Vec<&Bytes> = Vec::new(); From 6dbfb515d59e5bd495eb0cc69b77fb8f951faaa3 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:48:34 +0700 Subject: [PATCH 02/11] feat(observability): fill INFO sections and add tracing spans - INFO Clients: connected_clients from atomic counter - INFO Memory: used_memory/used_memory_human/used_memory_rss from /proc/self/status - INFO Replication: role/connected_slaves/master_replid/master_repl_offset from global ReplicationState (registered at startup via set_global_repl_state) - Added #[instrument] spans to: handle_connection (single/monoio), handle_psync_on_master (tokio/monoio), compact(), rewrite_aof() - Added get_rss_bytes() for Linux /proc parsing, connected_clients() atomic counter, get_replication_info() global accessor --- src/admin/metrics_setup.rs | 65 +++++++++++++++++++++++++++++++ src/command/connection.rs | 53 ++++++++++++++++++++----- src/main.rs | 3 ++ src/persistence/aof.rs | 1 + src/replication/master.rs | 2 + src/server/conn/handler_monoio.rs | 1 + src/server/conn/handler_single.rs | 1 + src/server/listener.rs | 3 ++ src/vector/segment/compaction.rs | 1 + 9 files changed, 121 insertions(+), 9 deletions(-) diff --git a/src/admin/metrics_setup.rs b/src/admin/metrics_setup.rs index 297948c9..fa167b0f 100644 --- a/src/admin/metrics_setup.rs +++ b/src/admin/metrics_setup.rs @@ -25,6 +25,7 @@ pub fn is_server_ready() -> bool { // (admin_port=0), so INFO always returns meaningful stats. static TOTAL_COMMANDS: AtomicU64 = AtomicU64::new(0); static TOTAL_CONNECTIONS: AtomicU64 = AtomicU64::new(0); +static CONNECTED_CLIENTS: AtomicU64 = AtomicU64::new(0); /// Initialize the Prometheus metrics exporter and admin HTTP server. /// @@ -340,6 +341,7 @@ pub fn record_command_error(cmd: &str) { #[inline] pub fn record_connection_opened() { TOTAL_CONNECTIONS.fetch_add(1, Ordering::Relaxed); + CONNECTED_CLIENTS.fetch_add(1, Ordering::Relaxed); if !METRICS_INITIALIZED.load(Ordering::Relaxed) { return; } @@ -350,12 +352,19 @@ pub fn record_connection_opened() { /// Record a client disconnection. #[inline] pub fn record_connection_closed() { + CONNECTED_CLIENTS.fetch_sub(1, Ordering::Relaxed); if !METRICS_INITIALIZED.load(Ordering::Relaxed) { return; } gauge!("moon_connected_clients").decrement(1.0); } +/// Current number of connected clients (for INFO command). +#[inline] +pub fn connected_clients() -> u64 { + CONNECTED_CLIENTS.load(Ordering::Relaxed) +} + // ── Keyspace metrics ──────────────────────────────────────────────────── /// Record keyspace hit/miss. @@ -464,6 +473,32 @@ pub fn update_rss_bytes(rss: u64) { gauge!("moon_rss_bytes").set(rss as f64); } +// ── Memory helpers ────────────────────────────────────────────────────── + +/// Read process RSS from /proc/self/status (Linux only). +/// Returns bytes, or 0 on failure / non-Linux. +#[cfg(target_os = "linux")] +pub fn get_rss_bytes() -> u64 { + if let Ok(status) = std::fs::read_to_string("/proc/self/status") { + for line in status.lines() { + if let Some(rest) = line.strip_prefix("VmRSS:") { + let trimmed = rest.trim(); + if let Some(kb_str) = trimmed.strip_suffix(" kB") { + if let Ok(kb) = kb_str.trim().parse::() { + return kb * 1024; + } + } + } + } + } + 0 +} + +#[cfg(not(target_os = "linux"))] +pub fn get_rss_bytes() -> u64 { + 0 +} + // ── INFO helpers ──────────────────────────────────────────────────────── /// Total commands processed since server start (for INFO Stats). @@ -506,6 +541,36 @@ pub fn get_cpu_usage() -> (f64, f64) { (0.0, 0.0) } +// ── Global replication state (for INFO) ──────────────────────────────── + +static GLOBAL_REPL_STATE: once_cell::sync::OnceCell< + std::sync::Arc>, +> = once_cell::sync::OnceCell::new(); + +/// Register the global replication state for INFO queries. +pub fn set_global_repl_state( + state: std::sync::Arc>, +) { + let _ = GLOBAL_REPL_STATE.set(state); +} + +/// Get replication info for INFO command: (role, connected_slaves, master_repl_offset, repl_id). +pub fn get_replication_info() -> (&'static str, usize, u64, String) { + if let Some(state) = GLOBAL_REPL_STATE.get() { + if let Ok(guard) = state.read() { + let role = match &guard.role { + crate::replication::state::ReplicationRole::Master => "master", + crate::replication::state::ReplicationRole::Replica { .. } => "slave", + }; + let slaves = guard.replicas.len(); + let offset = guard.master_repl_offset.load(Ordering::Relaxed); + let repl_id = guard.repl_id.clone(); + return (role, slaves, offset, repl_id); + } + } + ("master", 0, 0, "0".repeat(40)) +} + // ── Global SLOWLOG ───────────────────────────────────────────────────── /// Global slowlog instance accessible from any handler thread. diff --git a/src/command/connection.rs b/src/command/connection.rs index c56cad61..28a6bf19 100644 --- a/src/command/connection.rs +++ b/src/command/connection.rs @@ -130,11 +130,29 @@ pub fn readyz() -> Frame { } } +/// Format bytes as human-readable (e.g. "1.23M", "456.78K"). +fn format_memory_human(bytes: u64) -> String { + const KB: f64 = 1024.0; + const MB: f64 = 1024.0 * 1024.0; + const GB: f64 = 1024.0 * 1024.0 * 1024.0; + let b = bytes as f64; + if b >= GB { + format!("{:.2}G", b / GB) + } else if b >= MB { + format!("{:.2}M", b / MB) + } else if b >= KB { + format!("{:.2}K", b / KB) + } else { + format!("{bytes}B") + } +} + /// INFO command handler. /// -/// Returns a BulkString with minimal INFO sections. +/// Returns a BulkString with server info sections matching Redis INFO format. pub fn info(db: &Database, _args: &[Frame]) -> Frame { - let mut sections = String::new(); + use std::fmt::Write as _; + let mut sections = String::with_capacity(2048); sections.push_str("# Server\r\n"); sections.push_str("redis_version:0.1.0\r\n"); @@ -142,9 +160,24 @@ pub fn info(db: &Database, _args: &[Frame]) -> Frame { sections.push_str("\r\n"); sections.push_str("# Clients\r\n"); + let _ = write!( + sections, + "connected_clients:{}\r\n", + crate::admin::metrics_setup::connected_clients(), + ); sections.push_str("\r\n"); sections.push_str("# Memory\r\n"); + let rss = crate::admin::metrics_setup::get_rss_bytes(); + let _ = write!( + sections, + "used_memory:{rss}\r\n\ + used_memory_human:{human}\r\n\ + used_memory_rss:{rss}\r\n\ + used_memory_peak:{rss}\r\n", + rss = rss, + human = format_memory_human(rss), + ); sections.push_str("\r\n"); sections.push_str("# Persistence\r\n"); @@ -196,7 +229,6 @@ pub fn info(db: &Database, _args: &[Frame]) -> Frame { sections.push_str("\r\n"); sections.push_str("# MoonStore\r\n"); - use std::fmt::Write as _; let _ = write!( sections, "disk_offload_enabled:{}\r\n", @@ -228,13 +260,16 @@ pub fn info(db: &Database, _args: &[Frame]) -> Frame { sections.push_str("\r\n"); // # Replication - // NOTE: placeholder — always reports master with 0 replicas. - // TODO: wire to actual ReplicationState when replication is implemented. sections.push_str("# Replication\r\n"); - sections.push_str("role:master\r\n"); - sections.push_str("connected_slaves:0\r\n"); - sections.push_str("master_replid:0000000000000000000000000000000000000000\r\n"); - sections.push_str("master_repl_offset:0\r\n"); + let (role, slaves, offset, repl_id) = + crate::admin::metrics_setup::get_replication_info(); + let _ = write!( + sections, + "role:{role}\r\n\ + connected_slaves:{slaves}\r\n\ + master_replid:{repl_id}\r\n\ + master_repl_offset:{offset}\r\n", + ); sections.push_str("\r\n"); sections.push_str("# Keyspace\r\n"); diff --git a/src/main.rs b/src/main.rs index a7e5e8ae..a97fa2ef 100644 --- a/src/main.rs +++ b/src/main.rs @@ -195,6 +195,9 @@ fn main() -> anyhow::Result<()> { moon::replication::state::ReplicationState::new(num_shards, repl_id, repl_id2), )); + // Register repl_state globally for INFO command queries. + moon::admin::metrics_setup::set_global_repl_state(repl_state.clone()); + // Cluster mode initialization let cluster_state: Option>> = if config.cluster_enabled { diff --git a/src/persistence/aof.rs b/src/persistence/aof.rs index 840f1542..e6c98e70 100644 --- a/src/persistence/aof.rs +++ b/src/persistence/aof.rs @@ -1052,6 +1052,7 @@ fn reopen_aof_sync(aof_path: &Path) -> Result { /// /// Delegates to `rewrite_aof_sync` — the actual I/O is synchronous (temp write + rename). #[cfg(feature = "runtime-tokio")] +#[tracing::instrument(skip_all, level = "info")] pub async fn rewrite_aof(db: SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { rewrite_aof_sync(&db, aof_path) } diff --git a/src/replication/master.rs b/src/replication/master.rs index 4009452a..196ef0a6 100644 --- a/src/replication/master.rs +++ b/src/replication/master.rs @@ -38,6 +38,7 @@ use crate::replication::state::{ReplicaInfo, ReplicationState}; /// 2. Send backlog bytes from client_offset to current offset for each shard /// 3. Register replica with all shards for live streaming #[cfg(feature = "runtime-tokio")] +#[tracing::instrument(skip_all, level = "debug", fields(repl_id = %client_repl_id, offset = client_offset))] pub async fn handle_psync_on_master( client_repl_id: &str, client_offset: i64, @@ -194,6 +195,7 @@ pub async fn handle_psync_on_master( /// Same logic as the tokio variant but uses monoio ownership I/O for all TCP writes. /// Takes a mutable reference to `monoio::net::TcpStream` instead of `OwnedWriteHalf`. #[cfg(feature = "runtime-monoio")] +#[tracing::instrument(skip_all, level = "debug", fields(repl_id = %client_repl_id, offset = client_offset))] pub async fn handle_psync_on_master( client_repl_id: &str, client_offset: i64, diff --git a/src/server/conn/handler_monoio.rs b/src/server/conn/handler_monoio.rs index 1230dcfa..7a3e331d 100644 --- a/src/server/conn/handler_monoio.rs +++ b/src/server/conn/handler_monoio.rs @@ -74,6 +74,7 @@ pub enum MonoioHandlerResult { /// read since monoio's IoBufMut is implemented for Vec, then copy into BytesMut /// for codec parsing. #[cfg(feature = "runtime-monoio")] +#[tracing::instrument(skip_all, level = "debug")] pub async fn handle_connection_sharded_monoio< S: monoio::io::AsyncReadRent + monoio::io::AsyncWriteRent, >( diff --git a/src/server/conn/handler_single.rs b/src/server/conn/handler_single.rs index 59bc308e..39d8ac96 100644 --- a/src/server/conn/handler_single.rs +++ b/src/server/conn/handler_single.rs @@ -54,6 +54,7 @@ use crate::server::codec::RespCodec; /// into a batch, executes them under a single lock acquisition, then writes all /// responses outside the lock. This reduces lock acquisitions from N per pipeline /// to 1 per batch cycle. +#[tracing::instrument(skip_all, level = "debug")] pub async fn handle_connection( stream: TcpStream, db: SharedDatabases, diff --git a/src/server/listener.rs b/src/server/listener.rs index d1a5efc0..5aaf6ad6 100644 --- a/src/server/listener.rs +++ b/src/server/listener.rs @@ -176,6 +176,9 @@ pub async fn run_with_shutdown( ), )); + // Register repl_state globally for INFO command queries. + crate::admin::metrics_setup::set_global_repl_state(repl_state.clone()); + // Build ACL table from config (load aclfile if configured, else bootstrap from requirepass) let acl_table: Arc> = { let table = crate::acl::AclTable::load_or_default(&config); diff --git a/src/vector/segment/compaction.rs b/src/vector/segment/compaction.rs index 032b12f2..fa55baf9 100644 --- a/src/vector/segment/compaction.rs +++ b/src/vector/segment/compaction.rs @@ -445,6 +445,7 @@ fn add_neighbor_to_flat( /// /// Returns `Err(CompactionError::RecallTooLow)` if recall < 0.95. /// Returns `Err(CompactionError::EmptySegment)` if all entries are deleted. +#[tracing::instrument(skip_all, level = "debug")] pub fn compact( frozen: &FrozenSegment, collection: &Arc, From cc05992c9308e6b4199d987e62d03ac5f0e4596d Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:50:03 +0700 Subject: [PATCH 03/11] feat(observability): wire replication lag into Prometheus metric get_replication_info() now computes max lag across all replicas and calls record_replication_lag() to update the moon_replication_lag_bytes Prometheus gauge. Previously the function existed but had zero call sites. --- src/admin/metrics_setup.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/admin/metrics_setup.rs b/src/admin/metrics_setup.rs index fa167b0f..503e6464 100644 --- a/src/admin/metrics_setup.rs +++ b/src/admin/metrics_setup.rs @@ -555,6 +555,7 @@ pub fn set_global_repl_state( } /// Get replication info for INFO command: (role, connected_slaves, master_repl_offset, repl_id). +/// Also updates the Prometheus replication lag gauge as a side-effect. pub fn get_replication_info() -> (&'static str, usize, u64, String) { if let Some(state) = GLOBAL_REPL_STATE.get() { if let Ok(guard) = state.read() { @@ -565,6 +566,23 @@ pub fn get_replication_info() -> (&'static str, usize, u64, String) { let slaves = guard.replicas.len(); let offset = guard.master_repl_offset.load(Ordering::Relaxed); let repl_id = guard.repl_id.clone(); + // Update Prometheus lag gauge: max lag across all replicas. + if !guard.replicas.is_empty() { + let max_lag_bytes = guard + .replicas + .iter() + .map(|r| { + let ack: u64 = r + .ack_offsets + .iter() + .map(|a| a.load(Ordering::Relaxed)) + .sum(); + offset.saturating_sub(ack) + }) + .max() + .unwrap_or(0); + record_replication_lag(max_lag_bytes, 0); + } return (role, slaves, offset, repl_id); } } From e489b7d80f8af9993b39d601426230c91e4e9be9 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:51:12 +0700 Subject: [PATCH 04/11] feat(ci): add cargo deny/audit to CI and aarch64 Linux release build - CI: add supply-chain job running cargo deny check + cargo audit (deny.toml already existed but was not enforced in pipeline) - Release: add linux-aarch64-tokio matrix entry using cross for cross-compilation (aarch64 is the primary production target) - Release: update checksums and release artifact list --- .github/workflows/ci.yml | 14 ++++++++++++++ .github/workflows/release.yml | 20 ++++++++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 46b27d3f..f27e2e7a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,6 +79,20 @@ jobs: exit 1 fi + supply-chain: + name: Supply Chain Security + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@1.94.1 + - uses: Swatinem/rust-cache@v2 + - name: Install cargo-deny and cargo-audit + run: cargo install cargo-deny cargo-audit + - name: cargo deny check + run: cargo deny check + - name: cargo audit + run: cargo audit + msrv: name: MSRV (1.94) runs-on: ubuntu-latest diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a5f48f09..566028fc 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -27,6 +27,13 @@ jobs: features: runtime-monoio,jemalloc binary: moon-linux-monoio + - name: linux-aarch64-tokio + os: ubuntu-latest + target: aarch64-unknown-linux-gnu + features: runtime-tokio,jemalloc + binary: moon-linux-aarch64-tokio + cross: true + - name: macos-tokio os: macos-latest target: aarch64-apple-darwin @@ -42,9 +49,17 @@ jobs: - uses: Swatinem/rust-cache@v2 + - name: Install cross (aarch64) + if: matrix.cross + run: cargo install cross --locked + - name: Build run: | - cargo build --release --no-default-features --features ${{ matrix.features }} --target ${{ matrix.target }} + if [ "${{ matrix.cross }}" = "true" ]; then + cross build --release --no-default-features --features ${{ matrix.features }} --target ${{ matrix.target }} + else + cargo build --release --no-default-features --features ${{ matrix.features }} --target ${{ matrix.target }} + fi cp target/${{ matrix.target }}/release/moon ${{ matrix.binary }} - name: Upload artifact @@ -87,7 +102,7 @@ jobs: - name: Generate checksums run: | cd artifacts - sha256sum moon-linux-tokio moon-linux-monoio moon-macos-tokio moon-sbom.json moon-sbom-tokio.json moon-sbom-monoio.json > SHA256SUMS.txt + sha256sum moon-linux-tokio moon-linux-monoio moon-linux-aarch64-tokio moon-macos-tokio moon-sbom.json moon-sbom-tokio.json moon-sbom-monoio.json > SHA256SUMS.txt cat SHA256SUMS.txt - name: Install cosign @@ -111,6 +126,7 @@ jobs: --generate-notes \ artifacts/moon-linux-tokio \ artifacts/moon-linux-monoio \ + artifacts/moon-linux-aarch64-tokio \ artifacts/moon-macos-tokio \ artifacts/moon-sbom.json \ artifacts/moon-sbom-tokio.json \ From 6ca93f0b28cd12dd9b8b71a9566ffe2b1e43f6ef Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:52:07 +0700 Subject: [PATCH 05/11] feat(durability): add BGSAVE and BGREWRITEAOF crash matrix cells Two new crash matrix test cells: - crash_during_bgsave: SIGKILL mid-RDB snapshot, verify AOF recovery - crash_during_bgrewriteaof: SIGKILL mid-AOF compaction, verify original AOF intact for recovery Both use appendfsync=always so all 500 keys must survive (RPO=0). This brings crash matrix coverage to 6/7 cells. --- tests/durability/crash_matrix.rs | 96 ++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/tests/durability/crash_matrix.rs b/tests/durability/crash_matrix.rs index 79fbd94d..d4ed606d 100644 --- a/tests/durability/crash_matrix.rs +++ b/tests/durability/crash_matrix.rs @@ -239,6 +239,102 @@ mod tests { assert!(result.is_ok(), "{}", result.unwrap_err()); } + /// Crash during BGSAVE: write keys, trigger BGSAVE, SIGKILL mid-snapshot. + /// After restart, AOF should recover all committed data regardless of + /// whether the RDB snapshot completed. + #[test] + #[ignore] + fn crash_during_bgsave() { + let dir = tempfile::tempdir().unwrap(); + let dir_str = dir.path().to_str().unwrap(); + let addr = "127.0.0.1:16404"; + + let mut server = start_moon(&[ + "--port", "16404", "--shards", "1", + "--appendonly", "yes", "--appendfsync", "always", + "--dir", dir_str, + ]); + thread::sleep(Duration::from_millis(500)); + + // Write keys + write_keys(addr, 500); + + // Trigger BGSAVE then immediately kill + send_resp_command(addr, "BGSAVE"); + thread::sleep(Duration::from_millis(50)); + + // SAFETY: valid PID, SIGKILL is always valid + let ret = unsafe { libc::kill(server.id() as i32, libc::SIGKILL) }; + assert_eq!(ret, 0); + let _ = server.wait(); + + // Restart and verify + let mut server2 = start_moon(&[ + "--port", "16404", "--shards", "1", + "--appendonly", "yes", "--appendfsync", "always", + "--dir", dir_str, + ]); + thread::sleep(Duration::from_secs(2)); + + let after = get_dbsize(addr); + let _ = send_resp_command(addr, "SHUTDOWN NOSAVE"); + let _ = server2.kill(); + let _ = server2.wait(); + + assert!( + after >= 500, + "crash_during_bgsave: RPO violation — {} of 500 keys survived", + after, + ); + } + + /// Crash during BGREWRITEAOF: write keys, trigger rewrite, SIGKILL mid-compaction. + /// After restart, original AOF should be intact for recovery. + #[test] + #[ignore] + fn crash_during_bgrewriteaof() { + let dir = tempfile::tempdir().unwrap(); + let dir_str = dir.path().to_str().unwrap(); + let addr = "127.0.0.1:16405"; + + let mut server = start_moon(&[ + "--port", "16405", "--shards", "1", + "--appendonly", "yes", "--appendfsync", "always", + "--dir", dir_str, + ]); + thread::sleep(Duration::from_millis(500)); + + write_keys(addr, 500); + + // Trigger BGREWRITEAOF then immediately kill + send_resp_command(addr, "BGREWRITEAOF"); + thread::sleep(Duration::from_millis(50)); + + // SAFETY: valid PID, SIGKILL is always valid + let ret = unsafe { libc::kill(server.id() as i32, libc::SIGKILL) }; + assert_eq!(ret, 0); + let _ = server.wait(); + + // Restart and verify + let mut server2 = start_moon(&[ + "--port", "16405", "--shards", "1", + "--appendonly", "yes", "--appendfsync", "always", + "--dir", dir_str, + ]); + thread::sleep(Duration::from_secs(2)); + + let after = get_dbsize(addr); + let _ = send_resp_command(addr, "SHUTDOWN NOSAVE"); + let _ = server2.kill(); + let _ = server2.wait(); + + assert!( + after >= 500, + "crash_during_bgrewriteaof: RPO violation — {} of 500 keys survived", + after, + ); + } + /// G14: SIGKILL during disk-offload spill. /// /// Triggers cold-tier spill with a very low threshold, then crashes. From 1d6bd862e15b38e00cfbe0b7d278b7caa57dea7b Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:52:59 +0700 Subject: [PATCH 06/11] feat(compat): add stream, Lua scripting, and ACL compatibility tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New redis_compat.rs test coverage: - Streams: XADD, XLEN, XRANGE, XTRIM MAXLEN - Lua: EVAL return string, EVAL with KEYS/ARGV, EVALSHA after SCRIPT LOAD, SCRIPT EXISTS/FLUSH - ACL: WHOAMI, LIST (verify default user exists) All tests are #[ignore] — require a running Moon instance. --- tests/redis_compat.rs | 125 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/tests/redis_compat.rs b/tests/redis_compat.rs index 84d709db..4c5819d4 100644 --- a/tests/redis_compat.rs +++ b/tests/redis_compat.rs @@ -469,3 +469,128 @@ fn overwrite_different_type() { let v: String = c.get("ow:key").unwrap(); assert_eq!(v, "new-val"); } + +// ── Streams ──────────────────────────────────────────────────────────── + +#[test] +#[ignore] +fn xadd_and_xlen() { + let mut c = sync_conn(); + let _: () = redis::cmd("DEL").arg("compat:stream1").query(&mut c).unwrap(); + let id: String = redis::cmd("XADD") + .arg("compat:stream1").arg("*").arg("field1").arg("value1") + .query(&mut c).unwrap(); + assert!(!id.is_empty(), "XADD should return an entry ID"); + let len: i64 = redis::cmd("XLEN").arg("compat:stream1").query(&mut c).unwrap(); + assert_eq!(len, 1); +} + +#[test] +#[ignore] +fn xrange_basic() { + let mut c = sync_conn(); + let _: () = redis::cmd("DEL").arg("compat:stream2").query(&mut c).unwrap(); + for i in 0..3 { + let _: String = redis::cmd("XADD") + .arg("compat:stream2").arg("*").arg("i").arg(i.to_string()) + .query(&mut c).unwrap(); + } + let entries: Vec = redis::cmd("XRANGE") + .arg("compat:stream2").arg("-").arg("+") + .query(&mut c).unwrap(); + assert_eq!(entries.len(), 3); +} + +#[test] +#[ignore] +fn xtrim_maxlen() { + let mut c = sync_conn(); + let _: () = redis::cmd("DEL").arg("compat:stream3").query(&mut c).unwrap(); + for i in 0..10 { + let _: String = redis::cmd("XADD") + .arg("compat:stream3").arg("*").arg("i").arg(i.to_string()) + .query(&mut c).unwrap(); + } + let trimmed: i64 = redis::cmd("XTRIM") + .arg("compat:stream3").arg("MAXLEN").arg("5") + .query(&mut c).unwrap(); + assert!(trimmed >= 5, "XTRIM should remove at least 5 entries"); + let len: i64 = redis::cmd("XLEN").arg("compat:stream3").query(&mut c).unwrap(); + assert_eq!(len, 5); +} + +// ── Lua scripting ────────────────────────────────────────────────────── + +#[test] +#[ignore] +fn eval_return_string() { + let mut c = sync_conn(); + let result: String = redis::cmd("EVAL") + .arg("return 'hello'").arg(0) + .query(&mut c).unwrap(); + assert_eq!(result, "hello"); +} + +#[test] +#[ignore] +fn eval_keys_and_argv() { + let mut c = sync_conn(); + let _: () = c.set("compat:lua1", "world").unwrap(); + let result: String = redis::cmd("EVAL") + .arg("return redis.call('GET', KEYS[1])").arg(1).arg("compat:lua1") + .query(&mut c).unwrap(); + assert_eq!(result, "world"); +} + +#[test] +#[ignore] +fn evalsha_after_script_load() { + let mut c = sync_conn(); + let sha: String = redis::cmd("SCRIPT") + .arg("LOAD").arg("return 42") + .query(&mut c).unwrap(); + assert_eq!(sha.len(), 40, "SHA1 should be 40 hex chars"); + let result: i64 = redis::cmd("EVALSHA") + .arg(&sha).arg(0) + .query(&mut c).unwrap(); + assert_eq!(result, 42); +} + +#[test] +#[ignore] +fn script_exists_and_flush() { + let mut c = sync_conn(); + let sha: String = redis::cmd("SCRIPT") + .arg("LOAD").arg("return 1") + .query(&mut c).unwrap(); + let exists: Vec = redis::cmd("SCRIPT") + .arg("EXISTS").arg(&sha) + .query(&mut c).unwrap(); + assert_eq!(exists, vec![true]); + let _: () = redis::cmd("SCRIPT").arg("FLUSH").query(&mut c).unwrap(); + let exists2: Vec = redis::cmd("SCRIPT") + .arg("EXISTS").arg(&sha) + .query(&mut c).unwrap(); + assert_eq!(exists2, vec![false]); +} + +// ── ACL ──────────────────────────────────────────────────────────────── + +#[test] +#[ignore] +fn acl_whoami() { + let mut c = sync_conn(); + let user: String = redis::cmd("ACL").arg("WHOAMI").query(&mut c).unwrap(); + assert_eq!(user, "default"); +} + +#[test] +#[ignore] +fn acl_list_contains_default() { + let mut c = sync_conn(); + let users: Vec = redis::cmd("ACL").arg("LIST").query(&mut c).unwrap(); + assert!( + users.iter().any(|u| u.contains("default")), + "ACL LIST should contain 'default' user" + ); +} From daf5f4913e7367442e7dbd0623d005e6c9688187 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:53:27 +0700 Subject: [PATCH 07/11] docs: CHANGELOG entry for wave 0-4 gap closure Covers: ZREVRANGEBYSCORE fix, INFO enrichment, tracing spans, repl lag metric, CI supply chain, aarch64 release build, crash matrix, and expanded compat tests. --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96e51f2c..9d7c2b28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed — Wave 0-4 Gap Closure (2026-04-09) + +- **ZREVRANGEBYSCORE/ZREVRANGEBYLEX correctness bug:** Fixed double-swap of min/max bounds in `zrange_by_score` and `zrange_by_lex` that caused empty results for finite score ranges (e.g., `ZREVRANGEBYSCORE key 3 1`). Added finite-range test to `test-commands.sh`. +- **INFO command enriched:** Clients section now reports `connected_clients`, Memory section reports `used_memory`/`used_memory_human`/`used_memory_rss` (from `/proc/self/status`), Replication section wired to actual `ReplicationState` (role, connected_slaves, master_replid, master_repl_offset). +- **Tracing spans:** Added `#[instrument]` to connection handlers (single, monoio), replication master (tokio, monoio), HNSW compaction, and AOF rewrite — 6 new spans. +- **Replication lag metric wired:** `moon_replication_lag_bytes` Prometheus gauge now updated from `get_replication_info()`. +- **CI supply chain security:** `cargo deny check` + `cargo audit` added to CI pipeline (deny.toml was previously unenforced). +- **Release pipeline:** aarch64-unknown-linux-gnu build added via `cross` for primary production target. +- **Crash matrix expanded:** BGSAVE and BGREWRITEAOF crash cells added (6/7 coverage). +- **Compatibility tests expanded:** Stream (XADD/XLEN/XRANGE/XTRIM), Lua scripting (EVAL/EVALSHA/SCRIPT), and ACL (WHOAMI/LIST) tests added to `redis_compat.rs`. + ### Added — Production Contract (Phase 87, 2026-04-08) - **`docs/PRODUCTION-CONTRACT.md`** — Moon's v1.0 promises: per-command-class SLOs (provisional until Phase 97), supported platform matrix (Linux aarch64 primary, Linux x86_64 secondary contingent on `PERF-04`, macOS dev-only via OrbStack), durability mode semantics per `appendfsync` × failure-class, availability & replication guarantees, security guarantees, explicit out-of-scope list, and a machine-checkable GA Exit Criteria checklist that every v0.1.3 phase ticks off. This is the contract every downstream hardening phase (88–100) tests against. From 2f427abbb14a56934cbc34020a98bad9ab096071 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Thu, 9 Apr 2026 23:55:37 +0700 Subject: [PATCH 08/11] style: rustfmt formatting for wave 0-4 changes --- src/command/connection.rs | 3 +- tests/durability/crash_matrix.rs | 52 +++++++++++---- tests/redis_compat.rs | 106 ++++++++++++++++++++++--------- 3 files changed, 118 insertions(+), 43 deletions(-) diff --git a/src/command/connection.rs b/src/command/connection.rs index 28a6bf19..3f91e743 100644 --- a/src/command/connection.rs +++ b/src/command/connection.rs @@ -261,8 +261,7 @@ pub fn info(db: &Database, _args: &[Frame]) -> Frame { // # Replication sections.push_str("# Replication\r\n"); - let (role, slaves, offset, repl_id) = - crate::admin::metrics_setup::get_replication_info(); + let (role, slaves, offset, repl_id) = crate::admin::metrics_setup::get_replication_info(); let _ = write!( sections, "role:{role}\r\n\ diff --git a/tests/durability/crash_matrix.rs b/tests/durability/crash_matrix.rs index d4ed606d..fbc3f40c 100644 --- a/tests/durability/crash_matrix.rs +++ b/tests/durability/crash_matrix.rs @@ -250,9 +250,16 @@ mod tests { let addr = "127.0.0.1:16404"; let mut server = start_moon(&[ - "--port", "16404", "--shards", "1", - "--appendonly", "yes", "--appendfsync", "always", - "--dir", dir_str, + "--port", + "16404", + "--shards", + "1", + "--appendonly", + "yes", + "--appendfsync", + "always", + "--dir", + dir_str, ]); thread::sleep(Duration::from_millis(500)); @@ -270,9 +277,16 @@ mod tests { // Restart and verify let mut server2 = start_moon(&[ - "--port", "16404", "--shards", "1", - "--appendonly", "yes", "--appendfsync", "always", - "--dir", dir_str, + "--port", + "16404", + "--shards", + "1", + "--appendonly", + "yes", + "--appendfsync", + "always", + "--dir", + dir_str, ]); thread::sleep(Duration::from_secs(2)); @@ -298,9 +312,16 @@ mod tests { let addr = "127.0.0.1:16405"; let mut server = start_moon(&[ - "--port", "16405", "--shards", "1", - "--appendonly", "yes", "--appendfsync", "always", - "--dir", dir_str, + "--port", + "16405", + "--shards", + "1", + "--appendonly", + "yes", + "--appendfsync", + "always", + "--dir", + dir_str, ]); thread::sleep(Duration::from_millis(500)); @@ -317,9 +338,16 @@ mod tests { // Restart and verify let mut server2 = start_moon(&[ - "--port", "16405", "--shards", "1", - "--appendonly", "yes", "--appendfsync", "always", - "--dir", dir_str, + "--port", + "16405", + "--shards", + "1", + "--appendonly", + "yes", + "--appendfsync", + "always", + "--dir", + dir_str, ]); thread::sleep(Duration::from_secs(2)); diff --git a/tests/redis_compat.rs b/tests/redis_compat.rs index 4c5819d4..6c2c9090 100644 --- a/tests/redis_compat.rs +++ b/tests/redis_compat.rs @@ -476,12 +476,22 @@ fn overwrite_different_type() { #[ignore] fn xadd_and_xlen() { let mut c = sync_conn(); - let _: () = redis::cmd("DEL").arg("compat:stream1").query(&mut c).unwrap(); + let _: () = redis::cmd("DEL") + .arg("compat:stream1") + .query(&mut c) + .unwrap(); let id: String = redis::cmd("XADD") - .arg("compat:stream1").arg("*").arg("field1").arg("value1") - .query(&mut c).unwrap(); + .arg("compat:stream1") + .arg("*") + .arg("field1") + .arg("value1") + .query(&mut c) + .unwrap(); assert!(!id.is_empty(), "XADD should return an entry ID"); - let len: i64 = redis::cmd("XLEN").arg("compat:stream1").query(&mut c).unwrap(); + let len: i64 = redis::cmd("XLEN") + .arg("compat:stream1") + .query(&mut c) + .unwrap(); assert_eq!(len, 1); } @@ -489,15 +499,25 @@ fn xadd_and_xlen() { #[ignore] fn xrange_basic() { let mut c = sync_conn(); - let _: () = redis::cmd("DEL").arg("compat:stream2").query(&mut c).unwrap(); + let _: () = redis::cmd("DEL") + .arg("compat:stream2") + .query(&mut c) + .unwrap(); for i in 0..3 { let _: String = redis::cmd("XADD") - .arg("compat:stream2").arg("*").arg("i").arg(i.to_string()) - .query(&mut c).unwrap(); + .arg("compat:stream2") + .arg("*") + .arg("i") + .arg(i.to_string()) + .query(&mut c) + .unwrap(); } let entries: Vec = redis::cmd("XRANGE") - .arg("compat:stream2").arg("-").arg("+") - .query(&mut c).unwrap(); + .arg("compat:stream2") + .arg("-") + .arg("+") + .query(&mut c) + .unwrap(); assert_eq!(entries.len(), 3); } @@ -505,17 +525,30 @@ fn xrange_basic() { #[ignore] fn xtrim_maxlen() { let mut c = sync_conn(); - let _: () = redis::cmd("DEL").arg("compat:stream3").query(&mut c).unwrap(); + let _: () = redis::cmd("DEL") + .arg("compat:stream3") + .query(&mut c) + .unwrap(); for i in 0..10 { let _: String = redis::cmd("XADD") - .arg("compat:stream3").arg("*").arg("i").arg(i.to_string()) - .query(&mut c).unwrap(); + .arg("compat:stream3") + .arg("*") + .arg("i") + .arg(i.to_string()) + .query(&mut c) + .unwrap(); } let trimmed: i64 = redis::cmd("XTRIM") - .arg("compat:stream3").arg("MAXLEN").arg("5") - .query(&mut c).unwrap(); + .arg("compat:stream3") + .arg("MAXLEN") + .arg("5") + .query(&mut c) + .unwrap(); assert!(trimmed >= 5, "XTRIM should remove at least 5 entries"); - let len: i64 = redis::cmd("XLEN").arg("compat:stream3").query(&mut c).unwrap(); + let len: i64 = redis::cmd("XLEN") + .arg("compat:stream3") + .query(&mut c) + .unwrap(); assert_eq!(len, 5); } @@ -526,8 +559,10 @@ fn xtrim_maxlen() { fn eval_return_string() { let mut c = sync_conn(); let result: String = redis::cmd("EVAL") - .arg("return 'hello'").arg(0) - .query(&mut c).unwrap(); + .arg("return 'hello'") + .arg(0) + .query(&mut c) + .unwrap(); assert_eq!(result, "hello"); } @@ -537,8 +572,11 @@ fn eval_keys_and_argv() { let mut c = sync_conn(); let _: () = c.set("compat:lua1", "world").unwrap(); let result: String = redis::cmd("EVAL") - .arg("return redis.call('GET', KEYS[1])").arg(1).arg("compat:lua1") - .query(&mut c).unwrap(); + .arg("return redis.call('GET', KEYS[1])") + .arg(1) + .arg("compat:lua1") + .query(&mut c) + .unwrap(); assert_eq!(result, "world"); } @@ -547,12 +585,16 @@ fn eval_keys_and_argv() { fn evalsha_after_script_load() { let mut c = sync_conn(); let sha: String = redis::cmd("SCRIPT") - .arg("LOAD").arg("return 42") - .query(&mut c).unwrap(); + .arg("LOAD") + .arg("return 42") + .query(&mut c) + .unwrap(); assert_eq!(sha.len(), 40, "SHA1 should be 40 hex chars"); let result: i64 = redis::cmd("EVALSHA") - .arg(&sha).arg(0) - .query(&mut c).unwrap(); + .arg(&sha) + .arg(0) + .query(&mut c) + .unwrap(); assert_eq!(result, 42); } @@ -561,16 +603,22 @@ fn evalsha_after_script_load() { fn script_exists_and_flush() { let mut c = sync_conn(); let sha: String = redis::cmd("SCRIPT") - .arg("LOAD").arg("return 1") - .query(&mut c).unwrap(); + .arg("LOAD") + .arg("return 1") + .query(&mut c) + .unwrap(); let exists: Vec = redis::cmd("SCRIPT") - .arg("EXISTS").arg(&sha) - .query(&mut c).unwrap(); + .arg("EXISTS") + .arg(&sha) + .query(&mut c) + .unwrap(); assert_eq!(exists, vec![true]); let _: () = redis::cmd("SCRIPT").arg("FLUSH").query(&mut c).unwrap(); let exists2: Vec = redis::cmd("SCRIPT") - .arg("EXISTS").arg(&sha) - .query(&mut c).unwrap(); + .arg("EXISTS") + .arg(&sha) + .query(&mut c) + .unwrap(); assert_eq!(exists2, vec![false]); } From 3d5372a7ca7eaf9768016d6c19785ce2840275fd Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Fri, 10 Apr 2026 00:56:26 +0700 Subject: [PATCH 09/11] fix(readyz): wire set_server_ready() call at startup READYZ command always returned "ERR server not ready" because set_server_ready() was never called. The HTTP /readyz endpoint worked via a separate readiness_flag AtomicBool, but the Redis READYZ command used is_server_ready() which checks SERVER_READY. Added set_server_ready() call after shard recovery completes, immediately before the existing readiness_flag.store(). --- src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main.rs b/src/main.rs index a97fa2ef..66152d5c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -391,6 +391,7 @@ fn main() -> anyhow::Result<()> { let shard_databases = ShardDatabases::new(all_dbs); // All shards recovered — mark server as ready for /readyz. + moon::admin::metrics_setup::set_server_ready(); if let Some(ref flag) = readiness_flag { flag.store(true, std::sync::atomic::Ordering::Relaxed); tracing::info!("All shards ready — /readyz returning 200"); From 76510b4ba7494dd4294fd919fceb1049c32309ea Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Fri, 10 Apr 2026 08:26:50 +0700 Subject: [PATCH 10/11] ci: re-trigger CI after stale queue flush From 81cad44970860ecfb7c10eb264acc53b11f53518 Mon Sep 17 00:00:00 2001 From: Tin Dang Date: Fri, 10 Apr 2026 09:03:36 +0700 Subject: [PATCH 11/11] ci: re-trigger CI after runner recovery --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d7c2b28..5fbbdde8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,6 @@ # Changelog All notable changes to this project will be documented in this file. - The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).