Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 95 additions & 17 deletions faucet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
A simple Flask web application that dispenses test RTC tokens.

Features:
- IP-based rate limiting
- Wallet-based rate limiting (primary defense against rate-limit bypass)
- IP-based rate limiting (secondary defense)
- SQLite backend for tracking
- Simple HTML form for requesting tokens
"""
Expand Down Expand Up @@ -41,13 +42,31 @@ def init_db():


def get_client_ip():
"""Get client IP address from request."""
if request.headers.get('X-Forwarded-For'):
return request.headers.get('X-Forwarded-For').split(',')[0].strip()
return request.remote_addr or '127.0.0.1'
"""Get client IP address from request.

SECURITY: Only trust X-Forwarded-For from trusted reverse proxies.
Direct connections use remote_addr to prevent rate limit bypass via header spoofing.
A valid reverse proxy always sets X-Forwarded-For, so if it's absent
from a localhost connection, treat it as a potential spoofing attempt.
"""
remote = request.remote_addr or '127.0.0.1'
# Only trust X-Forwarded-For from localhost and only when it's present
# (a legitimate proxy always adds it)
if remote in ('127.0.0.1', '::1'):
xff = request.headers.get('X-Forwarded-For')
if xff:
# Validate: X-Forwarded-For must not be empty and must look like an IP
first_ip = xff.split(',')[0].strip()
if first_ip and '.' in first_ip or ':' in first_ip:
# Basic validation: contains IP-like characters
if not any(c.isalpha() for c in first_ip.split('.')[0] if '.' in first_ip):
return first_ip
# X-Forwarded-For absent or invalid from localhost — fallback to remote
# This prevents spoofing via crafted empty/malformed X-Forwarded-For headers
return remote


def get_last_drip_time(ip_address):
def get_last_drip_time_by_ip(ip_address):
"""Get the last time this IP requested a drip."""
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
Expand All @@ -62,9 +81,24 @@ def get_last_drip_time(ip_address):
return result[0] if result else None


def can_drip(ip_address):
"""Check if the IP can request a drip (rate limiting)."""
last_time = get_last_drip_time(ip_address)
def get_last_drip_time_by_wallet(wallet_address):
"""Get the last time this wallet requested a drip."""
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('''
SELECT timestamp FROM drip_requests
WHERE wallet = ?
ORDER BY timestamp DESC
LIMIT 1
''', (wallet_address,))
result = c.fetchone()
conn.close()
return result[0] if result else None


def can_drip_by_ip(ip_address):
"""Check if the IP can request a drip (IP-based rate limiting)."""
last_time = get_last_drip_time_by_ip(ip_address)
if not last_time:
return True

Expand All @@ -75,9 +109,42 @@ def can_drip(ip_address):
return hours_since >= RATE_LIMIT_HOURS


def get_next_available(ip_address):
def can_drip_by_wallet(wallet_address):
"""Check if the wallet can request a drip (wallet-based rate limiting).

Wallet-based rate limiting is the primary defense against IP-spoofing attacks.
Even if an attacker rotates IPs, they cannot bypass the rate limit without
rotating wallets, which is more expensive/noticeable.
"""
last_time = get_last_drip_time_by_wallet(wallet_address)
if not last_time:
return True

last_drip = datetime.fromisoformat(last_time.replace('Z', '+00:00'))
now = datetime.now(last_drip.tzinfo)
hours_since = (now - last_drip).total_seconds() / 3600

return hours_since >= RATE_LIMIT_HOURS


def get_next_available_by_ip(ip_address):
"""Get the next available time for this IP."""
last_time = get_last_drip_time(ip_address)
last_time = get_last_drip_time_by_ip(ip_address)
if not last_time:
return None

last_drip = datetime.fromisoformat(last_time.replace('Z', '+00:00'))
next_available = last_drip + timedelta(hours=RATE_LIMIT_HOURS)
now = datetime.now(last_drip.tzinfo)

if next_available > now:
return next_available.isoformat()
return None


def get_next_available_by_wallet(wallet_address):
"""Get the next available time for this wallet."""
last_time = get_last_drip_time_by_wallet(wallet_address)
if not last_time:
return None

Expand Down Expand Up @@ -204,7 +271,7 @@ def record_drip(wallet, ip_address, amount):
</div>

<div class="note">
<p><strong>Rate Limit:</strong> {{ rate_limit }} RTC per {{ hours }} hours per IP</p>
<p><strong>Rate Limit:</strong> {{ rate_limit }} RTC per {{ hours }} hours per wallet</p>
<p><strong>Network:</strong> RustChain Testnet</p>
</div>

Expand Down Expand Up @@ -290,13 +357,24 @@ def drip():

ip = get_client_ip()

# Check rate limit
if not can_drip(ip):
next_available = get_next_available(ip)
# PRIMARY DEFENSE: Wallet-based rate limiting (prevents IP-spoofing bypass)
if not can_drip_by_wallet(wallet):
next_available = get_next_available_by_wallet(wallet)
return jsonify({
'ok': False,
'error': 'Rate limit exceeded for this wallet',
'next_available': next_available,
'rate_limit_type': 'wallet'
}), 429

# SECONDARY DEFENSE: IP-based rate limiting
if not can_drip_by_ip(ip):
next_available = get_next_available_by_ip(ip)
return jsonify({
'ok': False,
'error': 'Rate limit exceeded',
'next_available': next_available
'error': 'Rate limit exceeded for this IP',
'next_available': next_available,
'rate_limit_type': 'ip'
}), 429

# Record the drip (in production, this would actually transfer tokens)
Expand Down