Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions modules/signatures/all/com_process_activation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
from lib.cuckoo.common.abstracts import Signature


class COMSpawnedProcess(Signature):
name = "com_spawned_process"
description = (
"Office application COM-activated a suspicious process via DCOM broker — "
"the logical parent-child relationship is hidden behind svchost (LethalHTA / OLE embedding pattern)"
)
severity = 3
weight = 3
confidence = 85
categories = ["behavior", "evasion"]
authors = ["wmetcalf"]
minimum = "1.2"
ttps = ["T1559.001", "T1218.005"]
evented = False

OFFICE_ACTIVATORS = {
"excel.exe", "winword.exe", "powerpnt.exe", "outlook.exe",
"msaccess.exe", "mspub.exe", "visio.exe",
}

def run(self):
# Only report confirmed COM-spawned subprocesses visible in the enriched tree.
# Requiring com_logical_parent_pid avoids noise from normal JScript/WMI activations.
def walk(nodes):
for node in nodes:
lpid = node.get("com_logical_parent_pid")
lname = (node.get("com_logical_parent_name") or "").lower()
if lpid and os.path.basename(lname) in self.OFFICE_ACTIVATORS:
self.data.append({
"spawned": "%s (pid %s)" % (node.get("name"), node.get("pid")),
"logical_parent": "%s (pid %s)" % (
node.get("com_logical_parent_name"), lpid),
"via": node.get("com_progid") or node.get("com_clsid", ""),
})
Comment on lines +20 to +38
walk(node.get("children") or [])

walk((self.results.get("behavior") or {}).get("processtree") or [])
return bool(self.data)
189 changes: 189 additions & 0 deletions modules/signatures/all/pe_cert_suspicious.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import re
from lib.cuckoo.common.abstracts import Signature


def _get_pe(results):
return (results.get("target") or {}).get("file", {}).get("pe", {})


KNOWN_CA_TERMS = (
"digicert", "entrust", "comodo", "sectigo", "verisign",
"globalsign", "usertrust", "thawte", "geotrust", "amazon",
"microsoft", "google", "apple", "root ca", "root authority",
"symantec", "godaddy", "rapidssl", "network solutions", "ssl.com",
)

DOMAIN_RE = re.compile(r'^[a-z0-9][a-z0-9\-\.]+\.(nl|com|net|org|ru|cn|de|io|co|info|biz|xyz|top|site)$', re.I)


def _is_known_ca(name):
if not name:
return False
nl = name.lower()
return any(t in nl for t in KNOWN_CA_TERMS)


class PECertSelfSigned(Signature):
name = "pe_cert_self_signed"
description = "PE is signed with a self-signed certificate (no trusted CA chain)"
severity = 3
weight = 3
confidence = 80
categories = ["static", "evasion"]
authors = ["wmetcalf"]
minimum = "1.2"
ttps = ["T1553.002"]
evented = False

def run(self):
pe = _get_pe(self.results)
ds = pe.get("digital_signers") or []
gs = pe.get("guest_signers") or {}

for cert in ds:
subject = cert.get("subject_commonName", "")
issuer = cert.get("issuer_commonName", "")
if not subject or not issuer:
continue
if subject.lower() != issuer.lower():
continue
if _is_known_ca(subject):
continue
self.data.append({
"subject": subject,
"sha1": cert.get("sha1_fingerprint", ""),
"not_after": cert.get("not_after", ""),
})

if not self.data:
for signer in gs.get("aux_signers") or []:
if "Certificate Chain" not in (signer.get("name") or ""):
continue
issued_to = signer.get("Issued to", "")
issued_by = signer.get("Issued by", "")
if issued_to and issued_to == issued_by and not _is_known_ca(issued_to):
self.data.append({
"subject": issued_to,
"sha1": signer.get("SHA1 hash", ""),
"expires": signer.get("Expires", ""),
})

return bool(self.data)


