diff --git a/.dockerignore b/.dockerignore index 591f6a3c2..dd61a2581 100644 --- a/.dockerignore +++ b/.dockerignore @@ -12,3 +12,6 @@ docker-compose.yml.example .vscode/** data/** .venv +.git +tests +.python-version diff --git a/Dockerfile b/Dockerfile index 4a68d6171..3fe26290d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,7 @@ ENV UV_LINK_MODE=copy # Python optimizations ENV PYTHONDONTWRITEBYTECODE=1 ENV PYTHONUNBUFFERED=1 +ENV UV_PYTHON_DOWNLOADS=never # Install the project's dependencies using the lockfile and settings RUN --mount=type=cache,target=/root/.cache/uv \ @@ -26,12 +27,31 @@ RUN --mount=type=cache,target=/root/.cache/uv \ # Copy only requirements to cache them in docker layer COPY uv.lock pyproject.toml /app/ -# Sync the project +# Place executables in the environment at the front of the path +ENV PATH="/app/.venv/bin:$PATH" + +# Sync the project into a virtual environment, using the frozen lockfile +COPY . /app + +# Install the project itself (extras omitted for runtime image) RUN --mount=type=cache,target=/root/.cache/uv \ uv sync --frozen --no-group dev -# Place executables in the environment at the front of the path -ENV PATH="/app/.venv/bin:$PATH" +# Pre-seed tiktoken's o200k_base cache so containers do not depend on live DNS +# resolution to openaipublic.blob.core.windows.net at first startup. +RUN python - <<'PY' +import hashlib +import pathlib +import urllib.request +url = 'https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken' +cache_key = hashlib.sha1(url.encode()).hexdigest() +cache_path = pathlib.Path('/tmp/data-gym-cache') / cache_key +cache_path.parent.mkdir(parents=True, exist_ok=True) +if not cache_path.exists(): + with urllib.request.urlopen(url) as resp: + cache_path.write_bytes(resp.read()) +print(f'cached {url} -> {cache_path}') +PY ENV HOME=/app ENV UV_CACHE_DIR=/tmp/uv-cache diff --git a/src/config.py b/src/config.py index f01c451b1..c63b596f5 100644 --- a/src/config.py +++ b/src/config.py @@ -1,3 +1,5 @@ +"""Application configuration loaded from environment, .env, and TOML files.""" + import logging from pathlib import Path from typing import Annotated, Any, ClassVar, Literal, Protocol @@ -53,6 +55,7 @@ class TomlConfigSettingsSource(PydanticBaseSettingsSource): """Custom settings source for loading from TOML file.""" def __init__(self, settings_cls: type[BaseSettings]) -> None: + """Initialize the TOML settings source.""" super().__init__(settings_cls) SECTION_MAP: ClassVar[dict[str, str]] = { @@ -76,6 +79,7 @@ def __init__(self, settings_cls: type[BaseSettings]) -> None: def get_field_value( self, field: FieldInfo, field_name: str ) -> tuple[Any, str, bool]: + """Retrieve a field value from the TOML config section.""" # Get the env_prefix from the model config prefix = self.settings_cls.model_config.get("env_prefix", "") if prefix.endswith("_"): @@ -95,6 +99,7 @@ def get_field_value( return field_value, field_name, False def __call__(self) -> dict[str, Any]: + """Return all TOML values for the current section as a dict.""" # Get the env_prefix from the model config prefix = self.settings_cls.model_config.get("env_prefix", "") if prefix.endswith("_"): @@ -122,6 +127,7 @@ def settings_customise_sources( # pyright: ignore dotenv_settings: DotEnvSettingsSource, file_secret_settings: PydanticBaseSettingsSource, ) -> tuple[PydanticBaseSettingsSource, ...]: + """Define settings source precedence: init > env > .env > toml > secrets.""" # Correct precedence: init > env > .env > toml > secrets > defaults return ( init_settings, @@ -153,6 +159,8 @@ def _validate_backup_configuration(self): class DBSettings(HonchoSettings): + """Database connection and pool configuration.""" + model_config = SettingsConfigDict(env_prefix="DB_", extra="ignore") # pyright: ignore CONNECTION_URI: str = ( @@ -175,6 +183,8 @@ class DBSettings(HonchoSettings): class AuthSettings(HonchoSettings): + """Authentication settings including JWT configuration.""" + model_config = SettingsConfigDict(env_prefix="AUTH_", extra="ignore") # pyright: ignore USE_AUTH: bool = False @@ -182,12 +192,15 @@ class AuthSettings(HonchoSettings): @model_validator(mode="after") # type: ignore def _require_jwt_secret(self) -> "AuthSettings": + """Validate that JWT_SECRET is set when USE_AUTH is enabled.""" if self.USE_AUTH and not self.JWT_SECRET: raise ValueError("JWT_SECRET must be set if USE_AUTH is true") return self class SentrySettings(HonchoSettings): + """Sentry error tracking and performance monitoring settings.""" + model_config = SettingsConfigDict(env_prefix="SENTRY_", extra="ignore") # pyright: ignore ENABLED: bool = False @@ -199,6 +212,8 @@ class SentrySettings(HonchoSettings): class LLMSettings(HonchoSettings): + """LLM provider API keys and default generation settings.""" + model_config = SettingsConfigDict(env_prefix="LLM_", extra="ignore") # pyright: ignore # API Keys for LLM providers @@ -233,6 +248,8 @@ class LLMSettings(HonchoSettings): class DeriverSettings(BackupLLMSettingsMixin, HonchoSettings): + """Configuration for the background deriver (message processing) agent.""" + model_config = SettingsConfigDict(env_prefix="DERIVER_", extra="ignore") # pyright: ignore ENABLED: bool = True @@ -278,6 +295,7 @@ class DeriverSettings(BackupLLMSettingsMixin, HonchoSettings): @model_validator(mode="after") def validate_batch_tokens_vs_context_limit(self): + """Ensure batch token limit does not exceed deriver input token limit.""" if self.REPRESENTATION_BATCH_MAX_TOKENS > self.MAX_INPUT_TOKENS: raise ValueError( f"REPRESENTATION_BATCH_MAX_TOKENS ({self.REPRESENTATION_BATCH_MAX_TOKENS}) cannot exceed max deriver input tokens ({self.MAX_INPUT_TOKENS})" @@ -286,6 +304,8 @@ def validate_batch_tokens_vs_context_limit(self): class PeerCardSettings(HonchoSettings): + """Configuration for peer card generation.""" + model_config = SettingsConfigDict(env_prefix="PEER_CARD_", extra="ignore") # pyright: ignore ENABLED: bool = True @@ -350,6 +370,8 @@ def _validate_anthropic_thinking_budget(self) -> "DialecticLevelSettings": class DialecticSettings(HonchoSettings): + """Configuration for the dialectic (chat/QA) agent and reasoning levels.""" + model_config = SettingsConfigDict( # pyright: ignore env_prefix="DIALECTIC_", env_nested_delimiter="__", extra="ignore" ) @@ -426,6 +448,8 @@ def _validate_all_levels_present(self) -> "DialecticSettings": class SummarySettings(BackupLLMSettingsMixin, HonchoSettings): + """Configuration for session summarization.""" + model_config = SettingsConfigDict(env_prefix="SUMMARY_", extra="ignore") # pyright: ignore ENABLED: bool = True @@ -442,6 +466,8 @@ class SummarySettings(BackupLLMSettingsMixin, HonchoSettings): class WebhookSettings(HonchoSettings): + """Configuration for webhook event delivery.""" + model_config = SettingsConfigDict(env_prefix="WEBHOOK_", extra="ignore") # pyright: ignore SECRET: str | None = None # Must be set if configuring webhooks @@ -449,6 +475,8 @@ class WebhookSettings(HonchoSettings): class MetricsSettings(HonchoSettings): + """Configuration for Prometheus metrics.""" + model_config = SettingsConfigDict(env_prefix="METRICS_", extra="ignore") # pyright: ignore ENABLED: bool = False NAMESPACE: str | None = None @@ -488,6 +516,8 @@ class TelemetrySettings(HonchoSettings): class CacheSettings(HonchoSettings): + """Configuration for Redis cache connection.""" + model_config = SettingsConfigDict(env_prefix="CACHE_", extra="ignore") # pyright: ignore ENABLED: bool = False @@ -529,6 +559,8 @@ class SurprisalSettings(BaseModel): class DreamSettings(BackupLLMSettingsMixin, HonchoSettings): + """Configuration for the dreamer (memory consolidation) agent.""" + model_config = SettingsConfigDict( # pyright: ignore env_prefix="DREAM_", env_nested_delimiter="__", extra="ignore" ) @@ -610,6 +642,7 @@ class VectorStoreSettings(HonchoSettings): @model_validator(mode="after") def _require_api_key_for_turbopuffer(self) -> "VectorStoreSettings": + """Validate that an API key is set when using Turbopuffer.""" if self.TYPE == "turbopuffer" and not self.TURBOPUFFER_API_KEY: raise ValueError( "VECTOR_STORE_TURBOPUFFER_API_KEY must be set when TYPE is 'turbopuffer'" @@ -618,6 +651,8 @@ def _require_api_key_for_turbopuffer(self) -> "VectorStoreSettings": class AppSettings(HonchoSettings): + """Top-level application settings aggregating all nested config sections.""" + # No env_prefix for app-level settings model_config = SettingsConfigDict( # pyright: ignore env_prefix="", env_nested_delimiter="__", extra="ignore" @@ -664,6 +699,7 @@ class AppSettings(HonchoSettings): @field_validator("LOG_LEVEL") def validate_log_level(cls, v: str) -> str: + """Validate and normalize the log level string.""" log_level = v.upper() if log_level not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]: raise ValueError(f"Invalid log level: {v}") diff --git a/src/crud/deriver.py b/src/crud/deriver.py index 0852a4792..bd95f4615 100644 --- a/src/crud/deriver.py +++ b/src/crud/deriver.py @@ -1,3 +1,5 @@ +"""CRUD operations for deriver queue status and related queries.""" + from collections.abc import Sequence from logging import getLogger from typing import Any diff --git a/src/deriver/prompts.py b/src/deriver/prompts.py index 98ab501ce..b9e117bfd 100644 --- a/src/deriver/prompts.py +++ b/src/deriver/prompts.py @@ -27,13 +27,22 @@ def minimal_deriver_prompt( """ return c( f""" -Analyze messages from {peer_id} to extract **explicit atomic facts** about them. +Analyze messages from {peer_id} to extract explicit atomic observations about them. -[EXPLICIT] DEFINITION: Facts about {peer_id} that can be derived directly from their messages. - - Transform statements into one or multiple conclusions - - Each conclusion must be self-contained with enough context +[EXPLICIT] DEFINITION: Observations about {peer_id} that can be derived directly from their messages. + - Transform statements into one or multiple observations + - Each observation must be self-contained with enough context - Use absolute dates/times when possible (e.g. "June 26, 2025" not "yesterday") +OUTPUT CONTRACT: +- Return exactly one JSON object with a top-level key `explicit`. +- `explicit` must be an array of objects. +- Each object must use the key `content`. +- Do NOT use keys like `fact`, `observation`, `source`, `source_message`, or any extra keys. +- Example valid output: {{"explicit":[{{"content":"{peer_id} lives in Wisconsin"}}]}} +- If there are multiple observations, add more objects with the same `content` key. +- Do not wrap the JSON in markdown. + RULES: - Properly attribute observations to the correct subject: if it is about {peer_id}, say so. If {peer_id} is referencing someone or something else, make that clear. - Observations should make sense on their own. Each observation will be used in the future to better understand {peer_id}. @@ -41,9 +50,9 @@ def minimal_deriver_prompt( - Contextualize each observation sufficiently (e.g. "Ann is nervous about the job interview at the pharmacy" not just "Ann is nervous") EXAMPLES: -- EXPLICIT: "I just had my 25th birthday last Saturday" → "{peer_id} is 25 years old", "{peer_id}'s birthday is June 21st" -- EXPLICIT: "I took my dog for a walk in NYC" → "{peer_id} has a dog", "{peer_id} lives in NYC" -- EXPLICIT: "{peer_id} attended college" + general knowledge → "{peer_id} completed high school or equivalent" +- EXPLICIT: "I just had my 25th birthday last Saturday" → {{"content":"{peer_id} is 25 years old"}}, {{"content":"{peer_id}'s birthday is June 21st"}} +- EXPLICIT: "I took my dog for a walk in NYC" → {{"content":"{peer_id} has a dog"}}, {{"content":"{peer_id} was in NYC"}} +- EXPLICIT: "{peer_id} attended college" → {{"content":"{peer_id} attended college"}} Messages to analyze: diff --git a/src/deriver/queue_manager.py b/src/deriver/queue_manager.py index 5e885255e..900452a9e 100644 --- a/src/deriver/queue_manager.py +++ b/src/deriver/queue_manager.py @@ -116,6 +116,15 @@ async def initialize(self) -> None: """Setup signal handlers, initialize client, and start the main polling loop""" logger.debug(f"Initializing QueueManager with {self.workers} workers") + recovered_dreams = 0 + try: + recovered_dreams = await self.dream_scheduler.recover_overdue_dreams() + except Exception as e: + logger.warning("Failed overdue dream recovery at startup: %s", e) + + if recovered_dreams: + logger.info("Recovered %s overdue dream(s) during queue manager init", recovered_dreams) + # Set up signal handlers loop = asyncio.get_running_loop() signals = (signal.SIGTERM, signal.SIGINT) diff --git a/src/dreamer/dream_scheduler.py b/src/dreamer/dream_scheduler.py index 1a339d21c..cc7dee010 100644 --- a/src/dreamer/dream_scheduler.py +++ b/src/dreamer/dream_scheduler.py @@ -1,6 +1,6 @@ import asyncio import contextlib -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from logging import getLogger import sentry_sdk @@ -216,6 +216,91 @@ async def execute_dream( session_name=session_name, ) + async def recover_overdue_dreams(self) -> int: + """Recover overdue dreams after a deriver restart. + + Timer-based dreams live in memory until they mature. If the deriver restarts, + those pending timers vanish. On startup we scan collections that are already + past the dream threshold and have been idle at least the configured timeout, + then immediately execute the missing dream. + """ + if not settings.DREAM.ENABLED: + return 0 + + recovered = 0 + now = datetime.now(timezone.utc) + idle_cutoff = now - timedelta(minutes=settings.DREAM.IDLE_TIMEOUT_MINUTES) + + async with tracked_db("dream_recovery_scan") as db: + stmt = ( + select( + models.Collection.workspace_name, + models.Collection.observer, + models.Collection.observed, + func.count(models.Document.id).label("doc_count"), + models.Collection.internal_metadata["dream"]["last_dream_document_count"].astext, + models.Collection.internal_metadata["dream"]["last_dream_at"].astext, + func.max(models.Document.created_at).label("last_document_at"), + ) + .join( + models.Document, + ( + (models.Document.workspace_name == models.Collection.workspace_name) + & (models.Document.observer == models.Collection.observer) + & (models.Document.observed == models.Collection.observed) + ), + ) + .group_by( + models.Collection.workspace_name, + models.Collection.observer, + models.Collection.observed, + models.Collection.internal_metadata, + ) + ) + result = await db.execute(stmt) + rows = result.all() + + for ( + workspace_name, + observer, + observed, + doc_count, + last_dream_document_count, + last_dream_at, + last_document_at, + ) in rows: + if doc_count < settings.DREAM.DOCUMENT_THRESHOLD: + continue + if last_document_at is None or last_document_at > idle_cutoff: + continue + + last_dream_document_count_int = int(last_dream_document_count or 0) + if doc_count - last_dream_document_count_int < settings.DREAM.DOCUMENT_THRESHOLD: + continue + + if last_dream_at: + try: + last_dream_dt = datetime.fromisoformat(last_dream_at) + except (TypeError, ValueError): + last_dream_dt = None + if last_dream_dt is not None: + hours_since_last_dream = (now - last_dream_dt).total_seconds() / 3600 + if hours_since_last_dream < settings.DREAM.MIN_HOURS_BETWEEN_DREAMS: + continue + + for dream_type in settings.DREAM.ENABLED_TYPES: + await self.execute_dream( + workspace_name, + DreamType(dream_type), + observer=observer, + observed=observed, + ) + recovered += 1 + + if recovered: + logger.info("Recovered %s overdue dream(s) after startup", recovered) + return recovered + async def shutdown(self) -> None: """Cancel all pending dreams during shutdown.""" if self.pending_dreams: diff --git a/src/dreamer/specialists.py b/src/dreamer/specialists.py index 608db3b0e..e06e675be 100644 --- a/src/dreamer/specialists.py +++ b/src/dreamer/specialists.py @@ -218,6 +218,7 @@ def iteration_callback(data: Any) -> None: max_tool_iterations=self.get_max_iterations(), messages=messages, track_name=f"Dreamer/{self.name}", + trace_name=f"dreamer_{self.name}", iteration_callback=iteration_callback, ) @@ -312,7 +313,7 @@ def get_max_tokens(self) -> int: return 8192 def get_max_iterations(self) -> int: - return 12 + return 6 def build_system_prompt( self, observed: str, *, peer_card_enabled: bool = True @@ -324,21 +325,42 @@ def build_system_prompt( ## PEER CARD (REQUIRED) The peer card is a summary of stable biographical facts. You MUST update it when you learn: -- Name, age, location, occupation +- Name, email, location, occupation/role - Family members and relationships - Standing instructions ("call me X", "don't mention Y") - Core preferences and traits +Priority order for peer-card entries: +1. Canonical identity facts explicitly stated by the user (name, email, title, location) +2. Standing instructions and routing preferences +3. Long-lived preferences and personality traits +4. Stable role/occupation facts +5. Only then, a few high-value project/work context facts if they materially help identification + +Avoid low-value or generic entries such as: +- "Works on projects" +- "Evaluates systems" +- "Capable of reviewing conversation history" +- temporary tasks, one-off questions, or broad capability statements + Never add temporary event summaries, one-off conclusions, reasoning traces, or contradiction notes. Format entries as: -- Plain facts: "Name: Alice", "Works at Google", "Lives in NYC" +- Plain facts: "Name: Alice", "Email: alice@example.com", "Lives in NYC", "Role: Software developer" - `INSTRUCTION: ...` for standing instructions - `PREFERENCE: ...` for preferences - `TRAIT: ...` for personality traits -Call `update_peer_card` with the complete updated list when you have new biographical info. -Keep it concise (max 40 entries), deduplicated, and current.""" +Good examples: +- "Name: Aubrey Freeman III" +- "Email: aubrey@freeman-wisco.com" +- "Location: Wisconsin" +- "INSTRUCTION: Gerard is the single front door" +- "PREFERENCE: Route specialists behind the scenes" + +Call `update_peer_card` with the complete updated list in the `content` field when you have new biographical info. +Do not use argument names like `entries` or `peer_card_updates`. +Keep it concise (max 12 entries unless there is a strong reason for more), deduplicated, and current.""" return f"""You are a deductive reasoning agent analyzing observations about {observed}. @@ -346,14 +368,28 @@ def build_system_prompt( Create deductive observations by finding logical implications in what's already known. Think like a detective connecting evidence. -## PHASE 1: DISCOVERY +## EXECUTION DISCIPLINE -Explore what's actually in memory. Use these tools freely: -- `get_recent_observations` - See what's been learned recently -- `search_memory` - Search for specific topics -- `search_messages` - See actual conversation content +- Call `extract_preferences` FIRST to pull standing instructions and stable preferences from conversation history. +- After that, do at most 2 additional discovery calls (`get_recent_observations`, `search_memory`, or `search_messages`). +- Stop discovery after at most 3 tool calls total before acting. +- Once you have enough evidence, you MUST do one of these before the loop ends: + 1. Call `update_peer_card` with the complete updated card in the `content` argument, or + 2. Call `finish_consolidation` with a short reason that no durable card update is warranted. +- Do not keep searching once you already have name/location/preferences/standing-instruction evidence. +- If you find durable profile facts, prefer updating the peer card over creating more search queries. -Spend a few tool calls understanding the landscape before creating anything. +## PHASE 1: DISCOVERY + +Use a short discovery pass only: +- `extract_preferences` - first call, for preferences and standing instructions +- `get_recent_observations` - recent facts +- `search_memory` - use targeted identity/routing queries such as: + - `Aubrey Freeman III name email` + - `aubrey@freeman-wisco.com Wisconsin` + - `Gerard single front door specialists behind the scenes` + - `software developer role occupation` +- `search_messages` - one targeted follow-up if needed ## PHASE 2: ACTION @@ -394,7 +430,8 @@ def build_system_prompt( 2. Create observations based on what you ACTUALLY FIND, not what you expect 3. Always include source_ids linking to the observations you're synthesizing 4. Delete outdated observations - don't leave duplicates -5. Quality over quantity - fewer good deductions beat many weak ones""" +5. Quality over quantity - fewer good deductions beat many weak ones +6. Before your final step, either call `update_peer_card` or call `finish_consolidation` explicitly""" def build_user_prompt( self, @@ -405,22 +442,23 @@ def build_user_prompt( if hints: hints_str = "\n".join(f"- {q}" for q in hints[:5]) - return f"""{peer_card_context}Start by exploring recent observations and messages. These topics may be worth investigating: - -{hints_str} + return f"""{peer_card_context}Start with `extract_preferences`, then do a very short discovery pass using at most two of `get_recent_observations`, `search_memory`, or `search_messages`. -But follow the evidence - if you find something more interesting, pursue that instead. +These topics may be worth investigating: -Begin with `get_recent_observations` to see what's there.""" +{hints_str} - return f"""{peer_card_context}Explore the observation space and create deductive observations. +Once you have enough evidence, stop searching and either update the peer card or finish consolidation explicitly.""" -Start with `get_recent_observations` to see what's been learned recently, then investigate whatever seems most promising. + return f"""{peer_card_context}Start with `extract_preferences`, then do a very short discovery pass using at most two of `get_recent_observations`, `search_memory`, or `search_messages`. Look for: 1. Knowledge updates (same fact, different values over time) 2. Logical implications that haven't been made explicit 3. Contradictions that need flagging +4. Canonical identity/routing facts first: exact name, email, location, Gerard/front-door preference + +Once you have enough evidence, stop searching and either update the peer card or finish consolidation explicitly. Go.""" @@ -455,7 +493,7 @@ def get_max_tokens(self) -> int: return 8192 def get_max_iterations(self) -> int: - return 10 + return 6 def build_system_prompt( self, observed: str, *, peer_card_enabled: bool = True @@ -466,14 +504,24 @@ def build_system_prompt( ## PEER CARD (REQUIRED) -After identifying patterns, only update the peer card for durable profile-level traits/preferences: -- `TRAIT: Analytical thinker` -- `TRAIT: Tends to reschedule when stressed` -- `PREFERENCE: Prefers detailed explanations` +After identifying patterns, only update the peer card for durable profile-level traits/preferences and missing canonical identity facts. + +Priority order for peer-card entries: +1. Missing canonical identity or routing facts the deduction phase failed to add +2. Standing instructions and strong routing preferences +3. Durable long-lived preferences and personality traits +4. Exclude generic project chatter unless it is essential to identifying the user -Do NOT add temporary patterns, episode-specific conclusions, or reasoning summaries. -Call `update_peer_card` with the complete deduplicated list only when a durable profile update is warranted. -Keep it concise (max 40 entries).""" +Prefer entries like: +- `INSTRUCTION: Gerard is the single front door` +- `PREFERENCE: Route specialists behind the scenes` +- `TRAIT: Methodical and process-oriented` +- `TRAIT: Detail-conscious in verification and debugging` + +Do NOT add temporary patterns, episode-specific conclusions, reasoning summaries, or generic capability statements. +Call `update_peer_card` with the complete deduplicated list in the `content` field only when a durable profile update is warranted. +Do not use argument names like `entries` or `peer_card_updates`. +Keep it concise (max 12 entries unless there is a strong reason for more).""" return f"""You are an inductive reasoning agent identifying patterns about {observed}. @@ -481,18 +529,30 @@ def build_system_prompt( Create inductive observations by finding patterns across multiple observations. Think like a psychologist identifying behavioral tendencies. +## EXECUTION DISCIPLINE + +- Call `extract_preferences` FIRST to pull standing instructions and stable preferences from conversation history. +- After that, do at most 2 additional discovery calls (`get_recent_observations`, `search_memory`, or `search_messages`). +- Stop discovery after at most 3 tool calls total before acting. +- Once you have enough evidence, you MUST do one of these before the loop ends: + 1. Call `update_peer_card` with the complete updated card in the `content` argument, or + 2. Call `finish_consolidation` with a short reason that no durable card update is warranted. +- Do not keep searching once you already have enough evidence for durable traits/preferences. +- If you identify a durable preference or trait, prefer updating the peer card over more search queries. + ## PHASE 1: DISCOVERY -Explore broadly to find patterns. Use these tools: -- `get_recent_observations` - Recent learnings -- `search_memory` - Topic-specific search -- `search_messages` - Actual conversation content +Use a short discovery pass only: +- `extract_preferences` - first call, for preferences and standing instructions +- `get_recent_observations` - recent learnings +- `search_memory` - one targeted follow-up if needed +- `search_messages` - one targeted follow-up if needed Look at BOTH explicit observations AND deductive ones. Patterns often emerge from synthesizing across both levels. ## PHASE 2: ACTION -Create inductive observations when you see patterns: +Create inductive observations only when you have enough evidence: ### Behavioral Patterns - "Tends to reschedule meetings when stressed" @@ -504,14 +564,14 @@ def build_system_prompt( - "Likes detailed technical explanations" ### Personality Traits -- "Generally optimistic about outcomes" -- "Detail-oriented in planning" - -### Temporal Patterns -- "Career goals have remained consistent" -- "Living situation changes frequently" +- "Analytical thinker" +- "Risk-averse with finances" +- "Values family time over overtime" {peer_card_section} +If you do NOT have at least 2 concrete source observation IDs for a proposed inductive pattern, do NOT call `create_observations`. +In that case, update the peer card if warranted or call `finish_consolidation`. + ## CREATING OBSERVATIONS ```json @@ -519,21 +579,24 @@ def build_system_prompt( "observations": [{{ "content": "The pattern or generalization", "level": "inductive", - "source_ids": ["id1", "id2", "id3"], - "sources": ["evidence 1", "evidence 2"], - "pattern_type": "tendency", // preference|behavior|personality|tendency|correlation - "confidence": "medium" // low (2 sources), medium (3-4), high (5+) + "source_ids": ["id1", "id2", "id3"] }}] }} ``` ## RULES -1. Minimum 2 source observations required - patterns need evidence -2. Don't just restate a single fact as a pattern -3. Confidence based on evidence count: 2=low, 3-4=medium, 5+=high -4. Look for HOW things change over time, not just static facts -5. Include source_ids - always link back to evidence""" +1. Patterns need multiple examples - don't overgeneralize from one instance +2. Create observations based on what you ACTUALLY FIND, not stereotypes +3. Include source_ids for all observations supporting the pattern +4. Focus on useful patterns that help understand future behavior +5. Quality over quantity - fewer strong patterns beat many weak ones +6. Before your final step, either call `update_peer_card` or call `finish_consolidation` explicitly +7. Minimum 2 source observations required - patterns need evidence +8. Don't just restate a single fact as a pattern +9. Look for HOW things change over time, not just static facts +10. When calling `create_observations`, use only `content`, `level`, and `source_ids` for inductive observations +11. Do NOT send extra keys like `pattern_type`, `confidence`, or `sources`""" def build_user_prompt( self, @@ -544,17 +607,19 @@ def build_user_prompt( if hints: hints_str = "\n".join(f"- {q}" for q in hints[:5]) - return f"""{peer_card_context}Explore and find patterns. These areas may be worth investigating: + return f"""{peer_card_context}Start with `extract_preferences`, then do a very short discovery pass using at most two of `get_recent_observations`, `search_memory`, or `search_messages`. + +These areas may be worth investigating: {hints_str} -But follow the evidence - if you find patterns elsewhere, pursue those. +Once you have enough evidence, stop searching and either update the peer card or finish consolidation explicitly.""" -Start with `get_recent_observations`.""" + return f"""{peer_card_context}Start with `extract_preferences`, then do a very short discovery pass using at most two of `get_recent_observations`, `search_memory`, or `search_messages`. - return f"""{peer_card_context}Explore the observation space and identify patterns. +Look for repeated behaviors, preferences, and durable traits. Remember: patterns need 2+ sources. -Remember: patterns need 2+ sources. Look for tendencies, preferences, and behavioral regularities. +Once you have enough evidence, stop searching and either update the peer card or finish consolidation explicitly. Go.""" diff --git a/src/models.py b/src/models.py index c2aa84dd2..5a421423b 100644 --- a/src/models.py +++ b/src/models.py @@ -1,3 +1,5 @@ +"""SQLAlchemy ORM models for the Honcho database.""" + import datetime from logging import getLogger from typing import Any, final @@ -27,6 +29,7 @@ from src.utils.types import DocumentLevel, TaskType, VectorSyncState +from .config import settings from .db import Base load_dotenv(override=True) @@ -92,6 +95,8 @@ @final class Workspace(Base): + """SQLAlchemy model representing a workspace (root organizational unit).""" + __tablename__: str = "workspaces" id: Mapped[str] = mapped_column(TEXT, default=generate_nanoid, primary_key=True) name: Mapped[str] = mapped_column(TEXT, unique=True) @@ -125,6 +130,8 @@ class Workspace(Base): @final class Peer(Base): + """SQLAlchemy model representing a peer (user or agent participant).""" + __tablename__: str = "peers" id: Mapped[str] = mapped_column(TEXT, default=generate_nanoid, primary_key=True) name: Mapped[str] = mapped_column(TEXT, nullable=False) @@ -157,11 +164,14 @@ class Peer(Base): ) def __repr__(self) -> str: + """Return a string representation of the Peer.""" return f"Peer(id={self.id}, name={self.name}, workspace_name={self.workspace_name}, created_at={self.created_at}, h_metadata={self.h_metadata}, configuration={self.configuration})" @final class Session(Base): + """SQLAlchemy model representing a conversation session.""" + __tablename__: str = "sessions" id: Mapped[str] = mapped_column(TEXT, primary_key=True, default=generate_nanoid) name: Mapped[str] = mapped_column(TEXT) @@ -196,11 +206,14 @@ class Session(Base): ) def __repr__(self) -> str: + """Return a string representation of the Session.""" return f"Session(id={self.id}, name={self.name}, workspace_name={self.workspace_name}, is_active={self.is_active}, created_at={self.created_at}, h_metadata={self.h_metadata})" @final class Message(Base): + """SQLAlchemy model representing a message within a session.""" + __tablename__: str = "messages" id: Mapped[int] = mapped_column( BigInteger, Identity(), primary_key=True, autoincrement=True @@ -267,18 +280,21 @@ class Message(Base): @override def __repr__(self) -> str: + """Return a string representation of the Message.""" return f"Message(id={self.id}, session_name={self.session_name}, peer_name={self.peer_name}, content={self.content})" @final class MessageEmbedding(Base): + """SQLAlchemy model for storing message embeddings with vector sync state.""" + __tablename__: str = "message_embeddings" id: Mapped[int] = mapped_column( BigInteger, Identity(), primary_key=True, autoincrement=True ) content: Mapped[str] = mapped_column(TEXT) - embedding: MappedColumn[Any] = mapped_column(Vector(1536), nullable=True) + embedding: MappedColumn[Any] = mapped_column(Vector(settings.VECTOR_STORE.DIMENSIONS), nullable=True) message_id: Mapped[str] = mapped_column( ForeignKey("messages.public_id", ondelete="CASCADE"), nullable=False, index=True ) @@ -330,6 +346,8 @@ class MessageEmbedding(Base): @final class Collection(Base): + """SQLAlchemy model representing a document collection for an observer-observed pair.""" + __tablename__: str = "collections" id: Mapped[str] = mapped_column(TEXT, default=generate_nanoid, primary_key=True) @@ -374,6 +392,8 @@ class Collection(Base): @final class Document(Base): + """SQLAlchemy model representing a stored observation document with embeddings.""" + __tablename__: str = "documents" id: Mapped[str] = mapped_column(TEXT, default=generate_nanoid, primary_key=True) internal_metadata: Mapped[dict[str, Any]] = mapped_column( @@ -386,7 +406,7 @@ class Document(Base): times_derived: Mapped[int] = mapped_column( Integer, nullable=False, server_default=text("1") ) - embedding: MappedColumn[Any] = mapped_column(Vector(1536), nullable=True) + embedding: MappedColumn[Any] = mapped_column(Vector(settings.VECTOR_STORE.DIMENSIONS), nullable=True) source_ids: Mapped[list[str] | None] = mapped_column( JSONB, nullable=True, server_default=text("NULL") ) @@ -472,6 +492,8 @@ class Document(Base): @final class QueueItem(Base): + """SQLAlchemy model representing a background processing queue item.""" + __tablename__: str = "queue" id: Mapped[int] = mapped_column( BigInteger, Identity(), primary_key=True, autoincrement=True @@ -526,11 +548,14 @@ class QueueItem(Base): ) def __repr__(self) -> str: + """Return a string representation of the QueueItem.""" return f"QueueItem(id={self.id}, session_id={self.session_id}, work_unit_key={self.work_unit_key}, task_type={self.task_type}, payload={self.payload}, processed={self.processed}, workspace_name={self.workspace_name}, message_id={self.message_id})" @final class ActiveQueueSession(Base): + """SQLAlchemy model tracking actively processed queue work units.""" + __tablename__: str = "active_queue_sessions" id: Mapped[str] = mapped_column(TEXT, default=generate_nanoid, primary_key=True) @@ -544,6 +569,8 @@ class ActiveQueueSession(Base): @final class WebhookEndpoint(Base): + """SQLAlchemy model representing a webhook endpoint for event delivery.""" + __tablename__: str = "webhook_endpoints" id: Mapped[str] = mapped_column(TEXT, default=generate_nanoid, primary_key=True) workspace_name: Mapped[str] = mapped_column( @@ -559,11 +586,14 @@ class WebhookEndpoint(Base): __table_args__ = (CheckConstraint("length(url) <= 2048", name="url_length"),) def __repr__(self) -> str: + """Return a string representation of the WebhookEndpoint.""" return f"WebhookEndpoint(id={self.id}, workspace_name={self.workspace_name}, url={self.url})" @final class SessionPeer(Base): + """SQLAlchemy model representing the many-to-many relationship between sessions and peers.""" + __table__: Table = session_peers_table # Type annotations for the columns diff --git a/src/schemas/api.py b/src/schemas/api.py index 307b12e03..69aca38d0 100644 --- a/src/schemas/api.py +++ b/src/schemas/api.py @@ -91,10 +91,11 @@ def _validate_metadata(v: Any) -> Any: class WorkspaceBase(BaseModel): - pass + """Base schema for workspace operations.""" class WorkspaceCreate(WorkspaceBase): + """Schema for creating a new workspace.""" name: Annotated[ str, Field(alias="id", min_length=1, max_length=100, pattern=RESOURCE_NAME_PATTERN), @@ -108,15 +109,20 @@ class WorkspaceCreate(WorkspaceBase): class WorkspaceGet(WorkspaceBase): + """Schema for querying workspaces with optional filters.""" + filters: dict[str, Any] | None = None class WorkspaceUpdate(WorkspaceBase): + """Schema for updating an existing workspace.""" + metadata: _SanitizedMetadata | None = None configuration: WorkspaceConfiguration | None = None class Workspace(WorkspaceBase): + """Schema representing a workspace in API responses.""" name: str = Field(serialization_alias="id") h_metadata: dict[str, Any] = Field( default_factory=dict, serialization_alias="metadata" @@ -135,10 +141,11 @@ class Workspace(WorkspaceBase): class PeerBase(BaseModel): - pass + """Base schema for peer operations.""" class PeerCreate(PeerBase): + """Schema for creating a new peer.""" name: Annotated[ str, Field(alias="id", min_length=1, max_length=100, pattern=RESOURCE_NAME_PATTERN), @@ -150,15 +157,20 @@ class PeerCreate(PeerBase): class PeerGet(PeerBase): + """Schema for querying peers with optional filters.""" + filters: dict[str, Any] | None = None class PeerUpdate(PeerBase): + """Schema for updating an existing peer.""" + metadata: _SanitizedMetadata | None = None configuration: dict[str, Any] | None = None class Peer(PeerBase): + """Schema representing a peer in API responses.""" name: str = Field(serialization_alias="id") workspace_name: str = Field(serialization_alias="workspace_id") created_at: datetime.datetime @@ -173,6 +185,7 @@ class Peer(PeerBase): class PeerRepresentationGet(BaseModel): + """Schema for requesting a peer's representation with optional filters.""" session_id: str | None = Field( None, description="Optional session ID within which to scope the representation" ) @@ -209,21 +222,28 @@ class PeerRepresentationGet(BaseModel): class RepresentationResponse(BaseModel): + """Schema for the peer representation response.""" + representation: str class PeerCardResponse(BaseModel): + """Schema for the peer card response.""" + peer_card: list[str] | None = Field( None, description="The peer card content, or None if not found" ) class PeerCardSet(BaseModel): + """Schema for setting a peer's card content.""" + peer_card: list[str] = Field(..., description="The peer card content to set") @field_validator("peer_card", mode="before") @classmethod def sanitize_peer_card(cls, v: Any) -> Any: + """Strip NUL bytes from peer card strings.""" if isinstance(v, list): return [ item.replace("\x00", "") if isinstance(item, str) else item @@ -238,10 +258,11 @@ def sanitize_peer_card(cls, v: Any) -> Any: class MessageBase(BaseModel): - pass + """Base schema for message operations.""" class MessageCreate(MessageBase): + """Schema for creating a new message.""" content: Annotated[str, Field(min_length=0, max_length=settings.MAX_MESSAGE_SIZE)] peer_name: str = Field(alias="peer_id") metadata: _SanitizedMetadata | None = None @@ -253,14 +274,17 @@ class MessageCreate(MessageBase): @field_validator("content", mode="after") @classmethod def sanitize_content(cls, v: str) -> str: + """Strip NUL bytes from message content.""" return v.replace("\x00", "") @property def encoded_message(self) -> list[int]: + """Return the tokenized message as a list of token IDs.""" return self._encoded_message @model_validator(mode="after") def validate_and_set_token_count(self) -> Self: + """Tokenize the message content using o200k_base encoding.""" encoding = tiktoken.get_encoding("o200k_base") encoded_message = encoding.encode(self.content) @@ -269,14 +293,19 @@ def validate_and_set_token_count(self) -> Self: class MessageGet(MessageBase): + """Schema for querying messages with optional filters.""" + filters: dict[str, Any] | None = None class MessageUpdate(MessageBase): + """Schema for updating an existing message.""" + metadata: _SanitizedMetadata | None = None class Message(MessageBase): + """Schema representing a message in API responses.""" public_id: str = Field(serialization_alias="id") content: str peer_name: str = Field(serialization_alias="peer_id") @@ -316,10 +345,11 @@ class MessageUploadCreate(BaseModel): class SessionBase(BaseModel): - pass + """Base schema for session operations.""" class SessionCreate(SessionBase): + """Schema for creating a new session.""" name: Annotated[ str, Field(alias="id", min_length=1, max_length=100, pattern=RESOURCE_NAME_PATTERN), @@ -332,15 +362,20 @@ class SessionCreate(SessionBase): class SessionGet(SessionBase): + """Schema for querying sessions with optional filters.""" + filters: dict[str, Any] | None = None class SessionUpdate(SessionBase): + """Schema for updating an existing session.""" + metadata: _SanitizedMetadata | None = None configuration: SessionConfiguration | None = None class Session(SessionBase): + """Schema representing a session in API responses.""" name: str = Field(serialization_alias="id") is_active: bool workspace_name: str = Field(serialization_alias="workspace_id") @@ -356,6 +391,7 @@ class Session(SessionBase): class Summary(BaseModel): + """Schema representing a session summary.""" content: str = Field(description="The summary text") message_id: int = Field( description="The internal ID of the message that this summary covers up to", @@ -373,6 +409,7 @@ class Summary(BaseModel): class SessionContext(SessionBase): + """Schema representing a session's context with messages, summary, and peer data.""" name: str = Field(serialization_alias="id") messages: list[Message] summary: Summary | None = Field( @@ -408,6 +445,7 @@ class PeerContext(BaseModel): class SessionSummaries(SessionBase): + """Schema representing a session's short and long summaries.""" name: str = Field(serialization_alias="id") short_summary: Summary | None = Field( default=None, description="The short summary if available" @@ -492,6 +530,7 @@ class ConclusionCreate(BaseModel): @field_validator("content", mode="after") @classmethod def sanitize_content(cls, v: str) -> str: + """Strip NUL bytes from conclusion content.""" return v.replace("\x00", "") @model_validator(mode="after") @@ -526,6 +565,7 @@ class ConclusionBatchCreate(BaseModel): class MessageSearchOptions(BaseModel): + """Schema for message search query parameters.""" query: Annotated[str, Field(..., description="Search query")] filters: dict[str, Any] | None = Field( default=None, description="Filters to scope the search" @@ -540,6 +580,7 @@ class MessageSearchOptions(BaseModel): @field_validator("query", mode="after") @classmethod def sanitize_query(cls, v: str) -> str: + """Strip NUL bytes from search query.""" return v.replace("\x00", "") @@ -549,6 +590,7 @@ def sanitize_query(cls, v: str) -> str: class DialecticOptions(BaseModel): + """Schema for dialectic chat request parameters.""" session_id: str | None = Field( None, description="ID of the session to scope the representation to" ) @@ -568,10 +610,13 @@ class DialecticOptions(BaseModel): @field_validator("query", mode="after") @classmethod def sanitize_query(cls, v: str) -> str: + """Strip NUL bytes from dialectic query.""" return v.replace("\x00", "") class DialecticResponse(BaseModel): + """Schema for non-streaming dialectic responses.""" + content: str | None @@ -642,6 +687,7 @@ class QueueStatus(BaseModel): class ScheduleDreamRequest(BaseModel): + """Schema for scheduling a dream consolidation task.""" observer: str = Field(..., description="Observer peer name") observed: str | None = Field( None, description="Observed peer name (defaults to observer if not specified)" @@ -658,15 +704,18 @@ class ScheduleDreamRequest(BaseModel): class WebhookEndpointBase(BaseModel): - pass + """Base schema for webhook endpoint operations.""" class WebhookEndpointCreate(WebhookEndpointBase): + """Schema for creating a new webhook endpoint.""" + url: str @field_validator("url") @classmethod def validate_webhook_url(cls, v: str) -> str: + """Validate webhook URL format, scheme, and block private IPs.""" parsed = urlparse(v) if not all([parsed.scheme, parsed.netloc]): @@ -689,6 +738,7 @@ def validate_webhook_url(cls, v: str) -> str: class WebhookEndpoint(WebhookEndpointBase): + """Schema representing a webhook endpoint in API responses.""" id: str workspace_name: str | None = Field(serialization_alias="workspace_id") url: str diff --git a/src/telemetry/prometheus/metrics.py b/src/telemetry/prometheus/metrics.py index f64399b7b..c5700ff2b 100644 --- a/src/telemetry/prometheus/metrics.py +++ b/src/telemetry/prometheus/metrics.py @@ -24,22 +24,31 @@ class NamespacedCounter(Counter): + """Prometheus counter that automatically injects the configured namespace label.""" + def labels(self, **kwargs: str) -> NamespacedCounter: + """Return a counter instance with namespace and other labels applied.""" kwargs["namespace"] = cast(str, settings.METRICS.NAMESPACE) return super().labels(**kwargs) # type: ignore[return-value] class TokenTypes(Enum): + """Enum for distinguishing input vs output tokens.""" + INPUT = "input" OUTPUT = "output" class DeriverTaskTypes(Enum): + """Enum for deriver task categories.""" + INGESTION = "ingestion" SUMMARY = "summary" class DeriverComponents(Enum): + """Enum for deriver token accounting components.""" + PROMPT = "prompt" MESSAGES = "messages" PREVIOUS_SUMMARY = "previous_summary" @@ -47,6 +56,8 @@ class DeriverComponents(Enum): class DialecticComponents(Enum): + """Enum for dialectic token accounting components.""" + TOTAL = "total" @@ -95,14 +106,18 @@ class DialecticComponents(Enum): @final class PrometheusMetrics: + """Singleton that wraps all Prometheus counters with error handling.""" + _instance: PrometheusMetrics | None = None def __new__(cls) -> PrometheusMetrics: + """Return the singleton instance, creating it on first call.""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def _handle_metric_error(self, method_name: str, error: Exception) -> None: + """Report a metric recording failure to Sentry and log a warning.""" import sentry_sdk sentry_sdk.capture_exception(error) @@ -117,6 +132,7 @@ def record_api_request( endpoint: str, status_code: str, ) -> None: + """Record an API request counter increment.""" try: api_requests_counter.labels( method=method, @@ -132,6 +148,7 @@ def record_messages_created( count: int, workspace_name: str, ) -> None: + """Record the creation of messages in a workspace.""" try: messages_created_counter.labels( workspace_name=workspace_name, @@ -145,6 +162,7 @@ def record_dialectic_call( workspace_name: str, reasoning_level: str, ) -> None: + """Record a dialectic API call.""" try: dialectic_calls_counter.labels( workspace_name=workspace_name, @@ -160,6 +178,7 @@ def record_deriver_queue_item( workspace_name: str, task_type: str, ) -> None: + """Record processed deriver queue items.""" try: deriver_queue_items_processed_counter.labels( workspace_name=workspace_name, @@ -176,6 +195,7 @@ def record_deriver_tokens( token_type: str, component: str, ) -> None: + """Record tokens consumed by the deriver.""" try: deriver_tokens_processed_counter.labels( task_type=task_type, @@ -193,6 +213,7 @@ def record_dialectic_tokens( component: str, reasoning_level: str, ) -> None: + """Record tokens consumed by the dialectic agent.""" try: dialectic_tokens_processed_counter.labels( token_type=token_type, @@ -209,6 +230,7 @@ def record_dreamer_tokens( specialist_name: str, token_type: str, ) -> None: + """Record tokens consumed by the dreamer agent.""" try: dreamer_tokens_processed_counter.labels( specialist_name=specialist_name, @@ -222,6 +244,7 @@ def record_dreamer_tokens( async def metrics_endpoint(_request: Request) -> Response: + """Serve Prometheus metrics or return 404 if metrics are disabled.""" if not settings.METRICS.ENABLED: return Response("Metrics are disabled", status_code=404) try: diff --git a/src/utils/agent_tools.py b/src/utils/agent_tools.py index a6c3009df..6004d640a 100644 --- a/src/utils/agent_tools.py +++ b/src/utils/agent_tools.py @@ -269,7 +269,10 @@ def _extract_pattern_snippet( "description": ( "Update the peer card with durable profile facts about the observed peer. " + "Only include stable biographical facts, standing instructions, and long-lived preferences/traits. " - + "Do not include one-off conclusions, temporary events, or duplicate entries." + + "Prefer canonical identity facts first (name, email, location, role), then standing instructions/routing preferences, then durable traits/preferences. " + + "Avoid generic project chatter, broad capability statements, one-off conclusions, temporary events, or duplicate entries. " + + "IMPORTANT: the argument name must be exactly `content` and its value must be the complete deduplicated list. " + + "Do not use keys like `entries`, `peer_card_updates`, or any other alias." ), "input_schema": { "type": "object", @@ -591,13 +594,13 @@ def _extract_pattern_snippet( # Note: get_peer_card is not included - peer card is injected into the prompt directly DEDUCTION_SPECIALIST_TOOLS: list[dict[str, Any]] = [ # Discovery tools + TOOLS["extract_preferences"], TOOLS["get_recent_observations"], - TOOLS["search_memory"], - TOOLS["search_messages"], # Action tools TOOLS["create_observations"], TOOLS["delete_observations"], TOOLS["update_peer_card"], + TOOLS["finish_consolidation"], ] # Tools for the induction specialist (dreamer phase 2) @@ -606,12 +609,12 @@ def _extract_pattern_snippet( # Note: get_peer_card is not included - peer card is injected into the prompt directly INDUCTION_SPECIALIST_TOOLS: list[dict[str, Any]] = [ # Discovery tools + TOOLS["extract_preferences"], TOOLS["get_recent_observations"], - TOOLS["search_memory"], - TOOLS["search_messages"], # Action tools TOOLS["create_observations"], TOOLS["update_peer_card"], + TOOLS["finish_consolidation"], ] @@ -1888,6 +1891,47 @@ async def execute_tool(tool_name: str, tool_input: dict[str, Any]) -> str: """ logger.info("[tool call] %s %s", tool_name, tool_input) + discovery_tools = { + "extract_preferences", + "get_recent_observations", + "get_most_derived_observations", + "search_memory", + "search_messages", + } + non_extract_discovery_tools = discovery_tools - {"extract_preferences"} + if parent_category == "dream": + if not hasattr(execute_tool, "_dream_discovery_count"): + execute_tool._dream_discovery_count = 0 # type: ignore[attr-defined] + execute_tool._dream_extract_called = False # type: ignore[attr-defined] + if tool_name in discovery_tools: + if tool_name in non_extract_discovery_tools and not execute_tool._dream_extract_called: # type: ignore[attr-defined] + logger.warning( + "Dream discovery rule violation: auto-running extract_preferences before %s", + tool_name, + ) + handler = _TOOL_HANDLERS.get("extract_preferences") + if handler: + execute_tool._dream_discovery_count += 1 # type: ignore[attr-defined] + execute_tool._dream_extract_called = True # type: ignore[attr-defined] + result = await handler(ctx, {}) + logger.info("[tool result] %s %s", "extract_preferences", result) + return result + if execute_tool._dream_discovery_count >= 3: # type: ignore[attr-defined] + msg = ( + "Dream discovery budget exhausted after extract_preferences plus two follow-up searches. " + "Auto-finishing consolidation to prevent tool-loop wandering." + ) + logger.warning(msg) + finish_handler = _TOOL_HANDLERS.get("finish_consolidation") + if finish_handler: + result = await finish_handler(ctx, {"summary": msg}) + logger.info("[tool result] %s %s", "finish_consolidation", result) + return result + return msg + execute_tool._dream_discovery_count += 1 # type: ignore[attr-defined] + if tool_name == "extract_preferences": + execute_tool._dream_extract_called = True # type: ignore[attr-defined] + try: handler = _TOOL_HANDLERS.get(tool_name) if handler: diff --git a/src/utils/files.py b/src/utils/files.py index cdbac5145..668c7092d 100644 --- a/src/utils/files.py +++ b/src/utils/files.py @@ -1,3 +1,5 @@ +"""File processing utilities for uploaded documents and text extraction.""" + import datetime import logging from io import BytesIO @@ -21,15 +23,47 @@ class FileProcessor(Protocol): - async def extract_text(self, content: bytes) -> str: ... - def supports_file_type(self, content_type: str) -> bool: ... + """Protocol defining the interface for file content processors.""" + + async def extract_text(self, content: bytes) -> str: + """Extract text from raw file content. + + Args: + content: Raw bytes of the file. + + Returns: + Extracted text string. + """ + ... + + def supports_file_type(self, content_type: str) -> bool: + """Check if this processor handles the given content type. + + Args: + content_type: MIME type of the file. + + Returns: + True if this processor supports the content type. + """ + ... class PDFProcessor: + """Processor for extracting text from PDF files.""" + def supports_file_type(self, content_type: str) -> bool: + """Return True if content_type is application/pdf.""" return content_type == "application/pdf" async def extract_text(self, content: bytes) -> str: + """Extract text from PDF content, labeling each page. + + Args: + content: Raw PDF bytes. + + Returns: + Extracted text with page labels. + """ import pdfplumber with pdfplumber.open(BytesIO(content)) as pdf_reader: @@ -42,10 +76,24 @@ async def extract_text(self, content: bytes) -> str: class TextProcessor: + """Processor for extracting text from plain text files.""" + def supports_file_type(self, content_type: str) -> bool: + """Return True if content_type starts with text/.""" return content_type.startswith("text/") async def extract_text(self, content: bytes) -> str: + """Decode text content trying multiple encodings. + + Args: + content: Raw text bytes. + + Returns: + Decoded text string. + + Raises: + ValueError: If no encoding succeeds. + """ # Try different encodings for encoding in ["utf-8", "utf-16", "latin-1"]: try: @@ -56,10 +104,24 @@ async def extract_text(self, content: bytes) -> str: class JSONProcessor: + """Processor for extracting text from JSON files.""" + def supports_file_type(self, content_type: str) -> bool: + """Return True if content_type is application/json.""" return content_type == "application/json" async def extract_text(self, content: bytes) -> str: + """Parse JSON content and return it as a formatted string. + + Args: + content: Raw JSON bytes. + + Returns: + Formatted JSON string. + + Raises: + ValidationException: If content is not valid UTF-8 or valid JSON. + """ import json try: @@ -80,7 +142,10 @@ async def extract_text(self, content: bytes) -> str: class FileProcessingService: + """Service that routes uploaded files to the appropriate text processor.""" + def __init__(self): + """Initialize with the list of available file processors.""" self.processors: list[FileProcessor] = [ PDFProcessor(), TextProcessor(), @@ -104,6 +169,14 @@ async def extract_text_from_upload(self, file: UploadFile) -> str: return await processor.extract_text(content) def _get_processor(self, content_type: str) -> FileProcessor | None: + """Find the first processor that supports the given content type. + + Args: + content_type: MIME type to match. + + Returns: + Matching FileProcessor or None. + """ for processor in self.processors: if processor.supports_file_type(content_type): return processor diff --git a/tests/deriver/test_prompts.py b/tests/deriver/test_prompts.py new file mode 100644 index 000000000..cbde3d68c --- /dev/null +++ b/tests/deriver/test_prompts.py @@ -0,0 +1,22 @@ +from src.deriver.prompts import minimal_deriver_prompt + + +def test_minimal_deriver_prompt_explicitly_requires_content_key() -> None: + prompt = minimal_deriver_prompt( + peer_id="Aubrey", + messages="[2026-04-14 04:00:00] Aubrey: I live in Wisconsin.", + ) + + assert "Each object must use the key `content`" in prompt + assert "Do NOT use keys like `fact`" in prompt + assert '{"explicit":[{"content":"Aubrey lives in Wisconsin"}]}' in prompt + + +def test_minimal_deriver_prompt_examples_keep_explicit_observations_literal() -> None: + prompt = minimal_deriver_prompt( + peer_id="Aubrey", + messages="[2026-04-14 04:00:00] Aubrey: I walked my dog in NYC.", + ) + + assert '"Aubrey lives in NYC"' not in prompt + assert '"Aubrey completed high school or equivalent"' not in prompt diff --git a/tests/dreamer/test_dream_scheduler.py b/tests/dreamer/test_dream_scheduler.py index 06f904c1c..b263dc7eb 100644 --- a/tests/dreamer/test_dream_scheduler.py +++ b/tests/dreamer/test_dream_scheduler.py @@ -460,3 +460,61 @@ async def test_enqueue_cancels_peer_to_peer_dreams( # All three dreams should be cancelled assert len(cancelled) == 3 assert len(dream_scheduler.pending_dreams) == 0 + + +class TestDreamRecoveryAfterRestart: + """Overdue dreams should be recovered after deriver restart instead of vanishing.""" + + @pytest.mark.asyncio + async def test_recover_overdue_dreams_executes_for_idle_collections( + self, dream_scheduler: DreamScheduler + ): + from contextlib import asynccontextmanager + from datetime import datetime, timedelta, timezone + from unittest.mock import MagicMock + + now = datetime.now(timezone.utc) + overdue_row = ( + "hermes", + "hermes", + "user-default-hermes-agent", + 75, + None, + None, + now - timedelta(minutes=95), + ) + fresh_row = ( + "hermes", + "hermes", + "fresh-user", + 75, + None, + None, + now - timedelta(minutes=10), + ) + + mock_db = MagicMock() + mock_result = MagicMock() + mock_result.all.return_value = [overdue_row, fresh_row] + mock_db.execute = AsyncMock(return_value=mock_result) + + @asynccontextmanager + async def mock_tracked_db(_: str | None = None): + yield mock_db + + with ( + patch("src.dreamer.dream_scheduler.tracked_db", mock_tracked_db), + patch.object(dream_scheduler, "execute_dream", new_callable=AsyncMock) as execute_dream, + patch("src.dreamer.dream_scheduler.settings.DREAM.DOCUMENT_THRESHOLD", 50), + patch("src.dreamer.dream_scheduler.settings.DREAM.IDLE_TIMEOUT_MINUTES", 60), + patch("src.dreamer.dream_scheduler.settings.DREAM.MIN_HOURS_BETWEEN_DREAMS", 0), + ): + recovered = await dream_scheduler.recover_overdue_dreams() + + assert recovered == 1 + execute_dream.assert_awaited_once_with( + "hermes", + DreamType.OMNI, + observer="hermes", + observed="user-default-hermes-agent", + ) diff --git a/tests/dreamer/test_specialists.py b/tests/dreamer/test_specialists.py new file mode 100644 index 000000000..31d6b9f66 --- /dev/null +++ b/tests/dreamer/test_specialists.py @@ -0,0 +1,35 @@ +from src.dreamer.specialists import DeductionSpecialist, InductionSpecialist + + +class TestDeductionSpecialistControl: + def test_tool_list_includes_finish_consolidation(self): + specialist = DeductionSpecialist() + tool_names = [tool["name"] for tool in specialist.get_tools(peer_card_enabled=True)] + + assert "finish_consolidation" in tool_names + assert "update_peer_card" in tool_names + + def test_prompt_forces_early_commit_or_finish(self): + specialist = DeductionSpecialist() + prompt = specialist.build_system_prompt("user-default-hermes-agent", peer_card_enabled=True) + + assert "Stop discovery after at most 3 tool calls" in prompt + assert "Call `update_peer_card`" in prompt + assert "call `finish_consolidation`" in prompt + + +class TestInductionSpecialistControl: + def test_tool_list_includes_finish_consolidation(self): + specialist = InductionSpecialist() + tool_names = [tool["name"] for tool in specialist.get_tools(peer_card_enabled=True)] + + assert "finish_consolidation" in tool_names + assert "update_peer_card" in tool_names + + def test_prompt_forces_early_commit_or_finish(self): + specialist = InductionSpecialist() + prompt = specialist.build_system_prompt("user-default-hermes-agent", peer_card_enabled=True) + + assert "Stop discovery after at most 3 tool calls" in prompt + assert "Call `update_peer_card`" in prompt + assert "call `finish_consolidation`" in prompt