diff --git a/tests/doc/rule-matcher-test-fixtures.md b/tests/doc/rule-matcher-test-fixtures.md new file mode 100644 index 000000000..b5fe3e3b2 --- /dev/null +++ b/tests/doc/rule-matcher-test-fixtures.md @@ -0,0 +1,300 @@ +1. Purpose + +These fixtures provide small, data-driven matcher tests. Each test pairs: +- a rule fragment, +- a synthetic feature listing, +- and the exact matches that capa should report. + +They are for matcher behavior, not end-to-end binary analysis. + +2. Where the tests live and how they run + +2a. Fixture files live under `tests/fixtures/matcher/`. + +2b. Static tests go under `tests/fixtures/matcher/static/`. + +2c. Dynamic tests go under `tests/fixtures/matcher/dynamic/`. + +2d. The pytest entrypoint is `tests/test_match_fixtures.py`. + +2e. The loader and DSL parser live in `tests/match_fixtures.py`. + +2f. Files are loaded in lexicographic path order. Tests inside a file are loaded in YAML order. + +2g. Run the suite with: + +```sh +pytest -q tests/test_match_fixtures.py +``` + +2h. Run a subset with: + +```sh +pytest -q tests/test_match_fixtures.py -k +``` + +3. Canonical file format + +Use a top-level YAML list. Each list element is one test case. + +Example: + +```yaml +- name: scope-boundary + description: function scope aggregates across basic blocks + rules: + - meta: + name: function-cross-basic-block + description: should match when function scope aggregates features from different basic blocks + scopes: + static: function + features: + - and: + - mnemonic: mov + - mnemonic: add + features: | + func: 0x401000 + bb: 0x401000: basic block + insn: 0x401000: mnemonic(mov) + bb: 0x401010: basic block + insn: 0x401010: mnemonic(add) + expect: + matches: + function-cross-basic-block: + - 0x401000 +``` + +4. Per-test fields + +4a. `name` +A stable human-readable identifier. Pytest ids include this value. + +4b. `description` +A short explanation of the behavior under test. + +4c. `base address` +Optional. Used only for static tests. Defaults to `0` if omitted. + +4d. `rules` +A list of rule fragments in normal capa rule syntax. These are wrapped and passed through `capa.rules.Rule.from_dict()`. + +4e. `features` +A block string or list of strings containing the show-features-like DSL described below. + +4f. `expect.matches` +Maps authored rule names to the exact match locations that should be returned. + +4g. `options.span size` +Optional. If present, patches `capa.capabilities.dynamic.SPAN_SIZE` for that one test. + +5. Flavor and scope defaults + +5a. Fixture flavor is inferred from the file location. +A fixture under `tests/fixtures/matcher/static/` is static. A fixture under `tests/fixtures/matcher/dynamic/` is dynamic. + +5b. The per-test `flavor` field is optional. +It is usually omitted. If present, it must agree with the file location. + +5c. Rule scope fragments may omit the unsupported side. +For example: +- static fixtures may specify only `scopes.static` +- dynamic fixtures may specify only `scopes.dynamic` + +The loader fills in the missing side with `unsupported`. + +6. Match semantics + +6a. Expectations are exact. +The test asserts the exact authored rule names that matched and the exact list of locations for each rule. + +6b. Generated subscope helper rules are ignored. +Only authored rules are compared in `expect.matches`. + +6c. Match order matters. +This is especially relevant for dynamic span-of-calls behavior. + +7. Feature DSL + +The DSL is intentionally close to `scripts/show-features.py`. Each line describes one feature or one scope header. + +7a. Static scope lines + +Accepted line prefixes: +- `global:` +- `file:` +- `func:` +- `bb:` +- `insn:` + +Examples: + +```text +global: global: os(windows) +file: 0x402345: characteristic(embedded pe) +func: 0x401000 +func: 0x401000: string(hello world) +bb: 0x401000: basic block +bb: 0x401000: characteristic(tight loop) +insn: 0x401000: mnemonic(mov) +insn: 0x401000: offset(0x402000) -> 0x402000 +insn: 0x401000: 0x401002: number(0x10) +insn: 0x401000: string(key: value) +``` + +Notes: +- `func: ` is a function header. It sets the current function. +- `bb:` lines attach to the current function and also set the current basic block. +- `insn:` lines attach to the current basic block. +- `insn:` accepts either `insn: : ` or `insn: : : `. +- `insn:` feature text may itself contain `: `, such as `string(key: value)`. +- `-> ` overrides the feature location. Without it, the location defaults to the current scope address. +- `file:` lines require an explicit address and do not support `->`. + +7b. Dynamic scope lines + +Accepted line prefixes: +- `global:` +- `file:` +- `proc:` +- `thread:` +- `call:` + +Examples: + +```text +proc: sample.exe (ppid=2456, pid=3052) +proc: sample.exe: string(config) +thread: 3064 +thread: 3064: string(worker) +call: 11: api(LdrGetProcedureAddress) +call: 11: string(AddVectoredExceptionHandler) +call: 11: string(kernel32.dll) -> process{pid:3052,tid:3064,call:11} +``` + +Notes: +- `proc: (ppid=, pid=)` is a process header. It sets the current process. +- `thread: ` is a thread header. It sets the current thread. +- `call:` lines attach to the current thread. +- `proc: : ` attaches a process-scope feature to the current process. The name must match the current process header. +- `thread: : ` attaches a thread-scope feature and also sets the current thread. +- `-> ` overrides the feature location. Without it, the location defaults to the current scope address. +- Dynamic fixture call IDs must be unique within a test. + +7c. Supported feature atoms + +Currently the parser supports these atoms: +- `basic block` +- `api(...)` +- `arch(...)` +- `bytes(...)` +- `characteristic(...)` +- `class(...)` +- `export(...)` +- `format(...)` +- `function-name(...)` +- `function name(...)` +- `import(...)` +- `match(...)` +- `mnemonic(...)` +- `namespace(...)` +- `number(...)` +- `offset(...)` +- `os(...)` +- `section(...)` +- `string(...)` +- `substring(...)` +- `operand[n].number(...)` +- `operand[n].offset(...)` +- `property(...)` +- `property/read(...)` +- `property/write(...)` + +Examples: + +```text +mnemonic(mov) +number(0x10) +number(0x1e) +string(hello world) +bytes(41 42 43) +operand[0].number(0x10) +property/read(System.IO.FileInfo::Length) +``` + +8. Supported address syntax + +The parser accepts both rendered string forms and tagged YAML arrays. + +8a. String forms include: +- `0x401000` +- `base address+0x100` +- `file+0x20` +- `token(0x1234)` +- `token(0x1234)+0x10` +- `global` +- `process{pid:3052}` +- `process{pid:3052,tid:3064}` +- `process{pid:3052,tid:3064,call:11}` +- the same process/thread/call forms with `ppid:` included + +8b. Tagged YAML arrays include: +- `[absolute, 0x401000]` +- `[relative, 0x100]` +- `[file, 0x20]` +- `[token, 0x1234]` +- `[token offset, 0x1234, 0x10]` +- `[process, 2456, 3052]` +- `[thread, 2456, 3052, 3064]` +- `[call, 2456, 3052, 3064, 11]` +- `[no address]` + +9. Expected match location shorthand + +9a. Static tests usually use normal addresses in `expect.matches`, such as `0x401000`. + +9b. Dynamic tests may also use full dynamic addresses, such as `[call, 2456, 3052, 3064, 11]`. + +9c. Dynamic tests may use a bare integer call ID in `expect.matches` when that call ID is unique within the test. + +Example: + +```yaml +expect: + matches: + span-resolve-add-veh: + - 11 +``` + +This resolves to the unique dynamic call address with call ID `11`. + +10. Adding a new test case + +10a. Pick the right fixture file under `tests/fixtures/matcher/`, or add a new file if the new cases form a clear group. + +10b. Append a new test entry to the top-level YAML list. Keep related tests together. + +10c. Add a short top-level `description` that states the matcher behavior being asserted. + +10d. Add concise rule `meta.description` fields when they help explain the role of each rule in the test. + +10e. Keep the rule fragment minimal. Include only the features needed for the behavior under test. + +10f. Write the synthetic feature listing in the DSL. Prefer the same wording and feature rendering that `show-features.py` emits. + +10g. Add `expect.matches` with the exact authored rule names and locations. + +10h. Run: + +```sh +pytest -q tests/test_match_fixtures.py -k +``` + +11. When to add parser support + +11a. If a new test only needs existing atoms and line prefixes, do not change Python code. Just add YAML. + +11b. If a new test needs a feature atom that the parser does not understand, update `_parse_feature()` in `tests/match_fixtures.py`. + +11c. If a new test needs a new scope line form, update `StaticFeatureParser` or `DynamicFeatureParser` in `tests/match_fixtures.py`. + +11d. If you extend the DSL, also update this document and add at least one fixture that exercises the new syntax. diff --git a/tests/fixtures.py b/tests/fixtures/__init__.py similarity index 100% rename from tests/fixtures.py rename to tests/fixtures/__init__.py diff --git a/tests/fixtures/matcher/README.md b/tests/fixtures/matcher/README.md new file mode 100644 index 000000000..3f85a4d81 --- /dev/null +++ b/tests/fixtures/matcher/README.md @@ -0,0 +1,10 @@ +Matcher fixture data lives here. + +Layout: +- `tests/fixtures/matcher/static/*.yml` +- `tests/fixtures/matcher/dynamic/*.yml` + +Fixture flavor is inferred from the file location. + +The full format and authoring guide lives in: +- `tests/doc/rule-matcher-test-fixtures.md` diff --git a/tests/fixtures/matcher/dynamic/core.yml b/tests/fixtures/matcher/dynamic/core.yml new file mode 100644 index 000000000..acbce0ceb --- /dev/null +++ b/tests/fixtures/matcher/dynamic/core.yml @@ -0,0 +1,35 @@ +- name: dynamic-span-window + description: span-of-calls matching honors the configured window size + options: + span size: 2 + rules: + - meta: + name: span-resolve-add-veh + description: should match the span ending at the call that resolves AddVectoredExceptionHandler + scopes: + dynamic: span of calls + features: + - and: + - api: LdrGetDllHandle + - api: LdrGetProcedureAddress + - string: AddVectoredExceptionHandler + - meta: + name: span-window-too-small + description: should not match because the configured span window does not include both APIs together + scopes: + dynamic: span of calls + features: + - and: + - api: LdrGetDllHandle + - api: RtlAddVectoredExceptionHandler + features: | + proc: sample.exe (ppid=2456, pid=3052) + thread: 3064 + call: 10: api(LdrGetDllHandle) + call: 11: api(LdrGetProcedureAddress) + call: 11: string(AddVectoredExceptionHandler) + call: 12: api(RtlAddVectoredExceptionHandler) + expect: + matches: + span-resolve-add-veh: + - 11 diff --git a/tests/fixtures/matcher/static/core.yml b/tests/fixtures/matcher/static/core.yml new file mode 100644 index 000000000..f32c68096 --- /dev/null +++ b/tests/fixtures/matcher/static/core.yml @@ -0,0 +1,200 @@ +- name: scope-boundary + description: function scope aggregates across basic blocks, but basic block scope does not + rules: + - meta: + name: function-cross-basic-block + description: should match when function scope aggregates features from different basic blocks + scopes: + static: function + features: + - and: + - mnemonic: mov + - mnemonic: add + - meta: + name: basic-block-cross-basic-block + description: should not match because no single basic block contains both mnemonics + scopes: + static: basic block + features: + - and: + - mnemonic: mov + - mnemonic: add + - meta: + name: instruction-single-mnemonic + description: should match once at the instruction that contains mov + scopes: + static: instruction + features: + - mnemonic: mov + features: | + func: 0x401000 + bb: 0x401000: basic block + insn: 0x401000: mnemonic(mov) + bb: 0x401010: basic block + insn: 0x401010: mnemonic(add) + expect: + matches: + function-cross-basic-block: + - 0x401000 + instruction-single-mnemonic: + - 0x401000 + +- name: combinators-and-count + description: and or not optional and count rules over a minimal function layout + rules: + - meta: + name: and-match + description: should match because the function contains both mov and number 0x10 + scopes: + static: function + features: + - and: + - mnemonic: mov + - number: 0x10 + - meta: + name: or-match + description: should match because one branch of the or is satisfied by number 0x10 + scopes: + static: function + features: + - or: + - api: CreateFileW + - number: 0x10 + - meta: + name: not-match + description: should match because mov is present and number 0x20 is absent + scopes: + static: function + features: + - and: + - mnemonic: mov + - not: + - number: 0x20 + - meta: + name: optional-match + description: should match even though the optional child is absent + scopes: + static: function + features: + - and: + - mnemonic: mov + - optional: + - number: 0x30 + - meta: + name: count-exact-match + description: should match because number 0x10 appears exactly twice + scopes: + static: function + features: + - count(number(0x10)): 2 + - meta: + name: count-range-match + description: should match because number 0x10 appears within the allowed range + scopes: + static: function + features: + - count(number(0x10)): (1, 2) + - meta: + name: count-negative-no-match + description: should not match because number 0x10 does not appear three times + scopes: + static: function + features: + - count(number(0x10)): 3 + features: | + func: 0x402000 + bb: 0x402000: basic block + insn: 0x402000: mnemonic(mov) + insn: 0x402000: number(0x10) + insn: 0x402002: number(0x10) + expect: + matches: + and-match: + - 0x402000 + or-match: + - 0x402000 + not-match: + - 0x402000 + optional-match: + - 0x402000 + count-exact-match: + - 0x402000 + count-range-match: + - 0x402000 + +- name: strings-and-regex + description: exact string, substring, regex, and anchored regex behavior over a single function + rules: + - meta: + name: exact-string-match + description: should match only the exact string hello world + scopes: + static: function + features: + - string: hello world + - meta: + name: substring-match + description: should match because abc appears within a larger string + scopes: + static: function + features: + - substring: abc + - meta: + name: regex-match + description: should match because the regex finds bbbb within abbbba + scopes: + static: function + features: + - string: /bbbb/ + - meta: + name: regex-ignorecase-match + description: should match because the regex is case-insensitive + scopes: + static: function + features: + - string: /BBBB/i + - meta: + name: regex-anchor-no-match + description: should not match because the anchored regex does not start at the beginning + scopes: + static: function + features: + - string: /^bbbb/ + features: | + func: 0x500000 + func: 0x500000: string(hello world) + func: 0x500000: string(zabczz) + func: 0x500000: string(abbbba) + func: 0x500000: string(aBBBBa) + expect: + matches: + exact-string-match: + - 0x500000 + substring-match: + - 0x500000 + regex-match: + - 0x500000 + regex-ignorecase-match: + - 0x500000 + +- name: instruction-feature-parsing + description: instruction lines must tolerate colons in feature text and hex literals containing e + rules: + - meta: + name: colon-string-and-hex-number + description: "should match when the parser keeps key: value inside the feature and parses 0x1e as an integer" + scopes: + static: function + features: + - and: + - string: "key: value" + - number: 0x1e + features: | + func: 0x600000 + bb: 0x600000: basic block + insn: 0x600000: number(0x1e) + insn: 0x600001: string(key: value) + expect: + matches: + colon-string-and-hex-number: + - 0x600000 diff --git a/tests/match_fixtures.py b/tests/match_fixtures.py new file mode 100644 index 000000000..f17fd726a --- /dev/null +++ b/tests/match_fixtures.py @@ -0,0 +1,813 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable + +import yaml + +import capa.rules +import capa.features.file +import capa.features.insn +import capa.features.common +import capa.features.address +import capa.features.basicblock +import capa.features.extractors.null +from capa.features.common import Feature +from capa.features.address import ( + NO_ADDRESS, + Address, + DNTokenAddress, + ProcessAddress, + ThreadAddress, + FileOffsetAddress, + DynamicCallAddress, + DNTokenOffsetAddress, + RelativeVirtualAddress, + AbsoluteVirtualAddress, +) +from capa.features.extractors.base_extractor import FeatureExtractor, SampleHashes + +DUMMY_SAMPLE_HASHES = SampleHashes.from_bytes(b"") +PROCESS_HEADER = re.compile(r"^(?P.+) \(ppid=(?P\d+), pid=(?P\d+)\)$") + + +@dataclass(frozen=True) +class MatchFixture: + path: Path + index: int + name: str + description: str + flavor: str + ruleset: capa.rules.RuleSet + extractor: FeatureExtractor + expected_matches: dict[str, list[Address]] + span_size: int | None + + +class StaticFeatureParser: + def __init__(self, base_address: Address): + self.base_address = base_address + self.global_features: list[Feature] = [] + self.file_features: list[tuple[Address, Feature]] = [] + self.functions: dict[ + Address, capa.features.extractors.null.FunctionFeatures + ] = {} + self.current_function: Address | None = None + self.current_basic_block: Address | None = None + + def parse( + self, source: Any + ) -> capa.features.extractors.null.NullStaticFeatureExtractor: + for line in _iter_feature_lines(source): + self.consume(line) + + return capa.features.extractors.null.NullStaticFeatureExtractor( + base_address=self.base_address, + sample_hashes=DUMMY_SAMPLE_HASHES, + global_features=self.global_features, + file_features=self.file_features, + functions=self.functions, + ) + + def consume(self, line: str) -> None: + if line.startswith("global:"): + self.consume_global(line) + elif line.startswith("file:"): + self.consume_file(line) + elif line.startswith("func:"): + self.consume_function(line) + elif line.startswith("bb:"): + self.consume_basic_block(line) + elif line.startswith("insn:"): + self.consume_instruction(line) + else: + raise ValueError(f"unsupported static feature line: {line}") + + def consume_global(self, line: str) -> None: + rest = _strip_prefix(line, "global:") + if rest.startswith("global: "): + rest = rest[len("global: ") :] + self.global_features.append(_parse_feature(rest)) + + def consume_file(self, line: str) -> None: + addr_text, feature_text, target_text = _split_feature_line( + _strip_prefix(line, "file:") + ) + if target_text is not None: + raise ValueError("file feature lines do not support relocated addresses") + self.file_features.append( + (_parse_static_address(addr_text), _parse_feature(feature_text)) + ) + + def consume_function(self, line: str) -> None: + rest = _strip_prefix(line, "func:") + if ": " not in rest: + function_address = _parse_static_address(rest) + self.ensure_function(function_address) + self.current_function = function_address + self.current_basic_block = None + return + + addr_text, feature_text, target_text = _split_feature_line(rest) + function_address = _parse_static_address(addr_text) + feature_address = ( + _parse_static_address(target_text) + if target_text is not None + else function_address + ) + self.ensure_function(function_address).features.append( + (feature_address, _parse_feature(feature_text)) + ) + self.current_function = function_address + self.current_basic_block = None + + def consume_basic_block(self, line: str) -> None: + if self.current_function is None: + raise ValueError(f"basic block line without current function: {line}") + + addr_text, feature_text, target_text = _split_feature_line( + _strip_prefix(line, "bb:") + ) + basic_block_address = _parse_static_address(addr_text) + feature_address = ( + _parse_static_address(target_text) + if target_text is not None + else basic_block_address + ) + self.ensure_basic_block( + self.current_function, basic_block_address + ).features.append((feature_address, _parse_feature(feature_text))) + self.current_basic_block = basic_block_address + + def consume_instruction(self, line: str) -> None: + if self.current_function is None or self.current_basic_block is None: + raise ValueError(f"instruction line without current basic block: {line}") + + rest, target_text = _split_target(_strip_prefix(line, "insn:")) + instruction_address, feature_text = _split_instruction_feature_line( + rest, + self.current_function, + line, + ) + + feature_address = ( + _parse_static_address(target_text) + if target_text is not None + else instruction_address + ) + basic_block = self.ensure_basic_block( + self.current_function, self.current_basic_block + ) + instruction = basic_block.instructions.get(instruction_address) + if instruction is None: + instruction = capa.features.extractors.null.InstructionFeatures(features=[]) + basic_block.instructions[instruction_address] = instruction + instruction.features.append((feature_address, _parse_feature(feature_text))) + + def ensure_function( + self, address: Address + ) -> capa.features.extractors.null.FunctionFeatures: + function = self.functions.get(address) + if function is None: + function = capa.features.extractors.null.FunctionFeatures( + features=[], basic_blocks={} + ) + self.functions[address] = function + return function + + def ensure_basic_block( + self, function_address: Address, basic_block_address: Address + ) -> capa.features.extractors.null.BasicBlockFeatures: + function = self.ensure_function(function_address) + basic_block = function.basic_blocks.get(basic_block_address) + if basic_block is None: + basic_block = capa.features.extractors.null.BasicBlockFeatures( + features=[], instructions={} + ) + function.basic_blocks[basic_block_address] = basic_block + return basic_block + + +class DynamicFeatureParser: + def __init__(self): + self.global_features: list[Feature] = [] + self.file_features: list[tuple[Address, Feature]] = [] + self.processes: dict[ + Address, capa.features.extractors.null.ProcessFeatures + ] = {} + self.calls_by_id: dict[int, DynamicCallAddress] = {} + self.current_process: ProcessAddress | None = None + self.current_thread: ThreadAddress | None = None + + def parse( + self, source: Any + ) -> capa.features.extractors.null.NullDynamicFeatureExtractor: + for line in _iter_feature_lines(source): + self.consume(line) + + return capa.features.extractors.null.NullDynamicFeatureExtractor( + base_address=NO_ADDRESS, + sample_hashes=DUMMY_SAMPLE_HASHES, + global_features=self.global_features, + file_features=self.file_features, + processes=self.processes, + ) + + def consume(self, line: str) -> None: + if line.startswith("global:"): + self.consume_global(line) + elif line.startswith("file:"): + self.consume_file(line) + elif line.startswith("proc:"): + self.consume_process(line) + elif line.startswith("thread:"): + self.consume_thread(line) + elif line.startswith("call:"): + self.consume_call(line) + else: + raise ValueError(f"unsupported dynamic feature line: {line}") + + def consume_global(self, line: str) -> None: + rest = _strip_prefix(line, "global:") + if rest.startswith("global: "): + rest = rest[len("global: ") :] + self.global_features.append(_parse_feature(rest)) + + def consume_file(self, line: str) -> None: + addr_text, feature_text, target_text = _split_feature_line( + _strip_prefix(line, "file:") + ) + if target_text is not None: + raise ValueError("file feature lines do not support relocated addresses") + self.file_features.append( + (_parse_address(addr_text), _parse_feature(feature_text)) + ) + + def consume_process(self, line: str) -> None: + rest = _strip_prefix(line, "proc:") + header = PROCESS_HEADER.fullmatch(rest) + if header is not None: + process_address = ProcessAddress( + ppid=int(header.group("ppid")), pid=int(header.group("pid")) + ) + self.ensure_process(process_address, header.group("name")) + self.current_process = process_address + self.current_thread = None + return + + if self.current_process is None: + raise ValueError(f"process feature line without current process: {line}") + + name, feature_text, target_text = _split_feature_line(rest) + process = self.ensure_process(self.current_process) + if process.name != name: + raise ValueError( + f"process feature line does not match current process: {line}" + ) + feature_address = ( + _parse_address(target_text) + if target_text is not None + else self.current_process + ) + process.features.append((feature_address, _parse_feature(feature_text))) + + def consume_thread(self, line: str) -> None: + if self.current_process is None: + raise ValueError(f"thread line without current process: {line}") + + rest = _strip_prefix(line, "thread:") + if ": " not in rest: + thread_address = ThreadAddress( + process=self.current_process, tid=int(rest, 0) + ) + self.ensure_thread(thread_address) + self.current_thread = thread_address + return + + tid_text, feature_text, target_text = _split_feature_line(rest) + thread_address = ThreadAddress( + process=self.current_process, tid=int(tid_text, 0) + ) + thread = self.ensure_thread(thread_address) + feature_address = ( + _parse_address(target_text) if target_text is not None else thread_address + ) + thread.features.append((feature_address, _parse_feature(feature_text))) + self.current_thread = thread_address + + def consume_call(self, line: str) -> None: + if self.current_thread is None: + raise ValueError(f"call line without current thread: {line}") + + call_id_text, feature_text, target_text = _split_feature_line( + _strip_prefix(line, "call:") + ) + call_address = DynamicCallAddress( + thread=self.current_thread, id=int(call_id_text, 0) + ) + call = self.ensure_call(call_address) + feature_address = ( + _parse_address(target_text) if target_text is not None else call_address + ) + call.features.append((feature_address, _parse_feature(feature_text))) + + def ensure_process( + self, address: ProcessAddress, name: str | None = None + ) -> capa.features.extractors.null.ProcessFeatures: + process = self.processes.get(address) + if process is None: + process = capa.features.extractors.null.ProcessFeatures( + name=name or f"process-{address.pid}", + features=[], + threads={}, + ) + self.processes[address] = process + elif name is not None: + process.name = name + return process + + def ensure_thread( + self, address: ThreadAddress + ) -> capa.features.extractors.null.ThreadFeatures: + process = self.ensure_process(address.process) + thread = process.threads.get(address) + if thread is None: + thread = capa.features.extractors.null.ThreadFeatures(features=[], calls={}) + process.threads[address] = thread + return thread + + def ensure_call( + self, address: DynamicCallAddress + ) -> capa.features.extractors.null.CallFeatures: + existing = self.calls_by_id.get(address.id) + if existing is not None and existing != address: + raise ValueError( + f"dynamic fixture call IDs must be unique within a test: {address.id}" + ) + + self.calls_by_id[address.id] = address + + thread = self.ensure_thread(address.thread) + call = thread.calls.get(address) + if call is None: + call = capa.features.extractors.null.CallFeatures( + name=f"call-{address.id}", features=[] + ) + thread.calls[address] = call + return call + + +def load_fixtures(path: Path) -> list[MatchFixture]: + doc = yaml.safe_load(path.read_text()) + fixture_docs = _get_fixture_docs(path, doc) + fixtures: list[MatchFixture] = [] + + for index, fixture_doc in enumerate(fixture_docs, start=1): + flavor = _get_fixture_flavor(path, fixture_doc) + span_size = _load_span_size(fixture_doc) + + if flavor == "static": + static_parser = StaticFeatureParser( + _parse_static_address(fixture_doc.get("base address", 0)) + ) + extractor = static_parser.parse(fixture_doc.get("features", "")) + expected_matches = _load_expected_matches(fixture_doc, flavor) + elif flavor == "dynamic": + dynamic_parser = DynamicFeatureParser() + extractor = dynamic_parser.parse(fixture_doc.get("features", "")) + expected_matches = _load_expected_matches( + fixture_doc, + flavor, + dynamic_parser=dynamic_parser, + ) + else: + raise ValueError(f"unsupported fixture flavor: {flavor}") + + ruleset = _load_ruleset(path, fixture_doc, flavor) + + fixtures.append( + MatchFixture( + path=path, + index=index, + name=str(fixture_doc.get("name", f"{path.stem}-{index}")), + description=str(fixture_doc.get("description", "")), + flavor=flavor, + ruleset=ruleset, + extractor=extractor, + expected_matches=expected_matches, + span_size=span_size, + ) + ) + + return fixtures + + +def load_fixture(path: Path) -> MatchFixture: + fixtures = load_fixtures(path) + if len(fixtures) != 1: + raise ValueError(f"fixture file contains {len(fixtures)} tests: {path}") + return fixtures[0] + + +def render_matches( + fixture: MatchFixture, matches: dict[str, Any] +) -> dict[str, list[Address]]: + return { + rule_name: [address for address, _ in results] + for rule_name, results in matches.items() + if rule_name in fixture.ruleset + and not fixture.ruleset[rule_name].is_subscope_rule() + } + + +def _get_fixture_docs(path: Path, doc: Any) -> list[dict[str, Any]]: + if isinstance(doc, list): + fixture_docs = doc + elif isinstance(doc, dict) and isinstance(doc.get("tests"), list): + fixture_docs = doc["tests"] + elif isinstance(doc, dict): + fixture_docs = [doc] + else: + raise ValueError(f"fixture file must contain a mapping or list: {path}") + + for fixture_doc in fixture_docs: + if not isinstance(fixture_doc, dict): + raise ValueError(f"fixture test must be a mapping: {path}") + + return fixture_docs + + +def _get_fixture_flavor(path: Path, doc: dict[str, Any]) -> str: + explicit = doc.get("flavor") + inferred = next( + (part for part in reversed(path.parts) if part in {"static", "dynamic"}), + None, + ) + + if explicit is None: + if inferred is None: + raise ValueError(f"fixture flavor could not be inferred from path: {path}") + return inferred + + if not isinstance(explicit, str): + raise ValueError("fixture flavor must be a string") + + if inferred is not None and explicit != inferred: + raise ValueError( + f"fixture flavor {explicit!r} does not match file location {inferred!r}: {path}" + ) + + return explicit + + +def _normalize_rule_doc(rule_doc: dict[str, Any], flavor: str) -> dict[str, Any]: + meta = rule_doc.setdefault("meta", {}) + if not isinstance(meta, dict): + raise ValueError("rule meta must be a mapping") + + scopes = meta.setdefault("scopes", {}) + if not isinstance(scopes, dict): + raise ValueError("rule scopes must be a mapping") + + if flavor == "static": + scopes.setdefault("dynamic", "unsupported") + elif flavor == "dynamic": + scopes.setdefault("static", "unsupported") + + return rule_doc + + +def _load_ruleset(path: Path, doc: dict[str, Any], flavor: str) -> capa.rules.RuleSet: + rules: list[capa.rules.Rule] = [] + for rule_doc in doc.get("rules", []): + if not isinstance(rule_doc, dict): + raise ValueError(f"rule must be a mapping: {path}") + wrapped = {"rule": _normalize_rule_doc(rule_doc, flavor)} + definition = yaml.safe_dump(wrapped, sort_keys=False) + rules.append(capa.rules.Rule.from_dict(wrapped, definition)) + return capa.rules.RuleSet(rules) + + +def _load_expected_matches( + doc: dict[str, Any], + flavor: str, + dynamic_parser: DynamicFeatureParser | None = None, +) -> dict[str, list[Address]]: + expect = doc.get("expect", {}) + if not isinstance(expect, dict): + raise ValueError("fixture expect must be a mapping") + + matches = expect.get("matches", {}) + if not isinstance(matches, dict): + raise ValueError("fixture expect.matches must be a mapping") + + return { + rule_name: [ + _parse_expected_address(spec, flavor, dynamic_parser) for spec in locations + ] + for rule_name, locations in matches.items() + } + + +def _parse_expected_address( + spec: Any, + flavor: str, + dynamic_parser: DynamicFeatureParser | None = None, +) -> Address: + if flavor == "dynamic" and dynamic_parser is not None: + if isinstance(spec, int) and spec in dynamic_parser.calls_by_id: + return dynamic_parser.calls_by_id[spec] + + if isinstance(spec, str): + call_id = re.fullmatch(r"call\((\d+)\)", spec) + if call_id is not None: + call_address = dynamic_parser.calls_by_id.get(int(call_id.group(1))) + if call_address is None: + raise ValueError(f"unknown dynamic fixture call ID: {spec}") + return call_address + + return _parse_address(spec) + + +def _load_span_size(doc: dict[str, Any]) -> int | None: + options = doc.get("options", {}) + if not isinstance(options, dict): + raise ValueError("fixture options must be a mapping") + + span_size = options.get("span size") + if span_size is None: + return None + if not isinstance(span_size, int): + raise ValueError("fixture options.span size must be an integer") + return span_size + + +def _iter_feature_lines(source: Any) -> Iterable[str]: + if isinstance(source, str): + lines = source.splitlines() + elif isinstance(source, list): + lines = source + else: + raise ValueError("fixture features must be a block string or list of strings") + + for line in lines: + if not isinstance(line, str): + raise ValueError("fixture feature lines must be strings") + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + yield stripped + + +def _split_feature_line(text: str) -> tuple[str, str, str | None]: + body, target = _split_target(text) + scope_text, separator, feature_text = body.partition(": ") + if not separator: + raise ValueError(f"expected ': ': {text}") + return scope_text, feature_text, target + + +def _split_instruction_feature_line( + text: str, + current_function: Address, + line: str, +) -> tuple[Address, str]: + addr1_text, separator, remainder = text.partition(": ") + if not separator: + raise ValueError(f"unsupported instruction feature line: {line}") + + addr2_text, separator, feature_text = remainder.partition(": ") + if separator: + try: + function_address = _parse_static_address(addr1_text) + instruction_address = _parse_static_address(addr2_text) + except ValueError: + return _parse_static_address(addr1_text), remainder + + if function_address != current_function: + raise ValueError( + f"instruction line changed function without a function header: {line}" + ) + + return instruction_address, feature_text + + return _parse_static_address(addr1_text), remainder + + +def _split_target(text: str) -> tuple[str, str | None]: + if " -> " not in text: + return text, None + return text.rsplit(" -> ", 1) + + +def _parse_feature(text: str) -> Feature: + text = text.strip() + if text == "basic block": + return capa.features.basicblock.BasicBlock() + + operand_number = re.fullmatch(r"operand\[(\d+)\]\.number\((.*)\)", text) + if operand_number: + return capa.features.insn.OperandNumber( + int(operand_number.group(1)), + _parse_number_literal(operand_number.group(2)), + ) + + operand_offset = re.fullmatch(r"operand\[(\d+)\]\.offset\((.*)\)", text) + if operand_offset: + return capa.features.insn.OperandOffset( + int(operand_offset.group(1)), + _parse_int_literal(operand_offset.group(2)), + ) + + property_ = re.fullmatch(r"property(?:/(read|write))?\((.*)\)", text) + if property_: + return capa.features.insn.Property( + _strip_quotes(property_.group(2).strip()), + access=property_.group(1), + ) + + feature = re.fullmatch(r"([a-z][a-z0-9\- ]*)\((.*)\)", text) + if feature is None: + raise ValueError(f"unsupported feature syntax: {text}") + + name = feature.group(1) + value = _strip_quotes(feature.group(2).strip()) + + if name == "api": + return capa.features.insn.API(value) + if name == "arch": + return capa.features.common.Arch(value) + if name == "bytes": + return capa.features.common.Bytes(bytes.fromhex(value.replace(" ", ""))) + if name == "characteristic": + return capa.features.common.Characteristic(value) + if name == "class": + return capa.features.common.Class(value) + if name == "export": + return capa.features.file.Export(value) + if name == "format": + return capa.features.common.Format(value) + if name in ("function-name", "function name"): + return capa.features.file.FunctionName(value) + if name == "import": + return capa.features.file.Import(value) + if name == "match": + return capa.features.common.MatchedRule(value) + if name == "mnemonic": + return capa.features.insn.Mnemonic(value) + if name == "namespace": + return capa.features.common.Namespace(value) + if name == "number": + return capa.features.insn.Number(_parse_number_literal(value)) + if name == "offset": + return capa.features.insn.Offset(_parse_int_literal(value)) + if name == "os": + return capa.features.common.OS(value) + if name == "section": + return capa.features.file.Section(value) + if name == "string": + return capa.features.common.String(value) + if name == "substring": + return capa.features.common.Substring(value) + + raise ValueError(f"unsupported feature type: {name}") + + +def _parse_number_literal(value: str) -> int | float: + value = value.strip() + if _looks_like_hex_literal(value): + return int(value, 0) + if any(character in value for character in ".eE"): + return float(value) + return int(value, 0) + + +def _looks_like_hex_literal(value: str) -> bool: + return value.lstrip("+-").lower().startswith("0x") + + +def _parse_int_literal(value: str) -> int: + return int(value, 0) + + +def _parse_static_address(spec: Any) -> Address: + address = _parse_address(spec) + if isinstance(address, (ProcessAddress, ThreadAddress, DynamicCallAddress)): + raise ValueError(f"expected a static address, got {spec!r}") + return address + + +def _parse_address(spec: Any) -> Address: + if spec is None: + return NO_ADDRESS + + if isinstance(spec, int): + return AbsoluteVirtualAddress(spec) + + if isinstance(spec, list): + if not spec: + raise ValueError(f"unsupported address: {spec!r}") + + kind = spec[0] + if kind == "absolute": + return AbsoluteVirtualAddress(_coerce_int(spec[1])) + if kind == "relative": + return RelativeVirtualAddress(_coerce_int(spec[1])) + if kind == "file": + return FileOffsetAddress(_coerce_int(spec[1])) + if kind == "token": + return DNTokenAddress(_coerce_int(spec[1])) + if kind == "token offset": + return DNTokenOffsetAddress(_coerce_int(spec[1]), _coerce_int(spec[2])) + if kind == "process": + return ProcessAddress(ppid=int(spec[1]), pid=int(spec[2])) + if kind == "thread": + return ThreadAddress( + process=ProcessAddress(ppid=int(spec[1]), pid=int(spec[2])), + tid=int(spec[3]), + ) + if kind == "call": + return DynamicCallAddress( + thread=ThreadAddress( + process=ProcessAddress(ppid=int(spec[1]), pid=int(spec[2])), + tid=int(spec[3]), + ), + id=int(spec[4]), + ) + if kind == "no address": + return NO_ADDRESS + raise ValueError(f"unsupported address type: {kind}") + + if not isinstance(spec, str): + raise ValueError(f"unsupported address: {spec!r}") + + if spec in {"global", "no address"}: + return NO_ADDRESS + if spec.startswith("base address+"): + return RelativeVirtualAddress(_coerce_int(spec[len("base address+") :])) + if spec.startswith("file+"): + return FileOffsetAddress(_coerce_int(spec[len("file+") :])) + if token_offset := re.fullmatch(r"token\((.+)\)\+(.+)", spec): + return DNTokenOffsetAddress( + _coerce_int(token_offset.group(1)), _coerce_int(token_offset.group(2)) + ) + if token := re.fullmatch(r"token\((.+)\)", spec): + return DNTokenAddress(_coerce_int(token.group(1))) + if process := re.fullmatch(r"process\{ppid:(\d+),pid:(\d+)\}", spec): + return ProcessAddress(ppid=int(process.group(1)), pid=int(process.group(2))) + if process := re.fullmatch(r"process\{pid:(\d+)\}", spec): + return ProcessAddress(pid=int(process.group(1))) + if thread := re.fullmatch(r"process\{ppid:(\d+),pid:(\d+),tid:(\d+)\}", spec): + return ThreadAddress( + process=ProcessAddress(ppid=int(thread.group(1)), pid=int(thread.group(2))), + tid=int(thread.group(3)), + ) + if thread := re.fullmatch(r"process\{pid:(\d+),tid:(\d+)\}", spec): + return ThreadAddress( + process=ProcessAddress(pid=int(thread.group(1))), tid=int(thread.group(2)) + ) + if call := re.fullmatch( + r"process\{ppid:(\d+),pid:(\d+),tid:(\d+),call:(\d+)\}", spec + ): + return DynamicCallAddress( + thread=ThreadAddress( + process=ProcessAddress(ppid=int(call.group(1)), pid=int(call.group(2))), + tid=int(call.group(3)), + ), + id=int(call.group(4)), + ) + if call := re.fullmatch(r"process\{pid:(\d+),tid:(\d+),call:(\d+)\}", spec): + return DynamicCallAddress( + thread=ThreadAddress( + process=ProcessAddress(pid=int(call.group(1))), tid=int(call.group(2)) + ), + id=int(call.group(3)), + ) + return AbsoluteVirtualAddress(_coerce_int(spec)) + + +def _coerce_int(value: Any) -> int: + if isinstance(value, int): + return value + if isinstance(value, str): + return int(value, 0) + raise ValueError(f"expected integer value: {value!r}") + + +def _require_string(doc: dict[str, Any], key: str) -> str: + value = doc.get(key) + if not isinstance(value, str): + raise ValueError(f"expected string for {key}") + return value + + +def _strip_prefix(text: str, prefix: str) -> str: + return text[len(prefix) :].strip() + + +def _strip_quotes(value: str) -> str: + if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}: + return value[1:-1] + return value diff --git a/tests/test_match_fixtures.py b/tests/test_match_fixtures.py new file mode 100644 index 000000000..c9bd3c850 --- /dev/null +++ b/tests/test_match_fixtures.py @@ -0,0 +1,37 @@ +from pathlib import Path + +import pytest + +import capa.capabilities.common +import capa.capabilities.dynamic +import match_fixtures + +FIXTURE_DIR = Path(__file__).parent / "fixtures" / "matcher" +FIXTURE_PATHS = sorted( + path for path in FIXTURE_DIR.rglob("*") if path.suffix in {".json", ".yml", ".yaml"} +) +FIXTURES = [ + fixture for path in FIXTURE_PATHS for fixture in match_fixtures.load_fixtures(path) +] +FIXTURE_IDS = [ + f"{fixture.path.relative_to(FIXTURE_DIR)}[{fixture.index}]::{fixture.name}" + for fixture in FIXTURES +] + + +@pytest.mark.parametrize("fixture", FIXTURES, ids=FIXTURE_IDS) +def test_match_fixture(fixture: match_fixtures.MatchFixture): + with pytest.MonkeyPatch.context() as patch: + if fixture.span_size is not None: + patch.setattr(capa.capabilities.dynamic, "SPAN_SIZE", fixture.span_size) + + capabilities = capa.capabilities.common.find_capabilities( + fixture.ruleset, + fixture.extractor, + disable_progress=True, + ) + + assert ( + match_fixtures.render_matches(fixture, capabilities.matches) + == fixture.expected_matches + )