66aggregate memory health metrics without introducing new storage.
77"""
88
9- from datetime import datetime , timezone
109from typing import Any , Dict , List , Optional
1110
12- from openviking .retrieve .memory_lifecycle import hotness_score
1311from openviking .server .identity import RequestContext
14- from openviking .storage .expr import Eq
12+ from openviking .storage .expr import And , Eq , PathScope
1513from openviking_cli .utils import get_logger
1614
1715logger = get_logger (__name__ )
2826 "skills" ,
2927]
3028
31- # Hotness buckets
32- COLD_THRESHOLD = 0.2
33- HOT_THRESHOLD = 0.6
34-
35-
3629class StatsAggregator :
3730 """Aggregates memory health statistics from VikingDB.
3831
@@ -58,62 +51,36 @@ async def get_memory_stats(
5851 Dictionary with total counts, category breakdown,
5952 hotness distribution, and staleness metrics.
6053 """
61- now = datetime .now (timezone .utc )
62-
6354 # Build category list to query
6455 categories = [category ] if category else MEMORY_CATEGORIES
6556
66- by_category : Dict [str , int ] = {}
57+ by_category : Dict [str , int ] = {cat : 0 for cat in categories }
6758 hotness_dist = {"cold" : 0 , "warm" : 0 , "hot" : 0 }
6859 staleness = {
6960 "not_accessed_7d" : 0 ,
7061 "not_accessed_30d" : 0 ,
7162 "oldest_memory_age_days" : 0 ,
7263 }
7364
74- # Fetch all memories once and group by category in Python
75- all_records = await self ._query_all_memories (ctx )
76- grouped : Dict [str , List [Dict [str , Any ]]] = {cat : [] for cat in categories }
77- for record in all_records :
78- uri = record .get ("uri" , "" )
79- for cat in categories :
80- if f"/{ cat } /" in uri :
81- grouped [cat ].append (record )
82- break
65+ # Use count() (aggregate) instead of query() (vector search).
66+ # query() with no query_vector falls through to search_by_random
67+ # in the local HNSW backend, which can miss filtered results.
68+ # count() uses a scalar aggregate that reliably returns correct
69+ # results.
70+ user_id = ctx .user .user_id
71+ memory_base = f"viking://user/{ user_id } /memories"
8372
8473 for cat in categories :
85- records = grouped [cat ]
86- by_category [cat ] = len (records )
87-
88- for record in records :
89- active_count = record .get ("active_count" , 0 )
90- updated_at_raw = record .get ("updated_at" )
91- updated_at = _parse_datetime (updated_at_raw )
92- created_at_raw = record .get ("created_at" )
93- created_at = _parse_datetime (created_at_raw )
94-
95- # Hotness distribution
96- score = hotness_score (active_count , updated_at , now = now )
97- if score < COLD_THRESHOLD :
98- hotness_dist ["cold" ] += 1
99- elif score > HOT_THRESHOLD :
100- hotness_dist ["hot" ] += 1
101- else :
102- hotness_dist ["warm" ] += 1
103-
104- # Staleness: use updated_at for access tracking
105- if updated_at :
106- age_days = (now - updated_at ).total_seconds () / 86400.0
107- if age_days > 7 :
108- staleness ["not_accessed_7d" ] += 1
109- if age_days > 30 :
110- staleness ["not_accessed_30d" ] += 1
111-
112- # Track oldest memory by created_at
113- if created_at :
114- age = (now - created_at ).total_seconds () / 86400.0
115- if age > staleness ["oldest_memory_age_days" ]:
116- staleness ["oldest_memory_age_days" ] = round (age , 1 )
74+ try :
75+ by_category [cat ] = await self ._vikingdb .count (
76+ filter = And ([
77+ Eq ("context_type" , "memory" ),
78+ PathScope ("uri" , f"{ memory_base } /{ cat } " , depth = 2 ),
79+ ]),
80+ ctx = ctx ,
81+ )
82+ except Exception as e :
83+ logger .error ("Error counting memories for %s: %s" , cat , e )
11784
11885 total_memories = sum (by_category .values ())
11986
@@ -152,47 +119,3 @@ async def get_session_extraction_stats(
152119 "skills_used" : stats .skills_used ,
153120 }
154121
155- async def _query_all_memories (
156- self ,
157- ctx : RequestContext ,
158- ) -> List [Dict [str , Any ]]:
159- """Query all memory records in a single DB round-trip.
160-
161- Uses the context_type="memory" filter. Callers group by category
162- in Python to avoid N+1 queries.
163- """
164- try :
165- return await self ._vikingdb .query (
166- filter = Eq ("context_type" , "memory" ),
167- limit = 10000 ,
168- output_fields = [
169- "uri" ,
170- "active_count" ,
171- "updated_at" ,
172- "created_at" ,
173- "context_type" ,
174- ],
175- ctx = ctx ,
176- )
177- except Exception as e :
178- logger .error ("Error querying memories: %s" , e )
179- return []
180-
181-
182- def _parse_datetime (value ) -> Optional [datetime ]:
183- """Parse a datetime value from a VikingDB record."""
184- if value is None :
185- return None
186- if isinstance (value , datetime ):
187- if value .tzinfo is None :
188- return value .replace (tzinfo = timezone .utc )
189- return value
190- if isinstance (value , str ):
191- try :
192- dt = datetime .fromisoformat (value .replace ("Z" , "+00:00" ))
193- if dt .tzinfo is None :
194- dt = dt .replace (tzinfo = timezone .utc )
195- return dt
196- except (ValueError , TypeError ):
197- return None
198- return None
0 commit comments