Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ docker-compose.yml.example
.vscode/**
data/**
.venv
.git
tests
.python-version
26 changes: 23 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ENV UV_LINK_MODE=copy
# Python optimizations
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV UV_PYTHON_DOWNLOADS=never

# Install the project's dependencies using the lockfile and settings
RUN --mount=type=cache,target=/root/.cache/uv \
Expand All @@ -26,12 +27,31 @@ RUN --mount=type=cache,target=/root/.cache/uv \
# Copy only requirements to cache them in docker layer
COPY uv.lock pyproject.toml /app/

# Sync the project
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"

# Sync the project into a virtual environment, using the frozen lockfile
COPY . /app

# Install the project itself (extras omitted for runtime image)
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --frozen --no-group dev

# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
# Pre-seed tiktoken's o200k_base cache so containers do not depend on live DNS
# resolution to openaipublic.blob.core.windows.net at first startup.
RUN python - <<'PY'
import hashlib
import pathlib
import urllib.request
url = 'https://openaipublic.blob.core.windows.net/encodings/o200k_base.tiktoken'
cache_key = hashlib.sha1(url.encode()).hexdigest()
cache_path = pathlib.Path('/tmp/data-gym-cache') / cache_key
cache_path.parent.mkdir(parents=True, exist_ok=True)
if not cache_path.exists():
with urllib.request.urlopen(url) as resp:
cache_path.write_bytes(resp.read())
print(f'cached {url} -> {cache_path}')
PY
ENV HOME=/app
ENV UV_CACHE_DIR=/tmp/uv-cache

Expand Down
36 changes: 36 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Application configuration loaded from environment, .env, and TOML files."""

import logging
from pathlib import Path
from typing import Annotated, Any, ClassVar, Literal, Protocol
Expand Down Expand Up @@ -53,6 +55,7 @@ class TomlConfigSettingsSource(PydanticBaseSettingsSource):
"""Custom settings source for loading from TOML file."""

def __init__(self, settings_cls: type[BaseSettings]) -> None:
"""Initialize the TOML settings source."""
super().__init__(settings_cls)

SECTION_MAP: ClassVar[dict[str, str]] = {
Expand All @@ -76,6 +79,7 @@ def __init__(self, settings_cls: type[BaseSettings]) -> None:
def get_field_value(
self, field: FieldInfo, field_name: str
) -> tuple[Any, str, bool]:
"""Retrieve a field value from the TOML config section."""
# Get the env_prefix from the model config
prefix = self.settings_cls.model_config.get("env_prefix", "")
if prefix.endswith("_"):
Expand All @@ -95,6 +99,7 @@ def get_field_value(
return field_value, field_name, False

def __call__(self) -> dict[str, Any]:
"""Return all TOML values for the current section as a dict."""
# Get the env_prefix from the model config
prefix = self.settings_cls.model_config.get("env_prefix", "")
if prefix.endswith("_"):
Expand Down Expand Up @@ -122,6 +127,7 @@ def settings_customise_sources( # pyright: ignore
dotenv_settings: DotEnvSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> tuple[PydanticBaseSettingsSource, ...]:
"""Define settings source precedence: init > env > .env > toml > secrets."""
# Correct precedence: init > env > .env > toml > secrets > defaults
return (
init_settings,
Expand Down Expand Up @@ -153,6 +159,8 @@ def _validate_backup_configuration(self):


class DBSettings(HonchoSettings):
"""Database connection and pool configuration."""

model_config = SettingsConfigDict(env_prefix="DB_", extra="ignore") # pyright: ignore

CONNECTION_URI: str = (
Expand All @@ -175,19 +183,24 @@ class DBSettings(HonchoSettings):


class AuthSettings(HonchoSettings):
"""Authentication settings including JWT configuration."""

model_config = SettingsConfigDict(env_prefix="AUTH_", extra="ignore") # pyright: ignore

USE_AUTH: bool = False
JWT_SECRET: str | None = None # Must be set if USE_AUTH is true

@model_validator(mode="after") # type: ignore
def _require_jwt_secret(self) -> "AuthSettings":
"""Validate that JWT_SECRET is set when USE_AUTH is enabled."""
if self.USE_AUTH and not self.JWT_SECRET:
raise ValueError("JWT_SECRET must be set if USE_AUTH is true")
return self


class SentrySettings(HonchoSettings):
"""Sentry error tracking and performance monitoring settings."""

model_config = SettingsConfigDict(env_prefix="SENTRY_", extra="ignore") # pyright: ignore

ENABLED: bool = False
Expand All @@ -199,6 +212,8 @@ class SentrySettings(HonchoSettings):


class LLMSettings(HonchoSettings):
"""LLM provider API keys and default generation settings."""

model_config = SettingsConfigDict(env_prefix="LLM_", extra="ignore") # pyright: ignore

# API Keys for LLM providers
Expand Down Expand Up @@ -233,6 +248,8 @@ class LLMSettings(HonchoSettings):


class DeriverSettings(BackupLLMSettingsMixin, HonchoSettings):
"""Configuration for the background deriver (message processing) agent."""

model_config = SettingsConfigDict(env_prefix="DERIVER_", extra="ignore") # pyright: ignore

ENABLED: bool = True
Expand Down Expand Up @@ -278,6 +295,7 @@ class DeriverSettings(BackupLLMSettingsMixin, HonchoSettings):

@model_validator(mode="after")
def validate_batch_tokens_vs_context_limit(self):
"""Ensure batch token limit does not exceed deriver input token limit."""
if self.REPRESENTATION_BATCH_MAX_TOKENS > self.MAX_INPUT_TOKENS:
raise ValueError(
f"REPRESENTATION_BATCH_MAX_TOKENS ({self.REPRESENTATION_BATCH_MAX_TOKENS}) cannot exceed max deriver input tokens ({self.MAX_INPUT_TOKENS})"
Expand All @@ -286,6 +304,8 @@ def validate_batch_tokens_vs_context_limit(self):


class PeerCardSettings(HonchoSettings):
"""Configuration for peer card generation."""

model_config = SettingsConfigDict(env_prefix="PEER_CARD_", extra="ignore") # pyright: ignore

ENABLED: bool = True
Expand Down Expand Up @@ -350,6 +370,8 @@ def _validate_anthropic_thinking_budget(self) -> "DialecticLevelSettings":


class DialecticSettings(HonchoSettings):
"""Configuration for the dialectic (chat/QA) agent and reasoning levels."""

model_config = SettingsConfigDict( # pyright: ignore
env_prefix="DIALECTIC_", env_nested_delimiter="__", extra="ignore"
)
Expand Down Expand Up @@ -426,6 +448,8 @@ def _validate_all_levels_present(self) -> "DialecticSettings":


class SummarySettings(BackupLLMSettingsMixin, HonchoSettings):
"""Configuration for session summarization."""

model_config = SettingsConfigDict(env_prefix="SUMMARY_", extra="ignore") # pyright: ignore

ENABLED: bool = True
Expand All @@ -442,13 +466,17 @@ class SummarySettings(BackupLLMSettingsMixin, HonchoSettings):


class WebhookSettings(HonchoSettings):
"""Configuration for webhook event delivery."""

model_config = SettingsConfigDict(env_prefix="WEBHOOK_", extra="ignore") # pyright: ignore

SECRET: str | None = None # Must be set if configuring webhooks
MAX_WORKSPACE_LIMIT: int = 10


class MetricsSettings(HonchoSettings):
"""Configuration for Prometheus metrics."""

model_config = SettingsConfigDict(env_prefix="METRICS_", extra="ignore") # pyright: ignore
ENABLED: bool = False
NAMESPACE: str | None = None
Expand Down Expand Up @@ -488,6 +516,8 @@ class TelemetrySettings(HonchoSettings):


class CacheSettings(HonchoSettings):
"""Configuration for Redis cache connection."""

model_config = SettingsConfigDict(env_prefix="CACHE_", extra="ignore") # pyright: ignore

ENABLED: bool = False
Expand Down Expand Up @@ -529,6 +559,8 @@ class SurprisalSettings(BaseModel):


class DreamSettings(BackupLLMSettingsMixin, HonchoSettings):
"""Configuration for the dreamer (memory consolidation) agent."""

model_config = SettingsConfigDict( # pyright: ignore
env_prefix="DREAM_", env_nested_delimiter="__", extra="ignore"
)
Expand Down Expand Up @@ -610,6 +642,7 @@ class VectorStoreSettings(HonchoSettings):

@model_validator(mode="after")
def _require_api_key_for_turbopuffer(self) -> "VectorStoreSettings":
"""Validate that an API key is set when using Turbopuffer."""
if self.TYPE == "turbopuffer" and not self.TURBOPUFFER_API_KEY:
raise ValueError(
"VECTOR_STORE_TURBOPUFFER_API_KEY must be set when TYPE is 'turbopuffer'"
Expand All @@ -618,6 +651,8 @@ def _require_api_key_for_turbopuffer(self) -> "VectorStoreSettings":


class AppSettings(HonchoSettings):
"""Top-level application settings aggregating all nested config sections."""

# No env_prefix for app-level settings
model_config = SettingsConfigDict( # pyright: ignore
env_prefix="", env_nested_delimiter="__", extra="ignore"
Expand Down Expand Up @@ -664,6 +699,7 @@ class AppSettings(HonchoSettings):

@field_validator("LOG_LEVEL")
def validate_log_level(cls, v: str) -> str:
"""Validate and normalize the log level string."""
log_level = v.upper()
if log_level not in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]:
raise ValueError(f"Invalid log level: {v}")
Expand Down
2 changes: 2 additions & 0 deletions src/crud/deriver.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""CRUD operations for deriver queue status and related queries."""

from collections.abc import Sequence
from logging import getLogger
from typing import Any
Expand Down
23 changes: 16 additions & 7 deletions src/deriver/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,32 @@ def minimal_deriver_prompt(
"""
return c(
f"""
Analyze messages from {peer_id} to extract **explicit atomic facts** about them.
Analyze messages from {peer_id} to extract explicit atomic observations about them.

[EXPLICIT] DEFINITION: Facts about {peer_id} that can be derived directly from their messages.
- Transform statements into one or multiple conclusions
- Each conclusion must be self-contained with enough context
[EXPLICIT] DEFINITION: Observations about {peer_id} that can be derived directly from their messages.
- Transform statements into one or multiple observations
- Each observation must be self-contained with enough context
- Use absolute dates/times when possible (e.g. "June 26, 2025" not "yesterday")

OUTPUT CONTRACT:
- Return exactly one JSON object with a top-level key `explicit`.
- `explicit` must be an array of objects.
- Each object must use the key `content`.
- Do NOT use keys like `fact`, `observation`, `source`, `source_message`, or any extra keys.
- Example valid output: {{"explicit":[{{"content":"{peer_id} lives in Wisconsin"}}]}}
- If there are multiple observations, add more objects with the same `content` key.
- Do not wrap the JSON in markdown.

RULES:
- Properly attribute observations to the correct subject: if it is about {peer_id}, say so. If {peer_id} is referencing someone or something else, make that clear.
- Observations should make sense on their own. Each observation will be used in the future to better understand {peer_id}.
- Extract ALL observations from {peer_id} messages, using others as context.
- Contextualize each observation sufficiently (e.g. "Ann is nervous about the job interview at the pharmacy" not just "Ann is nervous")

EXAMPLES:
- EXPLICIT: "I just had my 25th birthday last Saturday" → "{peer_id} is 25 years old", "{peer_id}'s birthday is June 21st"
- EXPLICIT: "I took my dog for a walk in NYC" → "{peer_id} has a dog", "{peer_id} lives in NYC"
- EXPLICIT: "{peer_id} attended college" + general knowledge → "{peer_id} completed high school or equivalent"
- EXPLICIT: "I just had my 25th birthday last Saturday" → {{"content":"{peer_id} is 25 years old"}}, {{"content":"{peer_id}'s birthday is June 21st"}}
- EXPLICIT: "I took my dog for a walk in NYC" → {{"content":"{peer_id} has a dog"}}, {{"content":"{peer_id} was in NYC"}}
- EXPLICIT: "{peer_id} attended college" → {{"content":"{peer_id} attended college"}}

Messages to analyze:
<messages>
Expand Down
9 changes: 9 additions & 0 deletions src/deriver/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ async def initialize(self) -> None:
"""Setup signal handlers, initialize client, and start the main polling loop"""
logger.debug(f"Initializing QueueManager with {self.workers} workers")

recovered_dreams = 0
try:
recovered_dreams = await self.dream_scheduler.recover_overdue_dreams()
except Exception as e:
logger.warning("Failed overdue dream recovery at startup: %s", e)

if recovered_dreams:
logger.info("Recovered %s overdue dream(s) during queue manager init", recovered_dreams)

# Set up signal handlers
loop = asyncio.get_running_loop()
signals = (signal.SIGTERM, signal.SIGINT)
Expand Down
87 changes: 86 additions & 1 deletion src/dreamer/dream_scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import contextlib
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from logging import getLogger

import sentry_sdk
Expand Down Expand Up @@ -216,6 +216,91 @@ async def execute_dream(
session_name=session_name,
)

async def recover_overdue_dreams(self) -> int:
"""Recover overdue dreams after a deriver restart.

Timer-based dreams live in memory until they mature. If the deriver restarts,
those pending timers vanish. On startup we scan collections that are already
past the dream threshold and have been idle at least the configured timeout,
then immediately execute the missing dream.
"""
if not settings.DREAM.ENABLED:
return 0

recovered = 0
now = datetime.now(timezone.utc)
idle_cutoff = now - timedelta(minutes=settings.DREAM.IDLE_TIMEOUT_MINUTES)

async with tracked_db("dream_recovery_scan") as db:
stmt = (
select(
models.Collection.workspace_name,
models.Collection.observer,
models.Collection.observed,
func.count(models.Document.id).label("doc_count"),
models.Collection.internal_metadata["dream"]["last_dream_document_count"].astext,
models.Collection.internal_metadata["dream"]["last_dream_at"].astext,
func.max(models.Document.created_at).label("last_document_at"),
)
.join(
models.Document,
(
(models.Document.workspace_name == models.Collection.workspace_name)
& (models.Document.observer == models.Collection.observer)
& (models.Document.observed == models.Collection.observed)
),
)
.group_by(
models.Collection.workspace_name,
models.Collection.observer,
models.Collection.observed,
models.Collection.internal_metadata,
)
)
result = await db.execute(stmt)
rows = result.all()

for (
workspace_name,
observer,
observed,
doc_count,
last_dream_document_count,
last_dream_at,
last_document_at,
) in rows:
if doc_count < settings.DREAM.DOCUMENT_THRESHOLD:
continue
if last_document_at is None or last_document_at > idle_cutoff:
continue

last_dream_document_count_int = int(last_dream_document_count or 0)
if doc_count - last_dream_document_count_int < settings.DREAM.DOCUMENT_THRESHOLD:
continue

if last_dream_at:
try:
last_dream_dt = datetime.fromisoformat(last_dream_at)
except (TypeError, ValueError):
last_dream_dt = None
if last_dream_dt is not None:
hours_since_last_dream = (now - last_dream_dt).total_seconds() / 3600
if hours_since_last_dream < settings.DREAM.MIN_HOURS_BETWEEN_DREAMS:
continue

for dream_type in settings.DREAM.ENABLED_TYPES:
await self.execute_dream(
workspace_name,
DreamType(dream_type),
observer=observer,
observed=observed,
)
recovered += 1

if recovered:
logger.info("Recovered %s overdue dream(s) after startup", recovered)
return recovered

async def shutdown(self) -> None:
"""Cancel all pending dreams during shutdown."""
if self.pending_dreams:
Expand Down
Loading