|
41 | 41 | _SQLITE_MAX_PREPARE_WINDOW = 8 |
42 | 42 |
|
43 | 43 |
|
44 | | -# Cache instruments so the per-entity hot path in _log_vector_sync_complete |
45 | | -# doesn't re-enter the OTel MeterProvider lookup on every sample. |
46 | | -_METRIC_INSTRUMENTS: dict[tuple[str, str, str], Any] = {} |
47 | | - |
48 | | - |
49 | | -def _metric_histogram(name: str, unit: str = "") -> Any: |
50 | | - key = ("histogram", name, unit) |
51 | | - instrument = _METRIC_INSTRUMENTS.get(key) |
52 | | - if instrument is None: |
53 | | - instrument = logfire.metric_histogram(name, unit=unit) |
54 | | - _METRIC_INSTRUMENTS[key] = instrument |
55 | | - return instrument |
56 | | - |
57 | | - |
58 | | -def _metric_counter(name: str) -> Any: |
59 | | - key = ("counter", name, "") |
60 | | - instrument = _METRIC_INSTRUMENTS.get(key) |
61 | | - if instrument is None: |
62 | | - instrument = logfire.metric_counter(name) |
63 | | - _METRIC_INSTRUMENTS[key] = instrument |
64 | | - return instrument |
65 | | - |
66 | | - |
67 | 44 | @dataclass |
68 | 45 | class VectorSyncBatchResult: |
69 | 46 | """Aggregate result for batched semantic vector sync runs.""" |
@@ -1092,57 +1069,42 @@ def emit_progress(entity_id: int) -> None: |
1092 | 1069 | write_seconds_total=result.write_seconds_total, |
1093 | 1070 | ) |
1094 | 1071 | batch_total_seconds = time.perf_counter() - batch_start |
1095 | | - _metric_histogram( |
1096 | | - "vector_sync_batch_total_seconds", |
1097 | | - unit="s", |
1098 | | - ).record( |
1099 | | - batch_total_seconds, |
1100 | | - attributes={ |
1101 | | - "backend": backend_name, |
1102 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1103 | | - }, |
| 1072 | + batch_attrs = { |
| 1073 | + "backend": backend_name, |
| 1074 | + "skip_only_batch": result.embedding_jobs_total == 0, |
| 1075 | + } |
| 1076 | + logfire.metric_histogram("vector_sync_batch_total_seconds", unit="s").record( |
| 1077 | + batch_total_seconds, attributes=batch_attrs |
1104 | 1078 | ) |
1105 | | - _metric_counter("vector_sync_entities_total").add( |
1106 | | - result.entities_total, |
1107 | | - attributes={ |
1108 | | - "backend": backend_name, |
1109 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1110 | | - }, |
| 1079 | + logfire.metric_histogram("vector_sync_prepare_seconds", unit="s").record( |
| 1080 | + result.prepare_seconds_total, attributes=batch_attrs |
1111 | 1081 | ) |
1112 | | - _metric_counter("vector_sync_entities_skipped").add( |
1113 | | - result.entities_skipped, |
1114 | | - attributes={ |
1115 | | - "backend": backend_name, |
1116 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1117 | | - }, |
| 1082 | + logfire.metric_histogram("vector_sync_queue_wait_seconds", unit="s").record( |
| 1083 | + result.queue_wait_seconds_total, attributes=batch_attrs |
1118 | 1084 | ) |
1119 | | - _metric_counter("vector_sync_entities_deferred").add( |
1120 | | - result.entities_deferred, |
1121 | | - attributes={ |
1122 | | - "backend": backend_name, |
1123 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1124 | | - }, |
| 1085 | + logfire.metric_histogram("vector_sync_embed_seconds", unit="s").record( |
| 1086 | + result.embed_seconds_total, attributes=batch_attrs |
1125 | 1087 | ) |
1126 | | - _metric_counter("vector_sync_embedding_jobs_total").add( |
1127 | | - result.embedding_jobs_total, |
1128 | | - attributes={ |
1129 | | - "backend": backend_name, |
1130 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1131 | | - }, |
| 1088 | + logfire.metric_histogram("vector_sync_write_seconds", unit="s").record( |
| 1089 | + result.write_seconds_total, attributes=batch_attrs |
1132 | 1090 | ) |
1133 | | - _metric_counter("vector_sync_chunks_total").add( |
1134 | | - result.chunks_total, |
1135 | | - attributes={ |
1136 | | - "backend": backend_name, |
1137 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1138 | | - }, |
| 1091 | + logfire.metric_counter("vector_sync_entities_total").add( |
| 1092 | + result.entities_total, attributes=batch_attrs |
1139 | 1093 | ) |
1140 | | - _metric_counter("vector_sync_chunks_skipped").add( |
1141 | | - result.chunks_skipped, |
1142 | | - attributes={ |
1143 | | - "backend": backend_name, |
1144 | | - "skip_only_batch": result.embedding_jobs_total == 0, |
1145 | | - }, |
| 1094 | + logfire.metric_counter("vector_sync_entities_skipped").add( |
| 1095 | + result.entities_skipped, attributes=batch_attrs |
| 1096 | + ) |
| 1097 | + logfire.metric_counter("vector_sync_entities_deferred").add( |
| 1098 | + result.entities_deferred, attributes=batch_attrs |
| 1099 | + ) |
| 1100 | + logfire.metric_counter("vector_sync_embedding_jobs_total").add( |
| 1101 | + result.embedding_jobs_total, attributes=batch_attrs |
| 1102 | + ) |
| 1103 | + logfire.metric_counter("vector_sync_chunks_total").add( |
| 1104 | + result.chunks_total, attributes=batch_attrs |
| 1105 | + ) |
| 1106 | + logfire.metric_counter("vector_sync_chunks_skipped").add( |
| 1107 | + result.chunks_skipped, attributes=batch_attrs |
1146 | 1108 | ) |
1147 | 1109 | if batch_span is not None: |
1148 | 1110 | batch_span.set_attributes( |
@@ -1715,48 +1677,12 @@ def _log_vector_sync_complete( |
1715 | 1677 | shard_count: int, |
1716 | 1678 | remaining_jobs_after_shard: int, |
1717 | 1679 | ) -> None: |
1718 | | - """Log completion and slow-entity warnings with a consistent format.""" |
1719 | | - backend_name = type(self).__name__.removesuffix("SearchRepository").lower() |
1720 | | - _metric_histogram( |
1721 | | - "vector_sync_prepare_seconds", |
1722 | | - unit="s", |
1723 | | - ).record( |
1724 | | - prepare_seconds, |
1725 | | - attributes={ |
1726 | | - "backend": backend_name, |
1727 | | - "skip_only_entity": entity_skipped and embedding_jobs_count == 0, |
1728 | | - }, |
1729 | | - ) |
1730 | | - _metric_histogram( |
1731 | | - "vector_sync_queue_wait_seconds", |
1732 | | - unit="s", |
1733 | | - ).record( |
1734 | | - queue_wait_seconds, |
1735 | | - attributes={ |
1736 | | - "backend": backend_name, |
1737 | | - "skip_only_entity": entity_skipped and embedding_jobs_count == 0, |
1738 | | - }, |
1739 | | - ) |
1740 | | - _metric_histogram( |
1741 | | - "vector_sync_embed_seconds", |
1742 | | - unit="s", |
1743 | | - ).record( |
1744 | | - embed_seconds, |
1745 | | - attributes={ |
1746 | | - "backend": backend_name, |
1747 | | - "skip_only_entity": entity_skipped and embedding_jobs_count == 0, |
1748 | | - }, |
1749 | | - ) |
1750 | | - _metric_histogram( |
1751 | | - "vector_sync_write_seconds", |
1752 | | - unit="s", |
1753 | | - ).record( |
1754 | | - write_seconds, |
1755 | | - attributes={ |
1756 | | - "backend": backend_name, |
1757 | | - "skip_only_entity": entity_skipped and embedding_jobs_count == 0, |
1758 | | - }, |
1759 | | - ) |
| 1680 | + """Log completion and slow-entity warnings with a consistent format. |
| 1681 | +
|
| 1682 | + Per-entity timings are aggregated into `VectorSyncBatchResult` and |
| 1683 | + recorded as batch-level histograms once the batch completes — this |
| 1684 | + function stays on the per-entity hot path so it only emits logs. |
| 1685 | + """ |
1760 | 1686 | if total_seconds > 10: |
1761 | 1687 | logger.warning( |
1762 | 1688 | "Vector sync slow entity: project_id={project_id} entity_id={entity_id} " |
|
0 commit comments