Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 148 additions & 15 deletions capa/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import logging
import datetime
import contextlib
import threading
import signal
from typing import Optional
from pathlib import Path

Expand Down Expand Up @@ -50,6 +52,7 @@
from capa.capabilities.common import Capabilities
from capa.features.extractors.base_extractor import (
SampleHashes,
FunctionFilter,
FeatureExtractor,
StaticFeatureExtractor,
DynamicFeatureExtractor,
Expand All @@ -74,6 +77,10 @@ class CorruptFile(ValueError):
pass


class _AnalysisTimeoutError(RuntimeError):
pass


def is_supported_format(sample: Path) -> bool:
"""
Return if this is a supported file based on magic header values
Expand Down Expand Up @@ -177,6 +184,83 @@ def _is_probably_corrupt_pe(path: Path) -> bool:
return False


def _get_elf_analysis_timeout_seconds() -> int:
"""
Return timeout for viv ELF analysis in seconds.
0 disables timeout.
"""
value = os.environ.get("CAPA_ELF_ANALYSIS_TIMEOUT_SECONDS", "120").strip()
try:
return max(0, int(value))
except ValueError:
logger.warning("invalid CAPA_ELF_ANALYSIS_TIMEOUT_SECONDS=%r, using default 120", value)
return 120


def _get_elf_max_functions() -> int:
"""
Return max number of ELF functions to analyze with viv.
0 disables capping.
"""
value = os.environ.get("CAPA_ELF_MAX_FUNCTIONS", "1000").strip()
try:
return max(0, int(value))
except ValueError:
logger.warning("invalid CAPA_ELF_MAX_FUNCTIONS=%r, using default 1000", value)
return 1000


@contextlib.contextmanager
def _timebox(seconds: int):
"""
Timebox a block using SIGALRM on platforms that support it.
"""
if (
seconds <= 0
or not hasattr(signal, "SIGALRM")
or threading.current_thread() is not threading.main_thread()
):
yield
return

def _handle_timeout(signum, frame):
raise _AnalysisTimeoutError(f"analysis exceeded {seconds}s")

previous_handler = signal.getsignal(signal.SIGALRM)
signal.signal(signal.SIGALRM, _handle_timeout)
signal.setitimer(signal.ITIMER_REAL, float(seconds))
try:
yield
finally:
signal.setitimer(signal.ITIMER_REAL, 0.0)
signal.signal(signal.SIGALRM, previous_handler)


@contextlib.contextmanager
def _temporarily_disable_viv_elf_section_symbols():
"""
Disable viv's ELF section-symbol parsing while loading a workspace.

The parser reads large .symtab/.strtab sections very inefficiently and can
cause severe slowdowns on large real-world ELF binaries.
"""
import Elf

original = getattr(Elf.Elf, "_parseSectionSymbols", None)
if original is None:
yield
return

def _skip_section_symbols(self):
logger.debug("skipping viv ELF section-symbol parsing")

Elf.Elf._parseSectionSymbols = _skip_section_symbols
try:
yield
finally:
Elf.Elf._parseSectionSymbols = original


def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):
"""
load the program at the given path into a vivisect workspace using the given format.
Expand Down Expand Up @@ -206,15 +290,24 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):
+ " - skipping analysis to avoid excessive resource usage."
)

is_elf_input = False
if input_format == FORMAT_ELF:
is_elf_input = True
elif input_format == FORMAT_AUTO:
with path.open("rb") as f:
is_elf_input = f.read(4).startswith(capa.features.extractors.common.MATCH_ELF)

try:
if input_format == FORMAT_AUTO:
if not is_supported_format(path):
raise UnsupportedFormatError()

# don't analyze, so that we can add our Flirt function analyzer first.
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
with _temporarily_disable_viv_elf_section_symbols() if is_elf_input else contextlib.nullcontext():
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif input_format in {FORMAT_PE, FORMAT_ELF}:
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
with _temporarily_disable_viv_elf_section_symbols() if is_elf_input else contextlib.nullcontext():
vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False)
elif input_format == FORMAT_SC32:
# these are not analyzed nor saved.
vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False)
Expand All @@ -224,6 +317,12 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):
raise ValueError("unexpected format: " + input_format)
except envi.exc.SegmentationViolation as e:
raise CorruptFile(f"Invalid memory access during binary parsing: {e}") from e
except ModuleNotFoundError as e:
# viv may fail while loading architecture-specific impapi modules.
# treat this as unsupported architecture instead of crashing.
if e.name and e.name.startswith("vivisect.impapi.posix."):
raise UnsupportedArchError() from e
raise
except Exception as e:
# vivisect raises raw Exception instances, and we don't want
# to do a subclass check via isinstance.
Expand All @@ -240,19 +339,36 @@ def get_workspace(path: Path, input_format: str, sigpaths: list[Path]):

viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths])

