diff --git a/Cargo.lock b/Cargo.lock index 75e53a07d8..2006671b51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -859,7 +859,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -870,7 +870,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -1125,6 +1125,42 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-nats" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31811585c7c5bc2f60f8b80d5a6b0f737115611dac47567d7f7d94562ebb180b" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "pin-project", + "portable-atomic", + "rand 0.10.1", + "regex", + "ring", + "rustls-native-certs", + "rustls-pki-types", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 2.0.17", + "time", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -2219,6 +2255,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chain" version = "0.1.0" @@ -2449,7 +2496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bb320cac8a0750d7f25280aa97b09c26edfe161164238ecbbb31092b079e735" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "proptest", "serde_core", ] @@ -3502,6 +3549,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.3.0" @@ -3599,6 +3655,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest 0.10.7", + "fiat-crypto", + "rustc_version 0.4.1", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "darling" version = "0.14.4" @@ -3741,7 +3823,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.114", ] [[package]] @@ -4039,6 +4121,28 @@ dependencies = [ "spki", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "educe" version = "0.6.0" @@ -4122,7 +4226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -4267,6 +4371,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "find-msvc-tools" version = "0.1.7" @@ -4546,6 +4656,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -5275,7 +5386,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -5658,6 +5769,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -5674,7 +5800,16 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", +] + +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", ] [[package]] @@ -5853,8 +5988,10 @@ dependencies = [ name = "observe" version = "0.1.0" dependencies = [ + "async-nats", "async-trait", "axum 0.8.8", + "bytes", "chrono", "console-subscriber", "futures", @@ -5875,6 +6012,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-serde", "tracing-subscriber", + "url", ] [[package]] @@ -6730,6 +6868,17 @@ dependencies = [ "serde", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -6769,6 +6918,12 @@ dependencies = [ "serde", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_xorshift" version = "0.4.0" @@ -7195,7 +7350,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -7206,6 +7361,7 @@ checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -7252,7 +7408,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -7514,6 +7670,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -7525,6 +7690,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -7594,7 +7770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest 0.10.7", ] @@ -7605,7 +7781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest 0.10.7", ] @@ -7720,6 +7896,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -8333,7 +8521,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -8605,6 +8793,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-sink", + "http 1.4.0", + "httparse", + "rand 0.8.5", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "toml" version = "0.8.23" @@ -8913,6 +9122,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.28.0" @@ -9332,7 +9551,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7ad6cdcfab..d2822a7f35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ alloy-transport-ws = { version = "1.8.3", default-features = false } anyhow = "1.0.100" app-data = { path = "crates/app-data" } arc-swap = "1.7.1" +async-nats = "0.48.0" async-stream = "0.3.5" async-trait = "0.1.80" autopilot = { path = "crates/autopilot" } diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 5a93ebf879..9899b428d3 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -36,7 +36,7 @@ use { http_client::HttpClientFactory, model::DomainSeparator, num::ToPrimitive, - observe::metrics::LivenessChecking, + observe::{config::EventBusConfig, metrics::LivenessChecking}, price_estimation::{ config::price_estimation::BalanceOverridesConfigExt, factory::{self, PriceEstimatorFactory}, @@ -153,6 +153,16 @@ pub async fn start(args: impl Iterator) { ); observe::tracing::init::initialize(&obs_config); observe::panic_hook::install(); + if let Some(event_bus) = &config.shared.event_bus { + observe::event_bus::init(EventBusConfig { + url: event_bus.url.clone(), + stream_name: event_bus.channel.clone(), + // Presence of `chain-id` alongside `event_bus` is enforced by + // `SharedConfig::validate` at startup. + chain_id: config.shared.chain_id.unwrap(), + }) + .await; + } #[cfg(unix)] observe::heap_dump_handler::spawn_heap_dump_handler(); diff --git a/crates/configs/src/autopilot/mod.rs b/crates/configs/src/autopilot/mod.rs index 4e4bf43c81..8dd4492e1d 100644 --- a/crates/configs/src/autopilot/mod.rs +++ b/crates/configs/src/autopilot/mod.rs @@ -190,6 +190,7 @@ impl Configuration { !self.drivers.is_empty(), "colocation is enabled but no drivers are configured" ); + self.shared.validate()?; Ok(self) } } diff --git a/crates/configs/src/orderbook/mod.rs b/crates/configs/src/orderbook/mod.rs index 4b8e21b985..3f063ebfd6 100644 --- a/crates/configs/src/orderbook/mod.rs +++ b/crates/configs/src/orderbook/mod.rs @@ -143,6 +143,11 @@ impl Configuration { )), } } + + pub fn validate(self) -> anyhow::Result { + self.shared.validate()?; + Ok(self) + } } #[cfg(any(test, feature = "test-util"))] diff --git a/crates/configs/src/shared.rs b/crates/configs/src/shared.rs index 237f28dc82..c6737c5cdf 100644 --- a/crates/configs/src/shared.rs +++ b/crates/configs/src/shared.rs @@ -1,7 +1,8 @@ use { crate::fee_factor::FeeFactor, alloy::primitives::Address, - serde::{Deserialize, Deserializer, de::Unexpected}, + anyhow::ensure, + serde::{Deserialize, Deserializer, Serialize, de::Unexpected}, std::{collections::HashSet, str::FromStr, time::Duration}, tracing::Level, url::Url, @@ -96,6 +97,20 @@ pub struct SharedConfig { /// By default, volume fees are NOT applied to same-token trades. #[serde(default)] pub enable_sell_equals_buy_volume_fee: bool, + + /// Enables publishing events to a global events bus. + pub event_bus: Option, +} + +impl SharedConfig { + /// Cross-field invariants that cannot be expressed in the serde schema. + pub fn validate(&self) -> anyhow::Result<()> { + ensure!( + self.event_bus.is_none() || self.chain_id.is_some(), + "`chain-id` is required when the event bus is configured", + ); + Ok(()) + } } impl Default for SharedConfig { @@ -112,6 +127,7 @@ impl Default for SharedConfig { contracts: Default::default(), volume_fee_bucket_overrides: Vec::new(), enable_sell_equals_buy_volume_fee: false, + event_bus: None, } } } @@ -311,6 +327,33 @@ where serializer.serialize_str(level.as_str()) } +/// Event bus configuration for a backend service. +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct EventBusConfig { + /// Url of the event bus service. + pub url: Url, + /// Name of the channel to post events to. Must not contain spaces, tabs, + /// or `.` — NATS uses `.` as its subject hierarchy separator and disallows + /// whitespace in stream/subject names. + #[serde(deserialize_with = "deserialize_channel_name")] + pub channel: String, +} + +fn deserialize_channel_name<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let raw = String::deserialize(deserializer)?; + if raw.contains([' ', '\t', '.']) { + return Err(serde::de::Error::invalid_value( + Unexpected::Str(&raw), + &"a channel name without spaces, tabs, or '.'", + )); + } + Ok(raw) +} + /// Gas price estimation strategy. #[derive(Debug, Clone, Deserialize)] #[serde(tag = "type")] @@ -403,6 +446,10 @@ mod tests { "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", "0x6B175474E89094C44Da98b954EedeAC495271d0F", ] + + [event-bus] + url = "localhost:4222" + channel = "main" "#; let config: SharedConfig = toml::from_str(toml).unwrap(); @@ -430,5 +477,69 @@ mod tests { assert!(config.enable_sell_equals_buy_volume_fee); assert_eq!(config.volume_fee_bucket_overrides.len(), 1); assert_eq!(config.volume_fee_bucket_overrides[0].tokens.len(), 2); + assert_eq!( + config.event_bus.as_ref().unwrap().url, + "localhost:4222".parse().unwrap() + ); + assert_eq!(config.event_bus.as_ref().unwrap().channel, "main"); + } + + #[test] + fn rejects_invalid_event_bus_channel_at_deserialization() { + for bad in ["with space", "with\ttab", "with.dot"] { + let toml = format!( + r#" + chain-id = 1 + + [event-bus] + url = "localhost:4222" + channel = "{bad}" + "#, + ); + assert!( + toml::from_str::(&toml).is_err(), + "channel {bad:?} should be rejected", + ); + } + + let ok = toml::from_str::( + r#" + chain-id = 1 + + [event-bus] + url = "localhost:4222" + channel = "main" + "#, + ) + .unwrap(); + assert_eq!(ok.event_bus.unwrap().channel, "main"); + } + + #[test] + fn validate_event_bus_requires_chain_id() { + let with_event_bus_only = toml::from_str::( + r#" + [event-bus] + url = "localhost:4222" + channel = "main" + "#, + ) + .unwrap(); + assert!(with_event_bus_only.validate().is_err()); + + let with_both = toml::from_str::( + r#" + chain-id = 1 + + [event-bus] + url = "localhost:4222" + channel = "main" + "#, + ) + .unwrap(); + with_both.validate().unwrap(); + + // Default config has neither set; validation should pass. + SharedConfig::default().validate().unwrap(); } } diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index 213a6d3d5e..23e8574155 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -6,8 +6,10 @@ edition = "2024" license = "MIT OR Apache-2.0" [dependencies] +async-nats = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true, features = ["now"] } console-subscriber = { workspace = true, optional = true } futures = { workspace = true } @@ -28,6 +30,7 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-serde = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "time"] } +url = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/crates/observe/src/config.rs b/crates/observe/src/config.rs index cc061390e0..7bcbdbeca8 100644 --- a/crates/observe/src/config.rs +++ b/crates/observe/src/config.rs @@ -1,4 +1,4 @@ -use {core::time::Duration, tracing::Level}; +use {core::time::Duration, tracing::Level, url::Url}; #[derive(Debug, Clone)] pub struct Config { @@ -88,3 +88,15 @@ impl TracingConfig { } } } + +/// Configures a backend wide event bus events can be posted to. +pub struct EventBusConfig { + /// Url of the event bus service + pub url: Url, + /// Name of the channel to post events to + pub stream_name: String, + /// Which chain this service operates on. The service-level `chain-id` + /// must be set when the event bus is configured; this is checked at + /// config validation time, so callers can pass it through directly. + pub chain_id: u64, +} diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs new file mode 100644 index 0000000000..26b98e168c --- /dev/null +++ b/crates/observe/src/event_bus/mod.rs @@ -0,0 +1,299 @@ +//! Implements a simple globally available way to publish events to an event +//! bus. Under the hood it's using NATS. To support publishing events from +//! synchronous contexts we use a channel as an in-memory buffer. +//! Whenever a message gets posted to this channel a background task wakes +//! up and forwards it to the NATS service running in a different process. +//! Messages always get serialized as JSON so you can publish anything that +//! can be serialized to JSON as well. +use { + crate::config::EventBusConfig, + async_nats::jetstream::Context as JetstreamClient, + bytes::Bytes, + chrono::Utc, + futures::stream::{FuturesUnordered, StreamExt}, + serde::Serialize, + tokio::sync::{ + OnceCell, + mpsc::{Receiver, Sender, channel}, + }, +}; + +/// Wire format version of the JSON envelope sent on every event. Bump +/// alongside any breaking change to [`Envelope`]. +const ENVELOPE_VERSION: &str = "v1"; + +/// JSON envelope wrapping every event published to the bus. Consumers can +/// rely on `version` to evolve their parsers, on `timestamp` for ordering, +/// and on `requestId` to correlate events to a single inbound request. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Envelope { + version: &'static str, + timestamp: String, + #[serde(skip_serializing_if = "Option::is_none")] + request_id: Option, + body: T, +} + +impl Envelope { + fn new(request_id: Option, body: T) -> Self { + Self { + version: ENVELOPE_VERSION, + timestamp: Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + request_id, + body, + } + } +} + +struct EventBusConnector { + /// Channel to decouple issuing events from actually sending them to the + /// event bus service. + message_queue: Sender, + /// Subject prefix to disambiguate messages in globally shared event bus + /// service. + subject_prefix: String, +} + +struct Message { + subject: String, + data: Bytes, +} + +/// Singleton event bus connection to allow publishing events +/// conveniently from everywhere. +static BUS: OnceCell = OnceCell::const_new(); + +/// Initializes the event bus. Connection failures are logged but do not +/// abort startup: the event bus is purely observational, so a misconfigured +/// or unreachable NATS must not take the binary down. When init fails the +/// global `BUS` stays uninitialized and [`publish`] becomes a no-op. +/// +/// Safe to call multiple times: once a previous call has succeeded the +/// subsequent ones short-circuit. A failed call leaves the bus uninitialized +/// so the next call gets another chance. +pub async fn init(config: EventBusConfig) { + if BUS.initialized() { + return; + } + let result = BUS + .get_or_try_init(|| async { connect(&config).await }) + .await; + match result { + Ok(_) => { + tracing::info!( + channel = %config.stream_name, + chain_id = config.chain_id, + "event bus connected", + ); + } + Err(err) => { + tracing::error!( + ?err, + url = %config.url, + channel = %config.stream_name, + "failed to initialize event bus; events will be dropped", + ); + } + } +} + +async fn connect(config: &EventBusConfig) -> Result { + // We prefix every subject with `event` so consumers can subscribe to all + // events (e.g. `event.>`) without also seeing NATS internal events. The + // trailing dot is significant: see [`publish`] for how it's concatenated + // with the per-event subject suffix. + let subject_prefix = format!("event.{}.", config.chain_id); + + let client = async_nats::connect(config.url.as_str()).await?; + let jetstream = async_nats::jetstream::new(client); + // Make sure the stream exists up-front; otherwise every publish would fail + // server-side and we'd only find out at runtime. + jetstream.get_stream(&config.stream_name).await?; + + // JetStream publish completes in two stages: the call to `publish()` + // returns once the client has buffered the message, the returned + // PubAck future resolves once the server has stored it. + let (message_tx, message_rx) = channel(EVENT_BUS_SIZE); + let (ack_tx, ack_rx) = channel(EVENT_BUS_SIZE); + tokio::task::spawn(publish_messages(message_rx, jetstream, ack_tx)); + tokio::task::spawn(await_acks(ack_rx)); + + Ok(EventBusConnector { + message_queue: message_tx, + subject_prefix, + }) +} + +const EVENT_BUS_SIZE: usize = 1_000; + +/// In-flight handle returned by JetStream's `publish` call, passed from the +/// publisher task to the ack-handling task. +type PendingAck = (String, async_nats::jetstream::context::PublishAckFuture); + +/// Reads messages off the in-memory queue and hands the publish ack future +/// off to [`await_acks`]. +async fn publish_messages( + mut messages: Receiver, + client: JetstreamClient, + acks: Sender, +) { + while let Some(message) = messages.recv().await { + let subject = message.subject; + let ack_fut = match client.publish(subject.clone(), message.data).await { + Ok(fut) => fut, + Err(err) => { + tracing::warn!(?err, %subject, "failed to enqueue event with NATS client"); + record_dropped(DropReason::Publish); + continue; + } + }; + if acks.send((subject, ack_fut)).await.is_err() { + tracing::warn!("ack task was shut down; returning"); + return; + } + } +} + +/// Awaits JetStream publish acks concurrently and logs any failures. Runs +/// until the publisher task drops its sender and all in-flight acks have +/// resolved, so server-side rejections are still observed during shutdown. +async fn await_acks(mut acks: Receiver) { + let mut pending = FuturesUnordered::new(); + loop { + tokio::select! { + biased; + // Drain pending acks alongside new ones so failures are logged + // promptly and the set doesn't grow without bound. + Some(()) = pending.next(), if !pending.is_empty() => {} + next = acks.recv() => { + let Some((subject, ack_fut)) = next else { break }; + pending.push(log_ack(subject, ack_fut)); + } + } + } + // Only reached on publisher panic (steady state keeps both tasks alive). + // Drain pending futures so `log_ack` still runs for any in-flight publish + // — otherwise dropping them unpolled silently loses the log + metric. + while pending.next().await.is_some() {} +} + +async fn log_ack(subject: String, ack_fut: async_nats::jetstream::context::PublishAckFuture) { + if let Err(err) = ack_fut.await { + tracing::warn!(?err, %subject, "NATS did not acknowledge event"); + record_dropped(DropReason::Ack); + } +} + +/// Enqueues the event to be sent to the event bus in a background task. +pub fn publish(subject: &str, data: impl Serialize) { + let Some(bus) = BUS.get() else { + tracing::warn!("attempting to publish events without initializing the event bus"); + return; + }; + + let envelope = Envelope::new( + crate::tracing::distributed::request_id::from_current_span(), + data, + ); + let body = match serde_json::to_vec(&envelope) { + Ok(body) => body, + Err(err) => { + tracing::error!(?err, "failed to serialize event"); + record_dropped(DropReason::Serialize); + return; + } + }; + + let message = Message { + subject: format!("{}{}", bus.subject_prefix, subject), + data: body.into(), + }; + + if let Err(err) = bus.message_queue.try_send(message) { + tracing::error!(?err, "failed to enqueue message"); + record_dropped(DropReason::ChannelFull); + } +} + +/// Why an event was not delivered to the event bus. Used as a Prometheus +/// label so the failure modes can be alerted on independently. +#[derive(Copy, Clone, Debug)] +enum DropReason { + /// In-memory queue between [`publish`] and the background forwarder was + /// saturated. + ChannelFull, + /// The payload could not be encoded as JSON. + Serialize, + /// The NATS client rejected the publish locally (e.g. disconnected). + Publish, + /// JetStream did not acknowledge the publish. + Ack, +} + +impl DropReason { + fn as_label(self) -> &'static str { + match self { + DropReason::ChannelFull => "channel_full", + DropReason::Serialize => "serialize", + DropReason::Publish => "publish", + DropReason::Ack => "ack", + } + } +} + +#[derive(prometheus_metric_storage::MetricStorage)] +#[metric(subsystem = "event_bus")] +struct Metrics { + /// Events that were not delivered to the event bus, by failure mode. + /// See [`DropReason`] for the meaning of each label value. + #[metric(labels("reason"))] + dropped_events: prometheus::IntCounterVec, +} + +fn record_dropped(reason: DropReason) { + let Ok(metrics) = Metrics::instance(crate::metrics::get_storage_registry()) else { + return; + }; + metrics + .dropped_events + .with_label_values(&[reason.as_label()]) + .inc(); +} + +#[cfg(test)] +mod tests { + use {super::*, serde_json::json}; + + #[test] + fn envelope_matches_wire_format() { + let envelope = Envelope { + version: ENVELOPE_VERSION, + timestamp: "2026-05-22T12:00:00.000Z".to_string(), + request_id: Some("req-1".to_string()), + body: json!({"outAmount": 1234}), + }; + let serialized: serde_json::Value = serde_json::to_value(&envelope).unwrap(); + assert_eq!( + serialized, + json!({ + "version": "v1", + "timestamp": "2026-05-22T12:00:00.000Z", + "requestId": "req-1", + "body": {"outAmount": 1234}, + }) + ); + } + + #[test] + fn envelope_omits_missing_request_id() { + let envelope = Envelope { + version: ENVELOPE_VERSION, + timestamp: "2026-05-22T12:00:00.000Z".to_string(), + request_id: None, + body: json!({}), + }; + let serialized: serde_json::Value = serde_json::to_value(&envelope).unwrap(); + assert!(serialized.get("requestId").is_none()); + } +} diff --git a/crates/observe/src/lib.rs b/crates/observe/src/lib.rs index ba9f85edf7..d847d75193 100644 --- a/crates/observe/src/lib.rs +++ b/crates/observe/src/lib.rs @@ -2,6 +2,7 @@ //! improve the observability of a system. That includes initialization logic //! for metrics and logging as well as logging helper functions. pub mod config; +pub mod event_bus; pub mod future; #[cfg(unix)] pub mod heap_dump_handler; diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index e35a895c73..7cc571ec76 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -29,7 +29,10 @@ use { http_client::HttpClientFactory, model::DomainSeparator, num::ToPrimitive, - observe::metrics::{DEFAULT_METRICS_PORT, serve_metrics}, + observe::{ + config::EventBusConfig, + metrics::{DEFAULT_METRICS_PORT, serve_metrics}, + }, order_validation, price_estimation::{ PriceEstimating, @@ -50,7 +53,9 @@ pub async fn start(args: impl Iterator) { let args = Arguments::parse_from(args); let config = Configuration::from_path(&args.config) .await - .expect("failed to load configuration file"); + .expect("failed to load configuration file") + .validate() + .expect("failed to validate configuration file"); let tracing_config = config .shared .tracing @@ -74,6 +79,16 @@ pub async fn start(args: impl Iterator) { tracing::info!("running order book with validated arguments:\n{}", args); observe::panic_hook::install(); observe::metrics::setup_registry(Some("gp_v2_api".into()), None); + if let Some(event_bus) = &config.shared.event_bus { + observe::event_bus::init(EventBusConfig { + url: event_bus.url.clone(), + stream_name: event_bus.channel.clone(), + // Presence of `chain-id` alongside `event_bus` is enforced by + // `SharedConfig::validate` at startup. + chain_id: config.shared.chain_id.unwrap(), + }) + .await; + } #[cfg(unix)] observe::heap_dump_handler::spawn_heap_dump_handler(); tracing::info!("file configuration:\n{:#?}", config); diff --git a/crates/price-estimation/src/competition/mod.rs b/crates/price-estimation/src/competition/mod.rs index 80a9119a5b..b2b1e1cb9e 100644 --- a/crates/price-estimation/src/competition/mod.rs +++ b/crates/price-estimation/src/competition/mod.rs @@ -108,9 +108,10 @@ impl CompetitionEstimator { while stage_index < self.stages.len() && requests.len() < requests_for_batch { let stage = &self.stages.get(stage_index).expect("index checked by loop"); - let futures = stage.iter().enumerate().map(|(index, (_name, estimator))| { + let futures = stage.iter().enumerate().map(|(index, (name, estimator))| { get_single_result(Context { estimator, + name, query: query.clone(), remaining_stages: Arc::clone(&remaining_stages), }) @@ -175,6 +176,8 @@ struct Context<'a, ESTIMATOR, QUERY> { /// the number of stages that are left after the queries /// produced by this Context's stages. remaining_stages: Arc>, + /// Name of the estimator + name: &'a str, } impl<'a, E, Q> Context<'a, E, Q> { diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index a558494d20..7b0d3bd0b5 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -9,10 +9,15 @@ use { QuoteVerificationMode, }, alloy::primitives::{Address, U256}, - anyhow::Context, + anyhow::Context as _, futures::future::{BoxFuture, FutureExt, TryFutureExt}, model::order::OrderKind, - std::{cmp::Ordering, sync::Arc, time::Duration}, + serde::Serialize, + std::{ + cmp::Ordering, + sync::Arc, + time::{Duration, Instant}, + }, tracing::instrument, }; @@ -37,7 +42,19 @@ impl PriceEstimating for CompetitionEstimator> { }; let get_results = self .produce_results(query.clone(), is_reasonable, |context| { - context.estimator.estimate(context.query) + // Call estimate() eagerly so its side-effects still happen + // when an early-return drops the future before it's polled. + let start = Instant::now(); + let estimator_name = context.name; + let inner_query = context.query.clone(); + context + .estimator + .estimate(context.query.clone()) + .map(move |res| { + emit_quote_event(estimator_name, &inner_query, &res, start.elapsed()); + res + }) + .boxed() }) .map(Result::Ok); @@ -156,6 +173,75 @@ impl RankingContext { } } +fn emit_quote_event( + estimator_name: &str, + query: &Query, + result: &PriceEstimateResult, + elapsed: Duration, +) { + let event = PriceEstimateEvent { + query: QueryFields { + sell_token: query.sell_token.to_string(), + buy_token: query.buy_token.to_string(), + in_amount: query.in_amount.to_string(), + kind: match query.kind { + OrderKind::Sell => "sell", + OrderKind::Buy => "buy", + }, + }, + from: query.verification.from, + timeout: query.timeout.as_millis(), + elapsed: elapsed.as_millis(), + estimator: estimator_name, + result: match result { + Ok(estimate) => EstimateResult::Ok { + out_amount: estimate.out_amount.to_string(), + gas: estimate.gas.to_string(), + verified: estimate.verified, + }, + Err(err) => EstimateResult::Err { + error: err.to_string(), + }, + }, + }; + observe::event_bus::publish("priceEstimate", event); +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct PriceEstimateEvent<'a> { + query: QueryFields, + from: Address, + timeout: u128, + elapsed: u128, + estimator: &'a str, + result: EstimateResult, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct QueryFields { + sell_token: String, + buy_token: String, + in_amount: String, + kind: &'static str, +} + +/// Mirrors the previous JSON: either the estimate fields or an `error`. +#[derive(Serialize)] +#[serde(untagged)] +enum EstimateResult { + #[serde(rename_all = "camelCase")] + Ok { + out_amount: String, + gas: String, + verified: bool, + }, + Err { + error: String, + }, +} + #[cfg(test)] mod tests { use { @@ -164,8 +250,61 @@ mod tests { alloy::{eips::eip1559::Eip1559Estimation, primitives::U256}, gas_price_estimation::FakeGasPriceEstimator, model::order::OrderKind, + serde_json::json, }; + #[test] + fn price_estimate_event_matches_wire_format() { + let event = PriceEstimateEvent { + query: QueryFields { + sell_token: "0x01".into(), + buy_token: "0x02".into(), + in_amount: "100".into(), + kind: "sell", + }, + from: Address::ZERO, + timeout: 5000, + elapsed: 12, + estimator: "baseline", + result: EstimateResult::Ok { + out_amount: "99".into(), + gas: "21000".into(), + verified: true, + }, + }; + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "query": { + "sellToken": "0x01", + "buyToken": "0x02", + "inAmount": "100", + "kind": "sell", + }, + "from": Address::ZERO, + "timeout": 5000, + "elapsed": 12, + "estimator": "baseline", + "result": { + "outAmount": "99", + "gas": "21000", + "verified": true, + }, + }), + ); + } + + #[test] + fn price_estimate_event_error_variant() { + let result = EstimateResult::Err { + error: "boom".into(), + }; + assert_eq!( + serde_json::to_value(&result).unwrap(), + json!({ "error": "boom" }), + ); + } + fn price(out_amount: u128, gas: u64) -> PriceEstimateResult { Ok(Estimate { out_amount: U256::from(out_amount),