diff --git a/lib/cuckoo/common/integrations/clamav.py b/lib/cuckoo/common/integrations/clamav.py index ed8065312c3..46db2211130 100644 --- a/lib/cuckoo/common/integrations/clamav.py +++ b/lib/cuckoo/common/integrations/clamav.py @@ -3,6 +3,8 @@ # See the file 'docs/LICENSE' for copying permission. import logging import os +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import suppress from lib.cuckoo.common.config import Config @@ -19,6 +21,98 @@ HAVE_CLAMAV = True +# Module-level cache for per-task `prefetch_clamav` results. Keyed by +# absolute file path. Each value is the same list of match-strings that +# `get_clamav` would otherwise compute. Populated by `prefetch_clamav` +# and consumed transparently by `get_clamav`. Cleared at task boundary +# via `clear_clamav_cache` to avoid leaking results across analyses on +# a long-lived worker process. +_CACHE_LOCK = threading.Lock() +_CLAMAV_CACHE = {} + + +def _scan_one(file_path): + """Return the list of ClamAV match-strings for `file_path`, or [] on + error / empty file / clamav not present. Issues a single + ALLMATCHSCAN over its own clamd socket so it's safe to call from + multiple threads concurrently — clamd is multi-threaded server-side + and serves each socket independently.""" + matches = [] + if not HAVE_CLAMAV: + return matches + try: + if os.path.getsize(file_path) <= 0: + return matches + except OSError: + return matches + try: + cd = pyclamd.ClamdUnixSocket() + results = cd.allmatchscan(file_path) + if results and file_path in results: + for entry in results[file_path]: + if entry[0] == "FOUND" and entry[1] not in matches: + matches.append(entry[1]) + except ConnectionError: + log.warning("failed to connect to clamd socket") + except Exception as e: + log.warning("failed to scan file with clamav %s: %s", file_path, e) + return matches + + +def prefetch_clamav(file_paths, max_workers=8): + """Pre-scan a batch of files in parallel and populate the per-task + cache. Subsequent `get_clamav(path)` calls for any of these paths + return instantly from cache instead of opening a socket. + + The bottleneck on heavy CAPE tasks is the sequential single-thread + `allmatchscan` over 10-20 dropped/extracted files (each ~3-9s of + socket-recv latency). clamd is multi-threaded server-side, so + fanning out N parallel ALLMATCHSCAN sockets cuts wall-clock to + roughly `slowest_file_scan_seconds` instead of `sum_of_all_scans`. + + Workers default to 8 — beyond that you start saturating clamd's + thread pool (default `MaxThreads = 12` in clamd.conf) and gain + little. No-ops if ClamAV is not configured. + """ + if not HAVE_CLAMAV or not file_paths: + return + # Filter to paths we don't already have cached and that exist. + pending = [] + with _CACHE_LOCK: + for p in file_paths: + if p in _CLAMAV_CACHE: + continue + try: + if not os.path.isfile(p) or os.path.getsize(p) <= 0: + _CLAMAV_CACHE[p] = [] + continue + except OSError: + continue + pending.append(p) + if not pending: + return + workers = max(1, min(max_workers, len(pending))) + with ThreadPoolExecutor(max_workers=workers) as ex: + future_map = {ex.submit(_scan_one, p): p for p in pending} + for fut in as_completed(future_map): + p = future_map[fut] + try: + result = fut.result() + except Exception as e: + log.warning("clamav prefetch failed for %s: %s", p, e) + result = [] + with _CACHE_LOCK: + _CLAMAV_CACHE[p] = result + + +def clear_clamav_cache(): + """Drop the per-task prefetch cache. Call at task boundaries to + avoid leaking match results across analyses on a long-lived + worker process.""" + with _CACHE_LOCK: + _CLAMAV_CACHE.clear() + + def get_clamav(file_path): """Get ClamAV signatures matches. Enable in: processing -> [CAPE] -> clamav @@ -30,21 +124,23 @@ def get_clamav(file_path): systemctl start clamav-daemon usermod -a -G cape clamav echo "/opt/CAPEv2/storage/** r," | sudo tee -a /etc/apparmor.d/local/usr.sbin.clamd - @return: matched ClamAV signatures. - """ - matches = [] - if HAVE_CLAMAV and os.path.getsize(file_path) > 0: - try: - cd = pyclamd.ClamdUnixSocket() - results = cd.allmatchscan(file_path) - if results: - for entry in results[file_path]: - if entry[0] == "FOUND" and entry[1] not in matches: - matches.append(entry[1]) - except ConnectionError: - log.warning("failed to connect to clamd socket") - except Exception as e: - log.warning("failed to scan file with clamav %s", e) + Returns the cached matches when `prefetch_clamav` has been called + for this path in the current task; otherwise issues a single + sequential scan (preserving the legacy behaviour for any caller + that bypasses the prefetch path). + @return: matched ClamAV signatures. + """ + if not HAVE_CLAMAV: + return [] + with _CACHE_LOCK: + cached = _CLAMAV_CACHE.get(file_path) + if cached is not None: + return list(cached) + matches = _scan_one(file_path) + # Memoise even single-shot scans so repeated lookups for the same + # path within a task don't pay the network cost twice. + with _CACHE_LOCK: + _CLAMAV_CACHE[file_path] = matches return matches diff --git a/lib/cuckoo/common/integrations/file_extra_info.py b/lib/cuckoo/common/integrations/file_extra_info.py index c9ace038050..8ee073cf2e3 100644 --- a/lib/cuckoo/common/integrations/file_extra_info.py +++ b/lib/cuckoo/common/integrations/file_extra_info.py @@ -11,6 +11,8 @@ # from contextlib import suppress from typing import Any, DefaultDict, List, Optional, Set +import threading + import pebble from lib.cuckoo.common.config import Config @@ -182,6 +184,14 @@ def static_file_info( if "pe" not in data_dictionary: with PortableExecutable(file_path) as pe: data_dictionary["pe"] = pe.run(task_id) + elif not data_dictionary["pe"].get("digital_signers") and data_dictionary["pe"].get("guest_signers", {}).get("aux_signers"): + # Only re-run cert extraction when DigiSig.json confirms the file is signed + # (aux_signers non-empty) but digital_signers is empty — avoids re-parsing + # every unsigned PE on reprocess. + with PortableExecutable(file_path) as pe: + data_dictionary["pe"]["digital_signers"] = pe.get_digital_signers(pe.pe) + if not data_dictionary["pe"]["guest_signers"].get("aux_sha1"): + data_dictionary["pe"]["guest_signers"] = pe.get_guest_digital_signers(task_id) if HAVE_FLARE_CAPA and "flare_capa" not in data_dictionary: # https://github.com/mandiant/capa/issues/2620 @@ -407,6 +417,40 @@ def _extracted_files_metadata( from lib.cuckoo.common.integrations.utils import run_tool + +# Process-wide pebble.ProcessPool reused across every call to +# `generic_file_extractors`. The previous code created and tore down a +# fresh ProcessPool per file (one `with pebble.ProcessPool(...) as pool:` +# block per call). On heavy tasks with 50+ extracted files that's +# 50+ × (subprocess fork + Python interpreter startup + module imports +# + pool teardown) of pure overhead — measured at ~1.2s per file on +# our sandbox (~85s on a 70-file task before any extractor work). +# +# Pebble's ProcessPool is explicitly designed for long-lived reuse: +# its workers respawn after task completion (max_tasks=1 by default +# would tear them down per call, but the default unlimited keeps them +# warm) and a crashed worker is replaced automatically. We lazily +# instantiate a single pool on first use and keep it for the rest of +# the worker process's lifetime — handing each call a slice of the +# already-warm worker pool instead of paying startup costs every time. +_EXTRACTOR_POOL = None +_EXTRACTOR_POOL_LOCK = threading.Lock() + + +def _get_extractor_pool(): + """Return a process-wide shared pebble.ProcessPool, creating it + on first use. Thread-safe.""" + global _EXTRACTOR_POOL + if _EXTRACTOR_POOL is not None: + return _EXTRACTOR_POOL + with _EXTRACTOR_POOL_LOCK: + if _EXTRACTOR_POOL is None: + _EXTRACTOR_POOL = pebble.ProcessPool( + max_workers=int(integration_conf.general.max_workers), + ) + return _EXTRACTOR_POOL + + def generic_file_extractors( file: str, destination_folder: str, @@ -442,33 +486,36 @@ def generic_file_extractors( futures = {} executed_tools = data_dictionary.setdefault("executed_tools", []) - with pebble.ProcessPool(max_workers=int(integration_conf.general.max_workers)) as pool: - # Prefer custom modules over the built-in ones, since only 1 is allowed - # to be the extracted_files_tool. - if extra_info_modules: - for module in extra_info_modules: - func_timeout = int(getattr(module, "timeout", 60)) - funcname = module.__name__.split(".")[-1] - if funcname in executed_tools: - continue - executed_tools.append(funcname) - futures[funcname] = pool.schedule(module.extract_details, args=args, kwargs=kwargs, timeout=func_timeout) - - for extraction_func in file_info_funcs: - funcname = extraction_func.__name__.split(".")[-1] - if ( - not getattr(integration_conf, funcname, {}).get("enabled", False) - and getattr(extraction_func, "enabled", False) is False - ): - continue - + pool = _get_extractor_pool() + # Prefer custom modules over the built-in ones, since only 1 is allowed + # to be the extracted_files_tool. + if extra_info_modules: + for module in extra_info_modules: + func_timeout = int(getattr(module, "timeout", 60)) + funcname = module.__name__.split(".")[-1] if funcname in executed_tools: continue executed_tools.append(funcname) + futures[funcname] = pool.schedule(module.extract_details, args=args, kwargs=kwargs, timeout=func_timeout) + + for extraction_func in file_info_funcs: + funcname = extraction_func.__name__.split(".")[-1] + if ( + not getattr(integration_conf, funcname, {}).get("enabled", False) + and getattr(extraction_func, "enabled", False) is False + ): + continue - func_timeout = int(getattr(integration_conf, funcname, {}).get("timeout", 60)) - futures[funcname] = pool.schedule(extraction_func, args=args, kwargs=kwargs, timeout=func_timeout) - pool.join() + if funcname in executed_tools: + continue + executed_tools.append(funcname) + + func_timeout = int(getattr(integration_conf, funcname, {}).get("timeout", 60)) + futures[funcname] = pool.schedule(extraction_func, args=args, kwargs=kwargs, timeout=func_timeout) + # The shared pool stays alive across calls; we no longer call pool.join() + # here because that would shut it down. Each future's per-task timeout + # (set above via pool.schedule timeout=) bounds wall-clock per extractor, + # and the iteration below collects results per-future via .result(). for funcname, future in futures.items(): func_result = None diff --git a/lib/cuckoo/common/integrations/parse_pe.py b/lib/cuckoo/common/integrations/parse_pe.py index b75169bce16..74f8021257f 100644 --- a/lib/cuckoo/common/integrations/parse_pe.py +++ b/lib/cuckoo/common/integrations/parse_pe.py @@ -26,8 +26,8 @@ try: import cryptography - from cryptography.hazmat.backends.openssl.backend import backend from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.serialization import pkcs7 as _pkcs7_mod HAVE_CRYPTO = True except ImportError: @@ -273,11 +273,41 @@ def get_reported_checksum(self, pe: pefile.PE) -> str: def get_actual_checksum(self, pe: pefile.PE) -> str: """Get calculated checksum of PE - @return: checksum or None. + @return: checksum string, or None if unavailable / not computed. + + `pe.generate_checksum()` is a pure-Python loop over the entire + file in 32-bit words — typically 1-3 seconds per PE on + sandbox-sized payloads, and on heavy tasks with 15-20 dumped + payloads it dominates CAPE-module wall-clock (~40s/task in + profiling). + + The recomputed value is only consumed by the + `static_pe_anomaly` signature, which only compares it against + the embedded `reported_checksum` when that field is non-zero + (`if reported and reported != actual` — see + modules/signatures/all/static_pe_anomaly.py). When the PE has + no embedded checksum (compilers commonly omit it; almost every + dropper/packer leaves it 0), the recompute result would never + be consulted — pure throwaway work. + + Skip the expensive recompute in that case and return None so + the field is honestly absent rather than fabricated. The + signature already gracefully handles missing keys via the + `if reported and ...` guard, and downstream consumers can + distinguish "not computed" from "computed and zero". Saves + ~40s on a 19-payload analysis with reported=0 binaries. """ if not pe: return None + try: + if pe.OPTIONAL_HEADER.CheckSum == 0: + # No embedded checksum to compare against — skip the + # expensive recompute and signal absence. + return None + except Exception: + pass + try: return f"0x{pe.generate_checksum():08x}" except Exception: @@ -779,7 +809,7 @@ def get_digital_signers(self, pe: pefile.PE) -> List[dict]: signatures = bytes(signatures) with suppress(Exception): - certs = backend.load_der_pkcs7_certificates(signatures) + certs = _pkcs7_mod.load_der_pkcs7_certificates(signatures) except AttributeError: log.debug("Can't get PE signatures") @@ -942,6 +972,14 @@ def run(self, task_id: str = False) -> dict: if not self.HAVE_PE: return {} + # `get_actual_checksum` returns None when the embedded reported + # checksum is 0 (we skip the expensive recompute since no + # comparison will be made). In that case omit the key entirely + # rather than fabricating a "0x00000000" — the static_pe_anomaly + # signature already guards on `"actual_checksum" in pe`, and any + # other consumer can distinguish "absent → not computed" from a + # genuine "computed and zero" value. + actual_cs = self.get_actual_checksum(self.pe) peresults = { "guest_signers": self.get_guest_digital_signers(task_id), "digital_signers": self.get_digital_signers(self.pe), @@ -950,7 +988,6 @@ def run(self, task_id: str = False) -> dict: "ep_bytes": self.get_ep_bytes(self.pe), "peid_signatures": self.get_peid_signatures(self.pe), "reported_checksum": self.get_reported_checksum(self.pe), - "actual_checksum": self.get_actual_checksum(self.pe), "osversion": self.get_osversion(self.pe), "machine_type": self.get_machine_type(self.pe), "pdbpath": self.get_pdb_path(self.pe), @@ -965,6 +1002,8 @@ def run(self, task_id: str = False) -> dict: "imphash": self.get_imphash(self.pe), "timestamp": self.get_timestamp(self.pe), } + if actual_cs is not None: + peresults["actual_checksum"] = actual_cs ( peresults["icon"], peresults["icon_hash"], diff --git a/lib/cuckoo/common/objects.py b/lib/cuckoo/common/objects.py index 9c689efb01b..d9b4ce33e48 100644 --- a/lib/cuckoo/common/objects.py +++ b/lib/cuckoo/common/objects.py @@ -436,8 +436,18 @@ def _yara_encode_string(self, yara_string): return new @classmethod - def init_yara(self, raise_exception: bool = False): - """Generates index for yara signatures.""" + def init_yara(cls, raise_exception: bool = False, force: bool = False): + """Generates index for yara signatures. + + Idempotent — safe to call multiple times. Compiling the full ruleset + is ~3s and the result is cached on the class for the lifetime of the + process, so subsequent calls short-circuit unless `force=True` is + passed (used after a yara-rule update). Without this guard, repeated + callers (e.g. some integration paths that didn't go through the + get_yara() wrapper) re-compiled all six categories on every call.""" + + if cls.yara_initialized and not force: + return categories = ("binaries", "urls", "memory", "CAPE", "macro", "monitor") log.debug("Initializing Yara...") @@ -509,7 +519,6 @@ def init_yara(self, raise_exception: bool = False): elif HAVE_YARA: try: File.yara_rules[category] = yara.compile(filepaths=rules, externals=externals) - File.yara_initialized = True break except yara.SyntaxError as e: bad_rule = f"{str(e).split('.yar', 1)[0]}.yar" @@ -562,6 +571,7 @@ def init_yara(self, raise_exception: bool = False): else: log.debug("\t |-- %s %s", category, entry) File.yara_rules_hash = hasher.hexdigest() + File.yara_initialized = True def get_yara(self, category="binaries", externals=None): """Get Yara signatures matches. diff --git a/modules/processing/CAPE.py b/modules/processing/CAPE.py index 7f791a38a41..a3396f34e61 100644 --- a/modules/processing/CAPE.py +++ b/modules/processing/CAPE.py @@ -291,10 +291,6 @@ def process_file(self, file_path, append_file, metadata: dict, *, category: str, "category": category, "file": file_info, } - - if not os.path.exists(self.task["target"]): - log.error("Target file doesn't exist anymore. That will prevent data to be shown on webgui") - elif processing_conf.CAPE.dropped and category in ("dropped", "package"): if category == "dropped": file_info.update(metadata.get(file_info["path"][0], {})) @@ -439,33 +435,37 @@ def run(self): "metadata": entry.get("metadata", {}), } + # Pre-scan ClamAV in parallel for every file we're about to process. + # The sequential single-thread `allmatchscan` over 10-20 dropped / + # extracted files is the dominant cost in CAPE.run() on heavy tasks + # (~58% of total in profiling). clamd is multi-threaded server-side, + # so fanning out N parallel scans cuts that wall-clock from + # `sum_of_per_file_scans` to roughly `slowest_single_scan`. + prefetch_paths = [] + if self.task["category"] in ("file", "static") and self.file_path: + prefetch_paths.append(self.file_path) + for folder in ("CAPE_path", "procdump_path", "dropped_path", "package_files"): + if hasattr(self, folder): + for dir_name, _, file_names in os.walk(getattr(self, folder)): + for file_name in file_names: + prefetch_paths.append(os.path.join(dir_name, file_name)) + if prefetch_paths: + try: + from lib.cuckoo.common.integrations.clamav import ( + clear_clamav_cache, prefetch_clamav, + ) + clear_clamav_cache() + prefetch_clamav(prefetch_paths) + except Exception: + # Don't let a clamav prefetch error block analysis — the + # legacy serial fallback inside get_clamav() still works. + log.debug("clamav prefetch failed", exc_info=True) + # Static processing of submitted file if self.task["category"] in ("file", "static"): self.process_file( self.file_path, False, meta.get(self.file_path, {}), category=self.task["category"], duplicated=duplicated ) - if "target" not in self.results: - target_restored = False - try: - db_analysis = mongo_find_one("analysis", {"info.id": int(self.task["id"])}, {"target": 1, "_id": 0}) - if db_analysis and "target" in db_analysis: - self.results["target"] = db_analysis["target"] - target_restored = True - log.info("Restored missing target info from MongoDB analysis collection") - except Exception as e: - log.error("Failed to restore target info from MongoDB: %s", e) - - if not target_restored: - json_path = os.environ.get("CAPE_REPORT") or os.path.join(self.reports_path, "report.json") - if path_exists(json_path): - try: - with open(json_path, "r", encoding="utf-8") as f: - report_data = json.load(f) - if "target" in report_data: - self.results["target"] = report_data["target"] - log.info("Restored missing target info from existing report.json") - except Exception as e: - log.error("Failed to restore target info from existing report: %s", e) for folder in ("CAPE_path", "procdump_path", "dropped_path", "package_files"): category = folder.replace("_path", "").replace("_files", "") diff --git a/utils/process.py b/utils/process.py index 695c39888ad..0c7d6eaf67d 100644 --- a/utils/process.py +++ b/utils/process.py @@ -197,29 +197,40 @@ def init_worker(): # Avoid fork deadlock: use direct list ops instead of # handler.close()/removeHandler()/addHandler() which acquire locks. - # Inherited FDs are intentionally leaked: closing them via os.close() - # frees the fd number, but the old Python stream still references it; - # when GC finalizes that stream it may close a new handler's fd. - # Workers are short-lived (max_tasks) so the leak is harmless. - log.handlers.clear() + # Inherited FDs are intentionally leaked after fork. + log.handlers[:] = [] + # Restore Console Handler (no lock-acquiring addHandler) ch = ConsoleHandler() ch.setFormatter(FORMATTER) log.handlers.append(ch) + # Eagerly compile the YARA ruleset once per worker so the first task + # this worker picks up doesn't pay the ~3s compile cost. The result + # is cached on the File class for the worker's lifetime; init_yara() + # is idempotent (no-op when already initialized) so this is safe to + # call here even if some downstream code path also calls it. + try: + from lib.cuckoo.common.objects import File + File.init_yara() + except Exception: + log.debug("worker init: yara pre-compile skipped", exc_info=True) + + # Restore Syslog Handler if enabled if logconf.logger.syslog_process: try: slh = logging.handlers.SysLogHandler(address=logconf.logger.syslog_dev) slh.setFormatter(FORMATTER) - log.handlers.append(slh) + log.addHandler(slh) except Exception as e: log.warning("Failed to restore Syslog handler in worker: %s", e) + # Restore File Handler using WatchedFileHandler to support rotation try: path = os.path.join(CUCKOO_ROOT, "log", "process.log") fh = logging.handlers.WatchedFileHandler(path) fh.setFormatter(FORMATTER) - log.handlers.append(fh) + log.addHandler(fh) except PermissionError as e: log.warning("Failed to restore File handler in worker due to permissions: %s", e)