@@ -455,6 +455,7 @@ async def sync_entity_vectors_batch(
455455 entities_by_id = {
456456 entity .id : entity for entity in await self .entity_repository .find_by_ids (entity_ids )
457457 }
458+ unknown_ids = [entity_id for entity_id in entity_ids if entity_id not in entities_by_id ]
458459 opted_out_ids = [
459460 entity_id
460461 for entity_id in entity_ids
@@ -468,25 +469,57 @@ async def sync_entity_vectors_batch(
468469 * (self ._clear_entity_vectors (entity_id ) for entity_id in opted_out_ids )
469470 )
470471
472+ repository_results : list [VectorSyncBatchResult ] = []
473+ if unknown_ids :
474+ # Trigger: a caller passes entity IDs that were deleted after the batch was built.
475+ # Why: repository sync still owns stale chunk cleanup for IDs with no source rows.
476+ # Outcome: deleted entities do not silently keep orphaned vector rows forever.
477+ repository_results .append (await self .repository .sync_entity_vectors_batch (unknown_ids ))
478+
471479 eligible_entity_ids = [
472480 entity_id
473481 for entity_id in entity_ids
474482 if entity_id in entities_by_id and entity_id not in opted_out_ids
475483 ]
476- if not eligible_entity_ids :
484+ if eligible_entity_ids :
485+ repository_results .append (
486+ await self .repository .sync_entity_vectors_batch (
487+ eligible_entity_ids ,
488+ progress_callback = progress_callback ,
489+ )
490+ )
491+
492+ if not repository_results :
477493 return VectorSyncBatchResult (
478494 entities_total = len (entity_ids ),
479495 entities_synced = 0 ,
480496 entities_failed = 0 ,
481497 entities_skipped = len (opted_out_ids ),
482498 )
483499
484- batch_result = await self .repository .sync_entity_vectors_batch (
485- eligible_entity_ids ,
486- progress_callback = progress_callback ,
500+ batch_result = VectorSyncBatchResult (
501+ entities_total = len (entity_ids ),
502+ entities_synced = sum (result .entities_synced for result in repository_results ),
503+ entities_failed = sum (result .entities_failed for result in repository_results ),
504+ entities_deferred = sum (result .entities_deferred for result in repository_results ),
505+ entities_skipped = (
506+ len (opted_out_ids ) + sum (result .entities_skipped for result in repository_results )
507+ ),
508+ failed_entity_ids = [
509+ failed_entity_id
510+ for result in repository_results
511+ for failed_entity_id in result .failed_entity_ids
512+ ],
513+ chunks_total = sum (result .chunks_total for result in repository_results ),
514+ chunks_skipped = sum (result .chunks_skipped for result in repository_results ),
515+ embedding_jobs_total = sum (result .embedding_jobs_total for result in repository_results ),
516+ prepare_seconds_total = sum (result .prepare_seconds_total for result in repository_results ),
517+ queue_wait_seconds_total = sum (
518+ result .queue_wait_seconds_total for result in repository_results
519+ ),
520+ embed_seconds_total = sum (result .embed_seconds_total for result in repository_results ),
521+ write_seconds_total = sum (result .write_seconds_total for result in repository_results ),
487522 )
488- batch_result .entities_total = len (entity_ids )
489- batch_result .entities_skipped += len (opted_out_ids )
490523 return batch_result
491524
492525 async def reindex_vectors (self , progress_callback = None ) -> dict :
0 commit comments