From 6ccdcb0e3ff486b1a6a04a95191a288ce12820ad Mon Sep 17 00:00:00 2001 From: jujujuda <62797585+jujujuda@users.noreply.github.com> Date: Sat, 21 Mar 2026 03:19:40 +0800 Subject: [PATCH 1/2] fix: wallet-based rate limiting to prevent IP-spoofing bypass (SECURITY) Fixes rate limit bypass via X-Forwarded-For header spoofing. Vulnerability: Attacker controlling a reverse proxy could spoof any IP via X-Forwarded-For, bypassing IP-based rate limits. Fix: Add wallet-based rate limiting as primary defense. Attacker cannot bypass wallet-based limit without rotating wallets, which is more expensive than rotating IPs. Also improved X-Forwarded-For validation: only trust it when present and properly formatted (a legitimate reverse proxy always sets it). Addresses: rustchain-bounties#2246 --- faucet.py | 112 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 95 insertions(+), 17 deletions(-) diff --git a/faucet.py b/faucet.py index 7dffc67cc..dd3a5938f 100644 --- a/faucet.py +++ b/faucet.py @@ -4,7 +4,8 @@ A simple Flask web application that dispenses test RTC tokens. Features: -- IP-based rate limiting +- Wallet-based rate limiting (primary defense against rate-limit bypass) +- IP-based rate limiting (secondary defense) - SQLite backend for tracking - Simple HTML form for requesting tokens """ @@ -41,13 +42,31 @@ def init_db(): def get_client_ip(): - """Get client IP address from request.""" - if request.headers.get('X-Forwarded-For'): - return request.headers.get('X-Forwarded-For').split(',')[0].strip() - return request.remote_addr or '127.0.0.1' + """Get client IP address from request. + + SECURITY: Only trust X-Forwarded-For from trusted reverse proxies. + Direct connections use remote_addr to prevent rate limit bypass via header spoofing. + A valid reverse proxy always sets X-Forwarded-For, so if it's absent + from a localhost connection, treat it as a potential spoofing attempt. + """ + remote = request.remote_addr or '127.0.0.1' + # Only trust X-Forwarded-For from localhost and only when it's present + # (a legitimate proxy always adds it) + if remote in ('127.0.0.1', '::1'): + xff = request.headers.get('X-Forwarded-For') + if xff: + # Validate: X-Forwarded-For must not be empty and must look like an IP + first_ip = xff.split(',')[0].strip() + if first_ip and '.' in first_ip or ':' in first_ip: + # Basic validation: contains IP-like characters + if not any(c.isalpha() for c in first_ip.split('.')[0] if '.' in first_ip): + return first_ip + # X-Forwarded-For absent or invalid from localhost — fallback to remote + # This prevents spoofing via crafted empty/malformed X-Forwarded-For headers + return remote -def get_last_drip_time(ip_address): +def get_last_drip_time_by_ip(ip_address): """Get the last time this IP requested a drip.""" conn = sqlite3.connect(DATABASE) c = conn.cursor() @@ -62,9 +81,24 @@ def get_last_drip_time(ip_address): return result[0] if result else None -def can_drip(ip_address): - """Check if the IP can request a drip (rate limiting).""" - last_time = get_last_drip_time(ip_address) +def get_last_drip_time_by_wallet(wallet_address): + """Get the last time this wallet requested a drip.""" + conn = sqlite3.connect(DATABASE) + c = conn.cursor() + c.execute(''' + SELECT timestamp FROM drip_requests + WHERE wallet = ? + ORDER BY timestamp DESC + LIMIT 1 + ''', (wallet_address,)) + result = c.fetchone() + conn.close() + return result[0] if result else None + + +def can_drip_by_ip(ip_address): + """Check if the IP can request a drip (IP-based rate limiting).""" + last_time = get_last_drip_time_by_ip(ip_address) if not last_time: return True @@ -75,9 +109,42 @@ def can_drip(ip_address): return hours_since >= RATE_LIMIT_HOURS -def get_next_available(ip_address): +def can_drip_by_wallet(wallet_address): + """Check if the wallet can request a drip (wallet-based rate limiting). + + Wallet-based rate limiting is the primary defense against IP-spoofing attacks. + Even if an attacker rotates IPs, they cannot bypass the rate limit without + rotating wallets, which is more expensive/noticeable. + """ + last_time = get_last_drip_time_by_wallet(wallet_address) + if not last_time: + return True + + last_drip = datetime.fromisoformat(last_time.replace('Z', '+00:00')) + now = datetime.now(last_drip.tzinfo) + hours_since = (now - last_drip).total_seconds() / 3600 + + return hours_since >= RATE_LIMIT_HOURS + + +def get_next_available_by_ip(ip_address): """Get the next available time for this IP.""" - last_time = get_last_drip_time(ip_address) + last_time = get_last_drip_time_by_ip(ip_address) + if not last_time: + return None + + last_drip = datetime.fromisoformat(last_time.replace('Z', '+00:00')) + next_available = last_drip + timedelta(hours=RATE_LIMIT_HOURS) + now = datetime.now(last_drip.tzinfo) + + if next_available > now: + return next_available.isoformat() + return None + + +def get_next_available_by_wallet(wallet_address): + """Get the next available time for this wallet.""" + last_time = get_last_drip_time_by_wallet(wallet_address) if not last_time: return None @@ -204,7 +271,7 @@ def record_drip(wallet, ip_address, amount):
Rate Limit: {{ rate_limit }} RTC per {{ hours }} hours per IP
+Rate Limit: {{ rate_limit }} RTC per {{ hours }} hours per wallet
Network: RustChain Testnet
Generate embeddable BCOS certification badges for your repo
+ +BCOS-xxxxxxxx) or a GitHub repo URL to generate a badge.
+