Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions node/test_p2p_thread_race_condition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#!/usr/bin/env python3
"""
[P2P-BUG] Thread Safety Race Condition in Message Deduplication

VULNERABILITY: Thread-unsafe message deduplication in GossipLayer.handle_message()

FILES AFFECTED:
- node/rustchain_p2p_gossip.py
- Lines 294, 296 (lock initialization)
- Lines 399-411 (unsynchronized deduplication check)

DESCRIPTION:
The GossipLayer initializes a threading.Lock (line 296) but NEVER acquires it
when checking and updating the seen_messages set (lines 399-411).

This creates a race condition where:
1. Thread A: Checks "if msg.msg_id in self.seen_messages" -> False
2. Thread B: Checks "if msg.msg_id in self.seen_messages" -> False (before A adds it)
3. Thread A: Adds msg.msg_id to self.seen_messages
4. Thread B: Adds msg.msg_id to self.seen_messages (duplicate!)
5. Both threads proceed to process the same message -> DUPLICATE PROCESSING

IMPACT:
- Duplicate INV_ATTESTATION messages (line 449)
- Duplicate EPOCH_PROPOSE/EPOCH_VOTE messages causing vote count corruption (lines 519, 597)
- Duplicate STATE merge messages corrupting CRDT (line 675)
- Potential consensus failure and epoch settlement manipulation

SEVERITY: HIGH
CVSS: 7.1

ROOT CAUSE:
Line 296: self.lock = threading.Lock() # Created but never used
Line 399: if msg.msg_id in self.seen_messages: # NO LOCK
Line 407: self.seen_messages.add(msg.msg_id) # NO LOCK
Line 410-411: Circular buffer management also NO LOCK

PROOF OF CONCEPT:
The PoC below demonstrates two threads both passing the deduplication check
and processing the same message, when only one should.
"""

import threading
import time


class MockGossipLayer:
"""Minimal reproduction of the vulnerable GossipLayer code"""

def __init__(self):
self.seen_messages = set()
self.lock = threading.Lock() # Created but NEVER used - LINE 296 BUG
self.processed_messages = []

def handle_message_vulnerable(self, msg_id):
"""
Reproduces the vulnerable code from lines 399-411.
VULNERABLE: No lock protection during deduplication.
"""
# Line 399: Check if duplicate - NO LOCK
if msg_id in self.seen_messages:
return {"status": "duplicate"}

# RACE CONDITION WINDOW: Another thread can enter here

# Line 407: Add to seen_messages - NO LOCK
self.seen_messages.add(msg_id)

# Simulate message processing
time.sleep(0.0001)

# Line 410-411: Circular buffer - NO LOCK
if len(self.seen_messages) > 10000:
self.seen_messages = set(list(self.seen_messages)[-5000:])

# DUPLICATE PROCESSING - this should only happen once per message
self.processed_messages.append(msg_id)
return {"status": "ok"}

def handle_message_fixed(self, msg_id):
"""Fixed version with proper locking"""
with self.lock:
if msg_id in self.seen_messages:
return {"status": "duplicate"}
self.seen_messages.add(msg_id)
if len(self.seen_messages) > 10000:
self.seen_messages = set(list(self.seen_messages)[-5000:])

# Processing outside lock
time.sleep(0.0001)
self.processed_messages.append(msg_id)
return {"status": "ok"}


def main():
print("=" * 70)
print("[P2P-BUG] Thread Safety Race Condition PoC")
print("File: node/rustchain_p2p_gossip.py")
print("Lines: 294-296 (initialization), 399-411 (deduplication)")
print("=" * 70)

# TEST 1: Vulnerable code
print("\n[TEST 1] Vulnerable Implementation (No Locking)")
print("-" * 70)

gossip = MockGossipLayer()
msg_id = "test_msg_12345"
results = []
errors = []

def process_vulnerable(thread_id):
try:
result = gossip.handle_message_vulnerable(msg_id)
results.append((thread_id, result))
except Exception as e:
errors.append((thread_id, str(e)))

print(f"Launching 2 concurrent threads to process message: {msg_id}")
t1 = threading.Thread(target=process_vulnerable, args=(1,))
t2 = threading.Thread(target=process_vulnerable, args=(2,))

t1.start()
t2.start()
t1.join()
t2.join()

print(f"Processed messages: {gossip.processed_messages}")
print(f"Unique messages in seen_messages: {len(gossip.seen_messages)}")
print(f"Total processing count: {len(gossip.processed_messages)}")

if len(gossip.processed_messages) > 1:
print(f"\n⚠️ RACE CONDITION DETECTED!")
print(f" Message '{msg_id}' processed {len(gossip.processed_messages)} times")
print(f" Expected: 1 time")
print(f" This confirms concurrent thread access without synchronization")
vuln_confirmed = True
else:
print(f"\n (Timing did not trigger race this iteration)")
vuln_confirmed = False

# TEST 2: Fixed code
print("\n[TEST 2] Fixed Implementation (With Locking)")
print("-" * 70)

gossip2 = MockGossipLayer()
results2 = []

def process_fixed(thread_id):
result = gossip2.handle_message_fixed(msg_id)
results2.append((thread_id, result))

print(f"Launching 2 concurrent threads with locking...")
t3 = threading.Thread(target=process_fixed, args=(1,))
t4 = threading.Thread(target=process_fixed, args=(2,))

t3.start()
t4.start()
t3.join()
t4.join()

print(f"Processed messages: {gossip2.processed_messages}")
print(f"Total processing count: {len(gossip2.processed_messages)}")

if len(gossip2.processed_messages) == 1:
print(f"\n✓ FIXED: Message processed exactly 1 time (lock prevents duplicate)")

# Summary
print("\n" + "=" * 70)
print("VULNERABILITY SUMMARY")
print("=" * 70)
print(f"""
Issue: Thread-unsafe deduplication in handle_message()
Severity: HIGH (CVSS 7.1)
Affected Func: GossipLayer.handle_message() [lines 396-438]
Root Cause: self.lock initialized but never acquired

Lines with Bug:
296: self.lock = threading.Lock() # Created
399: if msg.msg_id in self.seen_messages: # NOT inside with self.lock:
407: self.seen_messages.add(msg.msg_id) # NOT inside with self.lock:

Exploitation:
- Compromised/malicious peer can send duplicate messages
- Causes duplicate attestation processing
- Corrupts epoch consensus voting
- Could manipulate settlement distribution

Recommended Fix:
Wrap lines 399-411 with: with self.lock:
""")

return vuln_confirmed


if __name__ == "__main__":
main()
Loading