Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
-

### Bug Fixes
- address: add optional id field to ProcessAddress/ThreadAddress for unique tracking of recycled PID/TID lifecycles @devs6186 #2619
- main: suggest --os flag in unsupported OS error message to help users override ELF OS detection @devs6186 #2577
- render: escape sample-controlled strings before passing to Rich to prevent MarkupError @devs6186 #2699
- Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770)
Expand Down
70 changes: 52 additions & 18 deletions capa/features/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import abc
from typing import Optional


class Address(abc.ABC):
Expand Down Expand Up @@ -50,53 +51,83 @@ def __hash__(self):


class ProcessAddress(Address):
"""an address of a process in a dynamic execution trace"""

def __init__(self, pid: int, ppid: int = 0):
"""an address of a process in a dynamic execution trace

Args:
pid: process ID assigned by the OS
ppid: parent process ID assigned by the OS
id: optional sandbox-specific unique identifier to distinguish
processes whose OS-assigned PIDs collide due to reuse.
For VMRay this is the monitor_id; for other backends
it may be a sequential counter or timestamp.
"""

def __init__(self, pid: int, ppid: int = 0, id: Optional[int] = None):
assert ppid >= 0
assert pid > 0
self.ppid = ppid
self.pid = pid
self.id = id

def __repr__(self):
return "process(%s%s)" % (
f"ppid: {self.ppid}, " if self.ppid > 0 else "",
f"pid: {self.pid}",
)
parts = []
if self.ppid > 0:
parts.append(f"ppid: {self.ppid}")
parts.append(f"pid: {self.pid}")
if self.id is not None:
parts.append(f"id: {self.id}")
return "process(%s)" % ", ".join(parts)

def __hash__(self):
return hash((self.ppid, self.pid))
return hash((self.ppid, self.pid, self.id))

def __eq__(self, other):
assert isinstance(other, ProcessAddress)
return (self.ppid, self.pid) == (other.ppid, other.pid)
return (self.ppid, self.pid, self.id) == (other.ppid, other.pid, other.id)

def __lt__(self, other):
assert isinstance(other, ProcessAddress)
return (self.ppid, self.pid) < (other.ppid, other.pid)
# None sorts before any real id
self_id = self.id if self.id is not None else -1
other_id = other.id if other.id is not None else -1
return (self.ppid, self.pid, self_id) < (other.ppid, other.pid, other_id)


class ThreadAddress(Address):
"""addresses a thread in a dynamic execution trace"""

def __init__(self, process: ProcessAddress, tid: int):
"""addresses a thread in a dynamic execution trace

Args:
process: address of the containing process
tid: thread ID assigned by the OS
id: optional sandbox-specific unique identifier to distinguish
threads whose OS-assigned TIDs collide due to reuse.
For VMRay this is the monitor_id; for other backends
it may be a sequential counter or timestamp.
"""

def __init__(self, process: ProcessAddress, tid: int, id: Optional[int] = None):
assert tid >= 0
self.process = process
self.tid = tid
self.id = id

def __repr__(self):
return f"{self.process}, thread(tid: {self.tid})"
id_part = f", id: {self.id}" if self.id is not None else ""
return f"{self.process}, thread(tid: {self.tid}{id_part})"

def __hash__(self):
return hash((self.process, self.tid))
return hash((self.process, self.tid, self.id))

def __eq__(self, other):
assert isinstance(other, ThreadAddress)
return (self.process, self.tid) == (other.process, other.tid)
return (self.process, self.tid, self.id) == (other.process, other.tid, other.id)

def __lt__(self, other):
assert isinstance(other, ThreadAddress)
return (self.process, self.tid) < (other.process, other.tid)
# None sorts before any real id
self_id = self.id if self.id is not None else -1
other_id = other.id if other.id is not None else -1
return (self.process, self.tid, self_id) < (other.process, other.tid, other_id)


class DynamicCallAddress(Address):
Expand All @@ -114,7 +145,10 @@ def __hash__(self):
return hash((self.thread, self.id))

def __eq__(self, other):
return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (other.thread, other.id)
return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (
other.thread,
other.id,
)

def __lt__(self, other):
assert isinstance(other, DynamicCallAddress)
Expand Down
41 changes: 27 additions & 14 deletions capa/features/extractors/cape/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,37 @@

def get_processes(report: CapeReport) -> Iterator[ProcessHandle]:
"""
get all the created processes for a sample
get all the created processes for a sample.

when the OS recycles a PID, multiple processes in the report may share the
same (ppid, pid) pair. we detect this and assign sequential ids so that
each process receives a unique ProcessAddress.
"""
seen_processes = {}
# first pass: count how many times each (ppid, pid) pair appears
counts: dict[tuple[int, int], int] = {}
for process in report.behavior.processes:
addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id)
yield ProcessHandle(address=addr, inner=process)
key = (process.parent_id, process.process_id)
counts[key] = counts.get(key, 0) + 1

# check for pid and ppid reuse
if addr not in seen_processes:
seen_processes[addr] = [process]
else:
logger.warning(
"pid and ppid reuse detected between process %s and process%s: %s",
process,
"es" if len(seen_processes[addr]) > 1 else "",
seen_processes[addr],
# second pass: yield handles with sequential ids for reused pairs
seq: dict[tuple[int, int], int] = {}
for process in report.behavior.processes:
key = (process.parent_id, process.process_id)
seq[key] = seq.get(key, 0) + 1

# only assign ids when reuse is detected; otherwise keep id=None
# for backward compatibility with existing addresses and freeze files
id_ = seq[key] if counts[key] > 1 else None
if id_ is not None:
logger.debug(
"pid reuse detected for ppid=%d, pid=%d: assigning id=%d",
process.parent_id,
process.process_id,
id_,
)
seen_processes[addr].append(process)

addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id, id=id_)
yield ProcessHandle(address=addr, inner=process)


def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
Expand Down
8 changes: 6 additions & 2 deletions capa/features/extractors/vmray/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def get_processes(self) -> Iterator[ProcessHandle]:
)
continue

address: ProcessAddress = ProcessAddress(pid=monitor_process.pid, ppid=monitor_process.ppid)
address: ProcessAddress = ProcessAddress(
pid=monitor_process.pid, ppid=monitor_process.ppid, id=monitor_process.monitor_id
)
yield ProcessHandle(address, inner=monitor_process)

def extract_process_features(self, ph: ProcessHandle) -> Iterator[tuple[Feature, Address]]:
Expand All @@ -114,7 +116,9 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
for monitor_thread_id in self.analysis.monitor_threads_by_monitor_process[ph.inner.monitor_id]:
monitor_thread: VMRayMonitorThread = self.analysis.monitor_threads[monitor_thread_id]

address: ThreadAddress = ThreadAddress(process=ph.address, tid=monitor_thread.tid)
address: ThreadAddress = ThreadAddress(
process=ph.address, tid=monitor_thread.tid, id=monitor_thread.monitor_id
)
yield ThreadHandle(address=address, inner=monitor_thread)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]:
Expand Down
Loading