Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 111 additions & 15 deletions lib/cuckoo/common/integrations/clamav.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
# See the file 'docs/LICENSE' for copying permission.
import logging
import os
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from contextlib import suppress

from lib.cuckoo.common.config import Config
Expand All @@ -19,6 +21,98 @@
HAVE_CLAMAV = True


# Module-level cache for per-task `prefetch_clamav` results. Keyed by
# absolute file path. Each value is the same list of match-strings that
# `get_clamav` would otherwise compute. Populated by `prefetch_clamav`
# and consumed transparently by `get_clamav`. Cleared at task boundary
# via `clear_clamav_cache` to avoid leaking results across analyses on
# a long-lived worker process.
_CACHE_LOCK = threading.Lock()
_CLAMAV_CACHE = {}


Comment on lines +24 to +33
def _scan_one(file_path):
"""Return the list of ClamAV match-strings for `file_path`, or [] on
error / empty file / clamav not present. Issues a single
ALLMATCHSCAN over its own clamd socket so it's safe to call from
multiple threads concurrently — clamd is multi-threaded server-side
and serves each socket independently."""
matches = []
if not HAVE_CLAMAV:
return matches
try:
if os.path.getsize(file_path) <= 0:
return matches
except OSError:
return matches
try:
cd = pyclamd.ClamdUnixSocket()
results = cd.allmatchscan(file_path)
if results and file_path in results:
for entry in results[file_path]:
if entry[0] == "FOUND" and entry[1] not in matches:
matches.append(entry[1])
except ConnectionError:
log.warning("failed to connect to clamd socket")
except Exception as e:
log.warning("failed to scan file with clamav %s: %s", file_path, e)
return matches


def prefetch_clamav(file_paths, max_workers=8):
"""Pre-scan a batch of files in parallel and populate the per-task
cache. Subsequent `get_clamav(path)` calls for any of these paths
return instantly from cache instead of opening a socket.

The bottleneck on heavy CAPE tasks is the sequential single-thread
`allmatchscan` over 10-20 dropped/extracted files (each ~3-9s of
socket-recv latency). clamd is multi-threaded server-side, so
fanning out N parallel ALLMATCHSCAN sockets cuts wall-clock to
roughly `slowest_file_scan_seconds` instead of `sum_of_all_scans`.

Workers default to 8 — beyond that you start saturating clamd's
thread pool (default `MaxThreads = 12` in clamd.conf) and gain
little. No-ops if ClamAV is not configured.
"""
if not HAVE_CLAMAV or not file_paths:
return
# Filter to paths we don't already have cached and that exist.
pending = []
with _CACHE_LOCK:
for p in file_paths:
if p in _CLAMAV_CACHE:
continue
try:
if not os.path.isfile(p) or os.path.getsize(p) <= 0:
_CLAMAV_CACHE[p] = []
continue
except OSError:
continue
pending.append(p)
if not pending:
return
workers = max(1, min(max_workers, len(pending)))
with ThreadPoolExecutor(max_workers=workers) as ex:
future_map = {ex.submit(_scan_one, p): p for p in pending}
for fut in as_completed(future_map):
p = future_map[fut]
try:
result = fut.result()
except Exception as e:
log.warning("clamav prefetch failed for %s: %s", p, e)
result = []
with _CACHE_LOCK:
_CLAMAV_CACHE[p] = result


def clear_clamav_cache():
"""Drop the per-task prefetch cache. Call at task boundaries to
avoid leaking match results across analyses on a long-lived
worker process."""
with _CACHE_LOCK:
_CLAMAV_CACHE.clear()


def get_clamav(file_path):
"""Get ClamAV signatures matches.
Enable in: processing -> [CAPE] -> clamav
Expand All @@ -30,21 +124,23 @@ def get_clamav(file_path):
systemctl start clamav-daemon
usermod -a -G cape clamav
echo "/opt/CAPEv2/storage/** r," | sudo tee -a /etc/apparmor.d/local/usr.sbin.clamd
@return: matched ClamAV signatures.
"""
matches = []

