From 6ab57f558c3fd7a8948d2a99164dbdf9e7e432fc Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 04:32:18 +0800 Subject: [PATCH 1/9] Fix stats endpoint returning zero by using count() instead of query() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The /api/v1/stats/memories endpoint always returns zeros because _query_all_memories() calls query() with a filter but no query_vector. In the local backend, this falls through to search_by_random which does an HNSW graph traversal with a random vector, completely missing filtered records in small collections. Use count() (aggregate_data — a proper scalar scan) with PathScope URI filters for per-category counts. Keep query() as best-effort for hotness/staleness metrics that need individual record data. Tested on a live instance: preferences=2, entities=2, events=2 (was all zeros). All 12 unit tests pass. Fixes #1255 --- openviking/storage/stats_aggregator.py | 105 ++++++++++++---------- tests/unit/stats/test_stats_aggregator.py | 25 +++++- 2 files changed, 81 insertions(+), 49 deletions(-) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index da320e6a4..f32a41f48 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -11,7 +11,7 @@ from openviking.retrieve.memory_lifecycle import hotness_score from openviking.server.identity import RequestContext -from openviking.storage.expr import Eq +from openviking.storage.expr import And, Eq, PathScope from openviking_cli.utils import get_logger logger = get_logger(__name__) @@ -63,7 +63,7 @@ async def get_memory_stats( # Build category list to query categories = [category] if category else MEMORY_CATEGORIES - by_category: Dict[str, int] = {} + by_category: Dict[str, int] = {cat: 0 for cat in categories} hotness_dist = {"cold": 0, "warm": 0, "hot": 0} staleness = { "not_accessed_7d": 0, @@ -71,52 +71,63 @@ async def get_memory_stats( "oldest_memory_age_days": 0, } - # Fetch all memories once and group by category in Python - all_records = await self._query_all_memories(ctx) - grouped: Dict[str, List[Dict[str, Any]]] = {cat: [] for cat in categories} - for record in all_records: - uri = record.get("uri", "") - for cat in categories: - if f"/{cat}/" in uri: - grouped[cat].append(record) - break + # Use count() (aggregate) for per-category counts instead of query() + # (vector search). query() with no query_vector falls through to + # search_by_random in the local HNSW backend, which can miss filtered + # results. count() uses a scalar aggregate that reliably returns + # correct results. + user_id = ctx.user.user_id + memory_base = f"viking://user/{user_id}/memories" for cat in categories: - records = grouped[cat] - by_category[cat] = len(records) - - for record in records: - active_count = record.get("active_count", 0) - updated_at_raw = record.get("updated_at") - updated_at = _parse_datetime(updated_at_raw) - created_at_raw = record.get("created_at") - created_at = _parse_datetime(created_at_raw) - - # Hotness distribution - score = hotness_score(active_count, updated_at, now=now) - if score < COLD_THRESHOLD: - hotness_dist["cold"] += 1 - elif score > HOT_THRESHOLD: - hotness_dist["hot"] += 1 - else: - hotness_dist["warm"] += 1 - - # Staleness: use updated_at for access tracking - if updated_at: - age_days = (now - updated_at).total_seconds() / 86400.0 - if age_days > 7: - staleness["not_accessed_7d"] += 1 - if age_days > 30: - staleness["not_accessed_30d"] += 1 - - # Track oldest memory by created_at - if created_at: - age = (now - created_at).total_seconds() / 86400.0 - if age > staleness["oldest_memory_age_days"]: - staleness["oldest_memory_age_days"] = round(age, 1) + try: + by_category[cat] = await self._vikingdb.count( + filter=And([ + Eq("context_type", "memory"), + PathScope("uri", f"{memory_base}/{cat}", depth=2), + ]), + ctx=ctx, + ) + except Exception as e: + logger.error("Error counting memories for %s: %s", cat, e) total_memories = sum(by_category.values()) + # Fetch individual records for hotness/staleness metrics (best-effort). + # On the local HNSW backend, query() may return incomplete results + # when no query vector is provided; category counts above are + # authoritative regardless. + all_records = await self._query_all_memories(ctx) + for record in all_records: + active_count = record.get("active_count", 0) + updated_at_raw = record.get("updated_at") + updated_at = _parse_datetime(updated_at_raw) + created_at_raw = record.get("created_at") + created_at = _parse_datetime(created_at_raw) + + # Hotness distribution + score = hotness_score(active_count, updated_at, now=now) + if score < COLD_THRESHOLD: + hotness_dist["cold"] += 1 + elif score > HOT_THRESHOLD: + hotness_dist["hot"] += 1 + else: + hotness_dist["warm"] += 1 + + # Staleness: use updated_at for access tracking + if updated_at: + age_days = (now - updated_at).total_seconds() / 86400.0 + if age_days > 7: + staleness["not_accessed_7d"] += 1 + if age_days > 30: + staleness["not_accessed_30d"] += 1 + + # Track oldest memory by created_at + if created_at: + age = (now - created_at).total_seconds() / 86400.0 + if age > staleness["oldest_memory_age_days"]: + staleness["oldest_memory_age_days"] = round(age, 1) + return { "total_memories": total_memories, "by_category": by_category, @@ -156,10 +167,11 @@ async def _query_all_memories( self, ctx: RequestContext, ) -> List[Dict[str, Any]]: - """Query all memory records in a single DB round-trip. + """Query all memory records for hotness/staleness metrics. - Uses the context_type="memory" filter. Callers group by category - in Python to avoid N+1 queries. + Note: This uses query() which relies on vector search internally. + On the local HNSW backend it may return incomplete results when no + query vector is provided. Category counts use count() instead. """ try: return await self._vikingdb.query( @@ -172,6 +184,7 @@ async def _query_all_memories( "created_at", "context_type", ], + order_by="created_at", ctx=ctx, ) except Exception as e: diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index 5f0e26473..5a378ec5d 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -19,7 +19,9 @@ def mock_vikingdb(): @pytest.fixture def mock_ctx(): """Create a mock request context.""" - return MagicMock() + ctx = MagicMock() + ctx.user.user_id = "default" + return ctx @pytest.fixture @@ -48,6 +50,7 @@ class TestStatsAggregator: @pytest.mark.asyncio async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): """Stats for an empty memory store should return zeros.""" + mock_vikingdb.count = AsyncMock(return_value=0) mock_vikingdb.query = AsyncMock(return_value=[]) result = await aggregator.get_memory_stats(mock_ctx) @@ -65,6 +68,18 @@ async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): _make_memory_record("cases", active_count=3, updated_at=now), _make_memory_record("tools", active_count=1, updated_at=now), ] + + async def _count_by_category(**kwargs): + filt = kwargs.get("filter") + # Return counts based on the PathScope URI in the filter + filt_str = str(filt) + if "/cases" in filt_str: + return 2 + if "/tools" in filt_str: + return 1 + return 0 + + mock_vikingdb.count = AsyncMock(side_effect=_count_by_category) mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx) @@ -80,6 +95,7 @@ async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): records = [ _make_memory_record("patterns", active_count=2, updated_at=now), ] + mock_vikingdb.count = AsyncMock(return_value=1) mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx, category="patterns") @@ -97,6 +113,7 @@ async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): cold_record = _make_memory_record( "cases", active_count=0, updated_at=now - timedelta(days=60) ) + mock_vikingdb.count = AsyncMock(return_value=2) mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record]) result = await aggregator.get_memory_stats(mock_ctx, category="cases") @@ -115,6 +132,7 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): updated_at=now - timedelta(days=40), created_at=now - timedelta(days=50), ) + mock_vikingdb.count = AsyncMock(return_value=1) mock_vikingdb.query = AsyncMock(return_value=[old_record]) result = await aggregator.get_memory_stats(mock_ctx, category="events") @@ -124,8 +142,9 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): assert result["staleness"]["oldest_memory_age_days"] >= 49 @pytest.mark.asyncio - async def test_query_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx): - """If VikingDB query fails, the category should show 0 records.""" + async def test_count_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx): + """If VikingDB count fails, the category should show 0 records.""" + mock_vikingdb.count = AsyncMock(side_effect=Exception("connection error")) mock_vikingdb.query = AsyncMock(side_effect=Exception("connection error")) result = await aggregator.get_memory_stats(mock_ctx, category="cases") From df2ea6da151d086955a379781089a7799cb9816d Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 05:10:06 +0800 Subject: [PATCH 2/9] Fix stats endpoint: use filesystem counting instead of vector index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stats endpoint was undercounting memories because it queried the vector index, but the semantic_processor only vectorizes directory-level .abstract.md/.overview.md files — not individual memory .md files. Switch to filesystem-based counting (viking_fs.ls + exists) as the source of truth for category counts. The vector index query is kept solely for best-effort hotness/staleness metrics. Also fixes: - profile.md now counted (was missed because it's a file, not a dir) - .abstract.md/.overview.md now excluded from counts - Missing directories gracefully return 0 instead of erroring Fixes #1255 --- openviking/storage/stats_aggregator.py | 87 +++++++++++--- tests/unit/stats/test_stats_aggregator.py | 139 +++++++++++++++++----- 2 files changed, 180 insertions(+), 46 deletions(-) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index f32a41f48..cf2ff6b62 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -11,7 +11,8 @@ from openviking.retrieve.memory_lifecycle import hotness_score from openviking.server.identity import RequestContext -from openviking.storage.expr import And, Eq, PathScope +from openviking.storage.expr import Eq +from openviking.storage.viking_fs import get_viking_fs from openviking_cli.utils import get_logger logger = get_logger(__name__) @@ -28,6 +29,20 @@ "skills", ] +# Categories that are directories (contain individual memory files) +_DIRECTORY_CATEGORIES = [ + "preferences", + "entities", + "events", + "cases", + "patterns", + "tools", + "skills", +] + +# Categories that are single files at the memories root +_FILE_CATEGORIES = ["profile"] + # Hotness buckets COLD_THRESHOLD = 0.2 HOT_THRESHOLD = 0.6 @@ -43,6 +58,53 @@ class StatsAggregator: def __init__(self, vikingdb_manager) -> None: self._vikingdb = vikingdb_manager + async def _count_memories_on_fs( + self, + memory_base: str, + ctx: RequestContext, + ) -> Dict[str, int]: + """Count memory files directly from the filesystem. + + This is the authoritative count — it reflects what actually + exists on disk, regardless of whether individual files have + been vectorized. The semantic_processor only vectorizes + directory-level .abstract.md / .overview.md files, not + individual memory .md files, so the vector index is always + an undercount. + + Returns a dict mapping category name → file count. + """ + viking_fs = get_viking_fs() + counts: Dict[str, int] = {cat: 0 for cat in MEMORY_CATEGORIES} + + # Count profile.md (single file, not a directory) + try: + if await viking_fs.exists(f"{memory_base}/profile.md", ctx=ctx): + counts["profile"] = 1 + except Exception as e: + logger.debug("Error checking profile.md existence: %s", e) + + # Count files in each directory category + for cat in _DIRECTORY_CATEGORIES: + dir_uri = f"{memory_base}/{cat}" + try: + entries = await viking_fs.ls(dir_uri, ctx=ctx) + except Exception: + # Directory doesn't exist — count stays 0 + continue + + for entry in entries: + name = entry.get("name", "") + is_dir = entry.get("isDir", False) + # Skip dotfiles (.abstract.md, .overview.md), dotdirs (.) + if name.startswith(".") or not name or is_dir: + continue + # Only count .md files (memory files) + if name.endswith(".md"): + counts[cat] += 1 + + return counts + async def get_memory_stats( self, ctx: RequestContext, @@ -71,25 +133,16 @@ async def get_memory_stats( "oldest_memory_age_days": 0, } - # Use count() (aggregate) for per-category counts instead of query() - # (vector search). query() with no query_vector falls through to - # search_by_random in the local HNSW backend, which can miss filtered - # results. count() uses a scalar aggregate that reliably returns - # correct results. + # Primary count: use the filesystem (source of truth). + # The vector index is incomplete because the semantic_processor + # only vectorizes directory-level abstract/overview files, not + # individual memory .md files created during session commit. user_id = ctx.user.user_id memory_base = f"viking://user/{user_id}/memories" + fs_counts = await self._count_memories_on_fs(memory_base, ctx) for cat in categories: - try: - by_category[cat] = await self._vikingdb.count( - filter=And([ - Eq("context_type", "memory"), - PathScope("uri", f"{memory_base}/{cat}", depth=2), - ]), - ctx=ctx, - ) - except Exception as e: - logger.error("Error counting memories for %s: %s", cat, e) + by_category[cat] = fs_counts.get(cat, 0) total_memories = sum(by_category.values()) @@ -171,7 +224,7 @@ async def _query_all_memories( Note: This uses query() which relies on vector search internally. On the local HNSW backend it may return incomplete results when no - query vector is provided. Category counts use count() instead. + query vector is provided. Category counts use filesystem counting instead. """ try: return await self._vikingdb.query( diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index 5a378ec5d..a7d872f3d 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -3,7 +3,7 @@ """Tests for StatsAggregator.""" from datetime import datetime, timedelta, timezone -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -29,6 +29,16 @@ def aggregator(mock_vikingdb): return StatsAggregator(mock_vikingdb) +@pytest.fixture +def mock_viking_fs(): + """Create a mock VikingFS that returns empty directory listings.""" + fs = AsyncMock() + # Default: profile.md doesn't exist, all dirs are empty + fs.exists = AsyncMock(return_value=False) + fs.ls = AsyncMock(return_value=[]) + return fs + + def _make_memory_record( category: str, active_count: int = 1, @@ -48,9 +58,14 @@ def _make_memory_record( class TestStatsAggregator: @pytest.mark.asyncio - async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_empty_store(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): """Stats for an empty memory store should return zeros.""" - mock_vikingdb.count = AsyncMock(return_value=0) + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + fs.ls = AsyncMock(return_value=[]) + mock_get_fs.return_value = fs + mock_vikingdb.query = AsyncMock(return_value=[]) result = await aggregator.get_memory_stats(mock_ctx) @@ -60,8 +75,9 @@ async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0} @pytest.mark.asyncio - async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): - """Records should be bucketed into the correct category.""" + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_counts_by_category(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + """Records should be bucketed into the correct category based on filesystem.""" now = datetime.now(timezone.utc) records = [ _make_memory_record("cases", active_count=5, updated_at=now), @@ -69,33 +85,48 @@ async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): _make_memory_record("tools", active_count=1, updated_at=now), ] - async def _count_by_category(**kwargs): - filt = kwargs.get("filter") - # Return counts based on the PathScope URI in the filter - filt_str = str(filt) - if "/cases" in filt_str: - return 2 - if "/tools" in filt_str: - return 1 - return 0 - - mock_vikingdb.count = AsyncMock(side_effect=_count_by_category) + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + + async def _ls(uri, **kwargs): + if "/cases" in uri: + return [ + {"name": "mem_abc.md", "isDir": False}, + {"name": "mem_def.md", "isDir": False}, + {"name": ".abstract.md", "isDir": False}, + ] + if "/tools" in uri: + return [ + {"name": "mem_ghi.md", "isDir": False}, + ] + return [] + + fs.ls = AsyncMock(side_effect=_ls) + mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx) + # .abstract.md should be excluded, only .md files counted assert result["by_category"]["cases"] == 2 assert result["by_category"]["tools"] == 1 assert result["total_memories"] == 3 @pytest.mark.asyncio - async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): - """Passing a category filter should only query that category.""" + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_category_filter(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + """Passing a category filter should only count that category.""" now = datetime.now(timezone.utc) records = [ _make_memory_record("patterns", active_count=2, updated_at=now), ] - mock_vikingdb.count = AsyncMock(return_value=1) + + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + fs.ls = AsyncMock(return_value=[ + {"name": "mem_xyz.md", "isDir": False}, + ]) + mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx, category="patterns") @@ -104,16 +135,54 @@ async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): assert len(result["by_category"]) == 1 @pytest.mark.asyncio - async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_profile_counted(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + """profile.md should be counted as 1 when it exists.""" + fs = AsyncMock() + fs.exists = AsyncMock(return_value=True) + fs.ls = AsyncMock(return_value=[]) + mock_get_fs.return_value = fs + mock_vikingdb.query = AsyncMock(return_value=[]) + + result = await aggregator.get_memory_stats(mock_ctx) + + assert result["by_category"]["profile"] == 1 + + @pytest.mark.asyncio + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_dotfiles_excluded(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + """.abstract.md and .overview.md should not be counted as memories.""" + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + fs.ls = AsyncMock(return_value=[ + {"name": ".abstract.md", "isDir": False}, + {"name": ".overview.md", "isDir": False}, + {"name": "mem_real.md", "isDir": False}, + ]) + mock_get_fs.return_value = fs + mock_vikingdb.query = AsyncMock(return_value=[]) + + result = await aggregator.get_memory_stats(mock_ctx, category="entities") + + assert result["by_category"]["entities"] == 1 + + @pytest.mark.asyncio + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_hotness_buckets(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): """Records should be classified into cold/warm/hot based on score.""" now = datetime.now(timezone.utc) - # Recent + high access -> hot hot_record = _make_memory_record("cases", active_count=50, updated_at=now) - # Old + no access -> cold cold_record = _make_memory_record( "cases", active_count=0, updated_at=now - timedelta(days=60) ) - mock_vikingdb.count = AsyncMock(return_value=2) + + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + fs.ls = AsyncMock(return_value=[ + {"name": "mem_a.md", "isDir": False}, + {"name": "mem_b.md", "isDir": False}, + ]) + mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record]) result = await aggregator.get_memory_stats(mock_ctx, category="cases") @@ -123,7 +192,8 @@ async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): assert dist["cold"] >= 1 @pytest.mark.asyncio - async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_staleness_metrics(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): """Staleness should detect records not accessed in 7 and 30 days.""" now = datetime.now(timezone.utc) old_record = _make_memory_record( @@ -132,7 +202,13 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): updated_at=now - timedelta(days=40), created_at=now - timedelta(days=50), ) - mock_vikingdb.count = AsyncMock(return_value=1) + + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + fs.ls = AsyncMock(return_value=[ + {"name": "mem_old.md", "isDir": False}, + ]) + mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=[old_record]) result = await aggregator.get_memory_stats(mock_ctx, category="events") @@ -142,10 +218,15 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): assert result["staleness"]["oldest_memory_age_days"] >= 49 @pytest.mark.asyncio - async def test_count_error_returns_empty(self, aggregator, mock_vikingdb, mock_ctx): - """If VikingDB count fails, the category should show 0 records.""" - mock_vikingdb.count = AsyncMock(side_effect=Exception("connection error")) - mock_vikingdb.query = AsyncMock(side_effect=Exception("connection error")) + @patch("openviking.storage.stats_aggregator.get_viking_fs") + async def test_missing_directory_returns_zero(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + """If a directory doesn't exist on filesystem, the category should show 0.""" + fs = AsyncMock() + fs.exists = AsyncMock(return_value=False) + # ls raises for nonexistent dirs + fs.ls = AsyncMock(side_effect=Exception("directory not found")) + mock_get_fs.return_value = fs + mock_vikingdb.query = AsyncMock(return_value=[]) result = await aggregator.get_memory_stats(mock_ctx, category="cases") From 13a83b09e2af531bc24d108b373ca7bcf8bb8e92 Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 14:45:14 +0800 Subject: [PATCH 3/9] fix(stats): filter _query_all_memories by level=2 to exclude directory metadata --- openviking/storage/stats_aggregator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index cf2ff6b62..cffaa36b1 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -11,7 +11,7 @@ from openviking.retrieve.memory_lifecycle import hotness_score from openviking.server.identity import RequestContext -from openviking.storage.expr import Eq +from openviking.storage.expr import And, Eq from openviking.storage.viking_fs import get_viking_fs from openviking_cli.utils import get_logger @@ -228,7 +228,7 @@ async def _query_all_memories( """ try: return await self._vikingdb.query( - filter=Eq("context_type", "memory"), + filter=And([Eq("context_type", "memory"), Eq("level", 2)]), limit=10000, output_fields=[ "uri", From f378833a1fdd665de6217c3a0dbe29d25c21410c Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 14:57:45 +0800 Subject: [PATCH 4/9] refactor(stats): derive all memory stats from vector index level=2 query Removes the filesystem-counting workaround and unifies counting, hotness, and staleness around a single vector-store query filtered to level=2 memory records. --- openviking/storage/stats_aggregator.py | 110 ++++++------------------- 1 file changed, 27 insertions(+), 83 deletions(-) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index cffaa36b1..bf7503346 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -12,7 +12,6 @@ from openviking.retrieve.memory_lifecycle import hotness_score from openviking.server.identity import RequestContext from openviking.storage.expr import And, Eq -from openviking.storage.viking_fs import get_viking_fs from openviking_cli.utils import get_logger logger = get_logger(__name__) @@ -29,25 +28,26 @@ "skills", ] -# Categories that are directories (contain individual memory files) -_DIRECTORY_CATEGORIES = [ - "preferences", - "entities", - "events", - "cases", - "patterns", - "tools", - "skills", -] - -# Categories that are single files at the memories root -_FILE_CATEGORIES = ["profile"] - # Hotness buckets COLD_THRESHOLD = 0.2 HOT_THRESHOLD = 0.6 +def _category_from_uri(uri: str) -> Optional[str]: + """Determine memory category from its Viking URI. + + Handles both directory-based memories (``/preferences/mem_*.md``) + and the single-file ``profile.md`` at the memories root. + """ + for cat in MEMORY_CATEGORIES: + if cat == "profile": + if uri.endswith("/profile.md"): + return cat + elif f"/{cat}/" in uri: + return cat + return None + + class StatsAggregator: """Aggregates memory health statistics from VikingDB. @@ -58,53 +58,6 @@ class StatsAggregator: def __init__(self, vikingdb_manager) -> None: self._vikingdb = vikingdb_manager - async def _count_memories_on_fs( - self, - memory_base: str, - ctx: RequestContext, - ) -> Dict[str, int]: - """Count memory files directly from the filesystem. - - This is the authoritative count — it reflects what actually - exists on disk, regardless of whether individual files have - been vectorized. The semantic_processor only vectorizes - directory-level .abstract.md / .overview.md files, not - individual memory .md files, so the vector index is always - an undercount. - - Returns a dict mapping category name → file count. - """ - viking_fs = get_viking_fs() - counts: Dict[str, int] = {cat: 0 for cat in MEMORY_CATEGORIES} - - # Count profile.md (single file, not a directory) - try: - if await viking_fs.exists(f"{memory_base}/profile.md", ctx=ctx): - counts["profile"] = 1 - except Exception as e: - logger.debug("Error checking profile.md existence: %s", e) - - # Count files in each directory category - for cat in _DIRECTORY_CATEGORIES: - dir_uri = f"{memory_base}/{cat}" - try: - entries = await viking_fs.ls(dir_uri, ctx=ctx) - except Exception: - # Directory doesn't exist — count stays 0 - continue - - for entry in entries: - name = entry.get("name", "") - is_dir = entry.get("isDir", False) - # Skip dotfiles (.abstract.md, .overview.md), dotdirs (.) - if name.startswith(".") or not name or is_dir: - continue - # Only count .md files (memory files) - if name.endswith(".md"): - counts[cat] += 1 - - return counts - async def get_memory_stats( self, ctx: RequestContext, @@ -133,25 +86,15 @@ async def get_memory_stats( "oldest_memory_age_days": 0, } - # Primary count: use the filesystem (source of truth). - # The vector index is incomplete because the semantic_processor - # only vectorizes directory-level abstract/overview files, not - # individual memory .md files created during session commit. - user_id = ctx.user.user_id - memory_base = f"viking://user/{user_id}/memories" - - fs_counts = await self._count_memories_on_fs(memory_base, ctx) - for cat in categories: - by_category[cat] = fs_counts.get(cat, 0) - - total_memories = sum(by_category.values()) - - # Fetch individual records for hotness/staleness metrics (best-effort). - # On the local HNSW backend, query() may return incomplete results - # when no query vector is provided; category counts above are - # authoritative regardless. all_records = await self._query_all_memories(ctx) for record in all_records: + uri = record.get("uri", "") + record_cat = _category_from_uri(uri) + + # Count by category + if record_cat and record_cat in by_category: + by_category[record_cat] += 1 + active_count = record.get("active_count", 0) updated_at_raw = record.get("updated_at") updated_at = _parse_datetime(updated_at_raw) @@ -181,6 +124,8 @@ async def get_memory_stats( if age > staleness["oldest_memory_age_days"]: staleness["oldest_memory_age_days"] = round(age, 1) + total_memories = sum(by_category.values()) + return { "total_memories": total_memories, "by_category": by_category, @@ -220,11 +165,10 @@ async def _query_all_memories( self, ctx: RequestContext, ) -> List[Dict[str, Any]]: - """Query all memory records for hotness/staleness metrics. + """Query all memory records from the vector index. - Note: This uses query() which relies on vector search internally. - On the local HNSW backend it may return incomplete results when no - query vector is provided. Category counts use filesystem counting instead. + Filters to ``level=2`` so only actual memory records are returned, + excluding directory-level abstract/overview metadata. """ try: return await self._vikingdb.query( From ccd76fc3a3c8895a5c27ace2acd4b38bbcc5cc2f Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 15:03:00 +0800 Subject: [PATCH 5/9] test(stats): update unit tests for vector-only stats aggregator Removes all get_viking_fs mocks and filesystem-counting assertions. Tests now verify category bucketing from URIs, profile.md counting, and graceful handling of query errors. --- tests/unit/stats/test_stats_aggregator.py | 140 +++++++--------------- 1 file changed, 42 insertions(+), 98 deletions(-) diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index a7d872f3d..b8a79f04e 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -3,7 +3,7 @@ """Tests for StatsAggregator.""" from datetime import datetime, timedelta, timezone -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock import pytest @@ -29,26 +29,20 @@ def aggregator(mock_vikingdb): return StatsAggregator(mock_vikingdb) -@pytest.fixture -def mock_viking_fs(): - """Create a mock VikingFS that returns empty directory listings.""" - fs = AsyncMock() - # Default: profile.md doesn't exist, all dirs are empty - fs.exists = AsyncMock(return_value=False) - fs.ls = AsyncMock(return_value=[]) - return fs - - def _make_memory_record( category: str, active_count: int = 1, updated_at: datetime = None, created_at: datetime = None, ): - """Helper to build a mock memory record.""" + """Helper to build a mock memory record with a realistic URI.""" now = datetime.now(timezone.utc) + if category == "profile": + uri = "viking://user/default/memories/profile.md" + else: + uri = f"viking://user/default/memories/{category}/test-item" return { - "uri": f"viking://memories/{category}/test-item", + "uri": uri, "context_type": "memory", "active_count": active_count, "updated_at": (updated_at or now).isoformat(), @@ -58,14 +52,8 @@ def _make_memory_record( class TestStatsAggregator: @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_empty_store(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + async def test_empty_store(self, aggregator, mock_vikingdb, mock_ctx): """Stats for an empty memory store should return zeros.""" - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - fs.ls = AsyncMock(return_value=[]) - mock_get_fs.return_value = fs - mock_vikingdb.query = AsyncMock(return_value=[]) result = await aggregator.get_memory_stats(mock_ctx) @@ -75,9 +63,8 @@ async def test_empty_store(self, mock_get_fs, aggregator, mock_vikingdb, mock_ct assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0} @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_counts_by_category(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): - """Records should be bucketed into the correct category based on filesystem.""" + async def test_counts_by_category(self, aggregator, mock_vikingdb, mock_ctx): + """Records should be bucketed into the correct category from their URI.""" now = datetime.now(timezone.utc) records = [ _make_memory_record("cases", active_count=5, updated_at=now), @@ -85,90 +72,66 @@ async def test_counts_by_category(self, mock_get_fs, aggregator, mock_vikingdb, _make_memory_record("tools", active_count=1, updated_at=now), ] - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - - async def _ls(uri, **kwargs): - if "/cases" in uri: - return [ - {"name": "mem_abc.md", "isDir": False}, - {"name": "mem_def.md", "isDir": False}, - {"name": ".abstract.md", "isDir": False}, - ] - if "/tools" in uri: - return [ - {"name": "mem_ghi.md", "isDir": False}, - ] - return [] - - fs.ls = AsyncMock(side_effect=_ls) - mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx) - # .abstract.md should be excluded, only .md files counted assert result["by_category"]["cases"] == 2 assert result["by_category"]["tools"] == 1 assert result["total_memories"] == 3 @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_category_filter(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + async def test_category_filter(self, aggregator, mock_vikingdb, mock_ctx): """Passing a category filter should only count that category.""" now = datetime.now(timezone.utc) records = [ _make_memory_record("patterns", active_count=2, updated_at=now), ] - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - fs.ls = AsyncMock(return_value=[ - {"name": "mem_xyz.md", "isDir": False}, - ]) - mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx, category="patterns") assert "patterns" in result["by_category"] assert len(result["by_category"]) == 1 + assert result["total_memories"] == 1 @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_profile_counted(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): - """profile.md should be counted as 1 when it exists.""" - fs = AsyncMock() - fs.exists = AsyncMock(return_value=True) - fs.ls = AsyncMock(return_value=[]) - mock_get_fs.return_value = fs - mock_vikingdb.query = AsyncMock(return_value=[]) + async def test_profile_counted(self, aggregator, mock_vikingdb, mock_ctx): + """profile.md should be counted as 1 when present in query results.""" + records = [ + _make_memory_record("profile", active_count=0), + ] + mock_vikingdb.query = AsyncMock(return_value=records) result = await aggregator.get_memory_stats(mock_ctx) assert result["by_category"]["profile"] == 1 + assert result["total_memories"] == 1 @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_dotfiles_excluded(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): - """.abstract.md and .overview.md should not be counted as memories.""" - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - fs.ls = AsyncMock(return_value=[ - {"name": ".abstract.md", "isDir": False}, - {"name": ".overview.md", "isDir": False}, - {"name": "mem_real.md", "isDir": False}, - ]) - mock_get_fs.return_value = fs - mock_vikingdb.query = AsyncMock(return_value=[]) + async def test_unrecognized_uri_ignored(self, aggregator, mock_vikingdb, mock_ctx): + """Records with unrecognized URIs should not be counted in any category.""" + now = datetime.now(timezone.utc) + records = [ + { + "uri": "viking://some/random/path", + "context_type": "memory", + "active_count": 1, + "updated_at": now.isoformat(), + "created_at": now.isoformat(), + } + ] + mock_vikingdb.query = AsyncMock(return_value=records) - result = await aggregator.get_memory_stats(mock_ctx, category="entities") + result = await aggregator.get_memory_stats(mock_ctx) - assert result["by_category"]["entities"] == 1 + assert result["total_memories"] == 0 + for cat in result["by_category"]: + assert result["by_category"][cat] == 0 @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_hotness_buckets(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + async def test_hotness_buckets(self, aggregator, mock_vikingdb, mock_ctx): """Records should be classified into cold/warm/hot based on score.""" now = datetime.now(timezone.utc) hot_record = _make_memory_record("cases", active_count=50, updated_at=now) @@ -176,13 +139,6 @@ async def test_hotness_buckets(self, mock_get_fs, aggregator, mock_vikingdb, moc "cases", active_count=0, updated_at=now - timedelta(days=60) ) - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - fs.ls = AsyncMock(return_value=[ - {"name": "mem_a.md", "isDir": False}, - {"name": "mem_b.md", "isDir": False}, - ]) - mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=[hot_record, cold_record]) result = await aggregator.get_memory_stats(mock_ctx, category="cases") @@ -192,8 +148,7 @@ async def test_hotness_buckets(self, mock_get_fs, aggregator, mock_vikingdb, moc assert dist["cold"] >= 1 @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_staleness_metrics(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): + async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): """Staleness should detect records not accessed in 7 and 30 days.""" now = datetime.now(timezone.utc) old_record = _make_memory_record( @@ -203,12 +158,6 @@ async def test_staleness_metrics(self, mock_get_fs, aggregator, mock_vikingdb, m created_at=now - timedelta(days=50), ) - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - fs.ls = AsyncMock(return_value=[ - {"name": "mem_old.md", "isDir": False}, - ]) - mock_get_fs.return_value = fs mock_vikingdb.query = AsyncMock(return_value=[old_record]) result = await aggregator.get_memory_stats(mock_ctx, category="events") @@ -218,20 +167,15 @@ async def test_staleness_metrics(self, mock_get_fs, aggregator, mock_vikingdb, m assert result["staleness"]["oldest_memory_age_days"] >= 49 @pytest.mark.asyncio - @patch("openviking.storage.stats_aggregator.get_viking_fs") - async def test_missing_directory_returns_zero(self, mock_get_fs, aggregator, mock_vikingdb, mock_ctx): - """If a directory doesn't exist on filesystem, the category should show 0.""" - fs = AsyncMock() - fs.exists = AsyncMock(return_value=False) - # ls raises for nonexistent dirs - fs.ls = AsyncMock(side_effect=Exception("directory not found")) - mock_get_fs.return_value = fs - mock_vikingdb.query = AsyncMock(return_value=[]) + async def test_query_error_returns_zeros(self, aggregator, mock_vikingdb, mock_ctx): + """If the vector query fails, stats should gracefully return zeros.""" + mock_vikingdb.query = AsyncMock(side_effect=Exception("db down")) result = await aggregator.get_memory_stats(mock_ctx, category="cases") assert result["by_category"]["cases"] == 0 assert result["total_memories"] == 0 + assert result["hotness_distribution"] == {"cold": 0, "warm": 0, "hot": 0} class TestParseDatetime: From d4eb5cc1877707cdd9a331d1e470fe39f1f84c91 Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 15:07:39 +0800 Subject: [PATCH 6/9] fix(tests): restore MagicMock import for mock_ctx fixture --- tests/unit/stats/test_stats_aggregator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index b8a79f04e..c43924eed 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -3,7 +3,7 @@ """Tests for StatsAggregator.""" from datetime import datetime, timedelta, timezone -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, MagicMock import pytest From d8de168e764807ca36c645a24260e4f1c48ff7cd Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 15:13:21 +0800 Subject: [PATCH 7/9] fix(stats): remove order_by from level=2 query to avoid HNSW backend error The local HNSW backend throws 'must be real number, not str' when order_by is combined with And+Eq filters. Stats aggregation does not require ordering, so removing it makes the query work correctly. --- openviking/storage/stats_aggregator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index bf7503346..c45bd091e 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -181,7 +181,6 @@ async def _query_all_memories( "created_at", "context_type", ], - order_by="created_at", ctx=ctx, ) except Exception as e: From bb883e8bb67343740f7e09918e789ba2cfd3283c Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 16:31:48 +0800 Subject: [PATCH 8/9] fix(stats): skip non-matching records when category filter is applied Ensure hotness_distribution and staleness metrics only count records that match the requested category filter. Previously these global metrics were computed over all records even when a specific category was requested. Suggested-by: github-actions[bot] --- openviking/storage/stats_aggregator.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/openviking/storage/stats_aggregator.py b/openviking/storage/stats_aggregator.py index c45bd091e..56ebd0f8e 100644 --- a/openviking/storage/stats_aggregator.py +++ b/openviking/storage/stats_aggregator.py @@ -91,9 +91,11 @@ async def get_memory_stats( uri = record.get("uri", "") record_cat = _category_from_uri(uri) - # Count by category - if record_cat and record_cat in by_category: - by_category[record_cat] += 1 + # Skip records not in the requested categories + if not (record_cat and record_cat in by_category): + continue + + by_category[record_cat] += 1 active_count = record.get("active_count", 0) updated_at_raw = record.get("updated_at") From b293f00c4e2e8a40cb57d396216b65de678a0e9a Mon Sep 17 00:00:00 2001 From: Yang Zhi Date: Tue, 7 Apr 2026 16:39:30 +0800 Subject: [PATCH 9/9] test(stats): add regression test for category-filtered metrics Verify that hotness_distribution and staleness only count records matching the requested category when a filter is applied. --- tests/unit/stats/test_stats_aggregator.py | 28 +++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/unit/stats/test_stats_aggregator.py b/tests/unit/stats/test_stats_aggregator.py index c43924eed..94d7e3e7b 100644 --- a/tests/unit/stats/test_stats_aggregator.py +++ b/tests/unit/stats/test_stats_aggregator.py @@ -166,6 +166,34 @@ async def test_staleness_metrics(self, aggregator, mock_vikingdb, mock_ctx): assert result["staleness"]["not_accessed_30d"] >= 1 assert result["staleness"]["oldest_memory_age_days"] >= 49 + @pytest.mark.asyncio + async def test_category_filter_excludes_other_records_from_metrics( + self, aggregator, mock_vikingdb, mock_ctx + ): + """When a category filter is applied, hotness/staleness should only + count records that match the filter, even if the query returns + records from other categories. + """ + now = datetime.now(timezone.utc) + records = [ + _make_memory_record("cases", active_count=50, updated_at=now), + _make_memory_record( + "tools", active_count=0, updated_at=now - timedelta(days=60) + ), + ] + mock_vikingdb.query = AsyncMock(return_value=records) + + result = await aggregator.get_memory_stats(mock_ctx, category="cases") + + assert result["by_category"]["cases"] == 1 + assert result["total_memories"] == 1 + # Only the "cases" record should contribute to hotness + assert result["hotness_distribution"]["hot"] == 1 + assert result["hotness_distribution"]["cold"] == 0 + # Only the "cases" record should contribute to staleness + assert result["staleness"]["not_accessed_7d"] == 0 + assert result["staleness"]["not_accessed_30d"] == 0 + @pytest.mark.asyncio async def test_query_error_returns_zeros(self, aggregator, mock_vikingdb, mock_ctx): """If the vector query fails, stats should gracefully return zeros."""