diff --git a/CHANGELOG.md b/CHANGELOG.md index d613005bb..65a51da2e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +- rules: pre-filter string rules whose Substring/Regex patterns are absent from the binary file, reducing redundant regex evaluation during per-function matching #2126 + ### Breaking Changes ### New Rules (0) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index 1047713b5..55e91b37d 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -24,6 +24,7 @@ import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults +from capa.features.common import String from capa.capabilities.common import Capabilities, find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor @@ -162,7 +163,23 @@ def find_static_capabilities( library_functions_list: list[rdoc.LibraryFunction] = [] assert isinstance(extractor, StaticFeatureExtractor) + functions: list[FunctionHandle] = list(extractor.get_functions()) + + # Pre-filter string rules based on strings found in the binary. + # For each rule whose required Substring/Regex patterns are provably absent + # from the binary's file-level strings, mark it as skippable in _match(). + # This replaces repeated Regex.evaluate() calls (once per function × per rule) + # with a single file-level scan. See: https://github.com/mandiant/capa/issues/2126 + # + # The upfront scan cost is O(|string_rules| × |file_strings|). For small + # binaries this overhead can exceed the savings, so we only activate the + # pre-filter when there are enough functions to justify it. + if len(functions) >= 10: + file_strings: frozenset[str] = frozenset( + feature.value for feature, _ in extractor.extract_file_features() if isinstance(feature, String) + ) + ruleset.prepare_for_file(file_strings) n_funcs: int = len(functions) n_libs: int = 0 percentage: float = 0 @@ -238,6 +255,9 @@ def find_static_capabilities( functions=tuple(function_feature_counts), ) + # Clear the string pre-filter so the ruleset is clean for potential reuse. + ruleset.prepare_for_file(frozenset()) + matches: MatchResults = dict( itertools.chain( # each rule exists in exactly one scope, diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 1eca88042..d8c3eb77e 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -1463,6 +1463,11 @@ def __init__( scope: {rule.name: i for i, rule in enumerate(self.rules_by_scope[scope])} for scope in scopes } + # Set of string-rule names whose required patterns are absent from the current binary. + # Populated by prepare_for_file(); empty means no pre-filtering is active. + # See: https://github.com/mandiant/capa/issues/2126 + self._impossible_string_rule_names: set[str] = set() + @property def file_rules(self): return self.rules_by_scope[Scope.FILE] @@ -1948,6 +1953,92 @@ def _sort_rules_by_index(rule_index_by_rule_name: dict[str, int], rules: list[Ru """ rules.sort(key=lambda r: rule_index_by_rule_name[r.name]) + def prepare_for_file(self, file_strings: frozenset[str]) -> None: + """ + Pre-filter string rules based on strings extracted from the binary file. + + Rules whose required Substring/Regex patterns cannot match any string in + file_strings will be skipped during subsequent _match() calls. This + saves repeated Regex.evaluate() / Substring.evaluate() work for patterns + that are provably absent from the binary. + + Call this before analyzing functions for a binary. + Pass an empty frozenset to clear the filter between binaries. + + See: https://github.com/mandiant/capa/issues/2126 + + Performance note: both Substring and Regex patterns use a concatenated-string + fast path (O(1) regex calls per rule) rather than per-string iteration. + Substring uses a \x01-joined concat with a C-level `in` check. Regex uses a + \n-joined concat with re.MULTILINE so that ^ / $ bind to line boundaries (one + re.search per rule). A per-string fallback runs only when the concat scan + matches, to rule out DOTALL false positives where .* spans a \n boundary. + """ + if not file_strings: + self._impossible_string_rule_names = set() + return + + # Two concatenated forms are used to accelerate the scan: + # + # concat_substr (\x01-separated) — for Substring patterns. + # A literal pattern cannot span a \x01 boundary (rule patterns are + # printable ASCII; \x01 never appears in them or in extracted strings). + # One C-level `in` check replaces N per-string comparisons. + # + # concat_regex (\n-separated) — for Regex patterns. + # Each pattern is compiled with re.MULTILINE added so that ^ and $ + # match at \n boundaries rather than only at the start/end of the whole + # string. This fixes the anchor bug: `re.search("^foo", "bar\nfoo")` + # succeeds when re.MULTILINE is set. One re.search per rule replaces N + # per-string calls. For patterns compiled with re.DOTALL, `.` also + # matches \n, so `.*` could bridge two adjacent strings (false positive); + # per-string confirmation handles that case. + concat_substr: str = "\x01".join(file_strings) + concat_regex: str = "\n".join(file_strings) + + impossible: set[str] = set() + all_string_rule_names: set[str] = set() + + for feature_index in self._feature_indexes_by_scopes.values(): + for rule_name, wanted_strings in feature_index.string_rules.items(): + if rule_name in all_string_rule_names: + continue + all_string_rule_names.add(rule_name) + + can_match = False + for feat in wanted_strings: + if isinstance(feat, capa.features.common.Substring): + if feat.value in concat_substr: + can_match = True + break + elif isinstance(feat, capa.features.common.Regex): + # Re-compile with MULTILINE so ^ / $ respect \n boundaries. + # Python's re module caches compiled patterns internally, so + # the recompile cost is paid only on the first call. + ml_re = re.compile(feat.re.pattern, feat.re.flags | re.MULTILINE) + if ml_re.search(concat_regex): + # Concat matched: confirm per-string to rule out false + # positives from DOTALL patterns whose .* spans a \n. + if any(feat.re.search(s) for s in file_strings): + can_match = True + break + # No concat match → pattern is absent from every file string. + else: + # Unknown feature type: keep to be safe. + can_match = True + break + if not can_match: + impossible.add(rule_name) + + if impossible: + logger.debug( + "pre-filter: %d/%d string rules skipped (patterns absent from binary)", + len(impossible), + len(all_string_rule_names), + ) + + self._impossible_string_rule_names = impossible + def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[FeatureSet, ceng.MatchResults]: """ Match rules from this ruleset at the given scope against the given features. @@ -2026,7 +2117,19 @@ def _match(self, scope: Scope, features: FeatureSet, addr: Address) -> tuple[Fea string_features[feature] = locations if string_features: + # Some extractors may synthesize stack strings that do not exist as contiguous + # file bytes. In that case, avoid file-level pre-filtering for this scope. + has_stack_string_characteristic = any( + isinstance(feature, capa.features.common.Characteristic) and feature.value == "stack string" + for feature in features + ) for rule_name, wanted_strings in feature_index.string_rules.items(): + # Skip rules whose patterns are provably absent from the binary. + # prepare_for_file() pre-checks all file strings once and populates + # _impossible_string_rule_names to avoid repeated Regex.evaluate() work. + # See: https://github.com/mandiant/capa/issues/2126 + if not has_stack_string_characteristic and rule_name in self._impossible_string_rule_names: + continue for wanted_string in wanted_strings: if wanted_string.evaluate(string_features): candidate_rule_names.add(rule_name) diff --git a/scripts/benchmark_string_prefilter.py b/scripts/benchmark_string_prefilter.py new file mode 100644 index 000000000..db9440e58 --- /dev/null +++ b/scripts/benchmark_string_prefilter.py @@ -0,0 +1,319 @@ +#!/usr/bin/env python3 +""" +Benchmark: string-rule pre-filter speedup (#2126) + +Measures wall-clock time for find_static_capabilities() with and without +the string pre-filter (prepare_for_file), so we can quantify the speedup +on real binaries with a full rule set. + +Usage: + python scripts/benchmark_string_prefilter.py [--runs N] [binary ...] + +If no binary paths are given the script picks a representative set from +tests/data/ spanning small/medium/large binaries. Each binary is analysed +RUNS times in each mode; the median is reported. Runs are interleaved +(W/O, W/, W/O, W/, ...) to reduce load-spike bias. + +A parity check is performed for every binary: matched rule names and +addresses must be identical with and without the pre-filter. FAIL means +a correctness regression. + +Example: + python scripts/benchmark_string_prefilter.py --runs 5 +""" + +import sys +import time +import logging +import pathlib +import argparse +import statistics + +# Silence capa progress output during benchmarking. +logging.disable(logging.WARNING) + +import capa.main +import capa.rules +import capa.rules.cache +import capa.capabilities.static +from capa.features.common import String + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _load_ruleset() -> capa.rules.RuleSet: + rules_path = pathlib.Path(__file__).parent.parent / "rules" + if not rules_path.is_dir(): + print(f"[!] rules/ directory not found at {rules_path}", file=sys.stderr) + sys.exit(1) + return capa.rules.get_rules([rules_path], enable_cache=True) + + +def _make_extractor(binary_path: pathlib.Path): + """Return a vivisect StaticFeatureExtractor for *binary_path*, or None.""" + try: + import capa.loader + + extractor = capa.loader.get_extractor( + binary_path, + input_format="auto", + os_="auto", + backend=capa.main.BACKEND_VIV, + sigpaths=[], + should_save_workspace=False, + disable_progress=True, + ) + return extractor + except Exception as exc: + print(f" [!] could not load {binary_path.name}: {exc}", file=sys.stderr) + return None + + +def _measure_prefilter(ruleset: capa.rules.RuleSet, extractor) -> tuple[int, int, float]: + """ + Run prepare_for_file() once and return + (n_file_strings, n_skipped_rules, overhead_seconds). + Does not disturb the ruleset state. + """ + file_strings: frozenset[str] = frozenset( + feat.value for feat, _ in extractor.extract_file_features() if isinstance(feat, String) + ) + t0 = time.perf_counter() + ruleset.prepare_for_file(file_strings) + t1 = time.perf_counter() + n_skipped = len(ruleset._impossible_string_rule_names) + ruleset.prepare_for_file(frozenset()) # restore + return len(file_strings), n_skipped, (t1 - t0) + + +def _verify_parity(ruleset: capa.rules.RuleSet, extractor) -> tuple[bool, str]: + """ + Run find_static_capabilities() with and without the pre-filter and + confirm that the set of matched (rule_name, address) pairs is identical. + + Returns (ok: bool, detail: str). ok=True means no semantic drift. + """ + original_prepare = capa.rules.RuleSet.prepare_for_file + + # run WITHOUT pre-filter + def _noop(self, file_strings): # type: ignore[misc] + self._impossible_string_rule_names = set() + + capa.rules.RuleSet.prepare_for_file = _noop # type: ignore[method-assign] + try: + caps_without = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + finally: + capa.rules.RuleSet.prepare_for_file = original_prepare # type: ignore[method-assign] + + # Build (rule_name, addr_repr) sets -- exclude subscope rules + def _rule_addr_set(caps): + result: set[tuple[str, str]] = set() + for rule_name, matches in caps.matches.items(): + if ruleset.rules[rule_name].is_subscope_rule(): + continue + for addr, _ in matches: + result.add((rule_name, repr(addr))) + return result + + without_set = _rule_addr_set(caps_without) + + # run WITH pre-filter (normal path) + caps_with = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + with_set = _rule_addr_set(caps_with) + + if without_set == with_set: + return True, "PASS" + + extra = with_set - without_set + missing = without_set - with_set + parts = [] + if missing: + rules_missing = {r for r, _ in missing} + parts.append(f"MISSING {len(missing)} matches in {len(rules_missing)} rules") + if extra: + rules_extra = {r for r, _ in extra} + parts.append(f"EXTRA {len(extra)} matches in {len(rules_extra)} rules") + return False, "FAIL: " + "; ".join(parts) + + +def _time_interleaved( + ruleset: capa.rules.RuleSet, + extractor, + n_runs: int, +) -> tuple[float, float, int]: + """ + Alternate WITHOUT / WITH runs to reduce load-spike variance bias. + Returns (median_without, median_with, n_functions). + """ + original_prepare = capa.rules.RuleSet.prepare_for_file + + def _noop(self, file_strings): # type: ignore[misc] + self._impossible_string_rule_names = set() + + without_times: list[float] = [] + with_times: list[float] = [] + n_funcs = 0 + + for _ in range(n_runs): + # WITHOUT + capa.rules.RuleSet.prepare_for_file = _noop # type: ignore[method-assign] + try: + t0 = time.perf_counter() + caps = capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + t1 = time.perf_counter() + finally: + capa.rules.RuleSet.prepare_for_file = original_prepare # type: ignore[method-assign] + without_times.append(t1 - t0) + if n_funcs == 0: + n_funcs = len(caps.feature_counts.functions) + + # WITH + t0 = time.perf_counter() + capa.capabilities.static.find_static_capabilities(ruleset, extractor, disable_progress=True) + t1 = time.perf_counter() + with_times.append(t1 - t0) + + return statistics.median(without_times), statistics.median(with_times), n_funcs + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +# 8 binaries spanning: tiny / small / medium-low / medium / medium-high / large. +_DEFAULT_SAMPLES = [ + # tiny -- packed, minimal strings (~3 KB) + "tests/data/Practical Malware Analysis Lab 01-02.exe_", + # small -- simple loader (~17 KB) + "tests/data/4f509bdfe5a2fe4320cdc070eedc0a72e12cc08f43d60a7701305b3d1408102b.exe_", + # small-medium -- typical downloader (~45 KB) + "tests/data/7d16efd0078f22c17a4bd78b0f0cc468.exe_", + # medium-low -- common malware (~120 KB) + "tests/data/0a30182ff3a6b67beb0f2cda9d0de678.exe_", + # medium -- string-heavy sample (~180 KB) + "tests/data/7fbc17a09cf5320c515fc1c5ba42c8b3.exe_", + # medium-high -- larger malware (~410 KB) + "tests/data/152d4c9f63efb332ccb134c6953c0104.exe_", + # large -- complex binary (~486 KB) + "tests/data/321338196a46b600ea330fc5d98d0699.exe_", + # extra-large -- many functions (~982 KB) + "tests/data/82bf6347acf15e5d883715dc289d8a2b.exe_", +] + + +def main(): + parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument("--runs", type=int, default=3, help="median over this many runs (default: 3)") + parser.add_argument( + "--skip-parity", action="store_true", help="skip the correctness parity check (faster, less safe)" + ) + parser.add_argument("binaries", nargs="*", metavar="BINARY", help="binary paths to benchmark") + args = parser.parse_args() + + root = pathlib.Path(__file__).parent.parent + if args.binaries: + samples = [pathlib.Path(b) for b in args.binaries] + else: + samples = [root / s for s in _DEFAULT_SAMPLES] + samples = [s for s in samples if s.exists()] + + if not samples: + print("[!] no sample files found; pass binary paths explicitly", file=sys.stderr) + sys.exit(1) + + print("Loading rules ...", end="", flush=True) + ruleset = _load_ruleset() + + # Count unique string-dependent rules across all scopes. + seen: set[str] = set() + for fi in ruleset._feature_indexes_by_scopes.values(): + seen.update(fi.string_rules.keys()) + n_string_rules = len(seen) + print(f" {len(ruleset.rules)} rules total, {n_string_rules} string-dependent") + print() + + col_w = 44 + hdr = ( + f"{'Binary':<{col_w}} {'Funcs':>6} {'Strs':>7} " + f"{'w/o filter':>10} {'w/ filter':>10} " + f"{'Speedup':>7} {'Overhead':>8} {'Net gain':>8} {'Skipped':>12} {'Parity':>6}" + ) + print(hdr) + print("-" * len(hdr)) + + speedups: list[float] = [] + parity_failures: list[str] = [] + + for sample in samples: + name = sample.name + if len(name) > col_w - 1: + name = "..." + name[-(col_w - 4) :] + + extractor = _make_extractor(sample) + if extractor is None: + continue + + # Measure prepare_for_file overhead and skipped rule count. + n_file_strings, n_skipped, t_overhead = _measure_prefilter(ruleset, extractor) + + print(f" {name:<{col_w - 2}} ", end="", flush=True) + + # Parity check (unless --skip-parity). + if not args.skip_parity: + parity_ok, parity_detail = _verify_parity(ruleset, extractor) + if not parity_ok: + parity_failures.append(f"{sample.name}: {parity_detail}") + else: + parity_ok, parity_detail = True, "SKIP" + + # Interleaved timing (alternates W/O -> W/ each run to reduce bias). + t_before, t_after, n_funcs = _time_interleaved(ruleset, extractor, args.runs) + + # t_after already includes the prepare_for_file overhead, so the true + # wall-clock net gain is simply t_before - t_after. + net = t_before - t_after + speedup = t_before / t_after if t_after > 0 else float("inf") + pct_skipped = 100.0 * n_skipped / n_string_rules if n_string_rules else 0.0 + speedups.append(speedup) + + parity_str = parity_detail if parity_detail in ("PASS", "SKIP") else "FAIL" + + print( + f"{n_funcs:>6} {n_file_strings:>7} {t_before:>9.2f}s {t_after:>9.2f}s " + f"{speedup:>6.2f}x {t_overhead * 1000:>6.0f}ms {net * 1000:>+7.0f}ms " + f"{n_skipped:>4}/{n_string_rules} ({pct_skipped:.0f}%) {parity_str:>6}" + ) + + print() + + if speedups: + geomean = 1.0 + for s in speedups: + geomean *= s + geomean **= 1.0 / len(speedups) + print(f"Geometric mean speedup across {len(speedups)} binaries: {geomean:.2f}x") + + if parity_failures: + print() + print(f"[!] PARITY FAILURES ({len(parity_failures)}):") + for msg in parity_failures: + print(f" {msg}") + elif not args.skip_parity: + print("All parity checks PASSED -- no semantic drift introduced by pre-filter.") + + print() + print("Notes:") + print(f" Times are median over {args.runs} run(s), interleaved W/O -> W/ to reduce load-spike bias.") + print(" 'w/o filter' patches prepare_for_file() to a no-op (clean baseline).") + print(" 'Overhead' = wall time of prepare_for_file() alone (informational).") + print(" 'Net gain' = w/o filter - w/ filter; t_after includes overhead, so this") + print(" is the true end-to-end wall-clock delta. Positive = faster.") + print(" 'Skipped' = string rules pruned because patterns are absent from the binary.") + print(" 'Strs' = distinct String values found in the binary at file scope.") + print(" 'Parity' = PASS means matched (rule, address) pairs are identical with/without filter.") + + +if __name__ == "__main__": + main() diff --git a/tests/test_match.py b/tests/test_match.py index 674b71b3a..ebbdb8538 100644 --- a/tests/test_match.py +++ b/tests/test_match.py @@ -21,7 +21,7 @@ import capa.features.insn import capa.features.common from capa.rules import Scope -from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule +from capa.features.common import OS, OS_ANY, OS_WINDOWS, String, MatchedRule, Characteristic def match(rules, features, va, scope=Scope.FUNCTION): @@ -46,7 +46,8 @@ def match(rules, features, va, scope=Scope.FUNCTION): def test_match_simple(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -56,7 +57,8 @@ def test_match_simple(): namespace: testns1/testns2 features: - number: 100 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) features, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) @@ -67,7 +69,8 @@ def test_match_simple(): def test_match_range_exact(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -76,7 +79,8 @@ def test_match_range_exact(): dynamic: process features: - count(number(100)): 2 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # just enough matches @@ -93,7 +97,8 @@ def test_match_range_exact(): def test_match_range_range(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -102,7 +107,8 @@ def test_match_range_range(): dynamic: process features: - count(number(100)): (2, 3) - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # just enough matches @@ -123,7 +129,8 @@ def test_match_range_range(): def test_match_range_exact_zero(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -139,7 +146,8 @@ def test_match_range_exact_zero(): # so we have this additional trivial feature. - mnemonic: mov - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # feature isn't indexed - good. @@ -157,7 +165,8 @@ def test_match_range_exact_zero(): def test_match_range_with_zero(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -172,7 +181,8 @@ def test_match_range_with_zero(): # since we don't support top level NOT statements. # so we have this additional trivial feature. - mnemonic: mov - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) # ok @@ -190,7 +200,8 @@ def test_match_range_with_zero(): def test_match_adds_matched_rule_feature(): """show that using `match` adds a feature for matched rules.""" - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -199,7 +210,8 @@ def test_match_adds_matched_rule_feature(): dynamic: process features: - number: 100 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) features, _ = match([r], {capa.features.insn.Number(100): {1}}, 0x0) assert capa.features.common.MatchedRule("test rule") in features @@ -549,7 +561,8 @@ def test_regex_get_value_str(pattern): @pytest.mark.xfail(reason="can't have top level NOT") def test_match_only_not(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -560,7 +573,8 @@ def test_match_only_not(): features: - not: - number: 99 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) @@ -568,7 +582,8 @@ def test_match_only_not(): def test_match_not(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -581,7 +596,8 @@ def test_match_not(): - mnemonic: mov - not: - number: 99 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match([r], {capa.features.insn.Number(100): {1, 2}, capa.features.insn.Mnemonic("mov"): {1, 2}}, 0x0) @@ -590,7 +606,8 @@ def test_match_not(): @pytest.mark.xfail(reason="can't have nested NOT") def test_match_not_not(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -602,7 +619,8 @@ def test_match_not_not(): - not: - not: - number: 100 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match([r], {capa.features.insn.Number(100): {1, 2}}, 0x0) @@ -610,7 +628,8 @@ def test_match_not_not(): def test_match_operand_number(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -620,7 +639,8 @@ def test_match_operand_number(): features: - and: - operand[0].number: 0x10 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) assert capa.features.insn.OperandNumber(0, 0x10) in {capa.features.insn.OperandNumber(0, 0x10)} @@ -638,7 +658,8 @@ def test_match_operand_number(): def test_match_operand_offset(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -648,7 +669,8 @@ def test_match_operand_offset(): features: - and: - operand[0].offset: 0x10 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) assert capa.features.insn.OperandOffset(0, 0x10) in {capa.features.insn.OperandOffset(0, 0x10)} @@ -666,7 +688,8 @@ def test_match_operand_offset(): def test_match_property_access(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -676,7 +699,8 @@ def test_match_property_access(): features: - and: - property/read: System.IO.FileInfo::Length - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) assert capa.features.insn.Property("System.IO.FileInfo::Length", capa.features.common.FeatureAccess.READ) in { @@ -708,7 +732,8 @@ def test_match_property_access(): def test_match_os_any(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -726,7 +751,8 @@ def test_match_os_any(): - and: - os: any - string: "Goodbye world" - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) _, matches = match( @@ -760,7 +786,8 @@ def test_match_os_any(): # this test demonstrates the behavior of unstable features that may change before the next major release. def test_index_features_and_unstable(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -771,7 +798,8 @@ def test_index_features_and_unstable(): - and: - mnemonic: mov - api: CreateFileW - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) rr = capa.rules.RuleSet([r]) index: capa.rules.RuleSet._RuleFeatureIndex = rr._feature_indexes_by_scopes[capa.rules.Scope.FUNCTION] @@ -787,7 +815,8 @@ def test_index_features_and_unstable(): # this test demonstrates the behavior of unstable features that may change before the next major release. def test_index_features_or_unstable(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -798,7 +827,8 @@ def test_index_features_or_unstable(): - or: - mnemonic: mov - api: CreateFileW - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) rr = capa.rules.RuleSet([r]) index: capa.rules.RuleSet._RuleFeatureIndex = rr._feature_indexes_by_scopes[capa.rules.Scope.FUNCTION] @@ -815,7 +845,8 @@ def test_index_features_or_unstable(): # this test demonstrates the behavior of unstable features that may change before the next major release. def test_index_features_nested_unstable(): - rule = textwrap.dedent(""" + rule = textwrap.dedent( + """ rule: meta: name: test rule @@ -828,7 +859,8 @@ def test_index_features_nested_unstable(): - or: - api: CreateFileW - string: foo - """) + """ + ) r = capa.rules.Rule.from_yaml(rule) rr = capa.rules.RuleSet([r]) index: capa.rules.RuleSet._RuleFeatureIndex = rr._feature_indexes_by_scopes[capa.rules.Scope.FUNCTION] @@ -844,9 +876,75 @@ def test_index_features_nested_unstable(): assert not index.bytes_prefix_index +def test_string_prefilter_stack_string_fallback(): + rule_text = textwrap.dedent(""" + rule: + meta: + name: test string prefilter stack string fallback + scopes: + static: function + dynamic: process + features: + - string: /powershell/ + """) + rule = capa.rules.Rule.from_yaml(rule_text) + ruleset = capa.rules.RuleSet([rule]) + + # Mark the regex rule as impossible based on file-level strings. + ruleset.prepare_for_file(frozenset({"hello", "world"})) + + _, matches = ruleset.match(Scope.FUNCTION, {String("powershell"): {0x0}}, 0x0) + assert "test string prefilter stack string fallback" not in matches + + # If a stack string is present in this scope, don't trust file-level pre-filtering. + _, matches = ruleset.match( + Scope.FUNCTION, + {String("powershell"): {0x0}, Characteristic("stack string"): {0x0}}, + 0x0, + ) + assert "test string prefilter stack string fallback" in matches + + +def test_string_prefilter_anchored_regex_correctness(): + """Anchored regex patterns must not be falsely marked impossible by the prefilter. + + The prefilter joins file strings with \\x01 for Substring checks. Regex patterns + must be checked per-string because ^ / $ bind to the start/end of the whole concat + string, not each individual entry. A rule with /^foo$/ must remain active when + "foo" is present in file_strings, even though it would not match the concat string. + """ + rule_text = textwrap.dedent(""" + rule: + meta: + name: test anchored regex prefilter + scopes: + static: function + dynamic: process + features: + - string: /^foo$/ + """) + rule = capa.rules.Rule.from_yaml(rule_text) + ruleset = capa.rules.RuleSet([rule]) + + # "foo" is in file_strings — rule must NOT be marked impossible. + ruleset.prepare_for_file(frozenset({"foo", "bar", "baz"})) + assert "test anchored regex prefilter" not in ruleset._impossible_string_rule_names + + _, matches = ruleset.match(Scope.FUNCTION, {String("foo"): {0x0}}, 0x0) + assert "test anchored regex prefilter" in matches + + # When "foo" is absent from file_strings, the rule IS impossible. + ruleset.prepare_for_file(frozenset({"bar", "baz"})) + assert "test anchored regex prefilter" in ruleset._impossible_string_rule_names + + _, matches = ruleset.match(Scope.FUNCTION, {String("foo"): {0x0}}, 0x0) + assert "test anchored regex prefilter" not in matches + + def test_bytes_prefix_index_correctness(): """Verify that the bytes prefix pre-filter preserves match behavior.""" - rule_text = textwrap.dedent(""" + rule_text = textwrap.dedent( + """ rule: meta: name: test bytes prefix index @@ -855,7 +953,8 @@ def test_bytes_prefix_index_correctness(): dynamic: process features: - bytes: 90 90 90 90 90 90 90 90 90 90 90 90 90 90 90 90 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule_text) # 16 nop bytes - exact match @@ -880,7 +979,8 @@ def test_bytes_prefix_index_correctness(): def test_bytes_prefix_index_collision(): - rule_text = textwrap.dedent(""" + rule_text = textwrap.dedent( + """ rule: meta: name: test bytes prefix collision @@ -889,7 +989,8 @@ def test_bytes_prefix_index_collision(): dynamic: process features: - bytes: 41 42 43 44 45 46 47 48 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule_text) features = { @@ -901,7 +1002,8 @@ def test_bytes_prefix_index_collision(): def test_bytes_prefix_index_short_pattern_fallback(): - rule_text = textwrap.dedent(""" + rule_text = textwrap.dedent( + """ rule: meta: name: test bytes short prefix fallback @@ -910,7 +1012,8 @@ def test_bytes_prefix_index_short_pattern_fallback(): dynamic: process features: - bytes: 41 42 43 - """) + """ + ) r = capa.rules.Rule.from_yaml(rule_text) _, matches = match([r], {capa.features.common.Bytes(b"ABCDEF"): {0x0}}, 0x0) @@ -922,7 +1025,8 @@ def test_bytes_prefix_index_short_pattern_fallback(): def test_bytes_prefix_index_mixed_short_and_long_patterns(): """A rule with both a short (<4B) and a long (>=4B) bytes pattern exercises both code paths.""" - short_rule_text = textwrap.dedent(""" + short_rule_text = textwrap.dedent( + """ rule: meta: name: test short pattern rule @@ -931,8 +1035,10 @@ def test_bytes_prefix_index_mixed_short_and_long_patterns(): dynamic: process features: - bytes: AA BB - """) - long_rule_text = textwrap.dedent(""" + """ + ) + long_rule_text = textwrap.dedent( + """ rule: meta: name: test long pattern rule @@ -941,7 +1047,8 @@ def test_bytes_prefix_index_mixed_short_and_long_patterns(): dynamic: process features: - bytes: CC DD EE FF 11 22 33 44 - """) + """ + ) short_rule = capa.rules.Rule.from_yaml(short_rule_text) long_rule = capa.rules.Rule.from_yaml(long_rule_text)