class PECertSuspiciousIssuer(Signature):
name = "pe_cert_suspicious_issuer"
description = (
"PE signed by an unrecognized CA with a short validity window or domain-style subject — "
"consistent with certificates purchased from low-trust or compromised issuers"
)
severity = 3
weight = 3
confidence = 75
categories = ["static", "evasion"]
authors = ["wmetcalf"]
minimum = "1.2"
ttps = ["T1553.002"]
evented = False

def run(self):
pe = _get_pe(self.results)
ds = pe.get("digital_signers") or []
gs = pe.get("guest_signers") or {}

# Only one cert in chain = no intermediate CA, incomplete chain
chain_certs = [s for s in (gs.get("aux_signers") or [])
if "Certificate Chain" in (s.get("name") or "")]
single_cert_chain = len(chain_certs) == 1

for cert in ds:
subject = cert.get("subject_commonName", "")
issuer = cert.get("issuer_commonName", "")
if not subject or not issuer:
continue
if subject.lower() == issuer.lower():
continue # handled by pe_cert_self_signed
if _is_known_ca(issuer):
continue

suspicious = False
reasons = []

if single_cert_chain:
suspicious = True
reasons.append("single-cert chain (no intermediate CA)")

# Domain name as code-signing subject
if DOMAIN_RE.match(subject):
suspicious = True
reasons.append(f"domain-style subject CN: {subject}")

# Very short validity (< 180 days)
try:
from datetime import datetime
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Importing datetime inside the run method is inefficient as it will be re-imported every time the signature runs. Move this import to the top of the file.

nb = datetime.fromisoformat(cert.get("not_before", "").replace("Z", ""))
na = datetime.fromisoformat(cert.get("not_after", "").replace("Z", ""))
days = (na - nb).days
if days < 180:
suspicious = True
reasons.append(f"short validity: {days} days")
except Exception:
pass

if suspicious:
self.data.append({
"subject": subject,
"issuer": issuer,
"sha1": cert.get("sha1_fingerprint", ""),
"reasons": ", ".join(reasons),
})

return bool(self.data)


class PECertInvalidSignature(Signature):
name = "pe_cert_invalid_signature"
description = "PE Authenticode signature failed cryptographic verification (tampered, revoked, or unresolvable chain)"
severity = 4
weight = 4
confidence = 85
categories = ["static", "evasion"]
authors = ["wmetcalf"]
minimum = "1.2"
ttps = ["T1553.002", "T1036"]
evented = False

BAD_ERRORS = (
"did not verify",
"revoked",
"chain could not be built", # 0x800B010A
"0x800b010a",
"0x800b0109", # revoked
"0x80096010", # hash mismatch
"0x800b0101", # expired
)

def run(self):
pe = _get_pe(self.results)
gs = pe.get("guest_signers") or {}

if not gs or gs.get("aux_valid"):
return False
err = (gs.get("aux_error_desc") or "").lower()
if not err or "no signature" in err:
return False
if not any(t in err for t in self.BAD_ERRORS):
return False

signers = gs.get("aux_signers") or []
leaf = next(
(s for s in signers if "Certificate Chain" in (s.get("name") or "")),
{}
)
self.data.append({
"error": gs.get("aux_error_desc", ""),
"signer": leaf.get("Issued to", "unknown"),
"issuer": leaf.get("Issued by", "unknown"),
"sha1": gs.get("aux_sha1") or leaf.get("SHA1 hash", ""),
})
return True
93 changes: 93 additions & 0 deletions modules/signatures/all/stealer_headless_browser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import re
from lib.cuckoo.common.abstracts import Signature

BROWSER_RE = re.compile(
r'\\(?:chrome|brave|msedge|firefox|opera)\.exe',
re.IGNORECASE
)

SUSPICIOUS_PARENT_RE = re.compile(
r'\\(?:Temp|AppData|ProgramData|Users\\[^\\]+\\(?:AppData|Downloads)|Users\\Public)\\',
re.IGNORECASE
)
Comment on lines +4 to +12

