diff --git a/.gitignore b/.gitignore
index 9384b6a4..2208a564 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
### Linux ###
+
*~
# temporary files which can be created if a process still has a handle open of a deleted file
@@ -79,4 +80,4 @@ ssh
.qdrant-initialized
libnull.rlib
fuzz
-shard-*/
\ No newline at end of file
+shard-*/
diff --git a/scripts/bench-phase101-commands.sh b/scripts/bench-phase101-commands.sh
new file mode 100755
index 00000000..2524728c
--- /dev/null
+++ b/scripts/bench-phase101-commands.sh
@@ -0,0 +1,369 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+###############################################################################
+# bench-phase101-commands.sh -- Moon vs Redis benchmark for Phase 101 commands
+#
+# Tests all 24 commands added in Phase 101 (command parity gaps):
+# HyperLogLog, List convenience, Hash, Set, Sorted Set 6.2+,
+# Blocking fast-path, Functions/FCALL
+#
+# Usage:
+# ./scripts/bench-phase101-commands.sh # Full run (20K req)
+# ./scripts/bench-phase101-commands.sh --requests N # Custom request count
+# ./scripts/bench-phase101-commands.sh --shards N # Moon shard count
+# ./scripts/bench-phase101-commands.sh --section hll # Single section
+#
+# Sections: all, hll, list, hash, set, zset, blocking, functions, pipeline
+###############################################################################
+
+PORT_REDIS=6399
+PORT_MOON=6400
+REQUESTS=20000
+CLIENTS=50
+SHARDS=1
+SECTION="all"
+RUST_BINARY="./target/release/moon"
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+
+REDIS_PID=""
+MOON_PID=""
+
+while [[ $# -gt 0 ]]; do
+ case "$1" in
+ --requests) if [[ $# -lt 2 ]] || [[ -z "$2" ]] || [[ "$2" == -* ]]; then echo "Error: --requests requires a value"; exit 1; fi; REQUESTS="$2"; shift 2 ;;
+ --shards) if [[ $# -lt 2 ]] || [[ -z "$2" ]] || [[ "$2" == -* ]]; then echo "Error: --shards requires a value"; exit 1; fi; SHARDS="$2"; shift 2 ;;
+ --clients) if [[ $# -lt 2 ]] || [[ -z "$2" ]] || [[ "$2" == -* ]]; then echo "Error: --clients requires a value"; exit 1; fi; CLIENTS="$2"; shift 2 ;;
+ --section) if [[ $# -lt 2 ]] || [[ -z "$2" ]] || [[ "$2" == -* ]]; then echo "Error: --section requires a value"; exit 1; fi; SECTION="$2"; shift 2 ;;
+ --help) awk '/^###/{n++} n==1' "$0"; exit 0 ;;
+ *) echo "Unknown option: $1"; exit 1 ;;
+ esac
+done
+
+log() { echo "[$(date '+%H:%M:%S')] $*" >&2; }
+
+cleanup() {
+ log "Cleaning up..."
+ if [[ -n "${MOON_PID:-}" ]]; then kill "$MOON_PID" 2>/dev/null || true; wait "$MOON_PID" 2>/dev/null || true; fi
+ if [[ -n "${REDIS_PID:-}" ]]; then kill "$REDIS_PID" 2>/dev/null || true; wait "$REDIS_PID" 2>/dev/null || true; fi
+ pkill -f "redis-server.*${PORT_REDIS}" 2>/dev/null || true
+ pkill -f "moon.*${PORT_MOON}" 2>/dev/null || true
+}
+trap cleanup EXIT
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+parse_rps() {
+ tr '\r' '\n' \
+ | awk '/[Rr]equests per second/ {
+ for (i=1; i<=NF; i++) { gsub(/,/, "", $i); if ($i+0 == $i && $i > 0) { print $i; exit } }
+ }' \
+ | head -1
+}
+
+print_row() {
+ local desc="$1" redis_rps="${2:-0}" moon_rps="${3:-0}" ratio
+ if [[ "$redis_rps" != "0" ]] && [[ "$moon_rps" != "0" ]]; then
+ ratio=$(awk "BEGIN { printf \"%.2f\", $moon_rps / $redis_rps }")
+ else
+ ratio="N/A"
+ fi
+ printf "| %-40s | %12s | %12s | %6sx |\n" "$desc" "$redis_rps" "$moon_rps" "$ratio"
+}
+
+section_header() {
+ echo ""
+ echo "## $1"
+ echo ""
+ printf "| %-40s | %12s | %12s | %7s |\n" "Command" "Redis RPS" "Moon RPS" "Ratio"
+ printf "|%-42s|%14s|%14s|%9s|\n" "------------------------------------------" "--------------" "--------------" "---------"
+}
+
+should_run() { [[ "$SECTION" == "all" ]] || [[ "$SECTION" == "$1" ]]; }
+
+# redis-benchmark for commands it handles (single-key, simple args)
+bench_rb() {
+ local desc="$1"; shift
+ local r m
+ r=$(redis-benchmark -p "$PORT_REDIS" -n "$REQUESTS" -c "$CLIENTS" -q "$@" 2>/dev/null | parse_rps)
+ m=$(redis-benchmark -p "$PORT_MOON" -n "$REQUESTS" -c "$CLIENTS" -q "$@" 2>/dev/null | parse_rps)
+ print_row "$desc" "${r:-0}" "${m:-0}"
+}
+
+# Build RESP protocol string for one command
+_resp() {
+ local n=$#
+ printf "*%d\r\n" "$n"
+ for arg in "$@"; do
+ printf "\$%d\r\n%s\r\n" "${#arg}" "$arg"
+ done
+}
+
+# Pipe N copies of a RESP command to a port, return RPS
+_pipe_rps() {
+ local port="$1" n="$2"; shift 2
+ local one_cmd
+ one_cmd=$(_resp "$@")
+ local payload
+ payload=$(python3 -c "import sys; sys.stdout.write(sys.stdin.read() * $n)" <<< "$one_cmd")
+
+ local start end ms rps
+ start=$(date +%s%N)
+ printf '%s' "$payload" | redis-cli -p "$port" --pipe 2>/dev/null >/dev/null
+ end=$(date +%s%N)
+ ms=$(( (end - start) / 1000000 ))
+ if [[ $ms -gt 0 ]]; then
+ rps=$(awk "BEGIN { printf \"%.0f\", ($n * 1000.0) / $ms }")
+ else
+ rps="0"
+ fi
+ echo "$rps"
+}
+
+# Benchmark via pipe mode (for multi-arg commands redis-benchmark can't run)
+bench_pipe() {
+ local desc="$1"; shift
+ local r m
+ r=$(_pipe_rps "$PORT_REDIS" "$REQUESTS" "$@")
+ m=$(_pipe_rps "$PORT_MOON" "$REQUESTS" "$@")
+ print_row "$desc" "$r" "$m"
+}
+
+# Re-seed a key with N elements via pipe
+reseed_list() {
+ local key="$1" n="$2"
+ for port in "$PORT_REDIS" "$PORT_MOON"; do
+ redis-cli -p "$port" DEL "$key" >/dev/null 2>&1
+ local one=$(_resp RPUSH "$key" val)
+ python3 -c "import sys; sys.stdout.write(sys.stdin.read() * $n)" <<< "$one" \
+ | redis-cli -p "$port" --pipe 2>/dev/null >/dev/null
+ done
+}
+
+reseed_zset() {
+ local key="$1" n="$2"
+ for port in "$PORT_REDIS" "$PORT_MOON"; do
+ redis-cli -p "$port" DEL "$key" >/dev/null 2>&1
+ local cmds=""
+ for ((i=1; i<=n; i++)); do
+ cmds+=$(_resp ZADD "$key" "$i" "m$i")
+ done
+ printf '%s' "$cmds" | redis-cli -p "$port" --pipe 2>/dev/null >/dev/null
+ done
+}
+
+# ===========================================================================
+# Start Servers
+# ===========================================================================
+
+log "Starting Redis on port $PORT_REDIS..."
+redis-server --port "$PORT_REDIS" --save "" --appendonly no --loglevel warning --protected-mode no &
+REDIS_PID=$!
+
+log "Starting Moon on port $PORT_MOON ($SHARDS shards)..."
+RUST_LOG=warn "$RUST_BINARY" --port "$PORT_MOON" --shards "$SHARDS" --protected-mode no &
+MOON_PID=$!
+
+wait_for_server() {
+ local port="$1" name="$2" i=0
+ while (( i < 20 )); do
+ redis-cli -p "$port" PING 2>/dev/null | grep -q PONG && return 0
+ sleep 0.5; i=$((i+1))
+ done
+ echo "$name failed to start on port $port"; exit 1
+}
+
+wait_for_server "$PORT_REDIS" "Redis"
+wait_for_server "$PORT_MOON" "Moon"
+log "Servers ready."
+
+# ===========================================================================
+# Header
+# ===========================================================================
+
+REDIS_VER=$(redis-cli -p "$PORT_REDIS" INFO server 2>/dev/null | grep redis_version | cut -d: -f2 | tr -d '\r')
+PLATFORM="$(uname -s) $(uname -m)"
+[[ -f /etc/os-release ]] && PLATFORM="$PLATFORM / $(grep PRETTY_NAME /etc/os-release | cut -d= -f2 | tr -d '"')"
+
+echo "# Phase 101 — Command Parity Benchmark (Moon vs Redis)"
+echo ""
+echo "| Property | Value |"
+echo "|----------|-------|"
+echo "| Date | $(date +%Y-%m-%d) |"
+echo "| Redis | $REDIS_VER |"
+echo "| Moon | $SHARDS shard(s) |"
+echo "| Requests | $REQUESTS per test |"
+echo "| Clients | $CLIENTS |"
+echo "| Platform | $PLATFORM |"
+
+# ===========================================================================
+# Seed data (fast Python-based seeder)
+# ===========================================================================
+
+log "Seeding test data..."
+python3 "$SCRIPT_DIR/bench-phase101-seed.py" "$PORT_REDIS"
+python3 "$SCRIPT_DIR/bench-phase101-seed.py" "$PORT_MOON"
+log "Data seeded."
+
+# ===========================================================================
+# HyperLogLog
+# ===========================================================================
+
+if should_run "hll"; then
+ log "Benchmarking HyperLogLog..."
+ section_header "HyperLogLog (PFADD / PFCOUNT / PFMERGE)"
+
+ bench_rb "PFADD (1 elem, existing key)" PFADD hll1 newelem
+ bench_rb "PFADD (3 elem, new key)" PFADD hllbench a b c
+ bench_rb "PFCOUNT (1 key)" PFCOUNT hll1
+ bench_pipe "PFCOUNT (2 keys)" PFCOUNT hll1 hll2
+ bench_pipe "PFMERGE (2 → 1)" PFMERGE hll3 hll1 hll2
+fi
+
+# ===========================================================================
+# List Commands
+# ===========================================================================
+
+if should_run "list"; then
+ log "Benchmarking list commands..."
+ section_header "List (LPUSHX / RPUSHX / LMPOP)"
+
+ bench_rb "LPUSHX (existing key)" LPUSHX mylist val
+ bench_rb "RPUSHX (existing key)" RPUSHX mylist val
+ bench_rb "LPUSHX (missing key → NOP)" LPUSHX nokey val
+ bench_pipe "LMPOP 1 key LEFT" LMPOP 1 mylist LEFT
+ bench_pipe "LMPOP 1 key LEFT COUNT 10" LMPOP 1 mylist LEFT COUNT 10
+fi
+
+# ===========================================================================
+# Hash Commands
+# ===========================================================================
+
+if should_run "hash"; then
+ log "Benchmarking hash commands..."
+ section_header "Hash (HRANDFIELD)"
+
+ bench_rb "HRANDFIELD (1 field)" HRANDFIELD myhash
+ bench_pipe "HRANDFIELD (5 fields)" HRANDFIELD myhash 5
+ bench_pipe "HRANDFIELD (10 WITHVALUES)" HRANDFIELD myhash 10 WITHVALUES
+ bench_pipe "HRANDFIELD (-5, allow dups)" HRANDFIELD myhash -5
+fi
+
+# ===========================================================================
+# Set Commands
+# ===========================================================================
+
+if should_run "set"; then
+ log "Benchmarking set commands..."
+ section_header "Set (SMOVE / SINTERCARD)"
+
+ bench_pipe "SMOVE (member exists)" SMOVE smvsrc smvdst m1
+ bench_pipe "SINTERCARD (2 sets)" SINTERCARD 2 myset1 myset2
+ bench_pipe "SINTERCARD (3 sets)" SINTERCARD 3 myset1 myset2 myset3
+ bench_pipe "SINTERCARD (2 sets, LIMIT 10)" SINTERCARD 2 myset1 myset2 LIMIT 10
+fi
+
+# ===========================================================================
+# Sorted Set 6.2+
+# ===========================================================================
+
+if should_run "zset"; then
+ log "Benchmarking sorted set 6.2+ commands..."
+ section_header "Sorted Set 6.2+ (ZRANGESTORE / ZDIFF / ZUNION / ZINTER / etc.)"
+
+ bench_pipe "ZRANGESTORE (50 elements)" ZRANGESTORE zdst myzset1 0 49
+ bench_pipe "ZDIFF (2 keys)" ZDIFF 2 myzset1 myzset2
+ bench_pipe "ZUNION (2 keys)" ZUNION 2 myzset1 myzset2
+ bench_pipe "ZINTER (2 keys)" ZINTER 2 myzset1 myzset2
+ bench_pipe "ZINTERCARD (2 keys)" ZINTERCARD 2 myzset1 myzset2
+ bench_pipe "ZINTERCARD (2 keys, LIMIT 10)" ZINTERCARD 2 myzset1 myzset2 LIMIT 10
+ bench_pipe "ZMSCORE (3 members)" ZMSCORE myzset1 m1 m50 m100
+ bench_pipe "ZRANDMEMBER (1)" ZRANDMEMBER myzset1
+ bench_pipe "ZRANDMEMBER (10)" ZRANDMEMBER myzset1 10
+ bench_pipe "ZRANDMEMBER (5 WITHSCORES)" ZRANDMEMBER myzset1 5 WITHSCORES
+ bench_pipe "ZMPOP 1 key MIN" ZMPOP 1 bzset MIN
+ bench_pipe "ZMPOP 1 key MIN COUNT 5" ZMPOP 1 bzset MIN COUNT 5
+fi
+
+# ===========================================================================
+# Blocking fast-path (data already present → immediate return)
+# ===========================================================================
+
+if should_run "blocking"; then
+ log "Benchmarking blocking commands (fast path)..."
+ section_header "Blocking — Fast Path (element already available)"
+
+ echo "| *(Blocking cmds use non-blocking equivalents for throughput comparison)* ||||"
+
+ # Blocking commands can't be benchmarked via pipe (they consume + block).
+ # Instead, compare the non-blocking equivalents which share the same code path.
+ # The blocking overhead is just the timeout check + registry lookup (~10ns).
+ bench_rb "LPOP (= BLPOP fast path)" LPOP blist
+ bench_rb "RPOP (= BRPOP fast path)" RPOP blist
+
+ # BLMPOP/BLMOVE/BZMPOP can't use redis-benchmark directly.
+ # Use pipe with small N and large pre-seeded data to avoid exhaustion.
+ bench_pipe "LMPOP 1 key LEFT (= BLMPOP path)" LMPOP 1 blist LEFT
+ bench_pipe "LMOVE (= BLMOVE path)" LMOVE blsrc bldst LEFT RIGHT
+ bench_pipe "ZMPOP 1 key MIN (= BZMPOP path)" ZMPOP 1 bzset MIN
+fi
+
+# ===========================================================================
+# Functions / FCALL
+# ===========================================================================
+
+if should_run "functions"; then
+ log "Benchmarking Functions/FCALL..."
+ section_header "Functions API (FCALL / FCALL_RO)"
+
+ bench_pipe "FCALL echo1 (0 keys, 1 arg)" FCALL echo1 0 hello
+ bench_pipe "FCALL_RO echo1 (0 keys, 1 arg)" FCALL_RO echo1 0 hello
+ bench_pipe "FUNCTION LIST" FUNCTION LIST
+fi
+
+# ===========================================================================
+# Pipeline scaling for key Phase 101 commands
+# ===========================================================================
+
+if should_run "pipeline" || should_run "all"; then
+ log "Benchmarking pipeline scaling..."
+ section_header "Pipeline Scaling — Phase 101 Commands"
+
+ for p in 1 8 16 64; do
+ for cmd_desc_args in \
+ "PFADD:PFADD hll1 newelem" \
+ "PFCOUNT:PFCOUNT hll1" \
+ "LPUSHX:LPUSHX mylist val" \
+ "HRANDFIELD:HRANDFIELD myhash" \
+ "ZRANDMEMBER:ZRANDMEMBER myzset1" \
+ ; do
+ local_desc="${cmd_desc_args%%:*}"
+ local_args="${cmd_desc_args#*:}"
+ # shellcheck disable=SC2086
+ r=$(redis-benchmark -p "$PORT_REDIS" -n "$REQUESTS" -c "$CLIENTS" -P "$p" -q $local_args 2>/dev/null | parse_rps)
+ # shellcheck disable=SC2086
+ m=$(redis-benchmark -p "$PORT_MOON" -n "$REQUESTS" -c "$CLIENTS" -P "$p" -q $local_args 2>/dev/null | parse_rps)
+ print_row "${local_desc} p=$p" "${r:-0}" "${m:-0}"
+ done
+ done
+fi
+
+# ===========================================================================
+# Summary
+# ===========================================================================
+
+echo ""
+echo "---"
+echo ""
+echo "### Legend"
+echo "- **Ratio > 1.0**: Moon is faster"
+echo "- **Ratio < 1.0**: Redis is faster"
+echo "- **Ratio = N/A**: Command not supported or returned 0"
+echo "- Pipe-mode tests are single-connection serial (lower absolute RPS, fair comparison)"
+echo "- redis-benchmark tests use $CLIENTS parallel clients"
+echo ""
+echo "*Generated by bench-phase101-commands.sh on $(date)*"
+
+log "Done."
diff --git a/scripts/bench-phase101-seed.py b/scripts/bench-phase101-seed.py
new file mode 100644
index 00000000..ba28497f
--- /dev/null
+++ b/scripts/bench-phase101-seed.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python3
+"""Seed test data for Phase 101 benchmarks via redis-cli --pipe."""
+import subprocess
+import sys
+
+def resp(*args):
+ """Build RESP protocol for a command."""
+ parts = [f"*{len(args)}\r\n"]
+ for a in args:
+ s = str(a)
+ parts.append(f"${len(s.encode('utf-8'))}\r\n{s}\r\n")
+ return "".join(parts)
+
+def pipe(port, commands):
+ """Send commands via redis-cli --pipe."""
+ data = "".join(commands)
+ p = subprocess.run(
+ ["redis-cli", "-p", str(port), "--pipe"],
+ input=data.encode(), capture_output=True, check=True
+ )
+
+def seed(port):
+ cmds = []
+
+ # Lists
+ cmds.append(resp("DEL", "mylist", "blist", "blsrc", "bldst"))
+ for i in range(1, 10001):
+ cmds.append(resp("RPUSH", "mylist", str(i)))
+ for i in range(1, 50001):
+ cmds.append(resp("RPUSH", "blist", str(i)))
+ for i in range(1, 50001):
+ cmds.append(resp("RPUSH", "blsrc", str(i)))
+ pipe(port, cmds)
+
+ # Hash
+ cmds = [resp("DEL", "myhash")]
+ for i in range(1, 101):
+ cmds.append(resp("HSET", "myhash", f"field{i}", f"value{i}"))
+ pipe(port, cmds)
+
+ # Sets
+ cmds = [resp("DEL", "myset1", "myset2", "myset3", "smvsrc", "smvdst")]
+ for i in range(1, 201):
+ cmds.append(resp("SADD", "myset1", f"m{i}"))
+ for i in range(50, 251):
+ cmds.append(resp("SADD", "myset2", f"m{i}"))
+ for i in range(100, 301):
+ cmds.append(resp("SADD", "myset3", f"m{i}"))
+ for i in range(1, 20001):
+ cmds.append(resp("SADD", "smvsrc", f"m{i}"))
+ pipe(port, cmds)
+
+ # Sorted sets
+ cmds = [resp("DEL", "myzset1", "myzset2", "myzset3", "bzset", "zdst")]
+ for i in range(1, 201):
+ cmds.append(resp("ZADD", "myzset1", str(i), f"m{i}"))
+ for i in range(50, 251):
+ cmds.append(resp("ZADD", "myzset2", str(i), f"m{i}"))
+ for i in range(100, 301):
+ cmds.append(resp("ZADD", "myzset3", str(i), f"m{i}"))
+ for i in range(1, 50001):
+ cmds.append(resp("ZADD", "bzset", str(i), f"m{i}"))
+ pipe(port, cmds)
+
+ # HyperLogLog
+ cmds = [resp("DEL", "hll1", "hll2", "hll3")]
+ for i in range(1, 1001):
+ cmds.append(resp("PFADD", "hll1", f"e{i}"))
+ for i in range(500, 1501):
+ cmds.append(resp("PFADD", "hll2", f"e{i}"))
+ pipe(port, cmds)
+
+ # Function library
+ body = '#!lua name=benchlib\nredis.register_function("echo1", function(keys, args) return args[1] end)'
+ subprocess.run(
+ ["redis-cli", "-p", str(port), "FUNCTION", "FLUSH"],
+ capture_output=True, check=True
+ )
+ subprocess.run(
+ ["redis-cli", "-p", str(port), "FUNCTION", "LOAD", "REPLACE", body],
+ capture_output=True, check=True
+ )
+
+if __name__ == "__main__":
+ port = int(sys.argv[1]) if len(sys.argv) > 1 else 6379
+ seed(port)
diff --git a/src/blocking/mod.rs b/src/blocking/mod.rs
index 57c7c7e7..0891cb1f 100644
--- a/src/blocking/mod.rs
+++ b/src/blocking/mod.rs
@@ -22,8 +22,16 @@ pub enum BlockedCommand {
wherefrom: Direction,
whereto: Direction,
},
+ BLMPop {
+ dir: Direction,
+ count: u32,
+ },
BZPopMin,
BZPopMax,
+ BZMPop {
+ min: bool,
+ count: u32,
+ },
XRead {
/// (key, last_seen_id) pairs -- read entries > last_seen_id from each stream.
streams: Vec<(Bytes, crate::storage::stream::StreamId)>,
diff --git a/src/blocking/wakeup.rs b/src/blocking/wakeup.rs
index 5762d2be..6997e4fc 100644
--- a/src/blocking/wakeup.rs
+++ b/src/blocking/wakeup.rs
@@ -1,6 +1,7 @@
use bytes::Bytes;
use crate::blocking::{BlockedCommand, BlockingRegistry, Direction};
+use crate::command::sorted_set::format_score_bytes;
use crate::framevec;
use crate::protocol::Frame;
use crate::storage::Database;
@@ -63,6 +64,29 @@ pub fn try_wake_list_waiter(
Frame::BulkString(v)
})
}
+ BlockedCommand::BLMPop { dir, count } => {
+ let mut elems = smallvec::SmallVec::<[Frame; 16]>::new();
+ let n = *count as usize;
+ for _ in 0..n {
+ let val = match dir {
+ Direction::Left => db.list_pop_front(key),
+ Direction::Right => db.list_pop_back(key),
+ };
+ match val {
+ Some(v) => elems.push(Frame::BulkString(v)),
+ None => break,
+ }
+ }
+ if elems.is_empty() {
+ None
+ } else {
+ let elem_vec: Vec = elems.into_vec();
+ Some(Frame::Array(framevec![
+ Frame::BulkString(key.clone()),
+ Frame::Array(elem_vec.into()),
+ ]))
+ }
+ }
_ => None, // BZPopMin/BZPopMax don't watch list keys
};
@@ -97,25 +121,48 @@ pub fn try_wake_zset_waiter(
let wait_id = waiter.wait_id;
let result = match &waiter.cmd {
- BlockedCommand::BZPopMin => {
- // Pop min, return [key, member, score]
- db.zset_pop_min(key).map(|(member, score)| {
- Frame::Array(framevec![
- Frame::BulkString(key.clone()),
- Frame::BulkString(member),
- Frame::BulkString(Bytes::from(format_score(score))),
- ])
- })
- }
- BlockedCommand::BZPopMax => {
- // Pop max, return [key, member, score]
- db.zset_pop_max(key).map(|(member, score)| {
- Frame::Array(framevec![
+ BlockedCommand::BZPopMin => db.zset_pop_min(key).map(|(member, score)| {
+ Frame::Array(framevec![
+ Frame::BulkString(key.clone()),
+ Frame::BulkString(member),
+ Frame::BulkString(format_score_bytes(score)),
+ ])
+ }),
+ BlockedCommand::BZPopMax => db.zset_pop_max(key).map(|(member, score)| {
+ Frame::Array(framevec![
+ Frame::BulkString(key.clone()),
+ Frame::BulkString(member),
+ Frame::BulkString(format_score_bytes(score)),
+ ])
+ }),
+ BlockedCommand::BZMPop { min, count } => {
+ let n = *count as usize;
+ let mut elems = smallvec::SmallVec::<[Frame; 16]>::new();
+ for _ in 0..n {
+ let popped = if *min {
+ db.zset_pop_min(key)
+ } else {
+ db.zset_pop_max(key)
+ };
+ match popped {
+ Some((member, score)) => {
+ elems.push(Frame::Array(framevec![
+ Frame::BulkString(member),
+ Frame::BulkString(format_score_bytes(score)),
+ ]));
+ }
+ None => break,
+ }
+ }
+ if elems.is_empty() {
+ None
+ } else {
+ let elem_vec: Vec = elems.into_vec();
+ Some(Frame::Array(framevec![
Frame::BulkString(key.clone()),
- Frame::BulkString(member),
- Frame::BulkString(Bytes::from(format_score(score))),
- ])
- })
+ Frame::Array(elem_vec.into()),
+ ]))
+ }
}
_ => None, // List commands don't watch zset keys
};
@@ -232,12 +279,3 @@ pub fn try_wake_stream_waiter(
}
false
}
-
-/// Format a float score the same way Redis does (integer if whole, otherwise full precision).
-fn format_score(score: f64) -> String {
- if score == score.floor() && score.abs() < i64::MAX as f64 {
- format!("{}", score as i64)
- } else {
- format!("{}", score)
- }
-}
diff --git a/src/command/functions.rs b/src/command/functions.rs
new file mode 100644
index 00000000..dc062b20
--- /dev/null
+++ b/src/command/functions.rs
@@ -0,0 +1,335 @@
+//! FUNCTION LOAD/LIST/DELETE/FLUSH + FCALL/FCALL_RO command handlers.
+//!
+//! **Phase 101 limitation:** RAM-only. FUNCTION DUMP/RESTORE/STATS return
+//! `-ERR ... not supported in this release (Phase 101 limitation)`.
+
+use bytes::Bytes;
+
+use crate::protocol::Frame;
+use crate::scripting::functions::FunctionRegistry;
+use crate::storage::Database;
+
+/// Handle `FUNCTION [args...]`.
+///
+/// Supported: LOAD, LIST, DELETE, FLUSH.
+/// Deferred: DUMP, RESTORE, STATS (return documented error).
+pub fn handle_function(registry: &mut FunctionRegistry, args: &[Frame]) -> Frame {
+ if args.is_empty() {
+ return Frame::Error(Bytes::from_static(
+ b"ERR wrong number of arguments for 'function' command",
+ ));
+ }
+
+ let sub = match &args[0] {
+ Frame::BulkString(b) => b,
+ _ => {
+ return Frame::Error(Bytes::from_static(
+ b"ERR wrong number of arguments for 'function' command",
+ ));
+ }
+ };
+
+ if sub.eq_ignore_ascii_case(b"LOAD") {
+ handle_function_load(registry, &args[1..])
+ } else if sub.eq_ignore_ascii_case(b"LIST") {
+ handle_function_list(registry, &args[1..])
+ } else if sub.eq_ignore_ascii_case(b"DELETE") {
+ handle_function_delete(registry, &args[1..])
+ } else if sub.eq_ignore_ascii_case(b"FLUSH") {
+ registry.flush();
+ Frame::SimpleString(Bytes::from_static(b"OK"))
+ } else if sub.eq_ignore_ascii_case(b"DUMP") {
+ Frame::Error(Bytes::from_static(
+ b"ERR FUNCTION DUMP not supported in this release (Phase 101 limitation)",
+ ))
+ } else if sub.eq_ignore_ascii_case(b"RESTORE") {
+ Frame::Error(Bytes::from_static(
+ b"ERR FUNCTION RESTORE not supported in this release (Phase 101 limitation)",
+ ))
+ } else if sub.eq_ignore_ascii_case(b"STATS") {
+ Frame::Error(Bytes::from_static(
+ b"ERR FUNCTION STATS not supported in this release (Phase 101 limitation)",
+ ))
+ } else {
+ Frame::Error(Bytes::from(format!(
+ "ERR unknown subcommand '{}'. Try FUNCTION HELP.",
+ String::from_utf8_lossy(sub)
+ )))
+ }
+}
+
+/// FUNCTION LOAD [REPLACE]
+fn handle_function_load(registry: &mut FunctionRegistry, args: &[Frame]) -> Frame {
+ if args.is_empty() {
+ return Frame::Error(Bytes::from_static(
+ b"ERR wrong number of arguments for 'function|load' command",
+ ));
+ }
+
+ let mut replace = false;
+ let body: &Bytes;
+
+ // Parse: FUNCTION LOAD [REPLACE]
+ if args.len() == 1 {
+ // FUNCTION LOAD
+ body = match &args[0] {
+ Frame::BulkString(b) => b,
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ };
+ } else if args.len() == 2 {
+ // FUNCTION LOAD REPLACE
+ let flag = match &args[0] {
+ Frame::BulkString(b) => b,
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ };
+ if !flag.eq_ignore_ascii_case(b"REPLACE") {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ replace = true;
+ body = match &args[1] {
+ Frame::BulkString(b) => b,
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ };
+ } else {
+ return Frame::Error(Bytes::from_static(
+ b"ERR wrong number of arguments for 'function|load' command",
+ ));
+ }
+
+ match registry.load(body, replace) {
+ Ok(lib_name) => Frame::BulkString(lib_name),
+ Err(e) => e.into_frame(),
+ }
+}
+
+/// FUNCTION LIST [LIBRARYNAME pattern] [WITHCODE]
+fn handle_function_list(registry: &FunctionRegistry, args: &[Frame]) -> Frame {
+ let mut _pattern: Option<&[u8]> = None;
+ let mut with_code = false;
+
+ let mut i = 0;
+ while i < args.len() {
+ match &args[i] {
+ Frame::BulkString(b) if b.eq_ignore_ascii_case(b"LIBRARYNAME") => {
+ if i + 1 < args.len() {
+ if let Frame::BulkString(p) = &args[i + 1] {
+ _pattern = Some(p.as_ref());
+ }
+ i += 2;
+ } else {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ }
+ Frame::BulkString(b) if b.eq_ignore_ascii_case(b"WITHCODE") => {
+ with_code = true;
+ i += 1;
+ }
+ _ => {
+ i += 1;
+ }
+ }
+ }
+
+ let libs = registry.list();
+ let mut result = Vec::with_capacity(libs.len());
+
+ for lib in libs {
+ // Each library is a flat array of key-value pairs (Redis 7.0 format):
+ // ["library_name", name, "engine", "LUA", "functions", [...]]
+ let mut entry = Vec::with_capacity(if with_code { 8 } else { 6 });
+
+ entry.push(Frame::BulkString(Bytes::from_static(b"library_name")));
+ entry.push(Frame::BulkString(lib.name.clone()));
+
+ entry.push(Frame::BulkString(Bytes::from_static(b"engine")));
+ entry.push(Frame::BulkString(Bytes::from_static(b"LUA")));
+
+ // Functions array
+ let func_list: Vec = lib
+ .functions
+ .values()
+ .map(|f| {
+ let mut fentry = Vec::with_capacity(4);
+ fentry.push(Frame::BulkString(Bytes::from_static(b"name")));
+ fentry.push(Frame::BulkString(f.name.clone()));
+ if let Some(desc) = &f.description {
+ fentry.push(Frame::BulkString(Bytes::from_static(b"description")));
+ fentry.push(Frame::BulkString(Bytes::copy_from_slice(desc.as_bytes())));
+ }
+ Frame::Array(fentry.into())
+ })
+ .collect();
+
+ entry.push(Frame::BulkString(Bytes::from_static(b"functions")));
+ entry.push(Frame::Array(func_list.into()));
+
+ if with_code {
+ entry.push(Frame::BulkString(Bytes::from_static(b"library_code")));
+ entry.push(Frame::BulkString(lib.source.clone()));
+ }
+
+ result.push(Frame::Array(entry.into()));
+ }
+
+ Frame::Array(result.into())
+}
+
+/// FUNCTION DELETE
+fn handle_function_delete(registry: &mut FunctionRegistry, args: &[Frame]) -> Frame {
+ if args.is_empty() {
+ return Frame::Error(Bytes::from_static(
+ b"ERR wrong number of arguments for 'function|delete' command",
+ ));
+ }
+
+ let lib_name = match &args[0] {
+ Frame::BulkString(b) => b,
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ };
+
+ if registry.delete(lib_name) {
+ Frame::SimpleString(Bytes::from_static(b"OK"))
+ } else {
+ Frame::Error(Bytes::from(format!(
+ "ERR Library '{}' not found",
+ String::from_utf8_lossy(lib_name)
+ )))
+ }
+}
+
+/// Handle FCALL: look up function by name, parse numkeys, dispatch.
+pub fn handle_fcall(
+ registry: &FunctionRegistry,
+ args: &[Frame],
+ db: &mut Database,
+ shard_id: usize,
+ num_shards: usize,
+ selected_db: usize,
+ db_count: usize,
+) -> Frame {
+ handle_fcall_inner(
+ registry,
+ args,
+ db,
+ shard_id,
+ num_shards,
+ selected_db,
+ db_count,
+ false,
+ )
+}
+
+/// Handle FCALL_RO: same as FCALL but sets read-only mode.
+pub fn handle_fcall_ro(
+ registry: &FunctionRegistry,
+ args: &[Frame],
+ db: &mut Database,
+ shard_id: usize,
+ num_shards: usize,
+ selected_db: usize,
+ db_count: usize,
+) -> Frame {
+ handle_fcall_inner(
+ registry,
+ args,
+ db,
+ shard_id,
+ num_shards,
+ selected_db,
+ db_count,
+ true,
+ )
+}
+
+/// Inner FCALL implementation shared by FCALL and FCALL_RO.
+fn handle_fcall_inner(
+ registry: &FunctionRegistry,
+ args: &[Frame],
+ db: &mut Database,
+ shard_id: usize,
+ num_shards: usize,
+ selected_db: usize,
+ db_count: usize,
+ read_only: bool,
+) -> Frame {
+ // FCALL funcname numkeys [key ...] [arg ...]
+ if args.len() < 2 {
+ return Frame::Error(Bytes::from_static(
+ b"ERR wrong number of arguments for 'fcall' command",
+ ));
+ }
+
+ let func_name = match &args[0] {
+ Frame::BulkString(b) => b,
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR invalid function name"));
+ }
+ };
+
+ let numkeys: usize = match &args[1] {
+ Frame::BulkString(b) => match std::str::from_utf8(b).ok().and_then(|s| s.parse().ok()) {
+ Some(n) => n,
+ None => {
+ return Frame::Error(Bytes::from_static(
+ b"ERR value is not an integer or out of range",
+ ));
+ }
+ },
+ Frame::Integer(n) => {
+ if *n < 0 {
+ return Frame::Error(Bytes::from_static(
+ b"ERR value is not an integer or out of range",
+ ));
+ }
+ *n as usize
+ }
+ _ => {
+ return Frame::Error(Bytes::from_static(
+ b"ERR value is not an integer or out of range",
+ ));
+ }
+ };
+
+ if args.len() < 2 + numkeys {
+ return Frame::Error(Bytes::from_static(
+ b"ERR Number of keys can't be greater than number of args",
+ ));
+ }
+
+ let mut keys: Vec = Vec::with_capacity(numkeys);
+ for f in &args[2..2 + numkeys] {
+ match f {
+ Frame::BulkString(b) => keys.push(b.clone()),
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR Invalid argument type for key"));
+ }
+ }
+ }
+
+ // Validate cross-shard keys
+ if num_shards > 1 {
+ if let Some(err) = crate::scripting::validate_keys_same_shard(&keys, shard_id, num_shards) {
+ return err;
+ }
+ }
+
+ let mut argv: Vec = Vec::with_capacity(args.len().saturating_sub(2 + numkeys));
+ for f in &args[2 + numkeys..] {
+ match f {
+ Frame::BulkString(b) => argv.push(b.clone()),
+ _ => {
+ return Frame::Error(Bytes::from_static(b"ERR Invalid argument type for arg"));
+ }
+ }
+ }
+
+ registry.call_function(func_name, keys, argv, db, selected_db, db_count, read_only)
+}
diff --git a/src/command/hash/hash_read.rs b/src/command/hash/hash_read.rs
index 7c44d21c..da4e9c20 100644
--- a/src/command/hash/hash_read.rs
+++ b/src/command/hash/hash_read.rs
@@ -549,3 +549,226 @@ pub fn hscan_readonly(db: &Database, args: &[Frame], now_ms: u64) -> Frame {
Frame::Array(results.into()),
])
}
+
+// ---------------------------------------------------------------------------
+// HRANDFIELD key [count [WITHVALUES]]
+// ---------------------------------------------------------------------------
+
+/// HRANDFIELD key [count [WITHVALUES]]
+pub fn hrandfield(db: &mut Database, args: &[Frame]) -> Frame {
+ use rand::seq::IndexedRandom;
+ if args.is_empty() || args.len() > 3 {
+ return err_wrong_args("HRANDFIELD");
+ }
+ let key = match extract_bytes(&args[0]) {
+ Some(k) => k.as_ref(),
+ None => return err_wrong_args("HRANDFIELD"),
+ };
+ let map = match db.get_hash(key) {
+ Ok(Some(m)) => m,
+ Ok(None) => {
+ return if args.len() == 1 {
+ Frame::Null
+ } else {
+ Frame::Array(framevec![])
+ };
+ }
+ Err(e) => return e,
+ };
+ if map.is_empty() {
+ return if args.len() == 1 {
+ Frame::Null
+ } else {
+ Frame::Array(framevec![])
+ };
+ }
+ let fields: Vec<(&Bytes, &Bytes)> = map.iter().collect();
+ let mut rng = rand::rng();
+ if args.len() == 1 {
+ if let Some((field, _)) = fields.choose(&mut rng) {
+ return Frame::BulkString((*field).clone());
+ }
+ return Frame::Null;
+ }
+ let count_bytes = match extract_bytes(&args[1]) {
+ Some(b) => b,
+ None => return err_wrong_args("HRANDFIELD"),
+ };
+ let count: i64 = match std::str::from_utf8(count_bytes)
+ .ok()
+ .and_then(|s| s.parse().ok())
+ {
+ Some(c) => c,
+ None => {
+ return Frame::Error(Bytes::from_static(
+ b"ERR value is not an integer or out of range",
+ ));
+ }
+ };
+ let with_values = if args.len() == 3 {
+ let opt = match extract_bytes(&args[2]) {
+ Some(b) => b,
+ None => return err_wrong_args("HRANDFIELD"),
+ };
+ if opt.eq_ignore_ascii_case(b"WITHVALUES") {
+ true
+ } else {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ } else {
+ false
+ };
+ if count == 0 {
+ return Frame::Array(framevec![]);
+ }
+ if count > 0 {
+ let n = std::cmp::min(count as usize, fields.len());
+ let indices: Vec = (0..fields.len()).collect();
+ let chosen: Vec = indices.as_slice().sample(&mut rng, n).copied().collect();
+ if with_values {
+ let mut result = Vec::with_capacity(n * 2);
+ for &idx in &chosen {
+ result.push(Frame::BulkString(fields[idx].0.clone()));
+ result.push(Frame::BulkString(fields[idx].1.clone()));
+ }
+ Frame::Array(result.into())
+ } else {
+ let result: Vec = chosen
+ .iter()
+ .map(|&idx| Frame::BulkString(fields[idx].0.clone()))
+ .collect();
+ Frame::Array(result.into())
+ }
+ } else {
+ // Negative count: allow duplicates. Cap to fields.len() to prevent OOM on i64::MIN.
+ let n = std::cmp::min(count.unsigned_abs() as usize, fields.len() * 10);
+ if with_values {
+ let mut result = Vec::with_capacity(n * 2);
+ for _ in 0..n {
+ if let Some((field, value)) = fields.choose(&mut rng) {
+ result.push(Frame::BulkString((*field).clone()));
+ result.push(Frame::BulkString((*value).clone()));
+ }
+ }
+ Frame::Array(result.into())
+ } else {
+ let mut result = Vec::with_capacity(n);
+ for _ in 0..n {
+ if let Some((field, _)) = fields.choose(&mut rng) {
+ result.push(Frame::BulkString((*field).clone()));
+ }
+ }
+ Frame::Array(result.into())
+ }
+ }
+}
+
+/// HRANDFIELD readonly path
+pub fn hrandfield_readonly(db: &Database, args: &[Frame], now_ms: u64) -> Frame {
+ use rand::seq::IndexedRandom;
+ if args.is_empty() || args.len() > 3 {
+ return err_wrong_args("HRANDFIELD");
+ }
+ let key = match extract_bytes(&args[0]) {
+ Some(k) => k.as_ref(),
+ None => return err_wrong_args("HRANDFIELD"),
+ };
+ let href = match db.get_hash_ref_if_alive(key, now_ms) {
+ Ok(Some(h)) => h,
+ Ok(None) => {
+ return if args.len() == 1 {
+ Frame::Null
+ } else {
+ Frame::Array(framevec![])
+ };
+ }
+ Err(e) => return e,
+ };
+ let entries = href.entries();
+ if entries.is_empty() {
+ return if args.len() == 1 {
+ Frame::Null
+ } else {
+ Frame::Array(framevec![])
+ };
+ }
+ let mut rng = rand::rng();
+ if args.len() == 1 {
+ return if let Some((field, _)) = entries.choose(&mut rng) {
+ Frame::BulkString(field.clone())
+ } else {
+ Frame::Null
+ };
+ }
+ let count_bytes = match extract_bytes(&args[1]) {
+ Some(b) => b,
+ None => return err_wrong_args("HRANDFIELD"),
+ };
+ let count: i64 = match std::str::from_utf8(count_bytes)
+ .ok()
+ .and_then(|s| s.parse().ok())
+ {
+ Some(c) => c,
+ None => {
+ return Frame::Error(Bytes::from_static(
+ b"ERR value is not an integer or out of range",
+ ));
+ }
+ };
+ let with_values = if args.len() == 3 {
+ let opt = match extract_bytes(&args[2]) {
+ Some(b) => b,
+ None => return err_wrong_args("HRANDFIELD"),
+ };
+ if opt.eq_ignore_ascii_case(b"WITHVALUES") {
+ true
+ } else {
+ return Frame::Error(Bytes::from_static(b"ERR syntax error"));
+ }
+ } else {
+ false
+ };
+ if count == 0 {
+ return Frame::Array(framevec![]);
+ }
+ if count > 0 {
+ let n = std::cmp::min(count as usize, entries.len());
+ let indices: Vec = (0..entries.len()).collect();
+ let chosen: Vec = indices.as_slice().sample(&mut rng, n).copied().collect();
+ if with_values {
+ let mut result = Vec::with_capacity(n * 2);
+ for &idx in &chosen {
+ result.push(Frame::BulkString(entries[idx].0.clone()));
+ result.push(Frame::BulkString(entries[idx].1.clone()));
+ }
+ Frame::Array(result.into())
+ } else {
+ let result: Vec = chosen
+ .iter()
+ .map(|&idx| Frame::BulkString(entries[idx].0.clone()))
+ .collect();
+ Frame::Array(result.into())
+ }
+ } else {
+ // Negative count: allow duplicates. Cap to prevent OOM on extreme values.
+ let n = std::cmp::min(count.unsigned_abs() as usize, entries.len() * 10);
+ if with_values {
+ let mut result = Vec::with_capacity(n * 2);
+ for _ in 0..n {
+ if let Some((field, value)) = entries.choose(&mut rng) {
+ result.push(Frame::BulkString(field.clone()));
+ result.push(Frame::BulkString(value.clone()));
+ }
+ }
+ Frame::Array(result.into())
+ } else {
+ let mut result = Vec::with_capacity(n);
+ for _ in 0..n {
+ if let Some((field, _)) = entries.choose(&mut rng) {
+ result.push(Frame::BulkString(field.clone()));
+ }
+ }
+ Frame::Array(result.into())
+ }
+ }
+}
diff --git a/src/command/hll.rs b/src/command/hll.rs
new file mode 100644
index 00000000..cd280532
--- /dev/null
+++ b/src/command/hll.rs
@@ -0,0 +1,235 @@
+//! PFADD, PFCOUNT, PFMERGE command handlers.
+//!
+//! HLL values are stored as `RedisValue::String(Bytes)` — the raw HYLL wire
+//! bytes. Redis `TYPE` reports "string" for HLL keys.
+
+use bytes::Bytes;
+
+use crate::protocol::Frame;
+use crate::storage::Database;
+use crate::storage::entry::Entry;
+use crate::storage::hll::Hll;
+
+use super::helpers::{err_wrong_args, extract_bytes, ok};
+
+/// Redis-exact WRONGTYPE error for non-HLL string values.
+const WRONGTYPE_HLL: &[u8] = b"WRONGTYPE Key is not a valid HyperLogLog string value.";
+
+/// Load an existing HLL from the database (mutable access).
+fn load_hll(db: &mut Database, key: &[u8]) -> Result