Skip to content
74 changes: 71 additions & 3 deletions src/basic_memory/cli/commands/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Database management commands."""

from dataclasses import dataclass
from pathlib import Path

import typer
Expand All @@ -12,13 +13,47 @@
from basic_memory.cli.app import app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, ProjectMode
from basic_memory.indexing import IndexProgress
from basic_memory.repository import ProjectRepository
from basic_memory.services.initialization import reconcile_projects_with_config
from basic_memory.sync.sync_service import get_sync_service

console = Console()


@dataclass(slots=True)
class EmbeddingProgress:
"""Typed CLI progress payload for embedding backfills."""

entity_id: int
index: int
total: int


def _format_eta(seconds: float | None) -> str:
"""Render a compact ETA string for CLI progress descriptions."""
if seconds is None:
return "--:--"

whole_seconds = max(int(seconds), 0)
minutes, remaining_seconds = divmod(whole_seconds, 60)
hours, remaining_minutes = divmod(minutes, 60)
if hours:
return f"{hours:d}:{remaining_minutes:02d}:{remaining_seconds:02d}"
return f"{remaining_minutes:02d}:{remaining_seconds:02d}"


def _format_index_progress(progress: IndexProgress) -> str:
"""Render typed index progress as a compact Rich task description."""
files_per_minute = int(progress.files_per_minute) if progress.files_per_minute else 0
return (
" Indexing files... "
f"{progress.files_processed}/{progress.files_total} files | "
f"{progress.batches_completed}/{progress.batches_total} batches | "
f"{files_per_minute}/min | ETA {_format_eta(progress.eta_seconds)}"
)


async def _reindex_projects(app_config):
"""Reindex all projects in a single async context.

Expand Down Expand Up @@ -185,10 +220,34 @@ async def _reindex(app_config, search: bool, embeddings: bool, project: str | No
console.print(f"\n[bold]Project: [cyan]{proj.name}[/cyan][/bold]")

if search:
console.print(" Rebuilding full-text search index...")
sync_service = await get_sync_service(proj)
sync_dir = Path(proj.path)
await sync_service.sync(sync_dir, project_name=proj.name)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
console=console,
) as progress:
task = progress.add_task(" Indexing files... scanning changes", total=1)

async def on_index_progress(update: IndexProgress) -> None:
total = update.files_total or 1
completed = update.files_processed if update.files_total else 1
progress.update(
task,
description=_format_index_progress(update),
total=total,
completed=min(completed, total),
)

await sync_service.sync(
sync_dir,
project_name=proj.name,
progress_callback=on_index_progress,
)
progress.update(task, completed=progress.tasks[task].total or 1)

console.print(" [green]✓[/green] Full-text search index rebuilt")

if embeddings:
Expand All @@ -213,7 +272,16 @@ async def _reindex(app_config, search: bool, embeddings: bool, project: str | No
task = progress.add_task(" Embedding entities...", total=None)

def on_progress(entity_id, index, total):
progress.update(task, total=total, completed=index)
embedding_progress = EmbeddingProgress(
entity_id=entity_id,
index=index,
total=total,
)
progress.update(
task,
total=embedding_progress.total,
completed=embedding_progress.index,
)

stats = await search_service.reindex_vectors(progress_callback=on_progress)
progress.update(task, completed=stats["total_entities"])
Expand Down
30 changes: 30 additions & 0 deletions src/basic_memory/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ class BasicMemoryConfig(BaseSettings):
description="Batch size for embedding generation.",
gt=0,
)
semantic_embedding_request_concurrency: int = Field(
default=4,
description="Maximum number of concurrent provider requests for batched embedding generation when the active provider supports request-level concurrency.",
gt=0,
)
semantic_embedding_sync_batch_size: int = Field(
default=64,
description="Batch size for vector sync orchestration flushes.",
Expand Down Expand Up @@ -286,6 +291,31 @@ class BasicMemoryConfig(BaseSettings):
description="Maximum number of files to process concurrently during sync. Limits memory usage on large projects (2000+ files). Lower values reduce memory consumption.",
gt=0,
)
index_batch_size: int = Field(
default=32,
description="Maximum number of changed files to load into one indexing batch.",
gt=0,
)
index_batch_max_bytes: int = Field(
default=8 * 1024 * 1024,
description="Maximum total bytes to load into one indexing batch. Large files still run as single-file batches.",
gt=0,
)
index_parse_max_concurrent: int = Field(
default=8,
description="Maximum number of markdown parse tasks to run concurrently inside one indexing batch.",
gt=0,
)
index_entity_max_concurrent: int = Field(
default=4,
description="Maximum number of entity create/update tasks to run concurrently inside one indexing batch.",
gt=0,
)
index_metadata_update_max_concurrent: int = Field(
default=4,
description="Maximum number of metadata/search refresh tasks to run concurrently inside one indexing batch.",
gt=0,
)

