diff --git a/capa/features/address.py b/capa/features/address.py index 31b5d8203..7f561436a 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -13,6 +13,7 @@ # limitations under the License. import abc +from typing import Optional class Address(abc.ABC): @@ -52,51 +53,59 @@ def __hash__(self): class ProcessAddress(Address): """an address of a process in a dynamic execution trace""" - def __init__(self, pid: int, ppid: int = 0): + def __init__(self, pid: int, ppid: int = 0, id: Optional[int] = None): assert ppid >= 0 assert pid > 0 self.ppid = ppid self.pid = pid + self.id = id def __repr__(self): - return "process(%s%s)" % ( + s = "process(%s%s%s)" % ( f"ppid: {self.ppid}, " if self.ppid > 0 else "", f"pid: {self.pid}", + f", id: {self.id}" if self.id is not None else "", ) + return s def __hash__(self): - return hash((self.ppid, self.pid)) + return hash((self.ppid, self.pid, self.id)) def __eq__(self, other): assert isinstance(other, ProcessAddress) - return (self.ppid, self.pid) == (other.ppid, other.pid) + return (self.ppid, self.pid, self.id) == (other.ppid, other.pid, other.id) def __lt__(self, other): assert isinstance(other, ProcessAddress) - return (self.ppid, self.pid) < (other.ppid, other.pid) + self_id = self.id if self.id is not None else -1 + other_id = other.id if other.id is not None else -1 + return (self.ppid, self.pid, self_id) < (other.ppid, other.pid, other_id) class ThreadAddress(Address): """addresses a thread in a dynamic execution trace""" - def __init__(self, process: ProcessAddress, tid: int): + def __init__(self, process: ProcessAddress, tid: int, id: Optional[int] = None): assert tid >= 0 self.process = process self.tid = tid + self.id = id def __repr__(self): - return f"{self.process}, thread(tid: {self.tid})" + return f"{self.process}, thread(tid: {self.tid}{f', id: {self.id}' if self.id is not None else ''})" def __hash__(self): - return hash((self.process, self.tid)) + return hash((self.process, self.tid, self.id)) def __eq__(self, other): assert isinstance(other, ThreadAddress) - return (self.process, self.tid) == (other.process, other.tid) + return (self.process, self.tid, self.id) == (other.process, other.tid, other.id) def __lt__(self, other): assert isinstance(other, ThreadAddress) - return (self.process, self.tid) < (other.process, other.tid) + self_id = self.id if self.id is not None else -1 + other_id = other.id if other.id is not None else -1 + return (self.process, self.tid, self_id) < (other.process, other.tid, other_id) class DynamicCallAddress(Address): @@ -114,7 +123,10 @@ def __hash__(self): return hash((self.thread, self.id)) def __eq__(self, other): - return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (other.thread, other.id) + return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == ( + other.thread, + other.id, + ) def __lt__(self, other): assert isinstance(other, DynamicCallAddress) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 36c205195..50182e6e8 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -16,12 +16,12 @@ import logging from typing import Iterator -from capa.features.file import Export, Import, Section -from capa.features.common import String, Feature -from capa.features.address import NO_ADDRESS, Address, ProcessAddress, AbsoluteVirtualAddress -from capa.features.extractors.helpers import generate_symbols -from capa.features.extractors.cape.models import CapeReport +from capa.features.address import NO_ADDRESS, AbsoluteVirtualAddress, Address, ProcessAddress +from capa.features.common import Feature, String from capa.features.extractors.base_extractor import ProcessHandle +from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.helpers import generate_symbols +from capa.features.file import Export, Import, Section logger = logging.getLogger(__name__) @@ -30,22 +30,32 @@ def get_processes(report: CapeReport) -> Iterator[ProcessHandle]: """ get all the created processes for a sample """ - seen_processes = {} + counts: dict[tuple[int, int], int] = {} + for process in report.behavior.processes: + key = (process.parent_id, process.process_id) + counts[key] = counts.get(key, 0) + 1 + + seen_processes: dict[tuple[int, int], list] = {} + seq: dict[tuple[int, int], int] = {} for process in report.behavior.processes: - addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id) + key = (process.parent_id, process.process_id) + seq[key] = seq.get(key, 0) + 1 + process_id = seq[key] - 1 if counts[key] > 1 else None + + addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id, id=process_id) yield ProcessHandle(address=addr, inner=process) # check for pid and ppid reuse - if addr not in seen_processes: - seen_processes[addr] = [process] + if key not in seen_processes: + seen_processes[key] = [process] else: logger.warning( "pid and ppid reuse detected between process %s and process%s: %s", process, - "es" if len(seen_processes[addr]) > 1 else "", - seen_processes[addr], + "es" if len(seen_processes[key]) > 1 else "", + seen_processes[key], ) - seen_processes[addr].append(process) + seen_processes[key].append(process) def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py index fb6cac8c6..a46a47325 100644 --- a/capa/features/extractors/cape/process.py +++ b/capa/features/extractors/cape/process.py @@ -16,10 +16,10 @@ import logging from typing import Iterator -from capa.features.common import String, Feature from capa.features.address import Address, ThreadAddress +from capa.features.common import Feature, String +from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle from capa.features.extractors.cape.models import Process -from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) @@ -31,8 +31,21 @@ def get_threads(ph: ProcessHandle) -> Iterator[ThreadHandle]: process: Process = ph.inner threads: list[int] = process.threads - for thread in threads: - address: ThreadAddress = ThreadAddress(process=ph.address, tid=thread) + counts: dict[int, int] = {} + for tid in threads: + counts[tid] = counts.get(tid, 0) + 1 + + seq: dict[int, int] = {} + warned_tids: set[int] = set() + for tid in threads: + if counts[tid] > 1 and tid not in warned_tids: + logger.warning("tid reuse detected for tid %d in process %s", tid, ph.address) + warned_tids.add(tid) + + seq[tid] = seq.get(tid, 0) + 1 + thread_id = seq[tid] - 1 if counts[tid] > 1 else None + + address: ThreadAddress = ThreadAddress(process=ph.address, tid=tid, id=thread_id) yield ThreadHandle(address=address, inner={}) diff --git a/capa/features/extractors/vmray/extractor.py b/capa/features/extractors/vmray/extractor.py index 27eeed481..531a78b07 100644 --- a/capa/features/extractors/vmray/extractor.py +++ b/capa/features/extractors/vmray/extractor.py @@ -13,31 +13,31 @@ # limitations under the License. import logging -from typing import Iterator from pathlib import Path +from typing import Iterator -import capa.helpers import capa.features.extractors.vmray.call import capa.features.extractors.vmray.file import capa.features.extractors.vmray.global_ -from capa.features.common import Feature +import capa.helpers from capa.features.address import ( NO_ADDRESS, + AbsoluteVirtualAddress, Address, - ThreadAddress, - ProcessAddress, DynamicCallAddress, - AbsoluteVirtualAddress, + ProcessAddress, + ThreadAddress, ) -from capa.features.extractors.vmray import VMRayAnalysis, VMRayMonitorThread, VMRayMonitorProcess -from capa.features.extractors.vmray.models import PARAM_TYPE_STR, ParamList, FunctionCall +from capa.features.common import Feature from capa.features.extractors.base_extractor import ( CallHandle, + DynamicFeatureExtractor, + ProcessHandle, SampleHashes, ThreadHandle, - ProcessHandle, - DynamicFeatureExtractor, ) +from capa.features.extractors.vmray import VMRayAnalysis, VMRayMonitorProcess, VMRayMonitorThread +from capa.features.extractors.vmray.models import PARAM_TYPE_STR, FunctionCall, ParamList logger = logging.getLogger(__name__) @@ -99,7 +99,11 @@ def get_processes(self) -> Iterator[ProcessHandle]: ) continue - address: ProcessAddress = ProcessAddress(pid=monitor_process.pid, ppid=monitor_process.ppid) + address: ProcessAddress = ProcessAddress( + pid=monitor_process.pid, + ppid=monitor_process.ppid, + id=monitor_process.monitor_id, + ) yield ProcessHandle(address, inner=monitor_process) def extract_process_features(self, ph: ProcessHandle) -> Iterator[tuple[Feature, Address]]: @@ -114,7 +118,11 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: for monitor_thread_id in self.analysis.monitor_threads_by_monitor_process[ph.inner.monitor_id]: monitor_thread: VMRayMonitorThread = self.analysis.monitor_threads[monitor_thread_id] - address: ThreadAddress = ThreadAddress(process=ph.address, tid=monitor_thread.tid) + address: ThreadAddress = ThreadAddress( + process=ph.address, + tid=monitor_thread.tid, + id=monitor_thread.monitor_id, + ) yield ThreadHandle(address=address, inner=monitor_thread) def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 4bfd417be..2c40dc824 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -17,29 +17,29 @@ """ import json -import zlib import logging +import zlib from enum import Enum -from typing import Union, Literal, TypeAlias +from typing import Literal, TypeAlias, Union -from pydantic import Field, BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, Field -import capa.helpers -import capa.version -import capa.features.file -import capa.features.insn -import capa.features.common import capa.features.address import capa.features.basicblock +import capa.features.common import capa.features.extractors.null as null -from capa.helpers import assert_never -from capa.features.freeze.features import Feature, feature_from_capa +import capa.features.file +import capa.features.insn +import capa.helpers +import capa.version from capa.features.extractors.base_extractor import ( - SampleHashes, + DynamicFeatureExtractor, FeatureExtractor, + SampleHashes, StaticFeatureExtractor, - DynamicFeatureExtractor, ) +from capa.features.freeze.features import Feature, feature_from_capa +from capa.helpers import assert_never logger = logging.getLogger(__name__) @@ -91,13 +91,49 @@ def from_capa(cls, a: capa.features.address.Address) -> "Address": return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset)) elif isinstance(a, capa.features.address.ProcessAddress): - return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid)) + if a.id is None: + return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid)) + return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid, a.id)) elif isinstance(a, capa.features.address.ThreadAddress): - return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid)) + if a.process.id is None and a.id is None: + return cls( + type=AddressType.THREAD, + value=(a.process.ppid, a.process.pid, a.tid), + ) + return cls( + type=AddressType.THREAD, + value=( + a.process.ppid, + a.process.pid, + a.tid, + a.process.id if a.process.id is not None else -1, + a.id if a.id is not None else -1, + ), + ) elif isinstance(a, capa.features.address.DynamicCallAddress): - return cls(type=AddressType.CALL, value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id)) + if a.thread.process.id is None and a.thread.id is None: + return cls( + type=AddressType.CALL, + value=( + a.thread.process.ppid, + a.thread.process.pid, + a.thread.tid, + a.id, + ), + ) + return cls( + type=AddressType.CALL, + value=( + a.thread.process.ppid, + a.thread.process.pid, + a.thread.tid, + a.id, + a.thread.process.id if a.thread.process.id is not None else -1, + a.thread.id if a.thread.id is not None else -1, + ), + ) elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress): return cls(type=AddressType.NO_ADDRESS, value=None) @@ -137,30 +173,77 @@ def to_capa(self) -> capa.features.address.Address: elif self.type is AddressType.PROCESS: assert isinstance(self.value, tuple) - ppid, pid = self.value - assert isinstance(ppid, int) - assert isinstance(pid, int) - return capa.features.address.ProcessAddress(ppid=ppid, pid=pid) + if len(self.value) == 2: + ppid, pid = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + return capa.features.address.ProcessAddress(ppid=ppid, pid=pid) + elif len(self.value) == 3: + ppid, pid, process_id = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(process_id, int) + return capa.features.address.ProcessAddress( + ppid=ppid, pid=pid, id=process_id if process_id >= 0 else None + ) + else: + raise ValueError(f"invalid process address tuple shape: {self.value!r}") elif self.type is AddressType.THREAD: assert isinstance(self.value, tuple) - ppid, pid, tid = self.value - assert isinstance(ppid, int) - assert isinstance(pid, int) - assert isinstance(tid, int) - return capa.features.address.ThreadAddress( - process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid - ) + if len(self.value) == 3: + ppid, pid, tid = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + return capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), + tid=tid, + ) + elif len(self.value) == 5: + ppid, pid, tid, process_id, thread_id = self.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + assert isinstance(process_id, int) + assert isinstance(thread_id, int) + return capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress( + ppid=ppid, pid=pid, id=process_id if process_id >= 0 else None + ), + tid=tid, + id=thread_id if thread_id >= 0 else None, + ) + else: + raise ValueError(f"invalid thread address tuple shape: {self.value!r}") elif self.type is AddressType.CALL: assert isinstance(self.value, tuple) - ppid, pid, tid, id_ = self.value - return capa.features.address.DynamicCallAddress( - thread=capa.features.address.ThreadAddress( - process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid - ), - id=id_, - ) + if len(self.value) == 4: + ppid, pid, tid, id_ = self.value + return capa.features.address.DynamicCallAddress( + thread=capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), + tid=tid, + ), + id=id_, + ) + elif len(self.value) == 6: + ppid, pid, tid, id_, process_id, thread_id = self.value + return capa.features.address.DynamicCallAddress( + thread=capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress( + ppid=ppid, + pid=pid, + id=process_id if process_id >= 0 else None, + ), + tid=tid, + id=thread_id if thread_id >= 0 else None, + ), + id=id_, + ) + else: + raise ValueError(f"invalid call address tuple shape: {self.value!r}") elif self.type is AddressType.NO_ADDRESS: return capa.features.address.NO_ADDRESS @@ -680,8 +763,8 @@ def load(buf: bytes): def main(argv=None): - import sys import argparse + import sys from pathlib import Path import capa.main diff --git a/capa/loader.py b/capa/loader.py index 939680ab7..ba5a29c77 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -12,48 +12,48 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import logging -import datetime import contextlib -from typing import Optional +import datetime +import logging +import os from pathlib import Path +from typing import Optional from rich.console import Console from typing_extensions import assert_never -import capa.rules -import capa.version import capa.features.common -import capa.features.freeze as frz import capa.features.extractors -import capa.render.result_document as rdoc import capa.features.extractors.common -from capa.rules import RuleSet +import capa.features.freeze as frz +import capa.render.result_document as rdoc +import capa.rules +import capa.version +from capa.capabilities.common import Capabilities from capa.engine import MatchResults -from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError +from capa.exceptions import UnsupportedArchError, UnsupportedFormatError, UnsupportedOSError +from capa.features.address import Address from capa.features.common import ( - OS_AUTO, - FORMAT_PE, - FORMAT_ELF, FORMAT_AUTO, + FORMAT_BINEXPORT2, + FORMAT_BINJA_DB, FORMAT_CAPE, + FORMAT_DOTNET, + FORMAT_DRAKVUF, + FORMAT_ELF, + FORMAT_PE, FORMAT_SC32, FORMAT_SC64, FORMAT_VMRAY, - FORMAT_DOTNET, - FORMAT_DRAKVUF, - FORMAT_BINJA_DB, - FORMAT_BINEXPORT2, + OS_AUTO, ) -from capa.features.address import Address -from capa.capabilities.common import Capabilities from capa.features.extractors.base_extractor import ( - SampleHashes, + DynamicFeatureExtractor, FeatureExtractor, + SampleHashes, StaticFeatureExtractor, - DynamicFeatureExtractor, ) +from capa.rules import RuleSet logger = logging.getLogger(__name__) @@ -388,8 +388,8 @@ def get_extractor( if not idalib.load_idalib(): raise RuntimeError("failed to load IDA idalib module.") - import idapro import ida_auto + import idapro import capa.features.extractors.ida.extractor @@ -408,7 +408,9 @@ def get_extractor( # -1 - Generic errors (database already open, auto-analysis failed, etc.) # -2 - User cancelled operation ret = idapro.open_database( - str(input_path), run_auto_analysis=True, args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R" + str(input_path), + run_auto_analysis=True, + args="-Olumina:host=0.0.0.0 -Osecondary_lumina:host=0.0.0.0 -R", ) if ret != 0: raise RuntimeError("failed to analyze input file") @@ -521,8 +523,8 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) elif input_format == FORMAT_DOTNET: - import capa.features.extractors.pefile import capa.features.extractors.dotnetfile + import capa.features.extractors.pefile file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file)) @@ -539,8 +541,8 @@ def get_file_extractors(input_file: Path, input_format: str) -> list[FeatureExtr file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) elif input_format == FORMAT_DRAKVUF: - import capa.helpers import capa.features.extractors.drakvuf.extractor + import capa.helpers report = capa.helpers.load_jsonl_from_path(input_file) file_extractors.append(capa.features.extractors.drakvuf.extractor.DrakvufExtractor.from_report(report)) @@ -712,14 +714,14 @@ def result_rec(result: capa.features.common.Result): threads_by_process[p.address] = [] for t in extractor.get_threads(p): - calls_by_thread[t.address] = [] + calls_by_thread.setdefault(t.address, []) for c in extractor.get_calls(p, t): if c.address in matched_calls: names_by_call[c.address] = extractor.get_call_name(p, t, c) calls_by_thread[t.address].append(c.address) - if calls_by_thread[t.address]: + if calls_by_thread[t.address] and t.address not in matched_threads: matched_threads.add(t.address) threads_by_process[p.address].append(t.address) diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index 53f942c54..8e03eac3c 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -37,13 +37,13 @@ import google.protobuf.json_format -import capa.rules import capa.features.freeze as frz +import capa.features.freeze.features as frzf import capa.render.proto.capa_pb2 as capa_pb2 import capa.render.result_document as rd -import capa.features.freeze.features as frzf -from capa.helpers import assert_never +import capa.rules from capa.features.freeze import AddressType +from capa.helpers import assert_never def int_to_pb2(v: int) -> capa_pb2.Integer: @@ -100,7 +100,12 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address: elif addr.type is AddressType.PROCESS: assert isinstance(addr.value, tuple) - ppid, pid = addr.value + if len(addr.value) == 2: + ppid, pid = addr.value + elif len(addr.value) == 3: + ppid, pid, _process_id = addr.value + else: + raise ValueError(f"invalid process address tuple shape: {addr.value!r}") assert isinstance(ppid, int) assert isinstance(pid, int) return capa_pb2.Address( @@ -113,7 +118,12 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address: elif addr.type is AddressType.THREAD: assert isinstance(addr.value, tuple) - ppid, pid, tid = addr.value + if len(addr.value) == 3: + ppid, pid, tid = addr.value + elif len(addr.value) == 5: + ppid, pid, tid, _process_id, _thread_id = addr.value + else: + raise ValueError(f"invalid thread address tuple shape: {addr.value!r}") assert isinstance(ppid, int) assert isinstance(pid, int) assert isinstance(tid, int) @@ -128,7 +138,12 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address: elif addr.type is AddressType.CALL: assert isinstance(addr.value, tuple) - ppid, pid, tid, id_ = addr.value + if len(addr.value) == 4: + ppid, pid, tid, id_ = addr.value + elif len(addr.value) == 6: + ppid, pid, tid, id_, _process_id, _thread_id = addr.value + else: + raise ValueError(f"invalid call address tuple shape: {addr.value!r}") assert isinstance(ppid, int) assert isinstance(pid, int) assert isinstance(tid, int) @@ -300,7 +315,11 @@ def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode: elif isinstance(statement, rd.SomeStatement): return capa_pb2.StatementNode( - some=capa_pb2.SomeStatement(type=statement.type, description=statement.description, count=statement.count), + some=capa_pb2.SomeStatement( + type=statement.type, + description=statement.description, + count=statement.count, + ), type="statement", ) @@ -327,17 +346,20 @@ def statement_to_pb2(statement: rd.Statement) -> capa_pb2.StatementNode: def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: if isinstance(f, frzf.OSFeature): return capa_pb2.FeatureNode( - type="feature", os=capa_pb2.OSFeature(type=f.type, os=f.os, description=f.description) + type="feature", + os=capa_pb2.OSFeature(type=f.type, os=f.os, description=f.description), ) elif isinstance(f, frzf.ArchFeature): return capa_pb2.FeatureNode( - type="feature", arch=capa_pb2.ArchFeature(type=f.type, arch=f.arch, description=f.description) + type="feature", + arch=capa_pb2.ArchFeature(type=f.type, arch=f.arch, description=f.description), ) elif isinstance(f, frzf.FormatFeature): return capa_pb2.FeatureNode( - type="feature", format=capa_pb2.FormatFeature(type=f.type, format=f.format, description=f.description) + type="feature", + format=capa_pb2.FormatFeature(type=f.type, format=f.format, description=f.description), ) elif isinstance(f, frzf.MatchFeature): @@ -360,17 +382,20 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: elif isinstance(f, frzf.ExportFeature): return capa_pb2.FeatureNode( - type="feature", export=capa_pb2.ExportFeature(type=f.type, export=f.export, description=f.description) + type="feature", + export=capa_pb2.ExportFeature(type=f.type, export=f.export, description=f.description), ) elif isinstance(f, frzf.ImportFeature): return capa_pb2.FeatureNode( - type="feature", import_=capa_pb2.ImportFeature(type=f.type, import_=f.import_, description=f.description) + type="feature", + import_=capa_pb2.ImportFeature(type=f.type, import_=f.import_, description=f.description), ) elif isinstance(f, frzf.SectionFeature): return capa_pb2.FeatureNode( - type="feature", section=capa_pb2.SectionFeature(type=f.type, section=f.section, description=f.description) + type="feature", + section=capa_pb2.SectionFeature(type=f.type, section=f.section, description=f.description), ) elif isinstance(f, frzf.FunctionNameFeature): @@ -389,7 +414,8 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: elif isinstance(f, frzf.RegexFeature): return capa_pb2.FeatureNode( - type="feature", regex=capa_pb2.RegexFeature(type=f.type, regex=f.regex, description=f.description) + type="feature", + regex=capa_pb2.RegexFeature(type=f.type, regex=f.regex, description=f.description), ) elif isinstance(f, frzf.StringFeature): @@ -404,7 +430,8 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: elif isinstance(f, frzf.ClassFeature): return capa_pb2.FeatureNode( - type="feature", class_=capa_pb2.ClassFeature(type=f.type, class_=f.class_, description=f.description) + type="feature", + class_=capa_pb2.ClassFeature(type=f.type, class_=f.class_, description=f.description), ) elif isinstance(f, frzf.NamespaceFeature): @@ -415,14 +442,18 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: elif isinstance(f, frzf.APIFeature): return capa_pb2.FeatureNode( - type="feature", api=capa_pb2.APIFeature(type=f.type, api=f.api, description=f.description) + type="feature", + api=capa_pb2.APIFeature(type=f.type, api=f.api, description=f.description), ) elif isinstance(f, frzf.PropertyFeature): return capa_pb2.FeatureNode( type="feature", property_=capa_pb2.PropertyFeature( - type=f.type, access=f.access, property_=f.property, description=f.description + type=f.type, + access=f.access, + property_=f.property, + description=f.description, ), ) @@ -434,7 +465,8 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: elif isinstance(f, frzf.BytesFeature): return capa_pb2.FeatureNode( - type="feature", bytes=capa_pb2.BytesFeature(type=f.type, bytes=f.bytes, description=f.description) + type="feature", + bytes=capa_pb2.BytesFeature(type=f.type, bytes=f.bytes, description=f.description), ) elif isinstance(f, frzf.OffsetFeature): @@ -453,7 +485,10 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: return capa_pb2.FeatureNode( type="feature", operand_number=capa_pb2.OperandNumberFeature( - type=f.type, index=f.index, operand_number=int_to_pb2(f.operand_number), description=f.description + type=f.type, + index=f.index, + operand_number=int_to_pb2(f.operand_number), + description=f.description, ), ) @@ -461,13 +496,17 @@ def feature_to_pb2(f: frzf.Feature) -> capa_pb2.FeatureNode: return capa_pb2.FeatureNode( type="feature", operand_offset=capa_pb2.OperandOffsetFeature( - type=f.type, index=f.index, operand_offset=int_to_pb2(f.operand_offset), description=f.description + type=f.type, + index=f.index, + operand_offset=int_to_pb2(f.operand_offset), + description=f.description, ), ) elif isinstance(f, frzf.BasicBlockFeature): return capa_pb2.FeatureNode( - type="feature", basic_block=capa_pb2.BasicBlockFeature(type=f.type, description=f.description) + type="feature", + basic_block=capa_pb2.BasicBlockFeature(type=f.type, description=f.description), ) else: @@ -737,7 +776,10 @@ def dynamic_analysis_from_pb2(analysis: capa_pb2.DynamicAnalysis) -> rd.DynamicA address=addr_from_pb2(t.address), matched_calls=tuple( [ - rd.CallLayout(address=addr_from_pb2(c.address), name=c.name) + rd.CallLayout( + address=addr_from_pb2(c.address), + name=c.name, + ) for c in t.matched_calls ] ), @@ -883,7 +925,11 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature: return frzf.APIFeature(api=ff.api, description=ff.description or None) elif type_ == "property_": ff = f.property_ - return frzf.PropertyFeature(property=ff.property_, access=ff.access or None, description=ff.description or None) + return frzf.PropertyFeature( + property=ff.property_, + access=ff.access or None, + description=ff.description or None, + ) elif type_ == "number": ff = f.number return frzf.NumberFeature(number=number_from_pb2(ff.number), description=ff.description or None) @@ -899,12 +945,16 @@ def feature_from_pb2(f: capa_pb2.FeatureNode) -> frzf.Feature: elif type_ == "operand_number": ff = f.operand_number return frzf.OperandNumberFeature( - index=ff.index, operand_number=number_from_pb2(ff.operand_number), description=ff.description or None + index=ff.index, + operand_number=number_from_pb2(ff.operand_number), + description=ff.description or None, ) # type: ignore elif type_ == "operand_offset": ff = f.operand_offset return frzf.OperandOffsetFeature( - index=ff.index, operand_offset=int_from_pb2(ff.operand_offset), description=ff.description or None + index=ff.index, + operand_offset=int_from_pb2(ff.operand_offset), + description=ff.description or None, ) # type: ignore # Mypy is unable to recognize `operand_offset` as an argument due to aliasing elif type_ == "basic_block": diff --git a/capa/render/verbose.py b/capa/render/verbose.py index a872755e0..30378e4a9 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -31,17 +31,17 @@ from typing import cast -from rich.text import Text from rich.table import Table +from rich.text import Text -import capa.rules -import capa.helpers -import capa.render.utils as rutils import capa.features.freeze as frz +import capa.helpers import capa.render.result_document as rd -from capa.rules import RuleSet +import capa.render.utils as rutils +import capa.rules from capa.engine import MatchResults from capa.render.utils import Console +from capa.rules import RuleSet def format_address(address: frz.Address) -> str: @@ -65,21 +65,50 @@ def format_address(address: frz.Address) -> str: return f"token({capa.helpers.hex(token)})+{capa.helpers.hex(offset)}" elif address.type == frz.AddressType.PROCESS: assert isinstance(address.value, tuple) - ppid, pid = address.value + if len(address.value) == 2: + ppid, pid = address.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + return f"process{{pid:{pid}}}" + ppid, pid, process_id = address.value assert isinstance(ppid, int) assert isinstance(pid, int) + assert isinstance(process_id, int) + if process_id >= 0: + return f"process{{pid:{pid},id:{process_id}}}" return f"process{{pid:{pid}}}" elif address.type == frz.AddressType.THREAD: assert isinstance(address.value, tuple) - ppid, pid, tid = address.value + if len(address.value) == 3: + ppid, pid, tid = address.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + return f"process{{pid:{pid},tid:{tid}}}" + ppid, pid, tid, process_id, thread_id = address.value assert isinstance(ppid, int) assert isinstance(pid, int) assert isinstance(tid, int) - return f"process{{pid:{pid},tid:{tid}}}" + assert isinstance(process_id, int) + assert isinstance(thread_id, int) + s = f"process{{pid:{pid},tid:{tid}" + if process_id >= 0: + s += f",pid_id:{process_id}" + if thread_id >= 0: + s += f",id:{thread_id}" + return s + "}" elif address.type == frz.AddressType.CALL: assert isinstance(address.value, tuple) - ppid, pid, tid, id_ = address.value - return f"process{{pid:{pid},tid:{tid},call:{id_}}}" + if len(address.value) == 4: + ppid, pid, tid, id_ = address.value + return f"process{{pid:{pid},tid:{tid},call:{id_}}}" + ppid, pid, tid, id_, process_id, thread_id = address.value + s = f"process{{pid:{pid},tid:{tid}" + if process_id >= 0: + s += f",pid_id:{process_id}" + if thread_id >= 0: + s += f",id:{thread_id}" + return s + f",call:{id_}}}" elif address.type == frz.AddressType.NO_ADDRESS: return "global" else: @@ -116,14 +145,20 @@ def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str: process = addr.to_capa() assert isinstance(process, capa.features.address.ProcessAddress) name = _get_process_name(layout, addr) - return f"{name}{{pid:{process.pid}}}" + s = f"{name}{{pid:{process.pid}" + if process.id is not None: + s += f",id:{process.id}" + return s + "}" def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str: thread = addr.to_capa() assert isinstance(thread, capa.features.address.ThreadAddress) name = _get_process_name(layout, frz.Address.from_capa(thread.process)) - return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}" + s = f"{name}{{pid:{thread.process.pid},tid:{thread.tid}" + if thread.id is not None: + s += f",id:{thread.id}" + return s + "}" def render_span_of_calls(layout: rd.DynamicLayout, addrs: list[frz.Address]) -> str: @@ -134,12 +169,16 @@ def render_span_of_calls(layout: rd.DynamicLayout, addrs: list[frz.Address]) -> call = calls[0] pname = _get_process_name(layout, frz.Address.from_capa(calls[0].thread.process)) + thread = call.thread + thread_s = f"pid:{thread.process.pid},tid:{thread.tid}" + if thread.id is not None: + thread_s += f",id:{thread.id}" call_ids = [str(call.id) for call in calls] if len(call_ids) == 1: call_id = call_ids[0] - return f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call_id}}}" + return f"{pname}{{{thread_s},call:{call_id}}}" else: - return f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},calls:{{{','.join(call_ids)}}}}}" + return f"{pname}{{{thread_s},calls:{{{','.join(call_ids)}}}}}" def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: @@ -160,7 +199,10 @@ def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: newline = "\n" # Use default (non-dim) styling for API details so they remain readable in -vv output - return f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call.id}}}\n{newline.join(s)}" + thread_s = f"pid:{call.thread.process.pid},tid:{call.thread.tid}" + if call.thread.id is not None: + thread_s += f",id:{call.thread.id}" + return f"{pname}{{{thread_s},call:{call.id}}}\n{newline.join(s)}" def render_short_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: @@ -353,7 +395,10 @@ def render_rules(console: Console, doc: rd.ResultDocument): lines = [render_process(doc.meta.analysis.layout, loc) for loc in locations] elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD: lines = [render_thread(doc.meta.analysis.layout, loc) for loc in locations] - elif rule.meta.scopes.dynamic in (capa.rules.Scope.CALL, capa.rules.Scope.SPAN_OF_CALLS): + elif rule.meta.scopes.dynamic in ( + capa.rules.Scope.CALL, + capa.rules.Scope.SPAN_OF_CALLS, + ): # because we're only in verbose mode, we won't show the full call details (name, args, retval) # we'll only show the details of the thread in which the calls are found. # so select the thread locations and render those. diff --git a/tests/test_address_uniqueness.py b/tests/test_address_uniqueness.py new file mode 100644 index 000000000..a452d0dd5 --- /dev/null +++ b/tests/test_address_uniqueness.py @@ -0,0 +1,142 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import cast +from unittest.mock import Mock + +import capa.features.common +import capa.features.freeze as frz +import capa.loader +from capa.engine import MatchResults +from capa.features.address import Address, DynamicCallAddress, ProcessAddress, ThreadAddress +from capa.features.extractors.base_extractor import ( + CallHandle, + DynamicFeatureExtractor, + ProcessHandle, + SampleHashes, + ThreadHandle, +) + + +def test_process_address_id_affects_identity(): + a = ProcessAddress(pid=1000, ppid=10, id=1) + b = ProcessAddress(pid=1000, ppid=10, id=2) + + assert a != b + assert hash(a) != hash(b) + + +def test_thread_address_id_affects_identity(): + p = ProcessAddress(pid=1000, ppid=10, id=1) + a = ThreadAddress(process=p, tid=42, id=1) + b = ThreadAddress(process=p, tid=42, id=2) + + assert a != b + assert hash(a) != hash(b) + + +def test_freeze_roundtrip_process_with_id(): + addr = ProcessAddress(pid=1000, ppid=10, id=7) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + + assert isinstance(thawed, ProcessAddress) + assert thawed == addr + assert thawed.id == 7 + + +def test_freeze_roundtrip_thread_with_ids(): + addr = ThreadAddress(process=ProcessAddress(pid=1000, ppid=10, id=5), tid=42, id=9) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + + assert isinstance(thawed, ThreadAddress) + assert thawed == addr + assert thawed.process.id == 5 + assert thawed.id == 9 + + +def test_freeze_roundtrip_call_with_ids(): + addr = DynamicCallAddress( + thread=ThreadAddress(process=ProcessAddress(pid=1000, ppid=10, id=5), tid=42, id=9), + id=77, + ) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + + assert isinstance(thawed, DynamicCallAddress) + assert thawed == addr + assert thawed.thread.process.id == 5 + assert thawed.thread.id == 9 + + +def test_compute_dynamic_layout_recycled_tid_does_not_drop_matched_call(): + process_addr = ProcessAddress(pid=1000, ppid=10) + thread_addr = ThreadAddress(process=process_addr, tid=42) + call_addr = DynamicCallAddress(thread=thread_addr, id=1) + + process_handle = ProcessHandle(address=process_addr, inner=None) + thread_handle1 = ThreadHandle(address=thread_addr, inner="first") + thread_handle2 = ThreadHandle(address=thread_addr, inner="second") + call_handle = CallHandle(address=call_addr, inner=None) + + class RecycledTidExtractor(DynamicFeatureExtractor): + def __init__(self): + super().__init__(SampleHashes(md5="a" * 32, sha1="a" * 40, sha256="a" * 64)) + + def extract_global_features(self): + return iter([]) + + def extract_file_features(self): + return iter([]) + + def get_processes(self): + yield process_handle + + def extract_process_features(self, ph): + return iter([]) + + def get_process_name(self, ph): + return "sample.exe" + + def get_threads(self, ph): + # same thread address appears twice, emulating TID reuse/collision. + yield thread_handle1 + yield thread_handle2 + + def extract_thread_features(self, ph, th): + return iter([]) + + def get_calls(self, ph, th): + if th.inner == "first": + yield call_handle + else: + yield from () + + def extract_call_features(self, ph, th, ch): + return iter([]) + + def get_call_name(self, ph, th, ch): + return "CreateFileW(lpFileName=C:\\\\tmp\\\\x)" + + extractor = RecycledTidExtractor() + result = capa.features.common.Result(success=True, statement=Mock(), children=[], locations={call_addr}) + capabilities = cast(MatchResults, {"repro rule": [(cast(Address, call_addr), result)]}) + + layout = capa.loader.compute_dynamic_layout(Mock(), extractor, capabilities) + + assert len(layout.processes) == 1 + assert len(layout.processes[0].matched_threads) == 1 + assert len(layout.processes[0].matched_threads[0].matched_calls) == 1 + assert layout.processes[0].matched_threads[0].matched_calls[0].name.startswith("CreateFileW")