LEGITIMATE_LAUNCHERS = re.compile(
r'(?:GoogleUpdate|BraveUpdate|MicrosoftEdgeUpdate|BraveCrashHandler|setup|installer)\.exe$',
re.IGNORECASE
)


class BrowserCredentialTheftHeadless(Signature):
name = "browser_credential_theft_headless"
description = (
"Stealer credential extraction: browser(s) launched headless with logging suppressed "
"from a suspicious parent directory. Malware spawns real browser binaries in silent "
"headless mode to access saved passwords, cookies, and session tokens."
)
severity = 4
weight = 5
confidence = 80
categories = ["infostealer", "credential_access"]
authors = ["wmetcalf"]
minimum = "1.2"
ttps = ["T1555.003", "T1185"]
references = ["CAPE task 7296"]
evented = False

def run(self):
cmdlines = (
self.results.get("behavior", {})
.get("summary", {})
.get("executed_commands", []) or []
)

# Find browsers launched headless with logging suppressed
headless_browsers = set()
for cmd in cmdlines:
lower = cmd.lower()
if not BROWSER_RE.search(cmd):
continue
if "--headless" not in lower:
continue
if "--disable-logging" not in lower and "--log-level=3" not in lower:
continue
m = BROWSER_RE.search(cmd)
if m:
headless_browsers.add(m.group(0).lower())
self.data.append({"headless_cmd": cmd[:200]})

if not headless_browsers:
return False

# Require suspicious parent — find via process tree
suspicious_parent = None
for proc in (
self.results.get("behavior", {}).get("processes", []) or []
):
path = proc.get("module_path", "") or proc.get("process_name", "") or ""
if not BROWSER_RE.search(path):
continue
parent_id = proc.get("parent_id")
if parent_id is None:
continue
# Find parent process
for parent in (
self.results.get("behavior", {}).get("processes", []) or []
):
if parent.get("process_id") != parent_id:
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This nested loop over the process list results in $O(N^2)$ complexity, which can significantly impact performance for analyses with many processes. Consider pre-indexing the processes by process_id into a dictionary for $O(1)$ lookup.

parent_path = parent.get("module_path", "") or ""
if SUSPICIOUS_PARENT_RE.search(parent_path) and not LEGITIMATE_LAUNCHERS.search(parent_path):
suspicious_parent = parent_path
self.data.append({"suspicious_parent": parent_path})
break
if suspicious_parent:
break

# Also accept if 3+ browsers launched headless (multi-browser sweep = high confidence
# even without confirmed parent — covers cases where process tree is incomplete)
if suspicious_parent or len(headless_browsers) >= 3:
self.data.append({"browsers_targeted": sorted(headless_browsers)})
return True

return False
28 changes: 27 additions & 1 deletion modules/signatures/windows/martians_office.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,30 @@ def run(self):
for martian in self.martians:
self.data.append({"office_martian": martian})
return True
return False

# Also check COM-logical children: processes whose com_logical_parent_pid
# points to an Office process (LethalHTA / DCOM broker pattern).
office_pids = set()
def _collect_office_pids(nodes):
for n in nodes:
if self.office_paths_re.match((n.get("module_path") or "").lower()):
office_pids.add(n["pid"])
_collect_office_pids(n.get("children") or [])
_collect_office_pids(processes)

def _check_com_martians(nodes):
for n in nodes:
if n.get("com_logical_parent_pid") in office_pids:
child_path = (n.get("module_path") or "").lower()
if child_path and not any(wl.search(child_path) for wl in self.white_list_re_compiled):
self.data.append({
"office_com_martian": child_path,
"activated_by": "%s (pid %s) via COM" % (
n.get("com_logical_parent_name", ""),
n.get("com_logical_parent_pid", ""),
),
"clsid_progid": n.get("com_progid") or n.get("com_clsid", ""),
})
_check_com_martians(n.get("children") or [])
_check_com_martians(processes)
return bool(self.data)
Loading
Loading