From 4751db36896f24687246c6dc1479815c5cabc7fe Mon Sep 17 00:00:00 2001 From: junderw Date: Sat, 6 Jun 2026 15:49:33 +0900 Subject: [PATCH 1/2] Feat: Include HAProxy and per-IP connection limits --- Cargo.lock | 10 ++ Cargo.toml | 1 + src/config.rs | 18 +++ src/electrum/server.rs | 261 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 282 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9c115fc4..1cb9679b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -913,6 +913,7 @@ dependencies = [ "log", "num_cpus", "page_size", + "ppp", "prometheus", "rayon", "rocksdb", @@ -1099,6 +1100,15 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "ppp" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0" +dependencies = [ + "thiserror", +] + [[package]] name = "ppv-lite86" version = "0.2.21" diff --git a/Cargo.toml b/Cargo.toml index 167f7330..d084810d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ socket2 = { version = "0.4", features = ["all"] } num_cpus = "1.12.0" page_size = "0.4.2" prometheus = "0.13" +ppp = "2.3.0" rayon = "1.5.0" rocksdb = "0.24.0" serde = "1.0.118" diff --git a/src/config.rs b/src/config.rs index 5a52f881..63a97808 100644 --- a/src/config.rs +++ b/src/config.rs @@ -69,6 +69,8 @@ pub struct Config { pub electrum_max_subscriptions: usize, pub electrum_max_clients: usize, pub electrum_idle_timeout: u64, + pub electrum_haproxy_depth: usize, + pub electrum_connections_per_client: usize, pub electrum_public_hosts: Option, #[cfg(feature = "liquid")] @@ -301,6 +303,16 @@ impl Config { .long("electrum-idle-timeout") .help("Maximum idle time in seconds since the last client request before disconnecting the Electrum connection.") .default_value("600") + ).arg( + Arg::with_name("electrum_haproxy_depth") + .long("electrum-haproxy-depth") + .help("Which HAProxy PROXY-protocol header layer identifies the real client IP. 0 disables PROXY-protocol detection; 1 uses the first (outermost) address, 2 the second, and so on. If the requested layer or any PROXY header is absent, no client IP is associated with the connection.") + .default_value("0") + ).arg( + Arg::with_name("electrum_connections_per_client") + .long("electrum-connections-per-client") + .help("Maximum number of concurrent Electrum connections allowed per client (keyed by the HAProxy-reported address when available, otherwise the peer IP). 0 disables the per-client limit.") + .default_value("10") ); #[cfg(unix)] @@ -576,6 +588,12 @@ impl Config { electrum_max_subscriptions: value_t_or_exit!(m, "electrum_max_subscriptions", usize), electrum_max_clients: value_t_or_exit!(m, "electrum_max_clients", usize), electrum_idle_timeout: value_t_or_exit!(m, "electrum_idle_timeout", u64), + electrum_haproxy_depth: value_t_or_exit!(m, "electrum_haproxy_depth", usize), + electrum_connections_per_client: value_t_or_exit!( + m, + "electrum_connections_per_client", + usize + ), jsonrpc_import: m.is_present("jsonrpc_import"), light_mode: m.is_present("light_mode"), main_loop_delay: value_t_or_exit!(m, "main_loop_delay", u64), diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 51fabfda..80c52273 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; use std::convert::TryInto; use std::fs; -use std::io::{BufRead, BufReader, Read, Write}; -#[cfg(feature = "electrum-discovery")] +use std::io::{BufRead, BufReader, Cursor, Read, Write}; use std::net::IpAddr; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; use std::os::unix::fs::FileTypeExt; @@ -17,6 +16,7 @@ use std::time::{Duration, Instant}; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use error_chain::ChainedError; use hex; +use ppp::PartialResult; use serde_json::{from_str, Value}; use sha2::{Digest, Sha256}; @@ -77,6 +77,36 @@ fn bool_from_value_or(val: Option<&Value>, name: &str, default: bool) -> Result< bool_from_value(val, name) } +/// Extracts the source socket address from a parsed PROXY protocol v1 header. +fn proxy_v1_source(addresses: &ppp::v1::Addresses) -> Option { + match addresses { + ppp::v1::Addresses::Tcp4(ip) => Some(SocketAddr::new( + IpAddr::V4(ip.source_address), + ip.source_port, + )), + ppp::v1::Addresses::Tcp6(ip) => Some(SocketAddr::new( + IpAddr::V6(ip.source_address), + ip.source_port, + )), + ppp::v1::Addresses::Unknown => None, + } +} + +/// Extracts the source socket address from a parsed PROXY protocol v2 header. +fn proxy_v2_source(addresses: &ppp::v2::Addresses) -> Option { + match addresses { + ppp::v2::Addresses::IPv4(ip) => Some(SocketAddr::new( + IpAddr::V4(ip.source_address), + ip.source_port, + )), + ppp::v2::Addresses::IPv6(ip) => Some(SocketAddr::new( + IpAddr::V6(ip.source_address), + ip.source_port, + )), + ppp::v2::Addresses::Unspecified | ppp::v2::Addresses::Unix(_) => None, + } +} + // TODO: implement caching and delta updates fn get_status_hash(txs: Vec<(Txid, Option)>, query: &Query) -> Option { if txs.is_empty() { @@ -129,6 +159,11 @@ struct Connection { last_request_at: Instant, die_please: Option>, server_features: Arc, + haproxy_depth: usize, + proxy_client: Option, + connections_per_client: usize, + client_counts: Arc>>, + registered_ip: Option, #[cfg(feature = "electrum-discovery")] discovery: Option>, } @@ -145,6 +180,9 @@ impl Connection { idle_timeout: u64, die_please: Receiver<()>, server_features: Arc, + haproxy_depth: usize, + connections_per_client: usize, + client_counts: Arc>>, #[cfg(feature = "electrum-discovery")] discovery: Option>, ) -> Connection { Connection { @@ -161,6 +199,11 @@ impl Connection { last_request_at: Instant::now(), die_please: Some(die_please), server_features, + haproxy_depth, + proxy_client: None, + connections_per_client, + client_counts, + registered_ip: None, #[cfg(feature = "electrum-discovery")] discovery, } @@ -568,13 +611,78 @@ impl Connection { fn close_idle_connection(&mut self, idle_for: Duration) { info!( "[{}] closing idle connection after {} seconds without requests (timeout: {} seconds)", - self.stream.addr_string(), + self.client_string(), idle_for.as_secs(), self.idle_timeout, ); self.chan.close(); } + /// A human-readable identifier for the connected client, preferring the + /// HAProxy-reported address (when present) over the direct peer address. + fn client_string(&self) -> String { + match self.proxy_client { + Some(addr) => format!("{} via {}", addr, self.stream.addr_string()), + None => self.stream.addr_string(), + } + } + + /// Resolves the PROXY-protocol parse result into the client address at the + /// configured `electrum-haproxy-depth` layer. A depth of 0, a missing PROXY + /// header, or a non-existent layer all leave the client unidentified. + fn set_proxy_client(&mut self, addresses: Option>) { + self.proxy_client = match (self.haproxy_depth, addresses) { + (0, _) | (_, None) => None, + (depth, Some(addrs)) => addrs.get(depth - 1).copied(), + }; + } + + /// Registers this connection against its client key (the HAProxy-reported IP + /// when available, otherwise the direct peer IP) and enforces the + /// `electrum-connections-per-client` limit. Returns an error if the limit has + /// already been reached, in which case the connection must be closed. + fn register_client(&mut self) -> Result<()> { + if self.connections_per_client == 0 { + // Per-client limit disabled. + return Ok(()); + } + let key = match self + .proxy_client + .map(|addr| addr.ip()) + .or_else(|| self.stream.direct_ip()) + { + Some(key) => key, + // No usable client key (e.g. a unix socket with no PROXY header). + None => return Ok(()), + }; + + let mut counts = self.client_counts.lock().unwrap(); + let count = counts.entry(key).or_insert(0); + if *count >= self.connections_per_client { + bail!( + "too many connections from client {} ({} max per client)", + key, + self.connections_per_client + ); + } + *count += 1; + self.registered_ip = Some(key); + Ok(()) + } + + /// Releases this connection's slot in the per-client connection counter. + fn unregister_client(&mut self) { + if let Some(key) = self.registered_ip.take() { + let mut counts = self.client_counts.lock().unwrap(); + if let Some(count) = counts.get_mut(&key) { + *count -= 1; + if *count == 0 { + counts.remove(&key); + } + } + } + } + fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> { let idle_timeout = Duration::from_secs(self.idle_timeout); loop { @@ -606,6 +714,14 @@ impl Connection { self.chan.close(); return Ok(()); } + Message::Proxy(addresses) => { + self.set_proxy_client(addresses); + if let Err(e) = self.register_client() { + info!("[{}] {}", self.client_string(), e); + self.chan.close(); + return Ok(()); + } + } } } recv(shutdown) -> _ => { @@ -665,11 +781,114 @@ impl Connection { } } + /// Reads and parses any PROXY-protocol (HAProxy) headers found at the very + /// start of the connection. Returns the source address reported by each + /// proxy layer (outermost first), or `None` if no PROXY header was present, + /// together with any bytes that were read past the header(s) and belong to + /// the Electrum request stream. + fn read_proxy_headers( + stream: &mut ConnectionStream, + ) -> Result<(Option>, Vec)> { + // Upper bound on how much we are willing to buffer while looking for + // PROXY headers, to avoid unbounded memory use from a slow/malicious peer. + const MAX_PROXY_HEADER_SIZE: usize = 4096; + + enum Step { + Parsed(usize, Option), + NeedMore, + Done, + } + + let mut buf: Vec = Vec::with_capacity(256); + let mut addrs: Vec = Vec::new(); + let mut saw_proxy = false; + let mut chunk = [0u8; 256]; + + loop { + // Parse as many complete, stacked PROXY headers as the buffer allows. + let need_more = loop { + if buf.is_empty() { + break true; + } + let step = match ppp::HeaderResult::parse(&buf) { + ppp::HeaderResult::V2(Ok(header)) => { + Step::Parsed(header.len(), proxy_v2_source(&header.addresses)) + } + ppp::HeaderResult::V1(Ok(header)) => { + Step::Parsed(header.header.len(), proxy_v1_source(&header.addresses)) + } + other => { + if other.is_incomplete() { + Step::NeedMore + } else { + Step::Done + } + } + }; + match step { + Step::Parsed(consumed, src) => { + saw_proxy = true; + if let Some(src) = src { + addrs.push(src); + } + if consumed == 0 || consumed > buf.len() { + // Defensive: never spin forever on a degenerate parse. + break false; + } + buf.drain(..consumed); + } + Step::NeedMore => break true, + Step::Done => break false, + } + }; + + if !need_more { + break; + } + if buf.len() > MAX_PROXY_HEADER_SIZE { + bail!( + "PROXY protocol header too large (exceeds {} bytes)", + MAX_PROXY_HEADER_SIZE + ); + } + let n = stream + .read(&mut chunk) + .chain_err(|| "failed to read PROXY protocol header")?; + if n == 0 { + // EOF before another complete header; stop with what we have. + break; + } + buf.extend_from_slice(&chunk[..n]); + } + + let result = if saw_proxy { Some(addrs) } else { None }; + Ok((result, buf)) + } + fn handle_requests( - mut reader: BufReader, + stream: ConnectionStream, tx: crossbeam_channel::Sender, max_line_size: usize, + parse_proxy: bool, ) -> Result<()> { + let mut stream = stream; + + // Parse any PROXY-protocol (HAProxy) headers at the very start of the + // connection before treating the stream as Electrum requests. The parsed + // addresses are forwarded over the channel so the connection can identify + // the client; any leftover bytes belong to the first Electrum request. + let leftover = if parse_proxy { + let (proxy_addrs, leftover) = Connection::read_proxy_headers(&mut stream)?; + tx.send(Message::Proxy(proxy_addrs)) + .chain_err(|| "channel closed")?; + leftover + } else { + tx.send(Message::Proxy(None)) + .chain_err(|| "channel closed")?; + Vec::new() + }; + + let mut reader = BufReader::new(Cursor::new(leftover).chain(stream)); loop { let mut line = Vec::::new(); // Read up to max_line_size + 1 bytes to detect oversized lines @@ -708,7 +927,7 @@ impl Connection { pub fn run(mut self) { self.stats.clients.inc(); - let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); + let stream = self.stream.try_clone().expect("failed to clone TcpStream"); let tx = self.chan.sender(); let die_please = self.die_please.take().unwrap(); @@ -727,13 +946,14 @@ impl Connection { }); let max_line_size = self.max_line_size; + let parse_proxy = self.haproxy_depth > 0; let child = spawn_thread("reader", move || { - Connection::handle_requests(reader, tx, max_line_size) + Connection::handle_requests(stream, tx, max_line_size, parse_proxy) }); if let Err(e) = self.handle_replies(reply_receiver) { error!( "[{}] connection handling failed: {}", - self.stream.addr_string(), + self.client_string(), e.display_chain().to_string() ); } @@ -741,8 +961,9 @@ impl Connection { self.stats .subscriptions .sub(self.status_hashes.len() as i64); + self.unregister_client(); - let addr = self.stream.addr_string(); + let addr = self.client_string(); debug!("[{}] shutting down connection", addr); // Drop the Arc so that the stream properly closes. drop(arc_stream); @@ -800,6 +1021,11 @@ pub enum Message { Request(String), PeriodicUpdate, Done, + /// The result of parsing zero or more PROXY-protocol (HAProxy) headers at + /// the start of the connection. `None` means no PROXY header was present; + /// `Some(addrs)` holds the source address reported by each proxy layer, + /// outermost first. + Proxy(Option>), } pub enum Notification { @@ -921,12 +1147,18 @@ impl RPC { let max_subscriptions = config.electrum_max_subscriptions; let max_clients = config.electrum_max_clients; let idle_timeout = config.electrum_idle_timeout; + let haproxy_depth = config.electrum_haproxy_depth; + let connections_per_client = config.electrum_connections_per_client; RPC { notification: notification.sender(), server: Some(spawn_thread("rpc", move || { let senders = Arc::new(Mutex::new(Vec::>::new())); + // Tracks the number of live connections per client (keyed by the + // HAProxy-reported address when available, otherwise the peer IP). + let client_counts: Arc>> = + Arc::new(Mutex::new(HashMap::new())); let acceptor_shutdown = Channel::unbounded(); let acceptor_shutdown_sender = acceptor_shutdown.sender(); @@ -970,6 +1202,7 @@ impl RPC { let query = Arc::clone(&query); let senders = Arc::clone(&senders); let stats = Arc::clone(&stats); + let client_counts = Arc::clone(&client_counts); let garbage_sender = garbage_sender.clone(); // Kill the peers properly @@ -993,6 +1226,9 @@ impl RPC { idle_timeout, peace_receiver, server_features, + haproxy_depth, + connections_per_client, + client_counts, #[cfg(feature = "electrum-discovery")] discovery, ); @@ -1166,6 +1402,15 @@ impl ConnectionStream { } } + /// The direct peer IP address, if this is a TCP connection. Unix-socket + /// connections have no IP and return `None`. + fn direct_ip(&self) -> Option { + match self { + ConnectionStream::Tcp(_, a) => Some(a.ip()), + ConnectionStream::Unix(..) => None, + } + } + fn try_clone(&self) -> std::io::Result { Ok(match self { ConnectionStream::Tcp(s, a) => ConnectionStream::Tcp(s.try_clone()?, *a), From baad4799a5f724b54aba2648e982be75ba6e13a5 Mon Sep 17 00:00:00 2001 From: junderw Date: Sat, 6 Jun 2026 17:01:04 +0900 Subject: [PATCH 2/2] Don't choke on PROXY headers, but ignore them if we're not configured to look for them --- src/electrum/server.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/electrum/server.rs b/src/electrum/server.rs index 80c52273..fcd9988b 100644 --- a/src/electrum/server.rs +++ b/src/electrum/server.rs @@ -877,12 +877,14 @@ impl Connection { // connection before treating the stream as Electrum requests. The parsed // addresses are forwarded over the channel so the connection can identify // the client; any leftover bytes belong to the first Electrum request. + let (proxy_addrs, leftover) = Connection::read_proxy_headers(&mut stream)?; let leftover = if parse_proxy { - let (proxy_addrs, leftover) = Connection::read_proxy_headers(&mut stream)?; tx.send(Message::Proxy(proxy_addrs)) .chain_err(|| "channel closed")?; leftover } else { + // If we're not configured to parse PROXY headers, just discard them and any + // bytes read while looking for them, to avoid confusing the Electrum request parser. tx.send(Message::Proxy(None)) .chain_err(|| "channel closed")?; Vec::new()