kebab_filenames: bool = Field(
default=False,
Expand Down
103 changes: 0 additions & 103 deletions src/basic_memory/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,101 +44,6 @@
_session_maker: Optional[async_sessionmaker[AsyncSession]] = None


async def _needs_semantic_embedding_backfill(
app_config: BasicMemoryConfig,
session_maker: async_sessionmaker[AsyncSession],
) -> bool:
"""Check if entities exist but vector embeddings are empty.

This is the reliable way to detect that embeddings need to be generated,
regardless of how migrations were applied (fresh DB, upgrade, reset, etc.).
"""
if not app_config.semantic_search_enabled:
return False

try:
async with scoped_session(session_maker) as session:
entity_count = (
await session.execute(text("SELECT COUNT(*) FROM entity"))
).scalar() or 0
if entity_count == 0:
return False

# Check if vector chunks table exists and is empty
embedding_count = (
await session.execute(text("SELECT COUNT(*) FROM search_vector_chunks"))
).scalar() or 0

return embedding_count == 0
except Exception as exc:
# Table might not exist yet (pre-migration)
logger.debug(f"Could not check embedding status: {exc}")
return False


async def _run_semantic_embedding_backfill(
app_config: BasicMemoryConfig,
session_maker: async_sessionmaker[AsyncSession],
) -> None:
"""Backfill semantic embeddings for all active projects/entities."""
if not app_config.semantic_search_enabled:
logger.info("Skipping automatic semantic embedding backfill: semantic search is disabled.")
return

async with scoped_session(session_maker) as session:
project_result = await session.execute(
text("SELECT id, name FROM project WHERE is_active = :is_active ORDER BY id"),
{"is_active": True},
)
projects = [(int(row[0]), str(row[1])) for row in project_result.fetchall()]

if not projects:
logger.info("Skipping automatic semantic embedding backfill: no active projects found.")
return

repository_class = (
PostgresSearchRepository
if app_config.database_backend == DatabaseBackend.POSTGRES
else SQLiteSearchRepository
)

total_entities = 0
for project_id, project_name in projects:
async with scoped_session(session_maker) as session:
entity_result = await session.execute(
text("SELECT id FROM entity WHERE project_id = :project_id ORDER BY id"),
{"project_id": project_id},
)
entity_ids = [int(row[0]) for row in entity_result.fetchall()]

if not entity_ids:
continue

total_entities += len(entity_ids)
logger.info(
"Automatic semantic embedding backfill: "
f"project={project_name}, entities={len(entity_ids)}"
)

search_repository = repository_class(
session_maker,
project_id=project_id,
app_config=app_config,
)
batch_result = await search_repository.sync_entity_vectors_batch(entity_ids)
if batch_result.entities_failed > 0:
logger.warning(
"Automatic semantic embedding backfill encountered entity failures: "
f"project={project_name}, failed={batch_result.entities_failed}, "
f"failed_entity_ids={batch_result.failed_entity_ids}"
)

logger.info(
"Automatic semantic embedding backfill complete: "
f"projects={len(projects)}, entities={total_entities}"
)


class DatabaseType(Enum):
"""Types of supported databases."""

Expand Down Expand Up @@ -521,14 +426,6 @@ async def run_migrations(
else:
await SQLiteSearchRepository(session_maker, 1).init_search_index()

# Check if backfill is needed — actual backfill runs in background
# from the MCP server lifespan to avoid blocking startup.
if await _needs_semantic_embedding_backfill(app_config, session_maker):
logger.info(
"Semantic embeddings missing — backfill will run in background after startup"
)
else:
logger.info("Semantic embeddings: up to date")
except Exception as e: # pragma: no cover
logger.error(f"Error running migrations: {e}")
raise
Expand Down
29 changes: 29 additions & 0 deletions src/basic_memory/indexing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Reusable indexing primitives shared by local sync and future remote callers."""

from basic_memory.indexing.batch_indexer import BatchIndexer
from basic_memory.indexing.batching import build_index_batches
from basic_memory.indexing.models import (
IndexedEntity,
IndexBatch,
IndexFileMetadata,
IndexFileWriter,
IndexFrontmatterUpdate,
IndexFrontmatterWriteResult,
IndexingBatchResult,
IndexInputFile,
IndexProgress,
)

__all__ = [
"BatchIndexer",
"IndexedEntity",
"IndexBatch",
"IndexFileMetadata",
"IndexFileWriter",
"IndexFrontmatterUpdate",
"IndexFrontmatterWriteResult",
"IndexingBatchResult",
"IndexInputFile",
"IndexProgress",
"build_index_batches",
]
Loading
Loading