From 9dcd615376aa7f0bd2e22edb2d9b46d38936d989 Mon Sep 17 00:00:00 2001 From: devs6186 Date: Mon, 6 Apr 2026 22:22:48 +0530 Subject: [PATCH 1/7] rules: pre-filter string rules whose patterns are absent from the binary MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add RuleSet.prepare_for_file(file_strings) which accepts the set of all string values extracted from a binary file and marks any string rule whose required Substring/Regex patterns cannot match any of those strings as impossible. _match() then skips those rules entirely, avoiding repeated Regex.evaluate() / Substring.evaluate() calls. The filter is applied in find_static_capabilities() before the per- function analysis loop and cleared afterwards to keep the ruleset clean for potential reuse. The one-time O(rules × file_strings) pre-check replaces an O(functions × string_rules × file_strings) repeated evaluation. This directly addresses the long-standing hot-path described in the issue: HTTP User-Agent rules and similar families with many regex branches in a single `or:` block are skipped entirely for binaries that don't contain any matching strings. Closes #2126 --- CHANGELOG.md | 2 ++ capa/capabilities/static.py | 15 +++++++++ capa/rules/__init__.py | 61 +++++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1cc6d236..d128a70af 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +- rules: pre-filter string rules whose Substring/Regex patterns are absent from the binary file, reducing redundant regex evaluation during per-function matching #2126 + ### Breaking Changes ### New Rules (0) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index 893887f77..215334afc 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -25,6 +25,7 @@ import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults +from capa.features.common import String from capa.capabilities.common import Capabilities, find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor @@ -163,6 +164,17 @@ def find_static_capabilities( library_functions_list: list[rdoc.LibraryFunction] = [] assert isinstance(extractor, StaticFeatureExtractor) + + # Pre-filter string rules based on strings found in the binary. + # Collect all string values from the file's feature set and inform the ruleset + # so that rules whose required patterns are provably absent are skipped during + # per-function matching. This avoids repeated Regex.evaluate() calls that can + # never succeed. See: https://github.com/mandiant/capa/issues/2126 + file_strings: frozenset[str] = frozenset( + feature.value for feature, _ in extractor.extract_file_features() if isinstance(feature, String) + ) + ruleset.prepare_for_file(file_strings) + functions: list[FunctionHandle] = list(extractor.get_functions()) n_funcs: int = len(functions) n_libs: int = 0 @@ -239,6 +251,9 @@ def find_static_capabilities( functions=tuple(function_feature_counts), ) + # Clear the string pre-filter so the ruleset is clean for potential reuse. + ruleset.prepare_for_file(frozenset()) + matches: MatchResults = dict( itertools.chain( # each rule exists in exactly one scope, diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 1eca88042..af13e4a18 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1463,6 +1463,11 @@ def __init__( scope: {rule.name: i for i, rule in enumerate(self.rules_by_scope[scope])} for scope in scopes } + # Set of string-rule names whose required patterns are absent from the current binary. + # Populated by prepare_for_file(); empty means no pre-filtering is active. + # See: https://github.com/mandiant/capa/issues/2126 + self._impossible_string_rule_names: set[str] = set() + @property def file_rules(self): return self.rules_by_scope[Scope.FILE] @@ -1948,6 +1953,56 @@ def _sort_rules_by_index(rule_index_by_rule_name: dict[str, int], rules: list[Ru """ rules.sort(key=lambda r: rule_index_by_rule_name[r.name]) + def prepare_for_file(self, file_strings: frozenset[str]) -> None: + """ + Pre-filter string rules based on strings extracted from the binary file. + + Rules whose required Substring/Regex patterns cannot match any string in + file_strings will be skipped during subsequent _match() calls. This + saves repeated Regex.evaluate() / Substring.evaluate() work for patterns + that are provably absent from the binary. + + Call this before analyzing functions for a binary. + Pass an empty frozenset to clear the filter between binaries. + + See: https://github.com/mandiant/capa/issues/2126 + """ + if not file_strings: + self._impossible_string_rule_names = set() + return + + impossible: set[str] = set() + total = 0 + + for feature_index in self._feature_indexes_by_scopes.values(): + for rule_name, wanted_strings in feature_index.string_rules.items(): + total += 1 + can_match = False + for feat in wanted_strings: + if isinstance(feat, capa.features.common.Substring): + if any(feat.value in s for s in file_strings): + can_match = True + break + elif isinstance(feat, capa.features.common.Regex): + if any(feat.re.search(s) for s in file_strings): + can_match = True + break + else: + # unknown feature type: keep to be safe + can_match = True + break + if not can_match: + impossible.add(rule_name) + + if impossible: + logger.debug( + "pre-filter: %d/%d string rules skipped (patterns absent from binary)", + len(impossible), + total, + ) + + self._impossible_string_rule_names = impossible + def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[FeatureSet, ceng.MatchResults]: """ Match rules from this ruleset at the given scope against the given features. @@ -2027,6 +2082,12 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea if string_features: for rule_name, wanted_strings in feature_index.string_rules.items(): + # Skip rules whose patterns are provably absent from the binary. + # prepare_for_file() pre-checks all file strings once and populates + # _impossible_string_rule_names to avoid repeated Regex.evaluate() work. + # See: https://github.com/mandiant/capa/issues/2126 + if rule_name in self._impossible_string_rule_names: + continue for wanted_string in wanted_strings: if wanted_string.evaluate(string_features): candidate_rule_names.add(rule_name) From 1a36f9e8a21ca8cba8dd4444aab98be3a965079e Mon Sep 17 00:00:00 2001 From: devs6186 Date: Tue, 7 Apr 2026 00:01:01 +0530 Subject: [PATCH 2/7] rules: avoid stack-string false negatives in string prefilter --- capa/rules/__init__.py | 17 +++++++++++++---- tests/test_match.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index af13e4a18..62f4713c3 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1972,11 +1972,14 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: return impossible: set[str] = set() - total = 0 + all_string_rule_names: set[str] = set() for feature_index in self._feature_indexes_by_scopes.values(): for rule_name, wanted_strings in feature_index.string_rules.items(): - total += 1 + if rule_name in all_string_rule_names: + continue + all_string_rule_names.add(rule_name) + can_match = False for feat in wanted_strings: if isinstance(feat, capa.features.common.Substring): @@ -1998,7 +2001,7 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: logger.debug( "pre-filter: %d/%d string rules skipped (patterns absent from binary)", len(impossible), - total, + len(all_string_rule_names), ) self._impossible_string_rule_names = impossible @@ -2081,12 +2084,18 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea string_features[feature] = locations if string_features: + # Some extractors may synthesize stack strings that do not exist as contiguous + # file bytes. In that case, avoid file-level pre-filtering for this scope. + has_stack_string_characteristic = any( + isinstance(feature, capa.features.common.Characteristic) and feature.value == "stack string" + for feature in features + ) for rule_name, wanted_strings in feature_index.string_rules.items(): # Skip rules whose patterns are provably absent from the binary. # prepare_for_file() pre-checks all file strings once and populates # _impossible_string_rule_names to avoid repeated Regex.evaluate() work. # See: https://github.com/mandiant/capa/issues/2126 - if rule_name in self._impossible_string_rule_names: + if not has_stack_string_characteristic and rule_name in self._impossible_string_rule_names: continue for wanted_string in wanted_strings: if wanted_string.evaluate(string_features): diff --git a/tests/test_match.py b/tests/test_match.py index 139e2434a..687d05d5b 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -21,7 +21,7 @@ import capa.features.insn import capa.features.common from capa.rules import Scope -from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule +from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule, Characteristic def match(rules, features, va, scope=Scope.FUNCTION): @@ -818,6 +818,35 @@ def test_index_features_nested_unstable(): assert not index.bytes_prefix_index +def test_string_prefilter_stack_string_fallback(): + rule_text = textwrap.dedent(""" + rule: + meta: + name: test string prefilter stack string fallback + scopes: + static: function + dynamic: process + features: + - string: /powershell/ + """) + rule = capa.rules.Rule.from_yaml(rule_text) + ruleset = capa.rules.RuleSet([rule]) + + # Mark the regex rule as impossible based on file-level strings. + ruleset.prepare_for_file(frozenset({"hello", "world"})) + + _, matches = ruleset.match(Scope.FUNCTION, {String("powershell"): {0x0}}, 0x0) + assert "test string prefilter stack string fallback" not in matches + + # If a stack string is present in this scope, don't trust file-level pre-filtering. + _, matches = ruleset.match( + Scope.FUNCTION, + {String("powershell"): {0x0}, Characteristic("stack string"): {0x0}}, + 0x0, + ) + assert "test string prefilter stack string fallback" in matches + + def test_bytes_prefix_index_correctness(): """Verify that the bytes prefix pre-filter preserves match behavior.""" rule_text = textwrap.dedent(""" From bdc6dcd0f7f12cccbab508001ae9a07d3c0fa959 Mon Sep 17 00:00:00 2001 From: devs6186 Date: Tue, 7 Apr 2026 03:35:50 +0530 Subject: [PATCH 3/7] rules: speed up prepare_for_file with concat-string scan and min-function guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two improvements to the string pre-filter introduced in #2126: 1. Concat-string optimization in prepare_for_file(): Instead of calling re.search() against each of the N file strings per rule, join all file strings with a \x01 separator and search the concat once. If the concat search finds no match the rule is provably impossible (a per- string match would also appear in the concat). If it does match, run the per-string fallback to confirm and avoid false positives from DOTALL patterns that could span the \x01 boundary. Measured overhead: ~41-53 ms (1147-1225 strings) vs ~200-270 ms before. 2. Minimum-function guard in find_static_capabilities(): Only activate prepare_for_file() when the binary has >= 10 functions. For very small binaries the one-time scan overhead can exceed the per- function savings; the guard avoids a net regression there. Benchmark results (vivisect backend, 1385 rules, 83 string-dependent): Binary Funcs File strs Baseline With filter Net gain Skipped Lab 01-02.exe_ 2 47 0.02 s 0.02 s +4 ms 83/83 0a30182f…exe_ 130 1 225 0.67 s 0.53 s +93 ms 83/83 7fbc17a0…exe_ 562 1 147 1.86 s 1.67 s +143 ms 81/83 321338…exe_ 2 466 3 363 11.83 s 11.46 s +280 ms 82/83 Net gain = (baseline - filtered) - prepare_for_file overhead. All positive; the filter pays for itself across the tested corpus. --- capa/capabilities/static.py | 25 +-- capa/rules/__init__.py | 39 ++++- scripts/benchmark_string_prefilter.py | 227 ++++++++++++++++++++++++++ 3 files changed, 276 insertions(+), 15 deletions(-) create mode 100644 scripts/benchmark_string_prefilter.py diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index 215334afc..94eae459d 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -165,17 +165,22 @@ def find_static_capabilities( assert isinstance(extractor, StaticFeatureExtractor) - # Pre-filter string rules based on strings found in the binary. - # Collect all string values from the file's feature set and inform the ruleset - # so that rules whose required patterns are provably absent are skipped during - # per-function matching. This avoids repeated Regex.evaluate() calls that can - # never succeed. See: https://github.com/mandiant/capa/issues/2126 - file_strings: frozenset[str] = frozenset( - feature.value for feature, _ in extractor.extract_file_features() if isinstance(feature, String) - ) - ruleset.prepare_for_file(file_strings) - functions: list[FunctionHandle] = list(extractor.get_functions()) + + # Pre-filter string rules based on strings found in the binary. + # For each rule whose required Substring/Regex patterns are provably absent + # from the binary's file-level strings, mark it as skippable in _match(). + # This replaces repeated Regex.evaluate() calls (once per function × per rule) + # with a single file-level scan. See: https://github.com/mandiant/capa/issues/2126 + # + # The upfront scan cost is O(|string_rules| × |file_strings|). For small + # binaries this overhead can exceed the savings, so we only activate the + # pre-filter when there are enough functions to justify it. + if len(functions) >= 10: + file_strings: frozenset[str] = frozenset( + feature.value for feature, _ in extractor.extract_file_features() if isinstance(feature, String) + ) + ruleset.prepare_for_file(file_strings) n_funcs: int = len(functions) n_libs: int = 0 percentage: float = 0 diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 62f4713c3..a748c001e 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1966,11 +1966,31 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: Pass an empty frozenset to clear the filter between binaries. See: https://github.com/mandiant/capa/issues/2126 + + Performance note: this method scans file_strings once per unique string-rule. + Cost is O(|string_rules| * |file_strings|) in the worst case, but typically + much faster because most rules' patterns are absent and `re.search` on a + concatenated string (see below) does the work in a single pass. """ if not file_strings: self._impossible_string_rule_names = set() return + # Build a single concatenated string from all file strings separated by \x01. + # \x01 is not present in capa rule patterns nor in file strings (which are + # printable ASCII sequences from the binary). + # Using this concat lets us do ONE re.search per rule (fast C-level scan) + # instead of iterating over every file string. + # + # If the concat-level scan finds no match, the rule is provably impossible + # (a match on an individual string would also appear in the concat). + # + # If it does find a match, we confirm per-string to avoid false positives: + # a pattern compiled with re.DOTALL treats `.` as matching any character + # including \x01, so `SELECT.*FROM.*WHERE` could match across the boundary + # of two unrelated strings. The per-string confirmation resolves this. + concat_strings: str = "\x01".join(file_strings) + impossible: set[str] = set() all_string_rule_names: set[str] = set() @@ -1983,15 +2003,24 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: can_match = False for feat in wanted_strings: if isinstance(feat, capa.features.common.Substring): - if any(feat.value in s for s in file_strings): + # Fast: single C-level scan of the concatenated string. + # No false-positive risk for Substring because feat.value + # cannot span a \x01 boundary (the pattern is a literal string + # and \x01 is never present in rule patterns). + if feat.value in concat_strings: can_match = True break elif isinstance(feat, capa.features.common.Regex): - if any(feat.re.search(s) for s in file_strings): - can_match = True - break + # Phase 1: check the concatenated string first. + # This is usually a definitive NO (impossible rule) in one call. + # When it returns a match, run per-string to confirm and avoid + # false positives from patterns that accidentally span \x01. + if feat.re.search(concat_strings): + if any(feat.re.search(s) for s in file_strings): + can_match = True + break else: - # unknown feature type: keep to be safe + # Unknown feature type: keep to be safe. can_match = True break if not can_match: diff --git a/scripts/benchmark_string_prefilter.py b/scripts/benchmark_string_prefilter.py new file mode 100644 index 000000000..690c24b5f --- /dev/null +++ b/scripts/benchmark_string_prefilter.py @@ -0,0 +1,227 @@ +#!/usr/bin/env python3 +""" +Benchmark: string-rule pre-filter speedup (#2126) + +Measures wall-clock time for find_static_capabilities() with and without +the string pre-filter (prepare_for_file), so we can quantify the speedup +on real binaries with a full rule set. + +Usage: + python scripts/benchmark_string_prefilter.py [--runs N] [binary ...] + +If no binary paths are given the script picks a small representative set +from tests/data/. Each binary is analysed RUNS times in each mode; the +median is reported. The script uses the vivisect back-end, which needs no +external tools. + +Example: + python scripts/benchmark_string_prefilter.py --runs 3 +""" + +import argparse +import logging +import pathlib +import statistics +import sys +import time + +# Silence capa progress output during benchmarking. +logging.disable(logging.WARNING) + +import capa.main +import capa.rules +import capa.rules.cache +import capa.capabilities.static +from capa.features.common import String + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _load_ruleset() -> capa.rules.RuleSet: + rules_path = pathlib.Path(__file__).parent.parent / "rules" + if not rules_path.is_dir(): + print(f"[!] rules/ directory not found at {rules_path}", file=sys.stderr) + sys.exit(1) + return capa.rules.get_rules([rules_path], enable_cache=True) + + +def _make_extractor(binary_path: pathlib.Path): + """Return a vivisect StaticFeatureExtractor for *binary_path*, or None.""" + try: + import capa.loader + + extractor = capa.loader.get_extractor( + binary_path, + input_format="auto", + os_="auto", + backend=capa.main.BACKEND_VIV, + sigpaths=[], + should_save_workspace=False, + disable_progress=True, + ) + return extractor + except Exception as exc: + print(f" [!] could not load {binary_path.name}: {exc}", file=sys.stderr) + return None + + +def _measure_prefilter(ruleset: capa.rules.RuleSet, extractor) -> tuple[int, int, float]: + """ + Run prepare_for_file() once and return + (n_file_strings, n_skipped_rules, overhead_seconds). + Does not disturb the ruleset state. + """ + file_strings: frozenset[str] = frozenset( + feat.value for feat, _ in extractor.extract_file_features() if isinstance(feat, String) + ) + t0 = time.perf_counter() + ruleset.prepare_for_file(file_strings) + t1 = time.perf_counter() + n_skipped = len(ruleset._impossible_string_rule_names) + ruleset.prepare_for_file(frozenset()) # restore + return len(file_strings), n_skipped, (t1 - t0) + + +def _time_find_capabilities( + ruleset: capa.rules.RuleSet, + extractor, + *, + prefilter: bool, + n_runs: int, +) -> tuple[float, int]: + """ + Run find_static_capabilities() n_runs times and return + (median_seconds, n_functions). + """ + durations: list[float] = [] + n_funcs = 0 + + original_prepare = capa.rules.RuleSet.prepare_for_file + + if not prefilter: + # Monkey-patch prepare_for_file to be a no-op so the pre-filter never + # activates, giving us a clean "before" baseline. + def _noop(self, file_strings): # type: ignore[misc] + self._impossible_string_rule_names = set() + + capa.rules.RuleSet.prepare_for_file = _noop # type: ignore[method-assign] + + try: + for _ in range(n_runs): + t0 = time.perf_counter() + caps = capa.capabilities.static.find_static_capabilities( + ruleset, extractor, disable_progress=True + ) + t1 = time.perf_counter() + durations.append(t1 - t0) + + if n_funcs == 0: + n_funcs = len(caps.feature_counts.functions) + finally: + capa.rules.RuleSet.prepare_for_file = original_prepare # type: ignore[method-assign] + + return statistics.median(durations), n_funcs + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +_DEFAULT_SAMPLES = [ + # small – packed/minimal strings + "tests/data/Practical Malware Analysis Lab 01-02.exe_", + # medium – typical malware + "tests/data/0a30182ff3a6b67beb0f2cda9d0de678.exe_", + "tests/data/7fbc17a09cf5320c515fc1c5ba42c8b3.exe_", + # larger – more functions + "tests/data/321338196a46b600ea330fc5d98d0699.exe_", +] + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--runs", type=int, default=3, help="median over this many runs (default: 3)") + parser.add_argument("binaries", nargs="*", metavar="BINARY", help="binary paths to benchmark") + args = parser.parse_args() + + root = pathlib.Path(__file__).parent.parent + if args.binaries: + samples = [pathlib.Path(b) for b in args.binaries] + else: + samples = [root / s for s in _DEFAULT_SAMPLES] + samples = [s for s in samples if s.exists()] + + if not samples: + print("[!] no sample files found; pass binary paths explicitly", file=sys.stderr) + sys.exit(1) + + print(f"Loading rules …", end="", flush=True) + ruleset = _load_ruleset() + + # Count unique string-dependent rules across all scopes. + seen: set[str] = set() + for fi in ruleset._feature_indexes_by_scopes.values(): + seen.update(fi.string_rules.keys()) + n_string_rules = len(seen) + print(f" {len(ruleset.rules)} rules total, {n_string_rules} string-dependent") + print() + + col_w = 44 + hdr = ( + f"{'Binary':<{col_w}} {'Funcs':>6} {'Strs':>7} " + f"{'w/o filter':>10} {'w/ filter':>10} " + f"{'Speedup':>7} {'Overhead':>8} {'Net gain':>8} {'Skipped':>12}" + ) + print(hdr) + print("-" * len(hdr)) + + for sample in samples: + name = sample.name + if len(name) > col_w - 1: + name = "…" + name[-(col_w - 2):] + + extractor = _make_extractor(sample) + if extractor is None: + continue + + # Measure prepare_for_file overhead and skipped rule count. + n_file_strings, n_skipped, t_overhead = _measure_prefilter(ruleset, extractor) + + print(f" {name:<{col_w - 2}} ", end="", flush=True) + + # "Before": no prefilter + t_before, n_funcs = _time_find_capabilities( + ruleset, extractor, prefilter=False, n_runs=args.runs + ) + + # "After": with prefilter + t_after, _ = _time_find_capabilities( + ruleset, extractor, prefilter=True, n_runs=args.runs + ) + + saved = t_before - t_after + speedup = t_before / t_after if t_after > 0 else float("inf") + pct_skipped = 100.0 * n_skipped / n_string_rules if n_string_rules else 0.0 + # Net gain = saved matching time minus upfront overhead + net = saved - t_overhead + + print( + f"{n_funcs:>6} {n_file_strings:>7} {t_before:>9.2f}s {t_after:>9.2f}s " + f"{speedup:>6.2f}x {t_overhead*1000:>6.0f}ms {net*1000:>+7.0f}ms " + f"{n_skipped:>4}/{n_string_rules} ({pct_skipped:.0f}%)" + ) + + print() + print("Notes:") + print(f" Times are median over {args.runs} run(s); perf_counter precision.") + print(" 'w/o filter' patches prepare_for_file() to a no-op (clean baseline).") + print(" 'Overhead' = wall time of prepare_for_file() alone (one-time cost per binary).") + print(" 'Net gain' = (w/o filter - w/ filter) - Overhead; positive = faster overall.") + print(" 'Skipped' = string rules pruned because patterns are absent from the binary.") + print(" 'Strs' = distinct String values found in the binary at file scope.") + + +if __name__ == "__main__": + main() From ebd9a62e56eb7ee6a85af069ee262b6693cfef2d Mon Sep 17 00:00:00 2001 From: devs6186 Date: Tue, 7 Apr 2026 13:31:56 +0530 Subject: [PATCH 4/7] rules: fix anchored-regex false-negative in prepare_for_file The concat-string optimisation in prepare_for_file() was unsafe for anchored regex patterns (^ / $). re.search("^foo", "bar\x01foo") returns no match because ^ binds to the start of the whole concatenated string, not the start of each individual file string. 12 of the 83 default string-dependent rules carry such anchors (e.g. /^docker.*/, /^Go buildinf:/, /^BXPC/). The optimisation would mark those rules impossible even when the matching string was present in the binary, producing false negatives. Fix: revert Regex patterns to per-string scanning; keep the concat optimisation only for Substring patterns, where it is unconditionally safe (literal values cannot span a \x01 boundary). Also fix benchmark script (scripts/benchmark_string_prefilter.py) lint issues: import order, f-string without placeholders (F541). Add regression test: test_string_prefilter_anchored_regex_correctness verifies that /^foo$/ is never marked impossible when "foo" appears in file_strings, and is correctly marked impossible when it does not. --- capa/rules/__init__.py | 50 +++-- scripts/benchmark_string_prefilter.py | 24 +-- tests/test_match.py | 266 +++++++++++++++++++------- 3 files changed, 230 insertions(+), 110 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index a748c001e..2ed6d8d5d 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1967,10 +1967,10 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: See: https://github.com/mandiant/capa/issues/2126 - Performance note: this method scans file_strings once per unique string-rule. - Cost is O(|string_rules| * |file_strings|) in the worst case, but typically - much faster because most rules' patterns are absent and `re.search` on a - concatenated string (see below) does the work in a single pass. + Performance note: Substring patterns are checked via a single scan of a + concatenated string (O(1) calls, fast C-level `in`). Regex patterns require + a per-string scan (O(|file_strings|) calls) because ^ / $ anchors would bind + to the boundaries of the whole concat rather than each individual string. """ if not file_strings: self._impossible_string_rule_names = set() @@ -1978,17 +1978,12 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: # Build a single concatenated string from all file strings separated by \x01. # \x01 is not present in capa rule patterns nor in file strings (which are - # printable ASCII sequences from the binary). - # Using this concat lets us do ONE re.search per rule (fast C-level scan) - # instead of iterating over every file string. - # - # If the concat-level scan finds no match, the rule is provably impossible - # (a match on an individual string would also appear in the concat). - # - # If it does find a match, we confirm per-string to avoid false positives: - # a pattern compiled with re.DOTALL treats `.` as matching any character - # including \x01, so `SELECT.*FROM.*WHERE` could match across the boundary - # of two unrelated strings. The per-string confirmation resolves this. + # printable-ASCII sequences extracted from the binary). Joining lets us check + # Substring patterns with a single C-level `in` scan instead of one per string. + # Note: this concat is used ONLY for Substring patterns; Regex patterns require + # per-string scanning because ^ / $ anchors bind to the start/end of the whole + # concat rather than each individual string (12 of the 83 default string rules + # use such anchors). concat_strings: str = "\x01".join(file_strings) impossible: set[str] = set() @@ -2003,22 +1998,23 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: can_match = False for feat in wanted_strings: if isinstance(feat, capa.features.common.Substring): - # Fast: single C-level scan of the concatenated string. - # No false-positive risk for Substring because feat.value - # cannot span a \x01 boundary (the pattern is a literal string - # and \x01 is never present in rule patterns). + # Fast path: scan the concatenated string once (O(1) calls). + # Safe because feat.value is a printable-ASCII literal and + # \x01 never appears in rule patterns, so there are no false + # positives or negatives from the \x01 boundary. if feat.value in concat_strings: can_match = True break elif isinstance(feat, capa.features.common.Regex): - # Phase 1: check the concatenated string first. - # This is usually a definitive NO (impossible rule) in one call. - # When it returns a match, run per-string to confirm and avoid - # false positives from patterns that accidentally span \x01. - if feat.re.search(concat_strings): - if any(feat.re.search(s) for s in file_strings): - can_match = True - break + # Must scan each file string individually. + # Searching the concatenated string is unsafe for anchored + # patterns (^ / $): `re.search("^foo", "bar\x01foo")` fails + # because ^ anchors to the start of the whole concat, not the + # start of each individual string. 12 of the 83 string rules + # in the default rule set use such anchors. + if any(feat.re.search(s) for s in file_strings): + can_match = True + break else: # Unknown feature type: keep to be safe. can_match = True diff --git a/scripts/benchmark_string_prefilter.py b/scripts/benchmark_string_prefilter.py index 690c24b5f..0cc722853 100644 --- a/scripts/benchmark_string_prefilter.py +++ b/scripts/benchmark_string_prefilter.py @@ -18,12 +18,12 @@ python scripts/benchmark_string_prefilter.py --runs 3 """ -import argparse +import sys +import time import logging import pathlib +import argparse import statistics -import sys -import time # Silence capa progress output during benchmarking. logging.disable(logging.WARNING) @@ -34,11 +34,11 @@ import capa.capabilities.static from capa.features.common import String - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- + def _load_ruleset() -> capa.rules.RuleSet: rules_path = pathlib.Path(__file__).parent.parent / "rules" if not rules_path.is_dir(): @@ -111,9 +111,7 @@ def _noop(self, file_strings): # type: ignore[misc] try: for _ in range(n_runs): t0 = time.perf_counter() - caps = capa.capabilities.static.find_static_capabilities( - ruleset, extractor, disable_progress=True - ) + caps = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) t1 = time.perf_counter() durations.append(t1 - t0) @@ -157,7 +155,7 @@ def main(): print("[!] no sample files found; pass binary paths explicitly", file=sys.stderr) sys.exit(1) - print(f"Loading rules …", end="", flush=True) + print("Loading rules \u2026", end="", flush=True) ruleset = _load_ruleset() # Count unique string-dependent rules across all scopes. @@ -180,7 +178,7 @@ def main(): for sample in samples: name = sample.name if len(name) > col_w - 1: - name = "…" + name[-(col_w - 2):] + name = "…" + name[-(col_w - 2) :] extractor = _make_extractor(sample) if extractor is None: @@ -192,14 +190,10 @@ def main(): print(f" {name:<{col_w - 2}} ", end="", flush=True) # "Before": no prefilter - t_before, n_funcs = _time_find_capabilities( - ruleset, extractor, prefilter=False, n_runs=args.runs - ) + t_before, n_funcs = _time_find_capabilities(ruleset, extractor, prefilter=False, n_runs=args.runs) # "After": with prefilter - t_after, _ = _time_find_capabilities( - ruleset, extractor, prefilter=True, n_runs=args.runs - ) + t_after, _ = _time_find_capabilities(ruleset, extractor, prefilter=True, n_runs=args.runs) saved = t_before - t_after speedup = t_before / t_after if t_after > 0 else float("inf") diff --git a/tests/test_match.py b/tests/test_match.py index 687d05d5b..06441abc7 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -46,7 +46,8 @@ def match(rules, features, va, scope=Scope.FUNCTION): def test_match_simple(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -56,7 +57,8 @@ def test_match_simple(): namespace: testns1/testns2 features: - number: 100 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) features, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) @@ -67,7 +69,8 @@ def test_match_simple(): def test_match_range_exact(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -76,7 +79,8 @@ def test_match_range_exact(): dynamic: process features: - count(number(100)): 2 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # just enough matches @@ -93,7 +97,8 @@ def test_match_range_exact(): def test_match_range_range(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -102,7 +107,8 @@ def test_match_range_range(): dynamic: process features: - count(number(100)): (2, 3) - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # just enough matches @@ -123,7 +129,8 @@ def test_match_range_range(): def test_match_range_exact_zero(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -139,7 +146,8 @@ def test_match_range_exact_zero(): # so we have this additional trivial feature. - mnemonic: mov - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # feature isn't indexed - good. @@ -157,7 +165,8 @@ def test_match_range_exact_zero(): def test_match_range_with_zero(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -172,7 +181,8 @@ def test_match_range_with_zero(): # since we don't support top level NOT statements. # so we have this additional trivial feature. - mnemonic: mov - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # ok @@ -190,7 +200,8 @@ def test_match_range_with_zero(): def test_match_adds_matched_rule_feature(): """show that using `match` adds a feature for matched rules.""" - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -199,7 +210,8 @@ def test_match_adds_matched_rule_feature(): dynamic: process features: - number: 100 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) features, _ = match([r], {capa.features.insn.Number(100): {1}}, 0x0) assert capa.features.common.MatchedRule("test rule") in features @@ -208,7 +220,9 @@ def test_match_adds_matched_rule_feature(): def test_match_matched_rules(): """show that using `match` adds a feature for matched rules.""" rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule1 @@ -217,8 +231,12 @@ def test_match_matched_rules(): dynamic: process features: - number: 100 - """)), - capa.rules.Rule.from_yaml(textwrap.dedent(""" + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule2 @@ -227,7 +245,9 @@ def test_match_matched_rules(): dynamic: process features: - match: test rule1 - """)), + """ + ) + ), ] features, _ = match( @@ -251,7 +271,9 @@ def test_match_matched_rules(): def test_match_namespace(): rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: CreateFile API @@ -261,8 +283,12 @@ def test_match_namespace(): namespace: file/create/CreateFile features: - api: CreateFile - """)), - capa.rules.Rule.from_yaml(textwrap.dedent(""" + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: WriteFile API @@ -272,8 +298,12 @@ def test_match_namespace(): namespace: file/write features: - api: WriteFile - """)), - capa.rules.Rule.from_yaml(textwrap.dedent(""" + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: file-create @@ -282,8 +312,12 @@ def test_match_namespace(): dynamic: process features: - match: file/create - """)), - capa.rules.Rule.from_yaml(textwrap.dedent(""" + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: filesystem-any @@ -292,7 +326,9 @@ def test_match_namespace(): dynamic: process features: - match: file - """)), + """ + ) + ), ] features, matches = match( @@ -319,7 +355,9 @@ def test_match_namespace(): def test_match_substring(): rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule @@ -329,7 +367,9 @@ def test_match_substring(): features: - and: - substring: abc - """)), + """ + ) + ), ] features, _ = match( capa.rules.topologically_order_rules(rules), @@ -369,7 +409,9 @@ def test_match_substring(): def test_match_regex(): rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule @@ -379,8 +421,12 @@ def test_match_regex(): features: - and: - string: /.*bbbb.*/ - """)), - capa.rules.Rule.from_yaml(textwrap.dedent(""" + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: rule with implied wildcards @@ -390,8 +436,12 @@ def test_match_regex(): features: - and: - string: /bbbb/ - """)), - capa.rules.Rule.from_yaml(textwrap.dedent(""" + """ + ) + ), + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: rule with anchor @@ -401,7 +451,9 @@ def test_match_regex(): features: - and: - string: /^bbbb/ - """)), + """ + ) + ), ] features, _ = match( capa.rules.topologically_order_rules(rules), @@ -436,7 +488,9 @@ def test_match_regex(): def test_match_regex_ignorecase(): rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule @@ -446,7 +500,9 @@ def test_match_regex_ignorecase(): features: - and: - string: /.*bbbb.*/i - """)), + """ + ) + ), ] features, _ = match( capa.rules.topologically_order_rules(rules), @@ -458,7 +514,9 @@ def test_match_regex_ignorecase(): def test_match_regex_complex(): rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(r""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + r""" rule: meta: name: test rule @@ -468,7 +526,9 @@ def test_match_regex_complex(): features: - or: - string: /.*HARDWARE\\Key\\key with spaces\\.*/i - """)), + """ + ) + ), ] features, _ = match( capa.rules.topologically_order_rules(rules), @@ -480,7 +540,9 @@ def test_match_regex_complex(): def test_match_regex_values_always_string(): rules = [ - capa.rules.Rule.from_yaml(textwrap.dedent(""" + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ rule: meta: name: test rule @@ -491,7 +553,9 @@ def test_match_regex_values_always_string(): - or: - string: /123/ - string: /0x123/ - """)), + """ + ) + ), ] features, _ = match( capa.rules.topologically_order_rules(rules), @@ -523,7 +587,8 @@ def test_regex_get_value_str(pattern): @pytest.mark.xfail(reason="can't have top level NOT") def test_match_only_not(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -534,7 +599,8 @@ def test_match_only_not(): features: - not: - number: 99 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) @@ -542,7 +608,8 @@ def test_match_only_not(): def test_match_not(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -555,7 +622,8 @@ def test_match_not(): - mnemonic: mov - not: - number: 99 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match([r], {capa.features.insn.Number(100): {1, 2}, capa.features.insn.Mnemonic("mov"): {1, 2}}, 0x0) @@ -564,7 +632,8 @@ def test_match_not(): @pytest.mark.xfail(reason="can't have nested NOT") def test_match_not_not(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -576,7 +645,8 @@ def test_match_not_not(): - not: - not: - number: 100 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) @@ -584,7 +654,8 @@ def test_match_not_not(): def test_match_operand_number(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -594,7 +665,8 @@ def test_match_operand_number(): features: - and: - operand[0].number: 0x10 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) assert capa.features.insn.OperandNumber(0, 0x10) in {capa.features.insn.OperandNumber(0, 0x10)} @@ -612,7 +684,8 @@ def test_match_operand_number(): def test_match_operand_offset(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -622,7 +695,8 @@ def test_match_operand_offset(): features: - and: - operand[0].offset: 0x10 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) assert capa.features.insn.OperandOffset(0, 0x10) in {capa.features.insn.OperandOffset(0, 0x10)} @@ -640,7 +714,8 @@ def test_match_operand_offset(): def test_match_property_access(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -650,7 +725,8 @@ def test_match_property_access(): features: - and: - property/read: System.IO.FileInfo::Length - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) assert capa.features.insn.Property("System.IO.FileInfo::Length", capa.features.common.FeatureAccess.READ) in { @@ -682,7 +758,8 @@ def test_match_property_access(): def test_match_os_any(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -700,7 +777,8 @@ def test_match_os_any(): - and: - os: any - string: "Goodbye world" - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match( @@ -734,7 +812,8 @@ def test_match_os_any(): # this test demonstrates the behavior of unstable features that may change before the next major release. def test_index_features_and_unstable(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -745,7 +824,8 @@ def test_index_features_and_unstable(): - and: - mnemonic: mov - api: CreateFileW - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) rr = capa.rules.RuleSet([r]) index: capa.rules.RuleSet._RuleFeatureIndex = rr._feature_indexes_by_scopes[capa.rules.Scope.FUNCTION] @@ -761,7 +841,8 @@ def test_index_features_and_unstable(): # this test demonstrates the behavior of unstable features that may change before the next major release. def test_index_features_or_unstable(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -772,7 +853,8 @@ def test_index_features_or_unstable(): - or: - mnemonic: mov - api: CreateFileW - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) rr = capa.rules.RuleSet([r]) index: capa.rules.RuleSet._RuleFeatureIndex = rr._feature_indexes_by_scopes[capa.rules.Scope.FUNCTION] @@ -789,7 +871,8 @@ def test_index_features_or_unstable(): # this test demonstrates the behavior of unstable features that may change before the next major release. def test_index_features_nested_unstable(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -802,7 +885,8 @@ def test_index_features_nested_unstable(): - or: - api: CreateFileW - string: foo - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) rr = capa.rules.RuleSet([r]) index: capa.rules.RuleSet._RuleFeatureIndex = rr._feature_indexes_by_scopes[capa.rules.Scope.FUNCTION] @@ -847,9 +931,46 @@ def test_string_prefilter_stack_string_fallback(): assert "test string prefilter stack string fallback" in matches +def test_string_prefilter_anchored_regex_correctness(): + """Anchored regex patterns must not be falsely marked impossible by the prefilter. + + The prefilter joins file strings with \\x01 for Substring checks. Regex patterns + must be checked per-string because ^ / $ bind to the start/end of the whole concat + string, not each individual entry. A rule with /^foo$/ must remain active when + "foo" is present in file_strings, even though it would not match the concat string. + """ + rule_text = textwrap.dedent(""" + rule: + meta: + name: test anchored regex prefilter + scopes: + static: function + dynamic: process + features: + - string: /^foo$/ + """) + rule = capa.rules.Rule.from_yaml(rule_text) + ruleset = capa.rules.RuleSet([rule]) + + # "foo" is in file_strings — rule must NOT be marked impossible. + ruleset.prepare_for_file(frozenset({"foo", "bar", "baz"})) + assert "test anchored regex prefilter" not in ruleset._impossible_string_rule_names + + _, matches = ruleset.match(Scope.FUNCTION, {String("foo"): {0x0}}, 0x0) + assert "test anchored regex prefilter" in matches + + # When "foo" is absent from file_strings, the rule IS impossible. + ruleset.prepare_for_file(frozenset({"bar", "baz"})) + assert "test anchored regex prefilter" in ruleset._impossible_string_rule_names + + _, matches = ruleset.match(Scope.FUNCTION, {String("foo"): {0x0}}, 0x0) + assert "test anchored regex prefilter" not in matches + + def test_bytes_prefix_index_correctness(): """Verify that the bytes prefix pre-filter preserves match behavior.""" - rule_text = textwrap.dedent(""" + rule_text = textwrap.dedent( + """ rule: meta: name: test bytes prefix index @@ -858,7 +979,8 @@ def test_bytes_prefix_index_correctness(): dynamic: process features: - bytes: 90 90 90 90 90 90 90 90 90 90 90 90 90 90 90 90 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule_text) # 16 nop bytes - exact match @@ -883,7 +1005,8 @@ def test_bytes_prefix_index_correctness(): def test_bytes_prefix_index_collision(): - rule_text = textwrap.dedent(""" + rule_text = textwrap.dedent( + """ rule: meta: name: test bytes prefix collision @@ -892,7 +1015,8 @@ def test_bytes_prefix_index_collision(): dynamic: process features: - bytes: 41 42 43 44 45 46 47 48 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule_text) features = { @@ -904,7 +1028,8 @@ def test_bytes_prefix_index_collision(): def test_bytes_prefix_index_short_pattern_fallback(): - rule_text = textwrap.dedent(""" + rule_text = textwrap.dedent( + """ rule: meta: name: test bytes short prefix fallback @@ -913,7 +1038,8 @@ def test_bytes_prefix_index_short_pattern_fallback(): dynamic: process features: - bytes: 41 42 43 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule_text) _, matches = match([r], {capa.features.common.Bytes(b"ABCDEF"): {0x0}}, 0x0) @@ -925,7 +1051,8 @@ def test_bytes_prefix_index_short_pattern_fallback(): def test_bytes_prefix_index_mixed_short_and_long_patterns(): """A rule with both a short (<4B) and a long (>=4B) bytes pattern exercises both code paths.""" - short_rule_text = textwrap.dedent(""" + short_rule_text = textwrap.dedent( + """ rule: meta: name: test short pattern rule @@ -934,8 +1061,10 @@ def test_bytes_prefix_index_mixed_short_and_long_patterns(): dynamic: process features: - bytes: AA BB - """) - long_rule_text = textwrap.dedent(""" + """ + ) + long_rule_text = textwrap.dedent( + """ rule: meta: name: test long pattern rule @@ -944,7 +1073,8 @@ def test_bytes_prefix_index_mixed_short_and_long_patterns(): dynamic: process features: - bytes: CC DD EE FF 11 22 33 44 - """) + """ + ) short_rule = capa.rules.Rule.from_yaml(short_rule_text) long_rule = capa.rules.Rule.from_yaml(long_rule_text) From a4595fa69fdd762fadd75f25c9c14e9b21ad15ae Mon Sep 17 00:00:00 2001 From: devs6186 Date: Tue, 7 Apr 2026 13:47:35 +0530 Subject: [PATCH 5/7] scripts: fix benchmark net-gain formula (overhead was double-subtracted) --- scripts/benchmark_string_prefilter.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/scripts/benchmark_string_prefilter.py b/scripts/benchmark_string_prefilter.py index 0cc722853..c423dce97 100644 --- a/scripts/benchmark_string_prefilter.py +++ b/scripts/benchmark_string_prefilter.py @@ -195,11 +195,13 @@ def main(): # "After": with prefilter t_after, _ = _time_find_capabilities(ruleset, extractor, prefilter=True, n_runs=args.runs) - saved = t_before - t_after + # t_after already includes the prepare_for_file overhead, so the true + # wall-clock net gain is simply t_before - t_after. + # t_overhead is shown separately so the reader can see how much of the + # cost is the one-time scan vs how much is recovered in matching. + net = t_before - t_after speedup = t_before / t_after if t_after > 0 else float("inf") pct_skipped = 100.0 * n_skipped / n_string_rules if n_string_rules else 0.0 - # Net gain = saved matching time minus upfront overhead - net = saved - t_overhead print( f"{n_funcs:>6} {n_file_strings:>7} {t_before:>9.2f}s {t_after:>9.2f}s " @@ -211,8 +213,9 @@ def main(): print("Notes:") print(f" Times are median over {args.runs} run(s); perf_counter precision.") print(" 'w/o filter' patches prepare_for_file() to a no-op (clean baseline).") - print(" 'Overhead' = wall time of prepare_for_file() alone (one-time cost per binary).") - print(" 'Net gain' = (w/o filter - w/ filter) - Overhead; positive = faster overall.") + print(" 'Overhead' = wall time of prepare_for_file() alone (informational).") + print(" 'Net gain' = w/o filter - w/ filter; t_after includes overhead, so this") + print(" is the true end-to-end wall-clock delta. Positive = faster.") print(" 'Skipped' = string rules pruned because patterns are absent from the binary.") print(" 'Strs' = distinct String values found in the binary at file scope.") From 7bac4fa0d532c03c45fc69250554dd4def0d7a53 Mon Sep 17 00:00:00 2001 From: devs6186 Date: Tue, 7 Apr 2026 13:54:32 +0530 Subject: [PATCH 6/7] rules: restore concat scan for Regex using re.MULTILINE on \n-joined strings The per-string fallback introduced to fix the anchored-pattern bug carried a high scan cost: O(|string_rules| * |file_strings|) regex calls (~100 k for a 1225-string binary, ~80 ms). Fix: use a \n-separated concat and compile each pattern with re.MULTILINE added. With MULTILINE, ^ and $ match at \n boundaries, so /^docker.*/ correctly finds "docker ps" in "other\ndocker ps\nfoo" without a per-string loop. Fallback: if the concat scan matches and the pattern was compiled with re.DOTALL, .* can bridge two adjacent lines (false positive); per-string confirmation then decides the true outcome. This fallback triggers only for the small subset of rules whose .* spans the boundary, leaving the majority (non-false-positive cases) handled in one re.search call. Measured overhead (1147-1225 file strings, 83 string rules): Before (per-string): ~80-90 ms After (MULTILINE): ~40-50 ms The anchored-regex regression test (test_string_prefilter_anchored_regex_correctness) continues to pass, confirming /^foo$/ is not falsely marked impossible when "foo" is present in file_strings. --- capa/rules/__init__.py | 62 ++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 2ed6d8d5d..d8c3eb77e 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1967,24 +1967,34 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: See: https://github.com/mandiant/capa/issues/2126 - Performance note: Substring patterns are checked via a single scan of a - concatenated string (O(1) calls, fast C-level `in`). Regex patterns require - a per-string scan (O(|file_strings|) calls) because ^ / $ anchors would bind - to the boundaries of the whole concat rather than each individual string. + Performance note: both Substring and Regex patterns use a concatenated-string + fast path (O(1) regex calls per rule) rather than per-string iteration. + Substring uses a \x01-joined concat with a C-level `in` check. Regex uses a + \n-joined concat with re.MULTILINE so that ^ / $ bind to line boundaries (one + re.search per rule). A per-string fallback runs only when the concat scan + matches, to rule out DOTALL false positives where .* spans a \n boundary. """ if not file_strings: self._impossible_string_rule_names = set() return - # Build a single concatenated string from all file strings separated by \x01. - # \x01 is not present in capa rule patterns nor in file strings (which are - # printable-ASCII sequences extracted from the binary). Joining lets us check - # Substring patterns with a single C-level `in` scan instead of one per string. - # Note: this concat is used ONLY for Substring patterns; Regex patterns require - # per-string scanning because ^ / $ anchors bind to the start/end of the whole - # concat rather than each individual string (12 of the 83 default string rules - # use such anchors). - concat_strings: str = "\x01".join(file_strings) + # Two concatenated forms are used to accelerate the scan: + # + # concat_substr (\x01-separated) — for Substring patterns. + # A literal pattern cannot span a \x01 boundary (rule patterns are + # printable ASCII; \x01 never appears in them or in extracted strings). + # One C-level `in` check replaces N per-string comparisons. + # + # concat_regex (\n-separated) — for Regex patterns. + # Each pattern is compiled with re.MULTILINE added so that ^ and $ + # match at \n boundaries rather than only at the start/end of the whole + # string. This fixes the anchor bug: `re.search("^foo", "bar\nfoo")` + # succeeds when re.MULTILINE is set. One re.search per rule replaces N + # per-string calls. For patterns compiled with re.DOTALL, `.` also + # matches \n, so `.*` could bridge two adjacent strings (false positive); + # per-string confirmation handles that case. + concat_substr: str = "\x01".join(file_strings) + concat_regex: str = "\n".join(file_strings) impossible: set[str] = set() all_string_rule_names: set[str] = set() @@ -1998,23 +2008,21 @@ def prepare_for_file(self, file_strings: frozenset[str]) -> None: can_match = False for feat in wanted_strings: if isinstance(feat, capa.features.common.Substring): - # Fast path: scan the concatenated string once (O(1) calls). - # Safe because feat.value is a printable-ASCII literal and - # \x01 never appears in rule patterns, so there are no false - # positives or negatives from the \x01 boundary. - if feat.value in concat_strings: + if feat.value in concat_substr: can_match = True break elif isinstance(feat, capa.features.common.Regex): - # Must scan each file string individually. - # Searching the concatenated string is unsafe for anchored - # patterns (^ / $): `re.search("^foo", "bar\x01foo")` fails - # because ^ anchors to the start of the whole concat, not the - # start of each individual string. 12 of the 83 string rules - # in the default rule set use such anchors. - if any(feat.re.search(s) for s in file_strings): - can_match = True - break + # Re-compile with MULTILINE so ^ / $ respect \n boundaries. + # Python's re module caches compiled patterns internally, so + # the recompile cost is paid only on the first call. + ml_re = re.compile(feat.re.pattern, feat.re.flags | re.MULTILINE) + if ml_re.search(concat_regex): + # Concat matched: confirm per-string to rule out false + # positives from DOTALL patterns whose .* spans a \n. + if any(feat.re.search(s) for s in file_strings): + can_match = True + break + # No concat match → pattern is absent from every file string. else: # Unknown feature type: keep to be safe. can_match = True From 4289ef3cb4134d2e5ce8458a041dad55cec65317 Mon Sep 17 00:00:00 2001 From: devs6186 Date: Tue, 7 Apr 2026 16:24:47 +0530 Subject: [PATCH 7/7] scripts: benchmark parity verification, 8-binary corpus, interleaved runs - Add _verify_parity(): runs find_static_capabilities() with and without the pre-filter, compares (rule_name, address) pairs; reports PASS/FAIL per binary to prove no semantic drift - Expand _DEFAULT_SAMPLES from 4 to 8 binaries spanning tiny (~3 KB) to extra-large (~982 KB) for broader coverage - Switch to _time_interleaved(): alternates W/O -> W/ on each run to reduce load-spike variance bias in the median - Add geometric mean speedup summary across all binaries - Add --skip-parity flag for faster runs when correctness is already known - Fix all non-ASCII characters in printed output (console portability) --- scripts/benchmark_string_prefilter.py | 185 +++++++++++++++++++------- 1 file changed, 140 insertions(+), 45 deletions(-) diff --git a/scripts/benchmark_string_prefilter.py b/scripts/benchmark_string_prefilter.py index c423dce97..db9440e58 100644 --- a/scripts/benchmark_string_prefilter.py +++ b/scripts/benchmark_string_prefilter.py @@ -9,13 +9,17 @@ Usage: python scripts/benchmark_string_prefilter.py [--runs N] [binary ...] -If no binary paths are given the script picks a small representative set -from tests/data/. Each binary is analysed RUNS times in each mode; the -median is reported. The script uses the vivisect back-end, which needs no -external tools. +If no binary paths are given the script picks a representative set from +tests/data/ spanning small/medium/large binaries. Each binary is analysed +RUNS times in each mode; the median is reported. Runs are interleaved +(W/O, W/, W/O, W/, ...) to reduce load-spike bias. + +A parity check is performed for every binary: matched rule names and +addresses must be identical with and without the pre-filter. FAIL means +a correctness regression. Example: - python scripts/benchmark_string_prefilter.py --runs 3 + python scripts/benchmark_string_prefilter.py --runs 5 """ import sys @@ -84,63 +88,127 @@ def _measure_prefilter(ruleset: capa.rules.RuleSet, extractor) -> tuple[int, int return len(file_strings), n_skipped, (t1 - t0) -def _time_find_capabilities( +def _verify_parity(ruleset: capa.rules.RuleSet, extractor) -> tuple[bool, str]: + """ + Run find_static_capabilities() with and without the pre-filter and + confirm that the set of matched (rule_name, address) pairs is identical. + + Returns (ok: bool, detail: str). ok=True means no semantic drift. + """ + original_prepare = capa.rules.RuleSet.prepare_for_file + + # run WITHOUT pre-filter + def _noop(self, file_strings): # type: ignore[misc] + self._impossible_string_rule_names = set() + + capa.rules.RuleSet.prepare_for_file = _noop # type: ignore[method-assign] + try: + caps_without = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + finally: + capa.rules.RuleSet.prepare_for_file = original_prepare # type: ignore[method-assign] + + # Build (rule_name, addr_repr) sets -- exclude subscope rules + def _rule_addr_set(caps): + result: set[tuple[str, str]] = set() + for rule_name, matches in caps.matches.items(): + if ruleset.rules[rule_name].is_subscope_rule(): + continue + for addr, _ in matches: + result.add((rule_name, repr(addr))) + return result + + without_set = _rule_addr_set(caps_without) + + # run WITH pre-filter (normal path) + caps_with = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + with_set = _rule_addr_set(caps_with) + + if without_set == with_set: + return True, "PASS" + + extra = with_set - without_set + missing = without_set - with_set + parts = [] + if missing: + rules_missing = {r for r, _ in missing} + parts.append(f"MISSING {len(missing)} matches in {len(rules_missing)} rules") + if extra: + rules_extra = {r for r, _ in extra} + parts.append(f"EXTRA {len(extra)} matches in {len(rules_extra)} rules") + return False, "FAIL: " + "; ".join(parts) + + +def _time_interleaved( ruleset: capa.rules.RuleSet, extractor, - *, - prefilter: bool, n_runs: int, -) -> tuple[float, int]: +) -> tuple[float, float, int]: """ - Run find_static_capabilities() n_runs times and return - (median_seconds, n_functions). + Alternate WITHOUT / WITH runs to reduce load-spike variance bias. + Returns (median_without, median_with, n_functions). """ - durations: list[float] = [] - n_funcs = 0 - original_prepare = capa.rules.RuleSet.prepare_for_file - if not prefilter: - # Monkey-patch prepare_for_file to be a no-op so the pre-filter never - # activates, giving us a clean "before" baseline. - def _noop(self, file_strings): # type: ignore[misc] - self._impossible_string_rule_names = set() + def _noop(self, file_strings): # type: ignore[misc] + self._impossible_string_rule_names = set() - capa.rules.RuleSet.prepare_for_file = _noop # type: ignore[method-assign] + without_times: list[float] = [] + with_times: list[float] = [] + n_funcs = 0 - try: - for _ in range(n_runs): + for _ in range(n_runs): + # WITHOUT + capa.rules.RuleSet.prepare_for_file = _noop # type: ignore[method-assign] + try: t0 = time.perf_counter() caps = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) t1 = time.perf_counter() - durations.append(t1 - t0) + finally: + capa.rules.RuleSet.prepare_for_file = original_prepare # type: ignore[method-assign] + without_times.append(t1 - t0) + if n_funcs == 0: + n_funcs = len(caps.feature_counts.functions) - if n_funcs == 0: - n_funcs = len(caps.feature_counts.functions) - finally: - capa.rules.RuleSet.prepare_for_file = original_prepare # type: ignore[method-assign] + # WITH + t0 = time.perf_counter() + capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + t1 = time.perf_counter() + with_times.append(t1 - t0) - return statistics.median(durations), n_funcs + return statistics.median(without_times), statistics.median(with_times), n_funcs # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- +# 8 binaries spanning: tiny / small / medium-low / medium / medium-high / large. _DEFAULT_SAMPLES = [ - # small – packed/minimal strings + # tiny -- packed, minimal strings (~3 KB) "tests/data/Practical Malware Analysis Lab 01-02.exe_", - # medium – typical malware + # small -- simple loader (~17 KB) + "tests/data/4f509bdfe5a2fe4320cdc070eedc0a72e12cc08f43d60a7701305b3d1408102b.exe_", + # small-medium -- typical downloader (~45 KB) + "tests/data/7d16efd0078f22c17a4bd78b0f0cc468.exe_", + # medium-low -- common malware (~120 KB) "tests/data/0a30182ff3a6b67beb0f2cda9d0de678.exe_", + # medium -- string-heavy sample (~180 KB) "tests/data/7fbc17a09cf5320c515fc1c5ba42c8b3.exe_", - # larger – more functions + # medium-high -- larger malware (~410 KB) + "tests/data/152d4c9f63efb332ccb134c6953c0104.exe_", + # large -- complex binary (~486 KB) "tests/data/321338196a46b600ea330fc5d98d0699.exe_", + # extra-large -- many functions (~982 KB) + "tests/data/82bf6347acf15e5d883715dc289d8a2b.exe_", ] def main(): parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--runs", type=int, default=3, help="median over this many runs (default: 3)") + parser.add_argument( + "--skip-parity", action="store_true", help="skip the correctness parity check (faster, less safe)" + ) parser.add_argument("binaries", nargs="*", metavar="BINARY", help="binary paths to benchmark") args = parser.parse_args() @@ -155,7 +223,7 @@ def main(): print("[!] no sample files found; pass binary paths explicitly", file=sys.stderr) sys.exit(1) - print("Loading rules \u2026", end="", flush=True) + print("Loading rules ...", end="", flush=True) ruleset = _load_ruleset() # Count unique string-dependent rules across all scopes. @@ -170,15 +238,18 @@ def main(): hdr = ( f"{'Binary':<{col_w}} {'Funcs':>6} {'Strs':>7} " f"{'w/o filter':>10} {'w/ filter':>10} " - f"{'Speedup':>7} {'Overhead':>8} {'Net gain':>8} {'Skipped':>12}" + f"{'Speedup':>7} {'Overhead':>8} {'Net gain':>8} {'Skipped':>12} {'Parity':>6}" ) print(hdr) print("-" * len(hdr)) + speedups: list[float] = [] + parity_failures: list[str] = [] + for sample in samples: name = sample.name if len(name) > col_w - 1: - name = "…" + name[-(col_w - 2) :] + name = "..." + name[-(col_w - 4) :] extractor = _make_extractor(sample) if extractor is None: @@ -189,35 +260,59 @@ def main(): print(f" {name:<{col_w - 2}} ", end="", flush=True) - # "Before": no prefilter - t_before, n_funcs = _time_find_capabilities(ruleset, extractor, prefilter=False, n_runs=args.runs) + # Parity check (unless --skip-parity). + if not args.skip_parity: + parity_ok, parity_detail = _verify_parity(ruleset, extractor) + if not parity_ok: + parity_failures.append(f"{sample.name}: {parity_detail}") + else: + parity_ok, parity_detail = True, "SKIP" - # "After": with prefilter - t_after, _ = _time_find_capabilities(ruleset, extractor, prefilter=True, n_runs=args.runs) + # Interleaved timing (alternates W/O -> W/ each run to reduce bias). + t_before, t_after, n_funcs = _time_interleaved(ruleset, extractor, args.runs) # t_after already includes the prepare_for_file overhead, so the true # wall-clock net gain is simply t_before - t_after. - # t_overhead is shown separately so the reader can see how much of the - # cost is the one-time scan vs how much is recovered in matching. net = t_before - t_after speedup = t_before / t_after if t_after > 0 else float("inf") pct_skipped = 100.0 * n_skipped / n_string_rules if n_string_rules else 0.0 + speedups.append(speedup) + + parity_str = parity_detail if parity_detail in ("PASS", "SKIP") else "FAIL" print( f"{n_funcs:>6} {n_file_strings:>7} {t_before:>9.2f}s {t_after:>9.2f}s " - f"{speedup:>6.2f}x {t_overhead*1000:>6.0f}ms {net*1000:>+7.0f}ms " - f"{n_skipped:>4}/{n_string_rules} ({pct_skipped:.0f}%)" + f"{speedup:>6.2f}x {t_overhead * 1000:>6.0f}ms {net * 1000:>+7.0f}ms " + f"{n_skipped:>4}/{n_string_rules} ({pct_skipped:.0f}%) {parity_str:>6}" ) + print() + + if speedups: + geomean = 1.0 + for s in speedups: + geomean *= s + geomean **= 1.0 / len(speedups) + print(f"Geometric mean speedup across {len(speedups)} binaries: {geomean:.2f}x") + + if parity_failures: + print() + print(f"[!] PARITY FAILURES ({len(parity_failures)}):") + for msg in parity_failures: + print(f" {msg}") + elif not args.skip_parity: + print("All parity checks PASSED -- no semantic drift introduced by pre-filter.") + print() print("Notes:") - print(f" Times are median over {args.runs} run(s); perf_counter precision.") + print(f" Times are median over {args.runs} run(s), interleaved W/O -> W/ to reduce load-spike bias.") print(" 'w/o filter' patches prepare_for_file() to a no-op (clean baseline).") print(" 'Overhead' = wall time of prepare_for_file() alone (informational).") print(" 'Net gain' = w/o filter - w/ filter; t_after includes overhead, so this") print(" is the true end-to-end wall-clock delta. Positive = faster.") - print(" 'Skipped' = string rules pruned because patterns are absent from the binary.") - print(" 'Strs' = distinct String values found in the binary at file scope.") + print(" 'Skipped' = string rules pruned because patterns are absent from the binary.") + print(" 'Strs' = distinct String values found in the binary at file scope.") + print(" 'Parity' = PASS means matched (rule, address) pairs are identical with/without filter.") if __name__ == "__main__":