diff --git a/bcos/badge-generator.html b/bcos/badge-generator.html new file mode 100644 index 000000000..67dfc770b --- /dev/null +++ b/bcos/badge-generator.html @@ -0,0 +1,564 @@ + + + + + + BCOS Badge Generator | RustChain + + + + +
+ +
+
+
+
+
+ bcos_badge_generator.sh +
+
+
$ Initializing BCOS Badge Generator...
+
$ API endpoint: https://50.28.86.131/bcos/badge/
+
$ Ready.
+ +

BCOS Badge Generator

+

Generate embeddable BCOS certification badges for your repo

+ +
+ BCOS = Beacon Certified Open Source  |  + 15 RTC bounty  |  + Issue #2292
+ Enter your cert_id (e.g. BCOS-xxxxxxxx) or a GitHub repo URL to generate a badge. +
+ +
+
Enter cert_id (e.g. BCOS-abc123) or GitHub repo URL:
+
+ + +
+
+
+ + + +
+
+ + +
+ + + + diff --git a/faucet.py b/faucet.py index 7dffc67cc..dd3a5938f 100644 --- a/faucet.py +++ b/faucet.py @@ -4,7 +4,8 @@ A simple Flask web application that dispenses test RTC tokens. Features: -- IP-based rate limiting +- Wallet-based rate limiting (primary defense against rate-limit bypass) +- IP-based rate limiting (secondary defense) - SQLite backend for tracking - Simple HTML form for requesting tokens """ @@ -41,13 +42,31 @@ def init_db(): def get_client_ip(): - """Get client IP address from request.""" - if request.headers.get('X-Forwarded-For'): - return request.headers.get('X-Forwarded-For').split(',')[0].strip() - return request.remote_addr or '127.0.0.1' + """Get client IP address from request. + + SECURITY: Only trust X-Forwarded-For from trusted reverse proxies. + Direct connections use remote_addr to prevent rate limit bypass via header spoofing. + A valid reverse proxy always sets X-Forwarded-For, so if it's absent + from a localhost connection, treat it as a potential spoofing attempt. + """ + remote = request.remote_addr or '127.0.0.1' + # Only trust X-Forwarded-For from localhost and only when it's present + # (a legitimate proxy always adds it) + if remote in ('127.0.0.1', '::1'): + xff = request.headers.get('X-Forwarded-For') + if xff: + # Validate: X-Forwarded-For must not be empty and must look like an IP + first_ip = xff.split(',')[0].strip() + if first_ip and '.' in first_ip or ':' in first_ip: + # Basic validation: contains IP-like characters + if not any(c.isalpha() for c in first_ip.split('.')[0] if '.' in first_ip): + return first_ip + # X-Forwarded-For absent or invalid from localhost — fallback to remote + # This prevents spoofing via crafted empty/malformed X-Forwarded-For headers + return remote -def get_last_drip_time(ip_address): +def get_last_drip_time_by_ip(ip_address): """Get the last time this IP requested a drip.""" conn = sqlite3.connect(DATABASE) c = conn.cursor() @@ -62,9 +81,24 @@ def get_last_drip_time(ip_address): return result[0] if result else None -def can_drip(ip_address): - """Check if the IP can request a drip (rate limiting).""" - last_time = get_last_drip_time(ip_address) +def get_last_drip_time_by_wallet(wallet_address): + """Get the last time this wallet requested a drip.""" + conn = sqlite3.connect(DATABASE) + c = conn.cursor() + c.execute(''' + SELECT timestamp FROM drip_requests + WHERE wallet = ? + ORDER BY timestamp DESC + LIMIT 1 + ''', (wallet_address,)) + result = c.fetchone() + conn.close() + return result[0] if result else None + + +def can_drip_by_ip(ip_address): + """Check if the IP can request a drip (IP-based rate limiting).""" + last_time = get_last_drip_time_by_ip(ip_address) if not last_time: return True @@ -75,9 +109,42 @@ def can_drip(ip_address): return hours_since >= RATE_LIMIT_HOURS -def get_next_available(ip_address): +def can_drip_by_wallet(wallet_address): + """Check if the wallet can request a drip (wallet-based rate limiting). + + Wallet-based rate limiting is the primary defense against IP-spoofing attacks. + Even if an attacker rotates IPs, they cannot bypass the rate limit without + rotating wallets, which is more expensive/noticeable. + """ + last_time = get_last_drip_time_by_wallet(wallet_address) + if not last_time: + return True + + last_drip = datetime.fromisoformat(last_time.replace('Z', '+00:00')) + now = datetime.now(last_drip.tzinfo) + hours_since = (now - last_drip).total_seconds() / 3600 + + return hours_since >= RATE_LIMIT_HOURS + + +def get_next_available_by_ip(ip_address): """Get the next available time for this IP.""" - last_time = get_last_drip_time(ip_address) + last_time = get_last_drip_time_by_ip(ip_address) + if not last_time: + return None + + last_drip = datetime.fromisoformat(last_time.replace('Z', '+00:00')) + next_available = last_drip + timedelta(hours=RATE_LIMIT_HOURS) + now = datetime.now(last_drip.tzinfo) + + if next_available > now: + return next_available.isoformat() + return None + + +def get_next_available_by_wallet(wallet_address): + """Get the next available time for this wallet.""" + last_time = get_last_drip_time_by_wallet(wallet_address) if not last_time: return None @@ -204,7 +271,7 @@ def record_drip(wallet, ip_address, amount):
-

Rate Limit: {{ rate_limit }} RTC per {{ hours }} hours per IP

+

Rate Limit: {{ rate_limit }} RTC per {{ hours }} hours per wallet

Network: RustChain Testnet

@@ -290,13 +357,24 @@ def drip(): ip = get_client_ip() - # Check rate limit - if not can_drip(ip): - next_available = get_next_available(ip) + # PRIMARY DEFENSE: Wallet-based rate limiting (prevents IP-spoofing bypass) + if not can_drip_by_wallet(wallet): + next_available = get_next_available_by_wallet(wallet) + return jsonify({ + 'ok': False, + 'error': 'Rate limit exceeded for this wallet', + 'next_available': next_available, + 'rate_limit_type': 'wallet' + }), 429 + + # SECONDARY DEFENSE: IP-based rate limiting + if not can_drip_by_ip(ip): + next_available = get_next_available_by_ip(ip) return jsonify({ 'ok': False, - 'error': 'Rate limit exceeded', - 'next_available': next_available + 'error': 'Rate limit exceeded for this IP', + 'next_available': next_available, + 'rate_limit_type': 'ip' }), 429 # Record the drip (in production, this would actually transfer tokens)