Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
-

### Bug Fixes
- address: add optional id field to ProcessAddress/ThreadAddress for unique tracking of recycled PID/TID lifecycles @devs6186 #2619
- Fixed insecure deserialization vulnerability in YAML loading @0x1622 (#2770)
- loader: gracefully handle ELF files with unsupported architectures kamranulhaq2002@gmail.com #2800
- loader: handle SegmentationViolation for malformed ELF files @kami922 #2799
Expand Down
70 changes: 52 additions & 18 deletions capa/features/address.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import abc
from typing import Optional


class Address(abc.ABC):
Expand Down Expand Up @@ -50,53 +51,83 @@ def __hash__(self):


class ProcessAddress(Address):
"""an address of a process in a dynamic execution trace"""

def __init__(self, pid: int, ppid: int = 0):
"""an address of a process in a dynamic execution trace

Args:
pid: process ID assigned by the OS
ppid: parent process ID assigned by the OS
id: optional sandbox-specific unique identifier to distinguish
processes whose OS-assigned PIDs collide due to reuse.
For VMRay this is the monitor_id; for other backends
it may be a sequential counter or timestamp.
"""

def __init__(self, pid: int, ppid: int = 0, id: Optional[int] = None):
assert ppid >= 0
assert pid > 0
self.ppid = ppid
self.pid = pid
self.id = id
Comment on lines +65 to +70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation has two potential issues related to the id field:

  1. Sorting: The __lt__ method uses -1 as a sentinel for None, which assumes all valid IDs are non-negative.
  2. Serialization: The freeze logic uses 0 as a sentinel for None, which would incorrectly handle a legitimate ID of 0 (it would be deserialized as None).

While current extractors seem to use positive IDs, this is not enforced. A future backend could provide id=0 or negative IDs, triggering these bugs.

To make the implementation more robust and prevent these potential data loss and sorting issues, I suggest enforcing that id must be a positive integer if it is not None. A similar assertion should be added to ThreadAddress.

Suggested change
def __init__(self, pid: int, ppid: int = 0, id: Optional[int] = None):
assert ppid >= 0
assert pid > 0
self.ppid = ppid
self.pid = pid
self.id = id
def __init__(self, pid: int, ppid: int = 0, id: Optional[int] = None):
assert ppid >= 0
assert pid > 0
if id is not None:
assert id > 0, "ProcessAddress id must be a positive integer"
self.ppid = ppid
self.pid = pid
self.id = id


def __repr__(self):
return "process(%s%s)" % (
f"ppid: {self.ppid}, " if self.ppid > 0 else "",
f"pid: {self.pid}",
)
parts = []
if self.ppid > 0:
parts.append(f"ppid: {self.ppid}")
parts.append(f"pid: {self.pid}")
if self.id is not None:
parts.append(f"id: {self.id}")
return "process(%s)" % ", ".join(parts)

def __hash__(self):
return hash((self.ppid, self.pid))
return hash((self.ppid, self.pid, self.id))

def __eq__(self, other):
assert isinstance(other, ProcessAddress)
return (self.ppid, self.pid) == (other.ppid, other.pid)
return (self.ppid, self.pid, self.id) == (other.ppid, other.pid, other.id)

def __lt__(self, other):
assert isinstance(other, ProcessAddress)
return (self.ppid, self.pid) < (other.ppid, other.pid)
# None sorts before any real id
self_id = self.id if self.id is not None else -1
other_id = other.id if other.id is not None else -1
return (self.ppid, self.pid, self_id) < (other.ppid, other.pid, other_id)


class ThreadAddress(Address):
"""addresses a thread in a dynamic execution trace"""

def __init__(self, process: ProcessAddress, tid: int):
"""addresses a thread in a dynamic execution trace

Args:
process: address of the containing process
tid: thread ID assigned by the OS
id: optional sandbox-specific unique identifier to distinguish
threads whose OS-assigned TIDs collide due to reuse.
For VMRay this is the monitor_id; for other backends
it may be a sequential counter or timestamp.
"""

def __init__(self, process: ProcessAddress, tid: int, id: Optional[int] = None):
assert tid >= 0
self.process = process
self.tid = tid
self.id = id
Comment on lines +108 to +112
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

For the same reasons outlined in the comment on ProcessAddress.__init__ (potential sorting and serialization issues), I recommend adding an assertion here to ensure that thread id values are positive integers if provided. This will make the overall design more robust.

Suggested change
def __init__(self, process: ProcessAddress, tid: int, id: Optional[int] = None):
assert tid >= 0
self.process = process
self.tid = tid
self.id = id
def __init__(self, process: ProcessAddress, tid: int, id: Optional[int] = None):
assert tid >= 0
if id is not None:
assert id > 0, "ThreadAddress id must be a positive integer"
self.process = process
self.tid = tid
self.id = id


def __repr__(self):
return f"{self.process}, thread(tid: {self.tid})"
id_part = f", id: {self.id}" if self.id is not None else ""
return f"{self.process}, thread(tid: {self.tid}{id_part})"

def __hash__(self):
return hash((self.process, self.tid))
return hash((self.process, self.tid, self.id))

def __eq__(self, other):
assert isinstance(other, ThreadAddress)
return (self.process, self.tid) == (other.process, other.tid)
return (self.process, self.tid, self.id) == (other.process, other.tid, other.id)

def __lt__(self, other):
assert isinstance(other, ThreadAddress)
return (self.process, self.tid) < (other.process, other.tid)
# None sorts before any real id
self_id = self.id if self.id is not None else -1
other_id = other.id if other.id is not None else -1
return (self.process, self.tid, self_id) < (other.process, other.tid, other_id)


class DynamicCallAddress(Address):
Expand All @@ -114,7 +145,10 @@ def __hash__(self):
return hash((self.thread, self.id))

def __eq__(self, other):
return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (other.thread, other.id)
return isinstance(other, DynamicCallAddress) and (self.thread, self.id) == (
other.thread,
other.id,
)

def __lt__(self, other):
assert isinstance(other, DynamicCallAddress)
Expand Down
41 changes: 27 additions & 14 deletions capa/features/extractors/cape/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,37 @@

def get_processes(report: CapeReport) -> Iterator[ProcessHandle]:
"""
get all the created processes for a sample
get all the created processes for a sample.

when the OS recycles a PID, multiple processes in the report may share the
same (ppid, pid) pair. we detect this and assign sequential ids so that
each process receives a unique ProcessAddress.
"""
seen_processes = {}
# first pass: count how many times each (ppid, pid) pair appears
counts: dict[tuple[int, int], int] = {}
for process in report.behavior.processes:
addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id)
yield ProcessHandle(address=addr, inner=process)
key = (process.parent_id, process.process_id)
counts[key] = counts.get(key, 0) + 1