with contextlib.suppress(Exception):
# unfortuately viv raises a raw Exception (not any subclass).
# This happens when the module isn't found, such as with a viv upgrade.
#
# Remove the symbolic switch case solver.
# This is only enabled for ELF files, not PE files.
# During the following performance investigation, this analysis module
# had some terrible worst-case behavior.
# We can put up with slightly worse CFG reconstruction in order to avoid this.
# https://github.com/mandiant/capa/issues/1989#issuecomment-1948022767
vw.delFuncAnalysisModule("vivisect.analysis.generic.symswitchcase")
if is_elf_input:
for module in (
# During performance investigations we've observed pathological
# behavior in several viv ELF function-analysis passes. prefer
# slightly reduced CFG reconstruction over indefinite analysis.
"vivisect.analysis.generic.symswitchcase",
"vivisect.analysis.elf.elfplt",
"vivisect.analysis.amd64.emulation",
"vivisect.analysis.generic.emucode",
"vivisect.analysis.generic.noret",
):
with contextlib.suppress(Exception):
# unfortunately viv raises raw Exception (not any subclass)
# when a module isn't found (e.g. after viv upgrades).
vw.delFuncAnalysisModule(module)

vw.analyze()
try:
timeout_s = _get_elf_analysis_timeout_seconds() if is_elf_input else 0
with _timebox(timeout_s):
vw.analyze()
except _AnalysisTimeoutError as e:
raise CorruptFile(
f"analysis timed out after {timeout_s}s while processing ELF sample; refusing to hang indefinitely"
) from e
except ModuleNotFoundError as e:
# viv may fail late when it cannot load an architecture-specific impapi module.
# treat this as an unsupported architecture instead of crashing with a traceback.
if e.name and e.name.startswith("vivisect.impapi.posix."):
raise UnsupportedArchError() from e
raise

logger.debug("%s", get_meta_str(vw))
return vw
Expand Down Expand Up @@ -364,7 +480,24 @@ def get_extractor(
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_)
extractor: FeatureExtractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_)
if input_format == FORMAT_ELF:
max_functions = _get_elf_max_functions()
if max_functions > 0:
selected = []
functions = extractor.get_functions()
for i, f in enumerate(functions):
if i >= max_functions:
logger.warning(
"ELF function count exceeds CAPA_ELF_MAX_FUNCTIONS=%d, limiting analysis scope",
max_functions,
)
break
selected.append(f.address)
if selected:
extractor = FunctionFilter(extractor, set(selected))

return extractor

elif backend == BACKEND_FREEZE:
return frz.load(input_path.read_bytes())
Expand Down
53 changes: 52 additions & 1 deletion capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import os
import sys
import time
import signal
import logging
import argparse
import threading
import textwrap
import contextlib
from types import TracebackType
Expand Down Expand Up @@ -140,6 +142,10 @@
logger = logging.getLogger("capa")


class _AnalysisTimeoutError(RuntimeError):
pass


class FilterConfig(TypedDict, total=False):
processes: set[int]
functions: set[int]
Expand All @@ -153,6 +159,42 @@ def timing(msg: str):
logger.debug("perf: %s: %0.2fs", msg, t1 - t0)


def _get_elf_total_analysis_timeout_seconds() -> int:
"""
Return timeout for ELF capability matching in seconds.
0 disables timeout.
"""
value = os.environ.get("CAPA_ELF_TOTAL_ANALYSIS_TIMEOUT_SECONDS", "120").strip()
try:
return max(0, int(value))
except ValueError:
logger.warning("invalid CAPA_ELF_TOTAL_ANALYSIS_TIMEOUT_SECONDS=%r, using default 120", value)
return 120


@contextlib.contextmanager
def _timebox(seconds: int):
if (
seconds <= 0
or not hasattr(signal, "SIGALRM")
or threading.current_thread() is not threading.main_thread()
):
yield
return

def _handle_timeout(signum, frame):
raise _AnalysisTimeoutError(f"analysis exceeded {seconds}s")

previous_handler = signal.getsignal(signal.SIGALRM)
signal.signal(signal.SIGALRM, _handle_timeout)
signal.setitimer(signal.ITIMER_REAL, float(seconds))
try:
yield
finally:
signal.setitimer(signal.ITIMER_REAL, 0.0)
signal.signal(signal.SIGALRM, previous_handler)


def set_vivisect_log_level(level):
logging.getLogger("vivisect").setLevel(level)
logging.getLogger("vivisect.base").setLevel(level)
Expand Down Expand Up @@ -1039,7 +1081,16 @@ def main(argv: Optional[list[str]] = None):
except ShouldExitError as e:
return e.status_code

capabilities: Capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet)
try:
timeout_s = _get_elf_total_analysis_timeout_seconds() if (input_format == FORMAT_ELF and backend == BACKEND_VIV) else 0
with _timebox(timeout_s):
capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet)
except _AnalysisTimeoutError:
logger.error(
"analysis timed out after %ds while matching capabilities for ELF sample; refusing to hang indefinitely",
timeout_s,
)
return E_FILE_LIMITATION

meta: rdoc.Metadata = capa.loader.collect_metadata(
argv, args.input_file, input_format, os_, args.rules, extractor, capabilities
Expand Down
Loading
Loading