Skip to content

Commit 90e3c0e

Browse files
committed
ingester: cache list of files, speedup on large queue
Before: Every 5-second cycle calls os.scandir() on the 2M-entry directory. Each scan enumerates all entries via readdir(), which is extremely slow on a flat directory that large. Might take 20-30 seconds. After: 1. scandir() runs once, caching all .json entries 2. Each cycle pops a chunk of up to INGEST_CYCLE_BATCH_SIZE (default 50,000) files from the cache and processes them 3. Re-scan only happens when the cache is fully drained 4. On scandir error, cache is cleared → forces re-scan next cycle With 2M files: one scan instead of ~40 scans (2M / 50K chunks = 40 cycles of scan-free processing). If each scandir of 2M entries takes ~30 seconds, that saves ~20 minutes of pure directory enumeration overhead. The INGEST_CYCLE_BATCH_SIZE(50k default) is configurable via env var if you want to tune the chunk size. Also note this fixed a latent bug in the old code where json_files would retain stale data from the previous iteration if scandir threw an exception (the except: pass didn't reset it). Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent df9a5f7 commit 90e3c0e

File tree

5 files changed

+118
-75
lines changed

5 files changed

+118
-75
lines changed

backend/kernelCI_app/constants/ingester.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@
4343
INGEST_FILES_BATCH_SIZE = int(os.environ.get("INGEST_FILES_BATCH_SIZE", 100))
4444
"""Size of the batch of files to be queued. Default: 100"""
4545

46+
try:
47+
INGEST_CYCLE_BATCH_SIZE = int(os.environ.get("INGEST_CYCLE_BATCH_SIZE", "50000"))
48+
except (ValueError, TypeError):
49+
logger.warning("Invalid INGEST_CYCLE_BATCH_SIZE, using default 50000")
50+
INGEST_CYCLE_BATCH_SIZE = 50000
51+
"""Max files to process per cycle from cached scandir results.
52+
Avoids re-scanning huge directories between cycles. Default: 50000"""
53+
4654
try:
4755
INGESTER_METRICS_PORT = int(os.environ.get("INGESTER_METRICS_PORT", 8002))
4856
except (ValueError, TypeError):

backend/kernelCI_app/management/commands/helpers/kcidbng_ingester.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import multiprocessing
22
from multiprocessing.sharedctypes import Synchronized
33
from multiprocessing.synchronize import Lock as ProcessLock
4-
from os import DirEntry
54
import json
65
import logging
76
import os
@@ -487,7 +486,7 @@ def print_ingest_progress(
487486

488487

489488
def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + multiprocessing
490-
json_files: list[DirEntry[str]],
489+
json_files: list[str],
491490
tree_names: dict[str, str],
492491
dirs: dict[INGESTER_DIRS, str],
493492
max_workers: int = 5,
@@ -504,17 +503,18 @@ def ingest_submissions_parallel( # noqa: C901 - orchestrator with IO + multipro
504503
)
505504

506505
batch = []
507-
for file in json_files:
506+
for file_path in json_files:
508507
try:
509-
total_bytes += file.stat().st_size
510-
except Exception:
511-
pass
508+
file_size = os.path.getsize(file_path)
509+
except OSError:
510+
file_size = 0
512511

512+
total_bytes += file_size
513513
batch.append(
514514
SubmissionFileMetadata(
515-
path=file.path,
516-
name=file.name,
517-
size=file.stat().st_size,
515+
path=file_path,
516+
name=os.path.basename(file_path),
517+
size=file_size,
518518
)
519519
)
520520

backend/kernelCI_app/management/commands/monitor_submissions.py

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
ingest_submissions_parallel,
1111
)
1212
from kernelCI_app.constants.ingester import (
13+
INGEST_CYCLE_BATCH_SIZE,
1314
INGESTER_GRAFANA_LABEL,
1415
INGESTER_METRICS_PORT,
1516
PROMETHEUS_MULTIPROC_DIR,
@@ -50,6 +51,50 @@ def signal_handler(self, signum, frame):
5051
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
5152
self.running = False
5253

54+
def _setup_prometheus(self):
55+
if PROMETHEUS_MULTIPROC_DIR:
56+
if os.path.exists(PROMETHEUS_MULTIPROC_DIR):
57+
shutil.rmtree(PROMETHEUS_MULTIPROC_DIR)
58+
59+
os.makedirs(PROMETHEUS_MULTIPROC_DIR, exist_ok=True)
60+
registry = CollectorRegistry()
61+
multiprocess.MultiProcessCollector(registry)
62+
start_http_server(INGESTER_METRICS_PORT, registry=registry)
63+
else:
64+
logger.warning(
65+
"PROMETHEUS_MULTIPROC_DIR is not set, skipping Prometheus metrics"
66+
)
67+
68+
def _scan_spool_dir(self, spool_dir: str) -> list[str]:
69+
"""Scan spool directory, returning only path strings to keep memory bounded."""
70+
try:
71+
with os.scandir(spool_dir) as it:
72+
cached_paths = [
73+
entry.path
74+
for entry in it
75+
if entry.is_file() and entry.name.endswith(".json")
76+
]
77+
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
78+
self.stdout.write(
79+
f"[{ts}] Spool scan: {len(cached_paths)}" " .json files pending"
80+
)
81+
return cached_paths
82+
except PermissionError:
83+
logger.error(
84+
"Permission denied scanning spool directory: %s",
85+
spool_dir,
86+
exc_info=True,
87+
)
88+
self.running = False
89+
return []
90+
except OSError:
91+
logger.warning(
92+
"Transient OS error scanning spool directory: %s",
93+
spool_dir,
94+
exc_info=True,
95+
)
96+
return []
97+
5398
def add_arguments(self, parser):
5499
# TODO: add a way to set the folder by env var instead of by argument
55100
parser.add_argument(
@@ -89,18 +134,7 @@ def handle(
89134
signal.signal(signal.SIGTERM, self.signal_handler)
90135
signal.signal(signal.SIGINT, self.signal_handler)
91136

92-
if PROMETHEUS_MULTIPROC_DIR:
93-
if os.path.exists(PROMETHEUS_MULTIPROC_DIR):
94-
shutil.rmtree(PROMETHEUS_MULTIPROC_DIR)
95-
96-
os.makedirs(PROMETHEUS_MULTIPROC_DIR, exist_ok=True)
97-
registry = CollectorRegistry()
98-
multiprocess.MultiProcessCollector(registry)
99-
start_http_server(INGESTER_METRICS_PORT, registry=registry)
100-
else:
101-
logger.warning(
102-
"PROMETHEUS_MULTIPROC_DIR is not set, skipping Prometheus metrics"
103-
)
137+
self._setup_prometheus()
104138

105139
dirs: dict[INGESTER_DIRS, str] = {
106140
"archive": os.path.join(spool_dir, "archive"),
@@ -120,32 +154,37 @@ def handle(
120154

121155
self.stdout.write("Starting file monitoring... (Press Ctrl+C to stop)")
122156

157+
cached_files: list[str] = []
158+
cache_pos = 0
159+
123160
try:
124161
while self.running:
125162
# TODO: retry failed files every x cycles
126-
try:
127-
with os.scandir(spool_dir) as it:
128-
json_files = [
129-
entry
130-
for entry in it
131-
if entry.is_file() and entry.name.endswith(".json")
132-
]
133-
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
134-
self.stdout.write(
135-
f"[{ts}] Spool has {len(json_files)} .json files pending"
136-
)
137-
except Exception:
138-
pass
139-
140-
QUEUE_SIZE_GAUGE.labels(INGESTER_GRAFANA_LABEL).set(len(json_files))
141-
142-
if len(json_files) > 0:
163+
164+
# Only re-scan directory when cache is depleted
165+
if cache_pos >= len(cached_files):
166+
cached_files = self._scan_spool_dir(spool_dir)
167+
cache_pos = 0
168+
169+
remaining = len(cached_files) - cache_pos
170+
QUEUE_SIZE_GAUGE.labels(INGESTER_GRAFANA_LABEL).set(remaining)
171+
172+
if remaining > 0:
173+
end = min(cache_pos + INGEST_CYCLE_BATCH_SIZE, len(cached_files))
174+
batch = cached_files[cache_pos:end]
175+
cache_pos = end
176+
self.stdout.write(
177+
f"Processing {len(batch)} files"
178+
f" ({len(cached_files) - cache_pos}"
179+
" remaining in cache)"
180+
)
143181
ingest_submissions_parallel(
144-
json_files,
182+
batch,
145183
tree_names,
146184
dirs,
147185
max_workers,
148186
)
187+
149188
cache_logs_maintenance()
150189

151190
time.sleep(interval)

backend/kernelCI_app/tests/performanceTests/test_ingest_perf.py

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@
3434
FILE_SUBSETS = [100, 300, 500]
3535

3636

37-
def _load_submission_files(dir_path: str) -> list[os.DirEntry[str]]:
37+
def _load_submission_files(dir_path: str) -> list[str]:
3838
with os.scandir(dir_path) as it:
3939
return [
40-
entry for entry in it if entry.is_file() and entry.name.endswith(".json")
40+
entry.path
41+
for entry in it
42+
if entry.is_file() and entry.name.endswith(".json")
4143
]
4244

4345

@@ -102,9 +104,7 @@ def cleanup_submission_files():
102104
_delete_json_files()
103105

104106

105-
def _get_file_subset(
106-
files: list[os.DirEntry[str]], subset: int
107-
) -> list[os.DirEntry[str]]:
107+
def _get_file_subset(files: list[str], subset: int) -> list[str]:
108108
"""Get a subset of files based on the subset type."""
109109
return files[:subset] if len(files) >= subset else files
110110

@@ -212,14 +212,12 @@ def test_prepare_file_data(
212212

213213
assert len(files) > 0, "No submissions found"
214214

215-
def prepare_files(
216-
files: list[os.DirEntry[str]], trees_names: dict[str, str]
217-
) -> None:
218-
for file in files:
215+
def prepare_files(files: list[str], trees_names: dict[str, str]) -> None:
216+
for file_path in files:
219217
file_metadata: SubmissionFileMetadata = {
220-
"path": file.path,
221-
"name": file.name,
222-
"size": file.stat().st_size,
218+
"path": file_path,
219+
"name": os.path.basename(file_path),
220+
"size": os.path.getsize(file_path),
223221
}
224222

225223
prepare_file_data(file_metadata, trees_names)
@@ -242,7 +240,7 @@ def prepare_files(
242240

243241

244242
def _prepare_buffers(
245-
files: list[os.DirEntry[str]], trees_names: dict[str, str]
243+
files: list[str], trees_names: dict[str, str]
246244
) -> tuple[list, dict[str, list[Any]]]:
247245
"""
248246
Prepare object buffers from submission files.
@@ -260,11 +258,11 @@ def _prepare_buffers(
260258
}
261259
buffer_files: set[tuple[str, str]] = set()
262260

263-
for file in files:
261+
for file_path in files:
264262
file_metadata: SubmissionFileMetadata = {
265-
"path": file.path,
266-
"name": file.name,
267-
"size": file.stat().st_size,
263+
"path": file_path,
264+
"name": os.path.basename(file_path),
265+
"size": os.path.getsize(file_path),
268266
}
269267

270268
data, metadata = prepare_file_data(file_metadata, trees_names)
@@ -279,7 +277,7 @@ def _prepare_buffers(
279277
objects_buffers["builds"].extend(instances["builds"])
280278
objects_buffers["tests"].extend(instances["tests"])
281279
objects_buffers["incidents"].extend(instances["incidents"])
282-
buffer_files.add((file.name, file.path))
280+
buffer_files.add((os.path.basename(file_path), file_path))
283281

284282
return [], {
285283
"issues_buf": objects_buffers["issues"],
@@ -365,11 +363,11 @@ def test_build_instances_perf(benchmark, cleanup_submission_files, file_subset):
365363
assert len(files) > 0, "No submissions found"
366364

367365
data_list = []
368-
for file in files:
366+
for file_path in files:
369367
file_metadata: SubmissionFileMetadata = {
370-
"path": file.path,
371-
"name": file.name,
372-
"size": file.stat().st_size,
368+
"path": file_path,
369+
"name": os.path.basename(file_path),
370+
"size": os.path.getsize(file_path),
373371
}
374372

375373
data, _ = prepare_file_data(file_metadata, trees_names)

backend/kernelCI_app/tests/unitTests/commands/monitorSubmissions/kcidbng_ingester_test.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -790,18 +790,19 @@ class TestIngestSubmissionsParallel:
790790
# Test cases:
791791
# - successful ingestion
792792

793-
mock_file1 = MagicMock()
794-
mock_file1.name = SUBMISSION_FILENAME_MOCK
795-
mock_file1.stat.return_value.st_size = 1000
793+
FILE1_SIZE = 1000
794+
FILE2_SIZE = 2000
796795

797796
@patch("kernelCI_app.management.commands.helpers.kcidbng_ingester.out")
798797
@patch("multiprocessing.Process")
799798
@patch("multiprocessing.Queue")
800799
@patch("multiprocessing.Value")
801800
@patch("time.sleep")
802801
@patch("time.time", side_effect=TIME_MOCK)
802+
@patch("os.path.getsize")
803803
def test_ingest_submissions_parallel_success(
804804
self,
805+
mock_getsize,
805806
mock_time,
806807
mock_sleep,
807808
mock_value,
@@ -810,19 +811,19 @@ def test_ingest_submissions_parallel_success(
810811
mock_out,
811812
):
812813
"""Test successful parallel ingestion."""
814+
file1_path = SUBMISSION_FILEPATH_MOCK + SUBMISSION_FILENAME_MOCK
815+
file2_path = SUBMISSION_FILEPATH_MOCK + "file2.json"
816+
817+
file_sizes = {file1_path: self.FILE1_SIZE, file2_path: self.FILE2_SIZE}
818+
mock_getsize.side_effect = lambda p: file_sizes[p]
819+
813820
mock_queue = MagicMock()
814821
mock_queue_cls.return_value = mock_queue
815822

816823
mock_queue.empty.return_value = True
817824
mock_queue.qsize.return_value = 0
818825

819-
self.mock_file1.path = SUBMISSION_FILEPATH_MOCK + SUBMISSION_FILENAME_MOCK
820-
mock_file2 = MagicMock()
821-
mock_file2.name = "file2.json"
822-
mock_file2.path = SUBMISSION_FILEPATH_MOCK + "file2.json"
823-
mock_file2.stat.return_value.st_size = 2000
824-
825-
json_files = [self.mock_file1, mock_file2]
826+
json_files = [file1_path, file2_path]
826827

827828
mock_ok = MagicMock()
828829
mock_ok.value = 1
@@ -852,10 +853,7 @@ def test_ingest_submissions_parallel_success(
852853
stat_fail = 1
853854
total_elapsed = 1
854855

855-
total_bytes = (
856-
self.mock_file1.stat.return_value.st_size
857-
+ mock_file2.stat.return_value.st_size
858-
)
856+
total_bytes = self.FILE1_SIZE + self.FILE2_SIZE
859857
mb = total_bytes / (1024 * 1024)
860858

861859
# We verify only the final call

0 commit comments

Comments
 (0)