if HAVE_CLAMAV and os.path.getsize(file_path) > 0:
try:
cd = pyclamd.ClamdUnixSocket()
results = cd.allmatchscan(file_path)
if results:
for entry in results[file_path]:
if entry[0] == "FOUND" and entry[1] not in matches:
matches.append(entry[1])
except ConnectionError:
log.warning("failed to connect to clamd socket")
except Exception as e:
log.warning("failed to scan file with clamav %s", e)
Returns the cached matches when `prefetch_clamav` has been called
for this path in the current task; otherwise issues a single
sequential scan (preserving the legacy behaviour for any caller
that bypasses the prefetch path).

@return: matched ClamAV signatures.
"""
if not HAVE_CLAMAV:
return []
with _CACHE_LOCK:
cached = _CLAMAV_CACHE.get(file_path)
if cached is not None:
return list(cached)
matches = _scan_one(file_path)
# Memoise even single-shot scans so repeated lookups for the same
# path within a task don't pay the network cost twice.
with _CACHE_LOCK:
_CLAMAV_CACHE[file_path] = matches
return matches
89 changes: 66 additions & 23 deletions lib/cuckoo/common/integrations/file_extra_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# from contextlib import suppress
from typing import Any, DefaultDict, List, Optional, Set

import threading

import pebble

from lib.cuckoo.common.config import Config
Expand Down Expand Up @@ -182,6 +184,10 @@ def static_file_info(
if "pe" not in data_dictionary:
with PortableExecutable(file_path) as pe:
data_dictionary["pe"] = pe.run(task_id)
elif not data_dictionary["pe"].get("digital_signers"):
with PortableExecutable(file_path) as pe:
data_dictionary["pe"]["digital_signers"] = pe.get_digital_signers(pe.pe)
data_dictionary["pe"]["guest_signers"] = pe.get_guest_digital_signers(task_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This check will trigger for every unsigned PE file during reprocessing, as digital_signers will be an empty list. This causes the file to be re-opened and re-parsed, which is a significant performance hit for tasks with many unsigned payloads.

Since the cryptography bug is now fixed for new analyses, consider if this 'fallback' for old records is worth the performance cost on all unsigned files. If it must be kept, it would be more efficient to check if the PE has a security directory entry (e.g., via pe.OPTIONAL_HEADER.DATA_DIRECTORY[pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_SECURITY']].Size > 0) before performing a full re-extraction.


if HAVE_FLARE_CAPA and "flare_capa" not in data_dictionary:
# https://github.com/mandiant/capa/issues/2620
Expand Down Expand Up @@ -407,6 +413,40 @@ def _extracted_files_metadata(

from lib.cuckoo.common.integrations.utils import run_tool


# Process-wide pebble.ProcessPool reused across every call to
# `generic_file_extractors`. The previous code created and tore down a
# fresh ProcessPool per file (one `with pebble.ProcessPool(...) as pool:`
# block per call). On heavy tasks with 50+ extracted files that's
# 50+ × (subprocess fork + Python interpreter startup + module imports
# + pool teardown) of pure overhead — measured at ~1.2s per file on
# our sandbox (~85s on a 70-file task before any extractor work).
#
# Pebble's ProcessPool is explicitly designed for long-lived reuse:
# its workers respawn after task completion (max_tasks=1 by default
# would tear them down per call, but the default unlimited keeps them
# warm) and a crashed worker is replaced automatically. We lazily
# instantiate a single pool on first use and keep it for the rest of
# the worker process's lifetime — handing each call a slice of the
# already-warm worker pool instead of paying startup costs every time.
_EXTRACTOR_POOL = None
_EXTRACTOR_POOL_LOCK = threading.Lock()


def _get_extractor_pool():
"""Return a process-wide shared pebble.ProcessPool, creating it
on first use. Thread-safe."""
global _EXTRACTOR_POOL
if _EXTRACTOR_POOL is not None:
return _EXTRACTOR_POOL
with _EXTRACTOR_POOL_LOCK:
if _EXTRACTOR_POOL is None:
_EXTRACTOR_POOL = pebble.ProcessPool(
max_workers=int(integration_conf.general.max_workers),
)
return _EXTRACTOR_POOL


def generic_file_extractors(
file: str,
destination_folder: str,
Expand Down Expand Up @@ -442,33 +482,36 @@ def generic_file_extractors(

futures = {}
executed_tools = data_dictionary.setdefault("executed_tools", [])
with pebble.ProcessPool(max_workers=int(integration_conf.general.max_workers)) as pool:
# Prefer custom modules over the built-in ones, since only 1 is allowed
# to be the extracted_files_tool.
if extra_info_modules:
for module in extra_info_modules:
func_timeout = int(getattr(module, "timeout", 60))
funcname = module.__name__.split(".")[-1]
if funcname in executed_tools:
continue
executed_tools.append(funcname)
futures[funcname] = pool.schedule(module.extract_details, args=args, kwargs=kwargs, timeout=func_timeout)

for extraction_func in file_info_funcs:
funcname = extraction_func.__name__.split(".")[-1]
if (
not getattr(integration_conf, funcname, {}).get("enabled", False)
and getattr(extraction_func, "enabled", False) is False
):
continue

pool = _get_extractor_pool()
# Prefer custom modules over the built-in ones, since only 1 is allowed
# to be the extracted_files_tool.
if extra_info_modules:
for module in extra_info_modules:
func_timeout = int(getattr(module, "timeout", 60))
funcname = module.__name__.split(".")[-1]
if funcname in executed_tools:
continue
executed_tools.append(funcname)
futures[funcname] = pool.schedule(module.extract_details, args=args, kwargs=kwargs, timeout=func_timeout)

for extraction_func in file_info_funcs:
funcname = extraction_func.__name__.split(".")[-1]
if (
not getattr(integration_conf, funcname, {}).get("enabled", False)
and getattr(extraction_func, "enabled", False) is False
):
continue

func_timeout = int(getattr(integration_conf, funcname, {}).get("timeout", 60))
futures[funcname] = pool.schedule(extraction_func, args=args, kwargs=kwargs, timeout=func_timeout)
pool.join()
if funcname in executed_tools:
continue
executed_tools.append(funcname)

func_timeout = int(getattr(integration_conf, funcname, {}).get("timeout", 60))
futures[funcname] = pool.schedule(extraction_func, args=args, kwargs=kwargs, timeout=func_timeout)
# The shared pool stays alive across calls; we no longer call pool.join()
# here because that would shut it down. Each future's per-task timeout
# (set above via pool.schedule timeout=) bounds wall-clock per extractor,
# and the iteration below collects results per-future via .result().

for funcname, future in futures.items():
func_result = None
Expand Down
47 changes: 43 additions & 4 deletions lib/cuckoo/common/integrations/parse_pe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

try:
import cryptography
from cryptography.hazmat.backends.openssl.backend import backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import pkcs7 as _pkcs7_mod

HAVE_CRYPTO = True
except ImportError:
Expand Down Expand Up @@ -273,11 +273,41 @@ def get_reported_checksum(self, pe: pefile.PE) -> str:

def get_actual_checksum(self, pe: pefile.PE) -> str:
"""Get calculated checksum of PE
@return: checksum or None.
@return: checksum string, or None if unavailable / not computed.
Comment on lines 268 to +276