# check for pid and ppid reuse
if addr not in seen_processes:
seen_processes[addr] = [process]
else:
logger.warning(
"pid and ppid reuse detected between process %s and process%s: %s",
process,
"es" if len(seen_processes[addr]) > 1 else "",
seen_processes[addr],
# second pass: yield handles with sequential ids for reused pairs
seq: dict[tuple[int, int], int] = {}
for process in report.behavior.processes:
key = (process.parent_id, process.process_id)
seq[key] = seq.get(key, 0) + 1

# only assign ids when reuse is detected; otherwise keep id=None
# for backward compatibility with existing addresses and freeze files
id_ = seq[key] if counts[key] > 1 else None
if id_ is not None:
logger.debug(
"pid reuse detected for ppid=%d, pid=%d: assigning id=%d",
process.parent_id,
process.process_id,
id_,
)
seen_processes[addr].append(process)

addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id, id=id_)
yield ProcessHandle(address=addr, inner=process)


def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
Expand Down
8 changes: 6 additions & 2 deletions capa/features/extractors/vmray/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ def get_processes(self) -> Iterator[ProcessHandle]:
)
continue

address: ProcessAddress = ProcessAddress(pid=monitor_process.pid, ppid=monitor_process.ppid)
address: ProcessAddress = ProcessAddress(
pid=monitor_process.pid, ppid=monitor_process.ppid, id=monitor_process.monitor_id
)
yield ProcessHandle(address, inner=monitor_process)

def extract_process_features(self, ph: ProcessHandle) -> Iterator[tuple[Feature, Address]]:
Expand All @@ -114,7 +116,9 @@ def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
for monitor_thread_id in self.analysis.monitor_threads_by_monitor_process[ph.inner.monitor_id]:
monitor_thread: VMRayMonitorThread = self.analysis.monitor_threads[monitor_thread_id]

address: ThreadAddress = ThreadAddress(process=ph.address, tid=monitor_thread.tid)
address: ThreadAddress = ThreadAddress(
process=ph.address, tid=monitor_thread.tid, id=monitor_thread.monitor_id
)
yield ThreadHandle(address=address, inner=monitor_thread)

def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]:
Expand Down
Loading