diff --git a/CHANGELOG.md b/CHANGELOG.md index c35033d780..c5f22f6248 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - ### Bug Fixes +- address: add optional id field to ProcessAddress/ThreadAddress for unique tracking of recycled PID/TID lifecycles @devs6186 #2619 - Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770) - loader: gracefully handle ELF files with unsupported architectures kamranulhaq2002@gmail.com #2800 - loader: handle SegmentationViolation for malformed ELF files @kami922 #2799 diff --git a/capa/features/address.py b/capa/features/address.py index eb708a3dcd..3b111a3524 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -13,6 +13,7 @@ # limitations under the License. import abc +from typing import Optional class Address(abc.ABC): @@ -50,53 +51,83 @@ def __hash__(self): class ProcessAddress(Address): - """an address of a process in a dynamic execution trace""" - - def __init__(self, pid: int, ppid: int = 0): + """an address of a process in a dynamic execution trace + + Args: + pid: process ID assigned by the OS + ppid: parent process ID assigned by the OS + id: optional sandbox-specific unique identifier to distinguish + processes whose OS-assigned PIDs collide due to reuse. + For VMRay this is the monitor_id; for other backends + it may be a sequential counter or timestamp. + """ + + def __init__(self, pid: int, ppid: int = 0, id: Optional[int] = None): assert ppid >= 0 assert pid > 0 self.ppid = ppid self.pid = pid + self.id = id def __repr__(self): - return "process(%s%s)" % ( - f"ppid: {self.ppid}, " if self.ppid > 0 else "", - f"pid: {self.pid}", - ) + parts = [] + if self.ppid > 0: + parts.append(f"ppid: {self.ppid}") + parts.append(f"pid: {self.pid}") + if self.id is not None: + parts.append(f"id: {self.id}") + return "process(%s)" % ", ".join(parts) def __hash__(self): - return hash((self.ppid, self.pid)) + return hash((self.ppid, self.pid, self.id)) def __eq__(self, other): assert isinstance(other, ProcessAddress) - return (self.ppid, self.pid) == (other.ppid, other.pid) + return (self.ppid, self.pid, self.id) == (other.ppid, other.pid, other.id) def __lt__(self, other): assert isinstance(other, ProcessAddress) - return (self.ppid, self.pid) < (other.ppid, other.pid) + # None sorts before any real id + self_id = self.id if self.id is not None else -1 + other_id = other.id if other.id is not None else -1 + return (self.ppid, self.pid, self_id) < (other.ppid, other.pid, other_id) class ThreadAddress(Address): - """addresses a thread in a dynamic execution trace""" - - def __init__(self, process: ProcessAddress, tid: int): + """addresses a thread in a dynamic execution trace + + Args: + process: address of the containing process + tid: thread ID assigned by the OS + id: optional sandbox-specific unique identifier to distinguish + threads whose OS-assigned TIDs collide due to reuse. + For VMRay this is the monitor_id; for other backends + it may be a sequential counter or timestamp. + """ + + def __init__(self, process: ProcessAddress, tid: int, id: Optional[int] = None): assert tid >= 0 self.process = process self.tid = tid + self.id = id def __repr__(self): - return f"{self.process}, thread(tid: {self.tid})" + id_part = f", id: {self.id}" if self.id is not None else "" + return f"{self.process}, thread(tid: {self.tid}{id_part})" def __hash__(self): - return hash((self.process, self.tid)) + return hash((self.process, self.tid, self.id)) def __eq__(self, other): assert isinstance(other, ThreadAddress) - return (self.process, self.tid) == (other.process, other.tid) + return (self.process, self.tid, self.id) == (other.process, other.tid, other.id) def __lt__(self, other): assert isinstance(other, ThreadAddress) - return (self.process, self.tid) < (other.process, other.tid) + # None sorts before any real id + self_id = self.id if self.id is not None else -1 + other_id = other.id if other.id is not None else -1 + return (self.process, self.tid, self_id) < (other.process, other.tid, other_id) class DynamicCallAddress(Address): @@ -114,7 +145,10 @@ def __hash__(self): return hash((self.thread, self.id)) def __eq__(self, other): - return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (other.thread, other.id) + return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == ( + other.thread, + other.id, + ) def __lt__(self, other): assert isinstance(other, DynamicCallAddress) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 36c2051952..759383ece3 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -28,24 +28,37 @@ def get_processes(report: CapeReport) -> Iterator[ProcessHandle]: """ - get all the created processes for a sample + get all the created processes for a sample. + + when the OS recycles a PID, multiple processes in the report may share the + same (ppid, pid) pair. we detect this and assign sequential ids so that + each process receives a unique ProcessAddress. """ - seen_processes = {} + # first pass: count how many times each (ppid, pid) pair appears + counts: dict[tuple[int, int], int] = {} for process in report.behavior.processes: - addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id) - yield ProcessHandle(address=addr, inner=process) + key = (process.parent_id, process.process_id) + counts[key] = counts.get(key, 0) + 1 - # check for pid and ppid reuse - if addr not in seen_processes: - seen_processes[addr] = [process] - else: - logger.warning( - "pid and ppid reuse detected between process %s and process%s: %s", - process, - "es" if len(seen_processes[addr]) > 1 else "", - seen_processes[addr], + # second pass: yield handles with sequential ids for reused pairs + seq: dict[tuple[int, int], int] = {} + for process in report.behavior.processes: + key = (process.parent_id, process.process_id) + seq[key] = seq.get(key, 0) + 1 + + # only assign ids when reuse is detected; otherwise keep id=None + # for backward compatibility with existing addresses and freeze files + id_ = seq[key] if counts[key] > 1 else None + if id_ is not None: + logger.debug( + "pid reuse detected for ppid=%d, pid=%d: assigning id=%d", + process.parent_id, + process.process_id, + id_, ) - seen_processes[addr].append(process) + + addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id, id=id_) + yield ProcessHandle(address=addr, inner=process) def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/features/extractors/vmray/extractor.py b/capa/features/extractors/vmray/extractor.py index 27eeed4819..7ae599aa06 100644 --- a/capa/features/extractors/vmray/extractor.py +++ b/capa/features/extractors/vmray/extractor.py @@ -99,7 +99,9 @@ def get_processes(self) -> Iterator[ProcessHandle]: ) continue - address: ProcessAddress = ProcessAddress(pid=monitor_process.pid, ppid=monitor_process.ppid) + address: ProcessAddress = ProcessAddress( + pid=monitor_process.pid, ppid=monitor_process.ppid, id=monitor_process.monitor_id + ) yield ProcessHandle(address, inner=monitor_process) def extract_process_features(self, ph: ProcessHandle) -> Iterator[tuple[Feature, Address]]: @@ -114,7 +116,9 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: for monitor_thread_id in self.analysis.monitor_threads_by_monitor_process[ph.inner.monitor_id]: monitor_thread: VMRayMonitorThread = self.analysis.monitor_threads[monitor_thread_id] - address: ThreadAddress = ThreadAddress(process=ph.address, tid=monitor_thread.tid) + address: ThreadAddress = ThreadAddress( + process=ph.address, tid=monitor_thread.tid, id=monitor_thread.monitor_id + ) yield ThreadHandle(address=address, inner=monitor_thread) def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]: diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 2e12d2ffd7..11364aabe5 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -91,13 +91,54 @@ def from_capa(cls, a: capa.features.address.Address) -> "Address": return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset)) elif isinstance(a, capa.features.address.ProcessAddress): - return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid)) + if a.id is not None: + return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid, a.id)) + else: + return cls(type=AddressType.PROCESS, value=(a.ppid, a.pid)) elif isinstance(a, capa.features.address.ThreadAddress): - return cls(type=AddressType.THREAD, value=(a.process.ppid, a.process.pid, a.tid)) + has_ids = a.process.id is not None or a.id is not None + if has_ids: + return cls( + type=AddressType.THREAD, + value=( + a.process.ppid, + a.process.pid, + a.tid, + a.process.id or 0, + a.id or 0, + ), + ) + else: + return cls( + type=AddressType.THREAD, + value=(a.process.ppid, a.process.pid, a.tid), + ) elif isinstance(a, capa.features.address.DynamicCallAddress): - return cls(type=AddressType.CALL, value=(a.thread.process.ppid, a.thread.process.pid, a.thread.tid, a.id)) + has_ids = a.thread.process.id is not None or a.thread.id is not None + if has_ids: + return cls( + type=AddressType.CALL, + value=( + a.thread.process.ppid, + a.thread.process.pid, + a.thread.tid, + a.id, + a.thread.process.id or 0, + a.thread.id or 0, + ), + ) + else: + return cls( + type=AddressType.CALL, + value=( + a.thread.process.ppid, + a.thread.process.pid, + a.thread.tid, + a.id, + ), + ) elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress): return cls(type=AddressType.NO_ADDRESS, value=None) @@ -137,30 +178,60 @@ def to_capa(self) -> capa.features.address.Address: elif self.type is AddressType.PROCESS: assert isinstance(self.value, tuple) - ppid, pid = self.value - assert isinstance(ppid, int) - assert isinstance(pid, int) - return capa.features.address.ProcessAddress(ppid=ppid, pid=pid) + if len(self.value) == 3: + ppid, pid, process_id = self.value + return capa.features.address.ProcessAddress( + ppid=ppid, pid=pid, id=process_id if process_id != 0 else None + ) + else: + ppid, pid = self.value + return capa.features.address.ProcessAddress(ppid=ppid, pid=pid) elif self.type is AddressType.THREAD: assert isinstance(self.value, tuple) - ppid, pid, tid = self.value - assert isinstance(ppid, int) - assert isinstance(pid, int) - assert isinstance(tid, int) - return capa.features.address.ThreadAddress( - process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid - ) + if len(self.value) == 5: + ppid, pid, tid, process_id, thread_id = self.value + return capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress( + ppid=ppid, pid=pid, id=process_id if process_id != 0 else None + ), + tid=tid, + id=thread_id if thread_id != 0 else None, + ) + else: + ppid, pid, tid = self.value + return capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), + tid=tid, + ) elif self.type is AddressType.CALL: assert isinstance(self.value, tuple) - ppid, pid, tid, id_ = self.value - return capa.features.address.DynamicCallAddress( - thread=capa.features.address.ThreadAddress( - process=capa.features.address.ProcessAddress(ppid=ppid, pid=pid), tid=tid - ), - id=id_, - ) + if len(self.value) == 6: + ppid, pid, tid, id_, process_id, thread_id = self.value + return capa.features.address.DynamicCallAddress( + thread=capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress( + ppid=ppid, + pid=pid, + id=process_id if process_id != 0 else None, + ), + tid=tid, + id=thread_id if thread_id != 0 else None, + ), + id=id_, + ) + else: + ppid, pid, tid, id_ = self.value + return capa.features.address.DynamicCallAddress( + thread=capa.features.address.ThreadAddress( + process=capa.features.address.ProcessAddress( + ppid=ppid, pid=pid + ), + tid=tid, + ), + id=id_, + ) elif self.type is AddressType.NO_ADDRESS: return capa.features.address.NO_ADDRESS @@ -573,16 +644,26 @@ def loads_static(s: str) -> StaticFeatureExtractor: base_address=freeze.base_address.to_capa(), sample_hashes=freeze.sample_hashes, global_features=[f.feature.to_capa() for f in freeze.features.global_], - file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], + file_features=[ + (f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file + ], functions={ f.address.to_capa(): null.FunctionFeatures( - features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in f.features], + features=[ + (fe.address.to_capa(), fe.feature.to_capa()) for fe in f.features + ], basic_blocks={ bb.address.to_capa(): null.BasicBlockFeatures( - features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in bb.features], + features=[ + (fe.address.to_capa(), fe.feature.to_capa()) + for fe in bb.features + ], instructions={ i.address.to_capa(): null.InstructionFeatures( - features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in i.features] + features=[ + (fe.address.to_capa(), fe.feature.to_capa()) + for fe in i.features + ] ) for i in bb.instructions }, @@ -608,18 +689,28 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor: base_address=freeze.base_address.to_capa(), sample_hashes=freeze.sample_hashes, global_features=[f.feature.to_capa() for f in freeze.features.global_], - file_features=[(f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file], + file_features=[ + (f.address.to_capa(), f.feature.to_capa()) for f in freeze.features.file + ], processes={ p.address.to_capa(): null.ProcessFeatures( name=p.name, - features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in p.features], + features=[ + (fe.address.to_capa(), fe.feature.to_capa()) for fe in p.features + ], threads={ t.address.to_capa(): null.ThreadFeatures( - features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in t.features], + features=[ + (fe.address.to_capa(), fe.feature.to_capa()) + for fe in t.features + ], calls={ c.address.to_capa(): null.CallFeatures( name=c.name, - features=[(fe.address.to_capa(), fe.feature.to_capa()) for fe in c.features], + features=[ + (fe.address.to_capa(), fe.feature.to_capa()) + for fe in c.features + ], ) for c in t.calls }, @@ -691,7 +782,9 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="save capa features to a file") - capa.main.install_common_args(parser, {"input_file", "format", "backend", "os", "signatures"}) + capa.main.install_common_args( + parser, {"input_file", "format", "backend", "os", "signatures"} + ) parser.add_argument("output", type=str, help="Path to output file") args = parser.parse_args(args=argv) diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 11f2442372..9de3df32e4 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -112,18 +112,37 @@ def _get_call_name(layout: rd.DynamicLayout, addr: frz.Address) -> str: raise ValueError("name not found for call", addr) +def _format_process_fields(process: capa.features.address.ProcessAddress) -> str: + """format process identification fields, including id when present.""" + s = f"pid:{process.pid}" + if process.id is not None: + s += f",id:{process.id}" + return s + + +def _format_thread_fields(thread: capa.features.address.ThreadAddress) -> str: + """format thread identification fields, including id when present.""" + s = f"pid:{thread.process.pid},tid:{thread.tid}" + if thread.id is not None: + s += f",id:{thread.id}" + elif thread.process.id is not None: + # show process id in thread context when thread has no own id + s += f",pid-id:{thread.process.id}" + return s + + def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str: process = addr.to_capa() assert isinstance(process, capa.features.address.ProcessAddress) name = _get_process_name(layout, addr) - return f"{name}{{pid:{process.pid}}}" + return f"{name}{{{_format_process_fields(process)}}}" def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str: thread = addr.to_capa() assert isinstance(thread, capa.features.address.ThreadAddress) name = _get_process_name(layout, frz.Address.from_capa(thread.process)) - return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}" + return f"{name}{{{_format_thread_fields(thread)}}}" def render_span_of_calls(layout: rd.DynamicLayout, addrs: list[frz.Address]) -> str: @@ -134,12 +153,12 @@ def render_span_of_calls(layout: rd.DynamicLayout, addrs: list[frz.Address]) -> call = calls[0] pname = _get_process_name(layout, frz.Address.from_capa(calls[0].thread.process)) + tfields = _format_thread_fields(call.thread) call_ids = [str(call.id) for call in calls] if len(call_ids) == 1: - call_id = call_ids[0] - return f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call_id}}}" + return f"{pname}{{{tfields},call:{call_ids[0]}}}" else: - return f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},calls:{{{','.join(call_ids)}}}}}" + return f"{pname}{{{tfields},calls:{{{','.join(call_ids)}}}}}" def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: @@ -158,10 +177,9 @@ def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: s.append(f" {arg},") s.append(f"){rest}") + tfields = _format_thread_fields(call.thread) newline = "\n" - return ( - f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call.id}}}\n{rutils.mute(newline.join(s))}" - ) + return f"{pname}{{{tfields},call:{call.id}}}\n{rutils.mute(newline.join(s))}" def render_short_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: diff --git a/tests/test_address_uniqueness.py b/tests/test_address_uniqueness.py new file mode 100644 index 0000000000..0e494eda29 --- /dev/null +++ b/tests/test_address_uniqueness.py @@ -0,0 +1,514 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for address uniqueness when PIDs/TIDs are recycled by the OS. + +These tests verify the fix for issue #2619 / #2361: dynamic sandbox extractors +(especially VMRay) can report multiple process/thread instances that share the +same OS-assigned IDs. The optional `id` field on ProcessAddress and +ThreadAddress allows capa to distinguish them. +""" + +from unittest.mock import MagicMock + +import capa.loader +import capa.features.common +import capa.features.freeze as frz +from capa.features.address import ProcessAddress, ThreadAddress, DynamicCallAddress +from capa.features.extractors.base_extractor import ( + CallHandle, + SampleHashes, + ThreadHandle, + ProcessHandle, + DynamicFeatureExtractor, +) + + +# --------------------------------------------------------------------------- +# ProcessAddress identity tests +# --------------------------------------------------------------------------- + + +class TestProcessAddressUniqueness: + def test_same_pid_different_id_not_equal(self): + a = ProcessAddress(pid=100, ppid=1, id=1) + b = ProcessAddress(pid=100, ppid=1, id=2) + assert a != b + + def test_same_pid_different_id_different_hash(self): + a = ProcessAddress(pid=100, ppid=1, id=1) + b = ProcessAddress(pid=100, ppid=1, id=2) + assert hash(a) != hash(b) + + def test_same_pid_same_id_equal(self): + a = ProcessAddress(pid=100, ppid=1, id=5) + b = ProcessAddress(pid=100, ppid=1, id=5) + assert a == b + assert hash(a) == hash(b) + + def test_no_id_backward_compat(self): + a = ProcessAddress(pid=100, ppid=1) + b = ProcessAddress(pid=100, ppid=1) + assert a == b + assert hash(a) == hash(b) + assert a.id is None + + def test_none_id_not_equal_to_int_id(self): + a = ProcessAddress(pid=100, ppid=1, id=None) + b = ProcessAddress(pid=100, ppid=1, id=1) + assert a != b + + def test_sorting_with_ids(self): + addrs = [ + ProcessAddress(pid=100, ppid=1, id=3), + ProcessAddress(pid=100, ppid=1, id=1), + ProcessAddress(pid=100, ppid=1, id=2), + ] + assert sorted(addrs) == [ + ProcessAddress(pid=100, ppid=1, id=1), + ProcessAddress(pid=100, ppid=1, id=2), + ProcessAddress(pid=100, ppid=1, id=3), + ] + + def test_none_id_sorts_before_int_id(self): + a = ProcessAddress(pid=100, ppid=1, id=None) + b = ProcessAddress(pid=100, ppid=1, id=1) + assert a < b + + def test_dict_key_uniqueness(self): + a = ProcessAddress(pid=100, ppid=1, id=1) + b = ProcessAddress(pid=100, ppid=1, id=2) + d = {a: "first", b: "second"} + assert len(d) == 2 + assert d[a] == "first" + assert d[b] == "second" + + def test_set_uniqueness(self): + a = ProcessAddress(pid=100, ppid=1, id=1) + b = ProcessAddress(pid=100, ppid=1, id=2) + c = ProcessAddress(pid=100, ppid=1, id=1) # duplicate of a + s = {a, b, c} + assert len(s) == 2 + + def test_repr_with_id(self): + a = ProcessAddress(pid=100, ppid=1, id=5) + assert "id: 5" in repr(a) + + def test_repr_without_id(self): + a = ProcessAddress(pid=100, ppid=1) + # "id:" is a substring of "ppid:", so check for the standalone form + assert ", id: " not in repr(a) + + +# --------------------------------------------------------------------------- +# ThreadAddress identity tests +# --------------------------------------------------------------------------- + + +class TestThreadAddressUniqueness: + def test_same_tid_different_id_not_equal(self): + p = ProcessAddress(pid=100, ppid=1) + a = ThreadAddress(p, tid=42, id=1) + b = ThreadAddress(p, tid=42, id=2) + assert a != b + + def test_same_tid_different_id_different_hash(self): + p = ProcessAddress(pid=100, ppid=1) + a = ThreadAddress(p, tid=42, id=1) + b = ThreadAddress(p, tid=42, id=2) + assert hash(a) != hash(b) + + def test_same_tid_same_id_equal(self): + p = ProcessAddress(pid=100, ppid=1) + a = ThreadAddress(p, tid=42, id=7) + b = ThreadAddress(p, tid=42, id=7) + assert a == b + assert hash(a) == hash(b) + + def test_different_process_id_propagates(self): + """threads in recycled processes (different process.id) should differ""" + p1 = ProcessAddress(pid=100, ppid=1, id=1) + p2 = ProcessAddress(pid=100, ppid=1, id=2) + t1 = ThreadAddress(p1, tid=42) + t2 = ThreadAddress(p2, tid=42) + assert t1 != t2 + assert hash(t1) != hash(t2) + + def test_no_id_backward_compat(self): + p = ProcessAddress(pid=100, ppid=1) + a = ThreadAddress(p, tid=42) + b = ThreadAddress(p, tid=42) + assert a == b + assert a.id is None + + def test_sorting_with_ids(self): + p = ProcessAddress(pid=100, ppid=1) + addrs = [ + ThreadAddress(p, tid=42, id=3), + ThreadAddress(p, tid=42, id=1), + ThreadAddress(p, tid=42, id=2), + ] + assert sorted(addrs) == [ + ThreadAddress(p, tid=42, id=1), + ThreadAddress(p, tid=42, id=2), + ThreadAddress(p, tid=42, id=3), + ] + + def test_repr_with_id(self): + p = ProcessAddress(pid=100, ppid=1) + t = ThreadAddress(p, tid=42, id=7) + assert "id: 7" in repr(t) + + def test_repr_without_id(self): + p = ProcessAddress(pid=100, ppid=1) + t = ThreadAddress(p, tid=42) + assert ", id: " not in repr(t) + + +# --------------------------------------------------------------------------- +# DynamicCallAddress with unique thread addresses +# --------------------------------------------------------------------------- + + +class TestCallAddressWithUniqueThreads: + def test_calls_in_different_thread_instances_not_equal(self): + p = ProcessAddress(pid=100, ppid=1, id=1) + t1 = ThreadAddress(p, tid=42, id=10) + t2 = ThreadAddress(p, tid=42, id=20) + c1 = DynamicCallAddress(t1, id=0) + c2 = DynamicCallAddress(t2, id=0) + assert c1 != c2 + + def test_calls_in_same_thread_instance_same_id_equal(self): + p = ProcessAddress(pid=100, ppid=1, id=1) + t = ThreadAddress(p, tid=42, id=10) + c1 = DynamicCallAddress(t, id=5) + c2 = DynamicCallAddress(t, id=5) + assert c1 == c2 + + +# --------------------------------------------------------------------------- +# Freeze roundtrip tests +# --------------------------------------------------------------------------- + + +class TestFreezeRoundtrip: + def test_process_address_without_id(self): + addr = ProcessAddress(pid=100, ppid=1) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + assert thawed.id is None + + def test_process_address_with_id(self): + addr = ProcessAddress(pid=100, ppid=1, id=42) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + assert thawed.id == 42 + + def test_thread_address_without_ids(self): + addr = ThreadAddress(ProcessAddress(pid=100, ppid=1), tid=5) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + assert thawed.id is None + assert thawed.process.id is None + + def test_thread_address_with_ids(self): + addr = ThreadAddress(ProcessAddress(pid=100, ppid=1, id=10), tid=5, id=20) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + assert thawed.process.id == 10 + assert thawed.id == 20 + + def test_thread_address_with_only_process_id(self): + addr = ThreadAddress(ProcessAddress(pid=100, ppid=1, id=10), tid=5) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + assert thawed.process.id == 10 + assert thawed.id is None + + def test_call_address_without_ids(self): + addr = DynamicCallAddress( + ThreadAddress(ProcessAddress(pid=100, ppid=1), tid=5), id=99 + ) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + + def test_call_address_with_ids(self): + addr = DynamicCallAddress( + ThreadAddress(ProcessAddress(pid=100, ppid=1, id=10), tid=5, id=20), + id=99, + ) + frozen = frz.Address.from_capa(addr) + thawed = frozen.to_capa() + assert addr == thawed + assert thawed.thread.process.id == 10 + assert thawed.thread.id == 20 + + def test_backward_compat_old_process_tuple(self): + """simulate loading an old freeze file with 2-element process tuple""" + frozen = frz.Address(type=frz.AddressType.PROCESS, value=(1, 100)) + addr = frozen.to_capa() + assert isinstance(addr, ProcessAddress) + assert addr.ppid == 1 + assert addr.pid == 100 + assert addr.id is None + + def test_backward_compat_old_thread_tuple(self): + """simulate loading an old freeze file with 3-element thread tuple""" + frozen = frz.Address(type=frz.AddressType.THREAD, value=(1, 100, 42)) + addr = frozen.to_capa() + assert isinstance(addr, ThreadAddress) + assert addr.process.ppid == 1 + assert addr.process.pid == 100 + assert addr.tid == 42 + assert addr.id is None + assert addr.process.id is None + + def test_backward_compat_old_call_tuple(self): + """simulate loading an old freeze file with 4-element call tuple""" + frozen = frz.Address(type=frz.AddressType.CALL, value=(1, 100, 42, 7)) + addr = frozen.to_capa() + assert isinstance(addr, DynamicCallAddress) + assert addr.thread.process.ppid == 1 + assert addr.thread.process.pid == 100 + assert addr.thread.tid == 42 + assert addr.id == 7 + + +# --------------------------------------------------------------------------- +# compute_dynamic_layout: recycled TID with unique addresses +# --------------------------------------------------------------------------- + + +class TestComputeDynamicLayoutRecycledTid: + """ + When a sandbox (e.g. VMRay) reports two thread instances with the same + OS-level TID but different unique ids (monitor_ids), compute_dynamic_layout + must keep both thread instances and their respective calls separate. + """ + + def _make_extractor(self): + proc_addr = ProcessAddress(pid=1000, ppid=0, id=1) + + # Two thread instances sharing the same OS-level TID but with + # different unique ids, simulating VMRay's monitor_id. + thread_addr_1 = ThreadAddress(proc_addr, tid=42, id=10) + thread_addr_2 = ThreadAddress(proc_addr, tid=42, id=20) + + call_addr_1 = DynamicCallAddress(thread_addr_1, id=0) + call_addr_2 = DynamicCallAddress(thread_addr_2, id=0) + + proc_handle = ProcessHandle(address=proc_addr, inner=None) + thread_handle_1 = ThreadHandle(address=thread_addr_1, inner="instance-1") + thread_handle_2 = ThreadHandle(address=thread_addr_2, inner="instance-2") + call_handle_1 = CallHandle(address=call_addr_1, inner=None) + call_handle_2 = CallHandle(address=call_addr_2, inner=None) + + class RecycledTidExtractor(DynamicFeatureExtractor): + def extract_global_features(self): + return iter([]) + + def extract_file_features(self): + return iter([]) + + def get_processes(self): + yield proc_handle + + def extract_process_features(self, ph): + return iter([]) + + def get_process_name(self, ph): + return "test.exe" + + def get_threads(self, ph): + yield thread_handle_1 + yield thread_handle_2 + + def extract_thread_features(self, ph, th): + return iter([]) + + def get_calls(self, ph, th): + if th is thread_handle_1: + yield call_handle_1 + elif th is thread_handle_2: + yield call_handle_2 + + def extract_call_features(self, ph, th, ch): + return iter([]) + + def get_call_name(self, ph, th, ch): + if ch is call_handle_1: + return "CreateFile(hFile)" + else: + return "WriteFile(hFile)" + + extractor = RecycledTidExtractor( + SampleHashes(md5="a" * 32, sha1="a" * 40, sha256="a" * 64) + ) + + # Both calls matched by rules + result_1 = capa.features.common.Result( + success=True, statement=MagicMock(), children=[], locations={call_addr_1} + ) + result_2 = capa.features.common.Result( + success=True, statement=MagicMock(), children=[], locations={call_addr_2} + ) + capabilities = { + "rule A": [(call_addr_1, result_1)], + "rule B": [(call_addr_2, result_2)], + } + + return extractor, capabilities + + def test_both_thread_instances_appear(self): + extractor, capabilities = self._make_extractor() + layout = capa.loader.compute_dynamic_layout( + MagicMock(), extractor, capabilities + ) + + assert len(layout.processes) == 1 + proc = layout.processes[0] + + # Both thread instances must appear as separate entries + assert len(proc.matched_threads) == 2 + + def test_each_thread_has_its_own_call(self): + extractor, capabilities = self._make_extractor() + layout = capa.loader.compute_dynamic_layout( + MagicMock(), extractor, capabilities + ) + + proc = layout.processes[0] + thread_names = set() + for t in proc.matched_threads: + assert len(t.matched_calls) == 1 + thread_names.add(t.matched_calls[0].name) + + assert "CreateFile(hFile)" in thread_names + assert "WriteFile(hFile)" in thread_names + + def test_no_data_loss(self): + """the original bug: second thread instance overwrites first's calls""" + extractor, capabilities = self._make_extractor() + layout = capa.loader.compute_dynamic_layout( + MagicMock(), extractor, capabilities + ) + + # count total matched calls across all threads + total_calls = sum( + len(t.matched_calls) for t in layout.processes[0].matched_threads + ) + assert total_calls == 2 + + +# --------------------------------------------------------------------------- +# compute_dynamic_layout: recycled PID with unique addresses +# --------------------------------------------------------------------------- + + +class TestComputeDynamicLayoutRecycledPid: + """ + When a sandbox reports two process instances with the same OS-level PID + but different unique ids, compute_dynamic_layout must keep both processes + and their respective threads/calls separate. + """ + + def test_both_process_instances_appear(self): + proc_addr_1 = ProcessAddress(pid=500, ppid=1, id=1) + proc_addr_2 = ProcessAddress(pid=500, ppid=1, id=2) + + thread_addr_1 = ThreadAddress(proc_addr_1, tid=10, id=100) + thread_addr_2 = ThreadAddress(proc_addr_2, tid=10, id=200) + + call_addr_1 = DynamicCallAddress(thread_addr_1, id=0) + call_addr_2 = DynamicCallAddress(thread_addr_2, id=0) + + ph1 = ProcessHandle(address=proc_addr_1, inner=None) + ph2 = ProcessHandle(address=proc_addr_2, inner=None) + th1 = ThreadHandle(address=thread_addr_1, inner=None) + th2 = ThreadHandle(address=thread_addr_2, inner=None) + ch1 = CallHandle(address=call_addr_1, inner=None) + ch2 = CallHandle(address=call_addr_2, inner=None) + + class RecycledPidExtractor(DynamicFeatureExtractor): + def extract_global_features(self): + return iter([]) + + def extract_file_features(self): + return iter([]) + + def get_processes(self): + yield ph1 + yield ph2 + + def extract_process_features(self, ph): + return iter([]) + + def get_process_name(self, ph): + return "malware.exe" if ph is ph1 else "malware.exe (recycled)" + + def get_threads(self, ph): + if ph is ph1: + yield th1 + elif ph is ph2: + yield th2 + + def extract_thread_features(self, ph, th): + return iter([]) + + def get_calls(self, ph, th): + if th is th1: + yield ch1 + elif th is th2: + yield ch2 + + def extract_call_features(self, ph, th, ch): + return iter([]) + + def get_call_name(self, ph, th, ch): + return "NtCreateFile()" if ch is ch1 else "NtWriteFile()" + + extractor = RecycledPidExtractor( + SampleHashes(md5="b" * 32, sha1="b" * 40, sha256="b" * 64) + ) + + result_1 = capa.features.common.Result( + success=True, statement=MagicMock(), children=[], locations={call_addr_1} + ) + result_2 = capa.features.common.Result( + success=True, statement=MagicMock(), children=[], locations={call_addr_2} + ) + capabilities = { + "rule A": [(call_addr_1, result_1)], + "rule B": [(call_addr_2, result_2)], + } + + layout = capa.loader.compute_dynamic_layout( + MagicMock(), extractor, capabilities + ) + + # both process instances must appear + assert len(layout.processes) == 2 + + # each process should have its own thread and call + for p in layout.processes: + assert len(p.matched_threads) == 1 + assert len(p.matched_threads[0].matched_calls) == 1