`pe.generate_checksum()` is a pure-Python loop over the entire
file in 32-bit words — typically 1-3 seconds per PE on
sandbox-sized payloads, and on heavy tasks with 15-20 dumped
payloads it dominates CAPE-module wall-clock (~40s/task in
profiling).

The recomputed value is only consumed by the
`static_pe_anomaly` signature, which only compares it against
the embedded `reported_checksum` when that field is non-zero
(`if reported and reported != actual` — see
modules/signatures/all/static_pe_anomaly.py). When the PE has
no embedded checksum (compilers commonly omit it; almost every
dropper/packer leaves it 0), the recompute result would never
be consulted — pure throwaway work.

Skip the expensive recompute in that case and return None so
the field is honestly absent rather than fabricated. The
signature already gracefully handles missing keys via the
`if reported and ...` guard, and downstream consumers can
distinguish "not computed" from "computed and zero". Saves
~40s on a 19-payload analysis with reported=0 binaries.
Comment on lines +284 to +298
"""
if not pe:
return None

try:
if pe.OPTIONAL_HEADER.CheckSum == 0:
# No embedded checksum to compare against — skip the
# expensive recompute and signal absence.
return None
except Exception:
pass

try:
return f"0x{pe.generate_checksum():08x}"
except Exception:
Expand Down Expand Up @@ -779,7 +809,7 @@ def get_digital_signers(self, pe: pefile.PE) -> List[dict]:
signatures = bytes(signatures)

with suppress(Exception):
certs = backend.load_der_pkcs7_certificates(signatures)
certs = _pkcs7_mod.load_der_pkcs7_certificates(signatures)

except AttributeError:
log.debug("Can't get PE signatures")
Expand Down Expand Up @@ -942,6 +972,14 @@ def run(self, task_id: str = False) -> dict:
if not self.HAVE_PE:
return {}

# `get_actual_checksum` returns None when the embedded reported
# checksum is 0 (we skip the expensive recompute since no
# comparison will be made). In that case omit the key entirely
# rather than fabricating a "0x00000000" — the static_pe_anomaly
# signature already guards on `"actual_checksum" in pe`, and any
# other consumer can distinguish "absent → not computed" from a
# genuine "computed and zero" value.
actual_cs = self.get_actual_checksum(self.pe)
peresults = {
"guest_signers": self.get_guest_digital_signers(task_id),
"digital_signers": self.get_digital_signers(self.pe),
Expand All @@ -950,7 +988,6 @@ def run(self, task_id: str = False) -> dict:
"ep_bytes": self.get_ep_bytes(self.pe),
"peid_signatures": self.get_peid_signatures(self.pe),
"reported_checksum": self.get_reported_checksum(self.pe),
"actual_checksum": self.get_actual_checksum(self.pe),
"osversion": self.get_osversion(self.pe),
"machine_type": self.get_machine_type(self.pe),
"pdbpath": self.get_pdb_path(self.pe),
Expand All @@ -965,6 +1002,8 @@ def run(self, task_id: str = False) -> dict:
"imphash": self.get_imphash(self.pe),
"timestamp": self.get_timestamp(self.pe),
}
if actual_cs is not None:
peresults["actual_checksum"] = actual_cs
(
peresults["icon"],
peresults["icon_hash"],
Expand Down
14 changes: 12 additions & 2 deletions lib/cuckoo/common/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,18 @@ def _yara_encode_string(self, yara_string):
return new

@classmethod
def init_yara(self, raise_exception: bool = False):
"""Generates index for yara signatures."""
def init_yara(cls, raise_exception: bool = False, force: bool = False):
"""Generates index for yara signatures.

Idempotent — safe to call multiple times. Compiling the full ruleset
is ~3s and the result is cached on the class for the lifetime of the
process, so subsequent calls short-circuit unless `force=True` is
passed (used after a yara-rule update). Without this guard, repeated
callers (e.g. some integration paths that didn't go through the
get_yara() wrapper) re-compiled all six categories on every call."""

if cls.yara_initialized and not force:
return
Comment on lines +445 to +450

categories = ("binaries", "urls", "memory", "CAPE", "macro", "monitor")
log.debug("Initializing Yara...")
Expand Down
Loading
Loading