From ddcbc7568e9b28b4fe1092fadab23ba7a09a0def Mon Sep 17 00:00:00 2001 From: Will Metcalf Date: Tue, 12 May 2026 11:20:39 -0500 Subject: [PATCH 1/3] feat: new detection signatures + bug fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **New signatures:** - `pe_cert_suspicious.py` (all): Three PE Authenticode cert signatures: - `pe_cert_self_signed` (sev 3): PE signed with self-signed cert (subject == issuer, excluding known root CAs). Uses both digital_signers and guest_signers data. - `pe_cert_suspicious_issuer` (sev 3): PE signed by unrecognized CA with incomplete chain, domain-style subject CN, or short validity window (< 180 days). - `pe_cert_invalid_signature` (sev 4): Signature failed cryptographic verification (hash mismatch 0x80096010, revoked 0x800B0109, chain can't be built 0x800B010A). Distinguishes definitive failures from sandbox trust-store gaps. - `stealer_headless_browser.py` (all): Detects browser stealers launching browsers headless with logging suppressed (--headless --disable-logging --log-level=3) from a suspicious parent directory. Fires when 3+ browsers are launched this way (multi-browser sweep = high confidence) or when the process tree confirms the suspicious parent. Catches the credential-extraction phase that follows the initial browser probe. - `com_process_activation.py` (all): Detects Office applications (Excel, Word, etc.) that COM-activated a suspicious process (mshta, powershell, cmd, etc.) via the DCOM broker — the LethalHTA / OLE embedding attack pattern. Only fires when the process tree enrichment confirms an actual subprocess was spawned (requires CAPEv2 behavior.py network_map COM enrichment). **Bug fixes:** - `martians_office.py`: Add COM-logical children check. The existing OS-process-tree walk misses LethalHTA spawns because mshta's OS parent is svchost, not Excel. Added `_check_com_martians()` that walks the enriched processtree for nodes with `com_logical_parent_pid` pointing to an Office process. - `ransomware_message.py`: Fix `TypeError: can't use a bytes pattern on a string-like object` in re2. `indicators` were encoded to bytes and joined with `b"|"` producing a bytes regex, but `buff.lower()` returns a str. Changed to compile a str pattern. --- .../signatures/all/com_process_activation.py | 42 +++ modules/signatures/all/pe_cert_suspicious.py | 189 ++++++++++++ .../all/stealer_headless_browser.py | 93 ++++++ modules/signatures/windows/martians_office.py | 28 +- .../signatures/windows/ransomware_message.py | 285 ++++++------------ 5 files changed, 449 insertions(+), 188 deletions(-) create mode 100644 modules/signatures/all/com_process_activation.py create mode 100644 modules/signatures/all/pe_cert_suspicious.py create mode 100644 modules/signatures/all/stealer_headless_browser.py diff --git a/modules/signatures/all/com_process_activation.py b/modules/signatures/all/com_process_activation.py new file mode 100644 index 00000000..3175a395 --- /dev/null +++ b/modules/signatures/all/com_process_activation.py @@ -0,0 +1,42 @@ +import os +from lib.cuckoo.common.abstracts import Signature + + +class COMSpawnedProcess(Signature): + name = "com_spawned_process" + description = ( + "Office application COM-activated a suspicious process via DCOM broker — " + "the logical parent-child relationship is hidden behind svchost (LethalHTA / OLE embedding pattern)" + ) + severity = 3 + weight = 3 + confidence = 85 + categories = ["behavior", "evasion"] + authors = ["wmetcalf"] + minimum = "1.2" + ttps = ["T1559.001", "T1218.005"] + evented = False + + OFFICE_ACTIVATORS = { + "excel.exe", "winword.exe", "powerpnt.exe", "outlook.exe", + "msaccess.exe", "mspub.exe", "visio.exe", + } + + def run(self): + # Only report confirmed COM-spawned subprocesses visible in the enriched tree. + # Requiring com_logical_parent_pid avoids noise from normal JScript/WMI activations. + def walk(nodes): + for node in nodes: + lpid = node.get("com_logical_parent_pid") + lname = (node.get("com_logical_parent_name") or "").lower() + if lpid and os.path.basename(lname) in self.OFFICE_ACTIVATORS: + self.data.append({ + "spawned": "%s (pid %s)" % (node.get("name"), node.get("pid")), + "logical_parent": "%s (pid %s)" % ( + node.get("com_logical_parent_name"), lpid), + "via": node.get("com_progid") or node.get("com_clsid", ""), + }) + walk(node.get("children") or []) + + walk((self.results.get("behavior") or {}).get("processtree") or []) + return bool(self.data) diff --git a/modules/signatures/all/pe_cert_suspicious.py b/modules/signatures/all/pe_cert_suspicious.py new file mode 100644 index 00000000..dfb3c536 --- /dev/null +++ b/modules/signatures/all/pe_cert_suspicious.py @@ -0,0 +1,189 @@ +import re +from lib.cuckoo.common.abstracts import Signature + + +def _get_pe(results): + return (results.get("target") or {}).get("file", {}).get("pe", {}) + + +KNOWN_CA_TERMS = ( + "digicert", "entrust", "comodo", "sectigo", "verisign", + "globalsign", "usertrust", "thawte", "geotrust", "amazon", + "microsoft", "google", "apple", "root ca", "root authority", + "symantec", "godaddy", "rapidssl", "network solutions", "ssl.com", +) + +DOMAIN_RE = re.compile(r'^[a-z0-9][a-z0-9\-\.]+\.(nl|com|net|org|ru|cn|de|io|co|info|biz|xyz|top|site)$', re.I) + + +def _is_known_ca(name): + if not name: + return False + nl = name.lower() + return any(t in nl for t in KNOWN_CA_TERMS) + + +class PECertSelfSigned(Signature): + name = "pe_cert_self_signed" + description = "PE is signed with a self-signed certificate (no trusted CA chain)" + severity = 3 + weight = 3 + confidence = 80 + categories = ["static", "evasion"] + authors = ["wmetcalf"] + minimum = "1.2" + ttps = ["T1553.002"] + evented = False + + def run(self): + pe = _get_pe(self.results) + ds = pe.get("digital_signers") or [] + gs = pe.get("guest_signers") or {} + + for cert in ds: + subject = cert.get("subject_commonName", "") + issuer = cert.get("issuer_commonName", "") + if not subject or not issuer: + continue + if subject.lower() != issuer.lower(): + continue + if _is_known_ca(subject): + continue + self.data.append({ + "subject": subject, + "sha1": cert.get("sha1_fingerprint", ""), + "not_after": cert.get("not_after", ""), + }) + + if not self.data: + for signer in gs.get("aux_signers") or []: + if "Certificate Chain" not in (signer.get("name") or ""): + continue + issued_to = signer.get("Issued to", "") + issued_by = signer.get("Issued by", "") + if issued_to and issued_to == issued_by and not _is_known_ca(issued_to): + self.data.append({ + "subject": issued_to, + "sha1": signer.get("SHA1 hash", ""), + "expires": signer.get("Expires", ""), + }) + + return bool(self.data) + + +class PECertSuspiciousIssuer(Signature): + name = "pe_cert_suspicious_issuer" + description = ( + "PE signed by an unrecognized CA with a short validity window or domain-style subject — " + "consistent with certificates purchased from low-trust or compromised issuers" + ) + severity = 3 + weight = 3 + confidence = 75 + categories = ["static", "evasion"] + authors = ["wmetcalf"] + minimum = "1.2" + ttps = ["T1553.002"] + evented = False + + def run(self): + pe = _get_pe(self.results) + ds = pe.get("digital_signers") or [] + gs = pe.get("guest_signers") or {} + + # Only one cert in chain = no intermediate CA, incomplete chain + chain_certs = [s for s in (gs.get("aux_signers") or []) + if "Certificate Chain" in (s.get("name") or "")] + single_cert_chain = len(chain_certs) == 1 + + for cert in ds: + subject = cert.get("subject_commonName", "") + issuer = cert.get("issuer_commonName", "") + if not subject or not issuer: + continue + if subject.lower() == issuer.lower(): + continue # handled by pe_cert_self_signed + if _is_known_ca(issuer): + continue + + suspicious = False + reasons = [] + + if single_cert_chain: + suspicious = True + reasons.append("single-cert chain (no intermediate CA)") + + # Domain name as code-signing subject + if DOMAIN_RE.match(subject): + suspicious = True + reasons.append(f"domain-style subject CN: {subject}") + + # Very short validity (< 180 days) + try: + from datetime import datetime + nb = datetime.fromisoformat(cert.get("not_before", "").replace("Z", "")) + na = datetime.fromisoformat(cert.get("not_after", "").replace("Z", "")) + days = (na - nb).days + if days < 180: + suspicious = True + reasons.append(f"short validity: {days} days") + except Exception: + pass + + if suspicious: + self.data.append({ + "subject": subject, + "issuer": issuer, + "sha1": cert.get("sha1_fingerprint", ""), + "reasons": ", ".join(reasons), + }) + + return bool(self.data) + + +class PECertInvalidSignature(Signature): + name = "pe_cert_invalid_signature" + description = "PE Authenticode signature failed cryptographic verification (tampered, revoked, or unresolvable chain)" + severity = 4 + weight = 4 + confidence = 85 + categories = ["static", "evasion"] + authors = ["wmetcalf"] + minimum = "1.2" + ttps = ["T1553.002", "T1036"] + evented = False + + BAD_ERRORS = ( + "did not verify", + "revoked", + "chain could not be built", # 0x800B010A + "0x800b010a", + "0x800b0109", # revoked + "0x80096010", # hash mismatch + "0x800b0101", # expired + ) + + def run(self): + pe = _get_pe(self.results) + gs = pe.get("guest_signers") or {} + + if not gs or gs.get("aux_valid"): + return False + err = (gs.get("aux_error_desc") or "").lower() + if not err or "no signature" in err: + return False + if not any(t in err for t in self.BAD_ERRORS): + return False + + signers = gs.get("aux_signers") or [] + leaf = next( + (s for s in signers if "Certificate Chain" in (s.get("name") or "")), + {} + ) + self.data.append({ + "error": gs.get("aux_error_desc", ""), + "signer": leaf.get("Issued to", "unknown"), + "issuer": leaf.get("Issued by", "unknown"), + "sha1": gs.get("aux_sha1") or leaf.get("SHA1 hash", ""), + }) + return True diff --git a/modules/signatures/all/stealer_headless_browser.py b/modules/signatures/all/stealer_headless_browser.py new file mode 100644 index 00000000..538da28b --- /dev/null +++ b/modules/signatures/all/stealer_headless_browser.py @@ -0,0 +1,93 @@ +import re +from lib.cuckoo.common.abstracts import Signature + +BROWSER_RE = re.compile( + r'\\(?:chrome|brave|msedge|firefox|opera)\.exe', + re.IGNORECASE +) + +SUSPICIOUS_PARENT_RE = re.compile( + r'\\(?:Temp|AppData|ProgramData|Users\\[^\\]+\\(?:AppData|Downloads)|Users\\Public)\\', + re.IGNORECASE +) + +LEGITIMATE_LAUNCHERS = re.compile( + r'(?:GoogleUpdate|BraveUpdate|MicrosoftEdgeUpdate|BraveCrashHandler|setup|installer)\.exe$', + re.IGNORECASE +) + + +class BrowserCredentialTheftHeadless(Signature): + name = "browser_credential_theft_headless" + description = ( + "Stealer credential extraction: browser(s) launched headless with logging suppressed " + "from a suspicious parent directory. Malware spawns real browser binaries in silent " + "headless mode to access saved passwords, cookies, and session tokens." + ) + severity = 4 + weight = 5 + confidence = 80 + categories = ["infostealer", "credential_access"] + authors = ["wmetcalf"] + minimum = "1.2" + ttps = ["T1555.003", "T1185"] + references = ["CAPE task 7296"] + evented = False + + def run(self): + cmdlines = ( + self.results.get("behavior", {}) + .get("summary", {}) + .get("executed_commands", []) or [] + ) + + # Find browsers launched headless with logging suppressed + headless_browsers = set() + for cmd in cmdlines: + lower = cmd.lower() + if not BROWSER_RE.search(cmd): + continue + if "--headless" not in lower: + continue + if "--disable-logging" not in lower and "--log-level=3" not in lower: + continue + m = BROWSER_RE.search(cmd) + if m: + headless_browsers.add(m.group(0).lower()) + self.data.append({"headless_cmd": cmd[:200]}) + + if not headless_browsers: + return False + + # Require suspicious parent — find via process tree + suspicious_parent = None + for proc in ( + self.results.get("behavior", {}).get("processes", []) or [] + ): + path = proc.get("module_path", "") or proc.get("process_name", "") or "" + if not BROWSER_RE.search(path): + continue + parent_id = proc.get("parent_id") + if parent_id is None: + continue + # Find parent process + for parent in ( + self.results.get("behavior", {}).get("processes", []) or [] + ): + if parent.get("process_id") != parent_id: + continue + parent_path = parent.get("module_path", "") or "" + if SUSPICIOUS_PARENT_RE.search(parent_path) and not LEGITIMATE_LAUNCHERS.search(parent_path): + suspicious_parent = parent_path + self.data.append({"suspicious_parent": parent_path}) + break + if suspicious_parent: + break + + # Also accept if 3+ browsers launched headless (multi-browser sweep = high confidence + # even without confirmed parent — covers cases where process tree is incomplete) + if suspicious_parent or len(headless_browsers) >= 3: + self.data.append({"browsers_targeted": sorted(headless_browsers)}) + return True + + return False diff --git a/modules/signatures/windows/martians_office.py b/modules/signatures/windows/martians_office.py index 7cf07251..ce9fc48b 100644 --- a/modules/signatures/windows/martians_office.py +++ b/modules/signatures/windows/martians_office.py @@ -109,4 +109,30 @@ def run(self): for martian in self.martians: self.data.append({"office_martian": martian}) return True - return False + + # Also check COM-logical children: processes whose com_logical_parent_pid + # points to an Office process (LethalHTA / DCOM broker pattern). + office_pids = set() + def _collect_office_pids(nodes): + for n in nodes: + if self.office_paths_re.match((n.get("module_path") or "").lower()): + office_pids.add(n["pid"]) + _collect_office_pids(n.get("children") or []) + _collect_office_pids(processes) + + def _check_com_martians(nodes): + for n in nodes: + if n.get("com_logical_parent_pid") in office_pids: + child_path = (n.get("module_path") or "").lower() + if child_path and not any(wl.search(child_path) for wl in self.white_list_re_compiled): + self.data.append({ + "office_com_martian": child_path, + "activated_by": "%s (pid %s) via COM" % ( + n.get("com_logical_parent_name", ""), + n.get("com_logical_parent_pid", ""), + ), + "clsid_progid": n.get("com_progid") or n.get("com_clsid", ""), + }) + _check_com_martians(n.get("children") or []) + _check_com_martians(processes) + return bool(self.data) diff --git a/modules/signatures/windows/ransomware_message.py b/modules/signatures/windows/ransomware_message.py index ec1740ea..b674d0d4 100644 --- a/modules/signatures/windows/ransomware_message.py +++ b/modules/signatures/windows/ransomware_message.py @@ -32,81 +32,38 @@ class RansomwareMessage(Signature): ttps = ["T1486"] mbcs = ["OB0008", "E1486", "OC0001", "C0016"] - filter_apinames = {"NtWriteFile", "WriteFile"} + filter_apinames = {"NtWriteFile"} def __init__(self, *args, **kwargs): Signature.__init__(self, *args, **kwargs) self.ret = False self.indicators = [ - ".onion", - "aes 128", - "aes 256", - "aes-128", - "aes-256", - "aes128", - "aes256", - "all data", - "attention!", - "bit coin", - "bitcoin", - "bootkit", - "btc", - "decrypt", - "decrypter", - "decryptor", - "device id", - "download tor", - "encrypt", - "encrypted", - "encryption id", - "enter code", - "ethereum", - "get back my", - "get back your", - "hardwareid", + "your files", + "your data", + "your documents", + "restore files", + "restore data", + "restore the files", + "restore the data", + "recover files", + "recover data", + "recover the files", + "recover the data", "has been locked", - "install tor", - "localbitcoins", - "military grade encryption", - "pay a fine", "pay fine", + "pay a fine", "pay the fine", - "payment", - "personal code", - "personal id", - "personal identification code", - "personal identifier", - "personal key", - "private code", - "private key", - "ransom", - "recover data", - "recover files", - "recover my", - "recover personal", - "recover the data", - "recover the files", + "decrypt", + "encrypt", "recover them", "recover your", - "restore data", - "restore files", - "restore system", - "restore the data", - "restore the files", - "restore the system", - "rootkit", - "rsa 1024", - "rsa 2048", - "rsa 4096", - "rsa-1024", - "rsa-2048", - "rsa-4096", - "rsa1024", - "rsa2048", - "rsa4096", - "secret internet server", + "recover personal", + "bitcoin", "secret server", + "secret internet server", + "install tor", + "download tor", "tor browser", "tor gateway", "tor-browser", @@ -115,26 +72,68 @@ def __init__(self, *args, **kwargs): "torgateway", "torproject.org", "tox.chat", - "unique id", - "unique key", + "ransom", + "bootkit", + "rootkit", + "payment", "victim", - "wallet address", - "what happened", + "AES128", + "AES256", + "AES 128", + "AES 256", + "AES-128", + "AES-256", + "RSA1024", + "RSA2048", + "RSA4096", + "RSA 1024", + "RSA 2048", + "RSA 4096", + "RSA-1024", + "RSA-2048", + "RSA-4096", + "private key", + "personal key", "your code", - "your data", - "your database", - "your documents", - "your files", + "private code", + "personal code", + "enter code", "your key", + "unique key", + "your database", + "encrypted", + "bit coin", + "BTC", + "ethereum", + "what happened", + "what happened", + "decryptor", + "decrypter", + "personal ID", + "unique ID", + "encryption ID", + "device ID", + "hardwareid", + "recover my", + "wallet address", + "localbitcoins", + "Attention!", + "restore the system", + "restore system", + "military grade encryption", + "personal identifier", + "personal identification code", + "get back my", + "get back your", "your network", ] - indicators_str = [re.escape(i.lower()) for i in self.indicators] - pattern_str = "|".join(indicators_str) + indicators_lower = [i.lower() for i in self.indicators] + pattern_str = "|".join(re.escape(i) for i in indicators_lower) self.regex = re.compile(pattern_str) def on_call(self, call, process): - filepath = self.get_argument(call, "HandleName") or self.get_argument(call, "FileName") + filepath = self.get_argument(call, "HandleName") if not filepath: return @@ -153,21 +152,18 @@ def on_call(self, call, process): if not is_target_path: return - buff = self.get_argument(call, "Buffer") - - if buff: - if isinstance(buff, bytes) or isinstance(buff, bytearray): - buff_str = bytes(buff).decode("utf-8", errors="ignore") - else: - buff_str = str(buff) + buff = self.get_raw_argument(call, "Buffer") + if buff and len(buff) >= 128: + buff_lower = buff.lower() + matches = set(self.regex.findall(buff_lower)) - if len(buff_str) >= 32: - buff_lower = buff_str.lower() - matches = set(self.regex.findall(buff_lower)) + if len(matches) > 1: + self.data.append({"ransom_note": filepath}) + self.data.append({"beginning_of_ransom_message": buff}) - if len(matches) > 1: + if self.pid: self.mark_call() - return True + self.ret = True def on_complete(self): if not self.ret and "dropped" in self.results: @@ -179,110 +175,25 @@ def on_complete(self): else: filename = str(raw_name).lower() - if filename.endswith((".txt", ".html", ".hta", ".rtf")) or "read_me" in filename or "readme" in filename: + if ( + filename.endswith((".txt", ".html", ".hta", ".rtf")) + or "read_me" in filename + or "readme" in filename + or "read-me" in filename + ): filedata = dropped.get("data") - if filedata: - if isinstance(filedata, bytes) or isinstance(filedata, bytearray): - filedata_str = bytes(filedata).decode("utf-8", errors="ignore") - else: - filedata_str = str(filedata) - - if len(filedata_str) >= 32: - filedata_lower = filedata_str.lower() - matches = set(self.regex.findall(filedata_lower)) - - if len(matches) > 1: - self.data.append({"ransom_note": filename}) - self.data.append({"beginning_of_ransom_message": filedata_str}) - self.ret = True - break - - return self.ret - + if isinstance(filedata, str): + filedata = filedata.encode("utf-8", errors="ignore") -class MassRansomNoteDrop(Signature): - name = "mass_ransom_note_drop" - description = "Writes or copies the same ransom note filename across multiple directories" - severity = 3 - categories = ["ransomware"] - authors = ["Kevin Ross"] - minimum = "1.3" - evented = True - ttps = ["T1486"] - mbcs = ["OB0008", "E1486"] - - filter_apinames = set( - [ - "NtWriteFile", - "WriteFile", - "CopyFileA", - "CopyFileW", - "CopyFileExA", - "CopyFileExW", - "MoveFileA", - "MoveFileW", - "MoveFileExA", - "MoveFileExW", - ] - ) + if filedata and len(filedata) >= 128: + filedata_lower = filedata.lower() + matches = set(self.regex.findall(filedata_lower)) - def __init__(self, *args, **kwargs): - Signature.__init__(self, *args, **kwargs) - self.ret = False - self.marked_calls = 0 - self.dropped_notes = {} - self.note_keywords = ("readme", "read_me", "decrypt", "restore", "instructions", "recover") - self.extensions = (".txt", ".html", ".hta", ".rtf", ".url") - - def on_call(self, call, process): - pid = process.get("process_id") - - filepath = ( - self.get_argument(call, "NewFileName") or self.get_argument(call, "HandleName") or self.get_argument(call, "FileName") - ) - - if not isinstance(filepath, str): - return + if len(matches) > 1: + self.data.append({"ransom_note": filename}) + self.data.append({"beginning_of_ransom_message": filedata}) + self.ret = True + break - filepath = filepath.replace("/", "\\") - if "\\" not in filepath: - return - - dirname, _, filename = filepath.rpartition("\\") - filename_lower = filename.lower() - - if not filename_lower.endswith(self.extensions): - return - - if not any(kw in filename_lower for kw in self.note_keywords): - return - - dirname_lower = dirname.lower() - - if pid not in self.dropped_notes: - self.dropped_notes[pid] = {} - - if filename_lower not in self.dropped_notes[pid]: - self.dropped_notes[pid][filename_lower] = set() - - if dirname_lower in self.dropped_notes[pid][filename_lower]: - return - - self.dropped_notes[pid][filename_lower].add(dirname_lower) - dir_count = len(self.dropped_notes[pid][filename_lower]) - - if dir_count >= 2 and self.marked_calls < 5: - self.mark_call() - self.marked_calls += 1 - - if dir_count >= 5: - self.ret = True - - def on_complete(self): - if self.ret: - for pid, notes in self.dropped_notes.items(): - for note_name, dirs in notes.items(): - if len(dirs) >= 5: - self.data.append({"ransom_note": note_name, "pid": pid, "directories_count": len(dirs)}) return self.ret From 3d7296fe3f7a422aeb67fd26a8da75d45f0042ec Mon Sep 17 00:00:00 2001 From: Will Metcalf Date: Tue, 12 May 2026 16:59:27 +0000 Subject: [PATCH 2/3] fix: ransomware_message bytes/str handling + restore MassRansomNoteDrop; stealer O(N2) loop --- .../all/stealer_headless_browser.py | 17 +-- .../signatures/windows/ransomware_message.py | 119 +++++++++++++++--- 2 files changed, 110 insertions(+), 26 deletions(-) diff --git a/modules/signatures/all/stealer_headless_browser.py b/modules/signatures/all/stealer_headless_browser.py index 538da28b..6945f17d 100644 --- a/modules/signatures/all/stealer_headless_browser.py +++ b/modules/signatures/all/stealer_headless_browser.py @@ -61,28 +61,23 @@ def run(self): # Require suspicious parent — find via process tree suspicious_parent = None - for proc in ( - self.results.get("behavior", {}).get("processes", []) or [] - ): + processes = self.results.get("behavior", {}).get("processes", []) or [] + proc_by_pid = {p["process_id"]: p for p in processes if p.get("process_id") is not None} + + for proc in processes: path = proc.get("module_path", "") or proc.get("process_name", "") or "" if not BROWSER_RE.search(path): continue parent_id = proc.get("parent_id") if parent_id is None: continue - # Find parent process - for parent in ( - self.results.get("behavior", {}).get("processes", []) or [] - ): - if parent.get("process_id") != parent_id: - continue + parent = proc_by_pid.get(parent_id) + if parent: parent_path = parent.get("module_path", "") or "" if SUSPICIOUS_PARENT_RE.search(parent_path) and not LEGITIMATE_LAUNCHERS.search(parent_path): suspicious_parent = parent_path self.data.append({"suspicious_parent": parent_path}) break - if suspicious_parent: - break # Also accept if 3+ browsers launched headless (multi-browser sweep = high confidence # even without confirmed parent — covers cases where process tree is incomplete) diff --git a/modules/signatures/windows/ransomware_message.py b/modules/signatures/windows/ransomware_message.py index b674d0d4..e73dacad 100644 --- a/modules/signatures/windows/ransomware_message.py +++ b/modules/signatures/windows/ransomware_message.py @@ -106,7 +106,6 @@ def __init__(self, *args, **kwargs): "BTC", "ethereum", "what happened", - "what happened", "decryptor", "decrypter", "personal ID", @@ -153,17 +152,18 @@ def on_call(self, call, process): return buff = self.get_raw_argument(call, "Buffer") - if buff and len(buff) >= 128: - buff_lower = buff.lower() - matches = set(self.regex.findall(buff_lower)) - - if len(matches) > 1: - self.data.append({"ransom_note": filepath}) - self.data.append({"beginning_of_ransom_message": buff}) - - if self.pid: - self.mark_call() - self.ret = True + if buff: + if isinstance(buff, (bytes, bytearray)): + buff = buff.decode("utf-8", errors="replace") + if len(buff) >= 128: + buff_lower = buff.lower() + matches = set(self.regex.findall(buff_lower)) + if len(matches) > 1: + self.data.append({"ransom_note": filepath}) + self.data.append({"beginning_of_ransom_message": buff[:2000]}) + if self.pid: + self.mark_call() + self.ret = True def on_complete(self): if not self.ret and "dropped" in self.results: @@ -183,8 +183,10 @@ def on_complete(self): ): filedata = dropped.get("data") - if isinstance(filedata, str): - filedata = filedata.encode("utf-8", errors="ignore") + if isinstance(filedata, (bytes, bytearray)): + filedata = filedata.decode("utf-8", errors="replace") + elif not isinstance(filedata, str): + filedata = str(filedata) if filedata else "" if filedata and len(filedata) >= 128: filedata_lower = filedata.lower() @@ -192,8 +194,95 @@ def on_complete(self): if len(matches) > 1: self.data.append({"ransom_note": filename}) - self.data.append({"beginning_of_ransom_message": filedata}) + self.data.append({"beginning_of_ransom_message": filedata[:2000]}) self.ret = True break return self.ret + + +class MassRansomNoteDrop(Signature): + name = "mass_ransom_note_drop" + description = "Writes or copies the same ransom note filename across multiple directories" + severity = 3 + categories = ["ransomware"] + authors = ["Kevin Ross"] + minimum = "1.3" + evented = True + ttps = ["T1486"] + mbcs = ["OB0008", "E1486"] + + filter_apinames = set( + [ + "NtWriteFile", + "WriteFile", + "CopyFileA", + "CopyFileW", + "CopyFileExA", + "CopyFileExW", + "MoveFileA", + "MoveFileW", + "MoveFileExA", + "MoveFileExW", + ] + ) + + def __init__(self, *args, **kwargs): + Signature.__init__(self, *args, **kwargs) + self.ret = False + self.marked_calls = 0 + self.dropped_notes = {} + self.note_keywords = ("readme", "read_me", "decrypt", "restore", "instructions", "recover") + self.extensions = (".txt", ".html", ".hta", ".rtf", ".url") + + def on_call(self, call, process): + pid = process.get("process_id") + + filepath = ( + self.get_argument(call, "NewFileName") or self.get_argument(call, "HandleName") or self.get_argument(call, "FileName") + ) + + if not isinstance(filepath, str): + return + + filepath = filepath.replace("/", "\\") + if "\\" not in filepath: + return + + dirname, _, filename = filepath.rpartition("\\") + filename_lower = filename.lower() + + if not filename_lower.endswith(self.extensions): + return + + if not any(kw in filename_lower for kw in self.note_keywords): + return + + dirname_lower = dirname.lower() + + if pid not in self.dropped_notes: + self.dropped_notes[pid] = {} + + if filename_lower not in self.dropped_notes[pid]: + self.dropped_notes[pid][filename_lower] = set() + + if dirname_lower in self.dropped_notes[pid][filename_lower]: + return + + self.dropped_notes[pid][filename_lower].add(dirname_lower) + dir_count = len(self.dropped_notes[pid][filename_lower]) + + if dir_count >= 2 and self.marked_calls < 5: + self.mark_call() + self.marked_calls += 1 + + if dir_count >= 5: + self.ret = True + + def on_complete(self): + if self.ret: + for pid, notes in self.dropped_notes.items(): + for note_name, dirs in notes.items(): + if len(dirs) >= 5: + self.data.append({"ransom_note": note_name, "pid": pid, "directories_count": len(dirs)}) + return self.ret From cab67404222f0a3ea5c6b04a57ca96dbc3a4b4b9 Mon Sep 17 00:00:00 2001 From: Will Metcalf Date: Tue, 12 May 2026 17:52:10 +0000 Subject: [PATCH 3/3] fix: pe_cert_suspicious static.pe source; ransomware dup entry + FileName fallback; stealer browser regex + Firefox -headless --- modules/signatures/all/pe_cert_suspicious.py | 6 +++++- modules/signatures/all/stealer_headless_browser.py | 4 ++-- modules/signatures/windows/ransomware_message.py | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/modules/signatures/all/pe_cert_suspicious.py b/modules/signatures/all/pe_cert_suspicious.py index dfb3c536..8e5a8203 100644 --- a/modules/signatures/all/pe_cert_suspicious.py +++ b/modules/signatures/all/pe_cert_suspicious.py @@ -1,8 +1,13 @@ import re +from datetime import datetime from lib.cuckoo.common.abstracts import Signature def _get_pe(results): + # digital_signers/guest_signers live under static.pe (populated by parse_pe) + static_pe = results.get("static", {}).get("pe", {}) + if static_pe: + return static_pe return (results.get("target") or {}).get("file", {}).get("pe", {}) @@ -120,7 +125,6 @@ def run(self): # Very short validity (< 180 days) try: - from datetime import datetime nb = datetime.fromisoformat(cert.get("not_before", "").replace("Z", "")) na = datetime.fromisoformat(cert.get("not_after", "").replace("Z", "")) days = (na - nb).days diff --git a/modules/signatures/all/stealer_headless_browser.py b/modules/signatures/all/stealer_headless_browser.py index 6945f17d..6a259ab6 100644 --- a/modules/signatures/all/stealer_headless_browser.py +++ b/modules/signatures/all/stealer_headless_browser.py @@ -2,7 +2,7 @@ from lib.cuckoo.common.abstracts import Signature BROWSER_RE = re.compile( - r'\\(?:chrome|brave|msedge|firefox|opera)\.exe', + r'(?