From daa64998079496cbed1878369e971d2f7ebdccc1 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 08:51:38 +0000 Subject: [PATCH 01/30] Initial commit --- Cargo.lock | 132 ++++++++++++++++++++++--- Cargo.toml | 1 + crates/observe/Cargo.toml | 1 + crates/observe/src/event_bus/mod.rs | 144 ++++++++++++++++++++++++++++ crates/observe/src/lib.rs | 1 + 5 files changed, 266 insertions(+), 13 deletions(-) create mode 100644 crates/observe/src/event_bus/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 7154437fdc..0025a77308 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -859,7 +859,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -870,7 +870,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2219,6 +2219,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chain" version = "0.1.0" @@ -2235,8 +2246,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-link", ] @@ -2449,7 +2462,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bb320cac8a0750d7f25280aa97b09c26edfe161164238ecbbb31092b079e735" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "proptest", "serde_core", ] @@ -3502,6 +3515,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.3.0" @@ -3741,7 +3763,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ab67060fc6b8ef687992d439ca0fa36e7ed17e9a0b16b25b601e8757df720de" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.114", ] [[package]] @@ -4122,7 +4144,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -4546,6 +4568,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -5275,7 +5298,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb26cec98cce3a3d96cbb7bced3c4b16e3d13f27ec56dbd62cbc8f39cfb9d653" dependencies = [ - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -5641,6 +5664,12 @@ dependencies = [ "data-encoding-macro", ] +[[package]] +name = "murmur3" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" + [[package]] name = "native-tls" version = "0.2.14" @@ -5674,7 +5703,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5799,6 +5828,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" dependencies = [ + "proc-macro-crate", "proc-macro2", "quote", "syn 2.0.114", @@ -5866,6 +5896,7 @@ dependencies = [ "pin-project-lite", "prometheus", "prometheus-metric-storage", + "rabbitmq-stream-client", "scopeguard", "serde", "serde_json", @@ -6088,6 +6119,15 @@ dependencies = [ "vergen", ] +[[package]] +name = "ordered-float" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" +dependencies = [ + "num-traits", +] + [[package]] name = "outref" version = "0.5.2" @@ -6701,6 +6741,44 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "rabbitmq-stream-client" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7e766fd5cccfba5caa077d0b00de81840f0553a15325f22b59e87322f235d10" +dependencies = [ + "async-trait", + "bytes", + "dashmap", + "futures", + "murmur3", + "pin-project", + "rabbitmq-stream-protocol", + "rand 0.10.1", + "rustls-pemfile", + "thiserror 2.0.17", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tracing", + "url", +] + +[[package]] +name = "rabbitmq-stream-protocol" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fedda81585bf2b5f8b91be49a592d2c91cb060c2c1927840617c52a67d1b7d" +dependencies = [ + "byteorder", + "chrono", + "derive_more 2.1.1", + "num_enum", + "ordered-float", + "uuid", +] + [[package]] name = "radium" version = "0.7.0" @@ -6730,6 +6808,17 @@ dependencies = [ "serde", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -6769,6 +6858,12 @@ dependencies = [ "serde", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_xorshift" version = "0.4.0" @@ -7195,7 +7290,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -7205,6 +7300,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", + "log", "once_cell", "rustls-pki-types", "rustls-webpki", @@ -7224,6 +7320,15 @@ dependencies = [ "security-framework 3.5.1", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.13.2" @@ -7252,7 +7357,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -7594,7 +7699,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest 0.10.7", ] @@ -7605,7 +7710,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest 0.10.7", ] @@ -8332,7 +8437,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -8524,6 +8629,7 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.6.1", @@ -9331,7 +9437,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7ad6cdcfab..efeb7a0465 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,7 @@ proc-macro2 = "1.0.103" prometheus = "0.14" prometheus-metric-storage = "0.6" quote = "1.0.41" +rabbitmq-stream-client = "0.11" rand = "0.9.4" rate-limit = { path = "crates/rate-limit" } refunder = { path = "crates/refunder" } diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index 213a6d3d5e..50857f8caa 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -19,6 +19,7 @@ opentelemetry_sdk = { workspace = true } pin-project-lite = { workspace = true } prometheus = { workspace = true } prometheus-metric-storage = { workspace = true } +rabbitmq-stream-client = { workspace = true } scopeguard = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs new file mode 100644 index 0000000000..b179e1caa9 --- /dev/null +++ b/crates/observe/src/event_bus/mod.rs @@ -0,0 +1,144 @@ +//! Implements a simple globally available way to publish events to an event +//! bus. Under the hood it's using a RabbitMQ stream. Because the RabbitMQ crate +//! internally uses a bounded channel which makes every send an async operation +//! we additionally buffer messages in an unbounded channel to also allow +//! publishing events from synchronous contexts. +//! The architecture looks roughly like this: +//! cow service: business_logic --message--> event_bus_channel +//! event_bus_background_task: event_bus_channel --message--> rabbitmq_channel +//! rabbitmq_background_task: rabbitmq_channel --message--> rabbitmq_service + +use { + chrono::Utc, + rabbitmq_stream_client::{ + Environment, + NoDedup, + Producer, + types::{ByteCapacity, Message}, + }, + serde::Serialize, + tokio::sync::{ + OnceCell, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + }, +}; + +static EVENT_QUEUE: OnceCell> = OnceCell::const_new(); + +/// Configuration options describing to which RabbitMQ instance to connect to. +pub struct ClientConfig { + host: String, + port: u16, +} + +/// Configuration options describing which RabbitMQ channel to publish messages in. +pub struct ChannelConfig { + name: String, + size: ByteCapacity, + /// Number of messages to buffer in-memory before actually sending them to the + /// event bus service running in a different process. + batch_size: usize, +} + +pub struct Config { + client: ClientConfig, + channel: ChannelConfig, +} + +/// Initializes the event bus and panics if it fails. +pub async fn init(config: Config, shutdown: impl Future) { + EVENT_QUEUE + .get_or_init(|| async move { + let environment = Environment::builder() + .host(&config.client.host) + .port(config.client.port) + .build() + .await + .expect("failed to connect to rabbitmq"); + + environment + .stream_creator() + .max_length(config.channel.size) + .create(&config.channel.name) + .await + .expect("failed to create channel"); + + let producer = environment + .producer() + .batch_size(config.channel.batch_size) + .build(&config.channel.name) + .await + .expect("failed to create producer"); + + let (sender, receiver) = unbounded_channel(); + tokio::task::spawn(publish_events( + receiver, + producer, + config.channel.batch_size, + shutdown, + )); + sender + }) + .await; +} + +/// Buffers incoming messages and sends a batch once the target batch size +/// has been reached. +// TODO: flush half filled buffer during program shutdown to not miss any +// events +async fn publish_events( + mut receiver: UnboundedReceiver, + producer: Producer, + batch_size: usize, + shutdown: impl Future, +) { + let mut buffer = Vec::with_capacity(batch_size); + while let Some(message) = receiver.recv().await { + buffer.push(message); + + if buffer.len() >= batch_size { + // swap out filled buffer with empty pre-allocated buffer + let mut messages = Vec::with_capacity(batch_size); + std::mem::swap(&mut messages, &mut buffer); + + let send_messages = producer.batch_send(messages, |res| async move { + match res { + Ok(status) => tracing::trace!(?status, "messages confirmed"), + Err(err) => tracing::error!(?err, "failed to send messages"), + } + }); + if let Err(err) = send_messages.await { + tracing::error!(?err, "failed to enqueue messages"); + } + } + } +} + +/// Enqueues the event to be sent to the event bus in a background task. +pub fn publish(name: impl Into, data: impl Serialize) { + let Some(queue) = EVENT_QUEUE.get() else { + tracing::error!("event queue not yet initialized"); + return; + }; + + let body = match serde_json::to_vec(&data) { + Ok(body) => body, + Err(err) => { + tracing::error!(?err, "failed to serialize event"); + return; + } + }; + + let message = Message::builder() + .properties() + .subject(name) + .creation_time(Utc::now()) + .content_type("application/json") + .message_builder() + .body(body) + .build(); + + if let Err(err) = queue.send(message) { + tracing::error!(?err, "failed to enqueue message"); + } +} diff --git a/crates/observe/src/lib.rs b/crates/observe/src/lib.rs index ba9f85edf7..d847d75193 100644 --- a/crates/observe/src/lib.rs +++ b/crates/observe/src/lib.rs @@ -2,6 +2,7 @@ //! improve the observability of a system. That includes initialization logic //! for metrics and logging as well as logging helper functions. pub mod config; +pub mod event_bus; pub mod future; #[cfg(unix)] pub mod heap_dump_handler; From ea1e32fd7dab04061301265452edea39ff0f3c44 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 10:06:13 +0000 Subject: [PATCH 02/30] Implement shutdown mode --- crates/observe/src/event_bus/mod.rs | 79 +++++++++++++++++++---------- 1 file changed, 53 insertions(+), 26 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index b179e1caa9..ddce39acb5 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -17,12 +17,15 @@ use { types::{ByteCapacity, Message}, }, serde::Serialize, + std::future::Future, tokio::sync::{ OnceCell, mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, }, }; +/// Channel to buffer emitted events until we have enough to send a bunch of +/// them at once. static EVENT_QUEUE: OnceCell> = OnceCell::const_new(); /// Configuration options describing to which RabbitMQ instance to connect to. @@ -31,12 +34,13 @@ pub struct ClientConfig { port: u16, } -/// Configuration options describing which RabbitMQ channel to publish messages in. +/// Configuration options describing which RabbitMQ channel to publish messages +/// in. pub struct ChannelConfig { name: String, size: ByteCapacity, - /// Number of messages to buffer in-memory before actually sending them to the - /// event bus service running in a different process. + /// Number of messages to buffer in-memory before actually sending them to + /// the event bus service running in a different process. batch_size: usize, } @@ -46,7 +50,7 @@ pub struct Config { } /// Initializes the event bus and panics if it fails. -pub async fn init(config: Config, shutdown: impl Future) { +pub async fn init(config: Config, shutdown: impl Future + Send + 'static) { EVENT_QUEUE .get_or_init(|| async move { let environment = Environment::builder() @@ -71,7 +75,7 @@ pub async fn init(config: Config, shutdown: impl Future) { .expect("failed to create producer"); let (sender, receiver) = unbounded_channel(); - tokio::task::spawn(publish_events( + tokio::task::spawn(forward_messages_to_event_bus_client( receiver, producer, config.channel.batch_size, @@ -82,36 +86,59 @@ pub async fn init(config: Config, shutdown: impl Future) { .await; } -/// Buffers incoming messages and sends a batch once the target batch size -/// has been reached. -// TODO: flush half filled buffer during program shutdown to not miss any -// events -async fn publish_events( +async fn send_batch(producer: &Producer, messages: Vec) { + if let Err(err) = producer + .batch_send(messages, |res| async move { + match res { + Ok(status) => tracing::trace!(?status, "messages confirmed"), + Err(err) => tracing::error!(?err, "failed to send messages"), + } + }) + .await + { + tracing::error!(?err, "failed to enqueue messages"); + } +} + +/// Buffers incoming messages and sends a batch once the target batch size has +/// been reached. When `shutdown` resolves, flushes any buffered messages and +/// switches to publishing each subsequent message immediately. +async fn forward_messages_to_event_bus_client( mut receiver: UnboundedReceiver, producer: Producer, batch_size: usize, - shutdown: impl Future, + shutdown: impl Future + Send + 'static, ) { let mut buffer = Vec::with_capacity(batch_size); - while let Some(message) = receiver.recv().await { - buffer.push(message); - - if buffer.len() >= batch_size { - // swap out filled buffer with empty pre-allocated buffer - let mut messages = Vec::with_capacity(batch_size); - std::mem::swap(&mut messages, &mut buffer); - - let send_messages = producer.batch_send(messages, |res| async move { - match res { - Ok(status) => tracing::trace!(?status, "messages confirmed"), - Err(err) => tracing::error!(?err, "failed to send messages"), + tokio::pin!(shutdown); + + // Normal mode: buffer messages and send in batches. + loop { + tokio::select! { + _ = &mut shutdown => break, + maybe_message = receiver.recv() => { + let Some(message) = maybe_message else { + tracing::debug!("event queue was closed"); + return; + }; + buffer.push(message); + if buffer.len() >= batch_size { + let mut messages = Vec::with_capacity(batch_size); + std::mem::swap(&mut messages, &mut buffer); + send_batch(&producer, messages).await; } - }); - if let Err(err) = send_messages.await { - tracing::error!(?err, "failed to enqueue messages"); } } } + + // Shutdown mode: flush the buffer, then publish each subsequent message + // immediately without waiting to fill a batch. + if !buffer.is_empty() { + send_batch(&producer, std::mem::take(&mut buffer)).await; + } + while let Some(message) = receiver.recv().await { + send_batch(&producer, vec![message]).await; + } } /// Enqueues the event to be sent to the event bus in a background task. From 2dd6067cbb020712cfd489d72fcfebac05fafe37 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 10:54:48 +0000 Subject: [PATCH 03/30] failing unit test --- crates/observe/src/event_bus/mod.rs | 69 ++++++++++++++++++++++++----- docker-compose.rabbitmq.yml | 12 +++++ 2 files changed, 71 insertions(+), 10 deletions(-) create mode 100644 docker-compose.rabbitmq.yml diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index ddce39acb5..9c502e8726 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -14,7 +14,8 @@ use { Environment, NoDedup, Producer, - types::{ByteCapacity, Message}, + error::StreamCreateError, + types::{ByteCapacity, Message, ResponseCode}, }, serde::Serialize, std::future::Future, @@ -60,16 +61,23 @@ pub async fn init(config: Config, shutdown: impl Future + Send + 's .await .expect("failed to connect to rabbitmq"); - environment + match environment .stream_creator() .max_length(config.channel.size) .create(&config.channel.name) .await - .expect("failed to create channel"); + { + Ok(()) => {} + Err(StreamCreateError::Create { + status: ResponseCode::StreamAlreadyExists, + .. + }) => {} + Err(err) => panic!("failed to create channel: {err}"), + } let producer = environment .producer() - .batch_size(config.channel.batch_size) + .batch_size(10) .build(&config.channel.name) .await .expect("failed to create producer"); @@ -96,7 +104,7 @@ async fn send_batch(producer: &Producer, messages: Vec) { }) .await { - tracing::error!(?err, "failed to enqueue messages"); + tracing::error!("failed to enqueue messages: {err:#?}"); } } @@ -114,6 +122,7 @@ async fn forward_messages_to_event_bus_client( // Normal mode: buffer messages and send in batches. loop { + tracing::debug!("poll futures"); tokio::select! { _ = &mut shutdown => break, maybe_message = receiver.recv() => { @@ -121,12 +130,12 @@ async fn forward_messages_to_event_bus_client( tracing::debug!("event queue was closed"); return; }; + tracing::debug!("received new message"); buffer.push(message); - if buffer.len() >= batch_size { - let mut messages = Vec::with_capacity(batch_size); - std::mem::swap(&mut messages, &mut buffer); - send_batch(&producer, messages).await; - } + tracing::debug!("send message"); + let mut messages = Vec::with_capacity(batch_size); + std::mem::swap(&mut messages, &mut buffer); + send_batch(&producer, messages).await; } } } @@ -168,4 +177,44 @@ pub fn publish(name: impl Into, data: impl Serialize) { if let Err(err) = queue.send(message) { tracing::error!(?err, "failed to enqueue message"); } + tracing::debug!("pushed even to queue"); +} + +#[cfg(test)] +mod tests { + use {super::*, serde_json::json}; + + #[tokio::test(flavor = "multi_thread")] + async fn send_messages() { + crate::tracing::init::initialize(&crate::Config { + env_filter: "debug".to_string(), + stderr_threshold: None, + use_json_format: false, + tracing: None, + }); + init( + Config { + client: ClientConfig { + host: "localhost".to_string(), + port: 5552, + }, + channel: ChannelConfig { + name: "test_stream".to_string(), + size: ByteCapacity::GB(2), + batch_size: 1, + }, + }, + futures::future::pending(), + ) + .await; + tracing::debug!("connected to rabbit"); + + publish( + "name", + json!({ + "estimator": "baseline", + "outAmount": 1234, + }), + ); + } } diff --git a/docker-compose.rabbitmq.yml b/docker-compose.rabbitmq.yml new file mode 100644 index 0000000000..7bd9d41a62 --- /dev/null +++ b/docker-compose.rabbitmq.yml @@ -0,0 +1,12 @@ +services: + rabbitmq: + image: rabbitmq:4-management + ports: + - "5552:5552" + - "5672:5672" + - "15672:15672" + environment: + RABBITMQ_DEFAULT_USER: guest + RABBITMQ_DEFAULT_PASS: guest + RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost + command: bash -c "rabbitmq-plugins enable rabbitmq_stream --offline && rabbitmq-server" From ceede24ac99f377f58ecc934d6b32a79fa466607 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 13:01:58 +0000 Subject: [PATCH 04/30] Switch to NATS --- Cargo.lock | 249 ++++++++++++++++++++-------- Cargo.toml | 2 +- crates/observe/Cargo.toml | 4 +- crates/observe/src/event_bus/mod.rs | 182 +++++--------------- docker-compose.nats.yml | 7 + 5 files changed, 231 insertions(+), 213 deletions(-) create mode 100644 docker-compose.nats.yml diff --git a/Cargo.lock b/Cargo.lock index 0025a77308..ccbc367ec1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1125,6 +1125,42 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-nats" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31811585c7c5bc2f60f8b80d5a6b0f737115611dac47567d7f7d94562ebb180b" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-util", + "memchr", + "nkeys", + "nuid", + "pin-project", + "portable-atomic", + "rand 0.10.1", + "regex", + "ring", + "rustls-native-certs", + "rustls-pki-types", + "rustls-webpki", + "serde", + "serde_json", + "serde_nanos", + "serde_repr", + "thiserror 2.0.17", + "time", + "tokio", + "tokio-rustls", + "tokio-stream", + "tokio-util", + "tokio-websockets", + "tracing", + "tryhard", + "url", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -2246,10 +2282,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" dependencies = [ "iana-time-zone", - "js-sys", "num-traits", "serde", - "wasm-bindgen", "windows-link", ] @@ -3621,6 +3655,32 @@ dependencies = [ "typenum", ] +[[package]] +name = "curve25519-dalek" +version = "4.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "curve25519-dalek-derive", + "digest 0.10.7", + "fiat-crypto", + "rustc_version 0.4.1", + "subtle", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "darling" version = "0.14.4" @@ -4061,6 +4121,28 @@ dependencies = [ "spki", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "signature", +] + +[[package]] +name = "ed25519-dalek" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" +dependencies = [ + "curve25519-dalek", + "ed25519", + "sha2", + "signature", + "subtle", +] + [[package]] name = "educe" version = "0.6.0" @@ -4289,6 +4371,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "fiat-crypto" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" + [[package]] name = "find-msvc-tools" version = "0.1.7" @@ -5664,12 +5752,6 @@ dependencies = [ "data-encoding-macro", ] -[[package]] -name = "murmur3" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9252111cf132ba0929b6f8e030cac2a24b507f3a4d6db6fb2896f27b354c714b" - [[package]] name = "native-tls" version = "0.2.14" @@ -5687,6 +5769,21 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nkeys" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879011babc47a1c7fdf5a935ae3cfe94f34645ca0cac1c7f6424b36fc743d1bf" +dependencies = [ + "data-encoding", + "ed25519", + "ed25519-dalek", + "getrandom 0.2.17", + "log", + "rand 0.8.5", + "signatory", +] + [[package]] name = "nom" version = "7.1.3" @@ -5706,6 +5803,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nuid" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc895af95856f929163a0aa20c26a78d26bfdc839f51b9d5aa7a5b79e52b7e83" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "num" version = "0.4.3" @@ -5828,7 +5934,6 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" dependencies = [ - "proc-macro-crate", "proc-macro2", "quote", "syn 2.0.114", @@ -5883,8 +5988,10 @@ dependencies = [ name = "observe" version = "0.1.0" dependencies = [ + "async-nats", "async-trait", "axum 0.8.8", + "bytes", "chrono", "console-subscriber", "futures", @@ -5896,7 +6003,6 @@ dependencies = [ "pin-project-lite", "prometheus", "prometheus-metric-storage", - "rabbitmq-stream-client", "scopeguard", "serde", "serde_json", @@ -5906,6 +6012,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-serde", "tracing-subscriber", + "url", ] [[package]] @@ -6119,15 +6226,6 @@ dependencies = [ "vergen", ] -[[package]] -name = "ordered-float" -version = "4.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" -dependencies = [ - "num-traits", -] - [[package]] name = "outref" version = "0.5.2" @@ -6741,44 +6839,6 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" -[[package]] -name = "rabbitmq-stream-client" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e766fd5cccfba5caa077d0b00de81840f0553a15325f22b59e87322f235d10" -dependencies = [ - "async-trait", - "bytes", - "dashmap", - "futures", - "murmur3", - "pin-project", - "rabbitmq-stream-protocol", - "rand 0.10.1", - "rustls-pemfile", - "thiserror 2.0.17", - "tokio", - "tokio-rustls", - "tokio-stream", - "tokio-util", - "tracing", - "url", -] - -[[package]] -name = "rabbitmq-stream-protocol" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fedda81585bf2b5f8b91be49a592d2c91cb060c2c1927840617c52a67d1b7d" -dependencies = [ - "byteorder", - "chrono", - "derive_more 2.1.1", - "num_enum", - "ordered-float", - "uuid", -] - [[package]] name = "radium" version = "0.7.0" @@ -7300,8 +7360,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", - "log", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", @@ -7320,15 +7380,6 @@ dependencies = [ "security-framework 3.5.1", ] -[[package]] -name = "rustls-pemfile" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" -dependencies = [ - "rustls-pki-types", -] - [[package]] name = "rustls-pki-types" version = "1.13.2" @@ -7619,6 +7670,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_nanos" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a93142f0367a4cc53ae0fead1bcda39e85beccfad3dcd717656cacab94b12985" +dependencies = [ + "serde", +] + [[package]] name = "serde_path_to_error" version = "0.1.20" @@ -7630,6 +7690,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_repr" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "serde_spanned" version = "0.6.9" @@ -7824,6 +7895,18 @@ dependencies = [ "libc", ] +[[package]] +name = "signatory" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" +dependencies = [ + "pkcs8", + "rand_core 0.6.4", + "signature", + "zeroize", +] + [[package]] name = "signature" version = "2.2.0" @@ -8629,7 +8712,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2 0.6.1", @@ -8710,6 +8792,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-websockets" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f591660438b3038dd04d16c938271c79e7e06260ad2ea2885a4861bfb238605d" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-sink", + "http 1.4.0", + "httparse", + "rand 0.8.5", + "ring", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots 0.26.11", +] + [[package]] name = "toml" version = "0.8.23" @@ -9018,6 +9121,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tryhard" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fe58ebd5edd976e0fe0f8a14d2a04b7c81ef153ea9a54eebc42e67c2c23b4e5" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tungstenite" version = "0.28.0" diff --git a/Cargo.toml b/Cargo.toml index efeb7a0465..d2822a7f35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ alloy-transport-ws = { version = "1.8.3", default-features = false } anyhow = "1.0.100" app-data = { path = "crates/app-data" } arc-swap = "1.7.1" +async-nats = "0.48.0" async-stream = "0.3.5" async-trait = "0.1.80" autopilot = { path = "crates/autopilot" } @@ -96,7 +97,6 @@ proc-macro2 = "1.0.103" prometheus = "0.14" prometheus-metric-storage = "0.6" quote = "1.0.41" -rabbitmq-stream-client = "0.11" rand = "0.9.4" rate-limit = { path = "crates/rate-limit" } refunder = { path = "crates/refunder" } diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index 50857f8caa..a94ab66785 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -6,7 +6,9 @@ edition = "2024" license = "MIT OR Apache-2.0" [dependencies] +async-nats = { workspace = true } async-trait = { workspace = true } +bytes = { workspace = true } axum = { workspace = true } chrono = { workspace = true, features = ["now"] } console-subscriber = { workspace = true, optional = true } @@ -19,7 +21,6 @@ opentelemetry_sdk = { workspace = true } pin-project-lite = { workspace = true } prometheus = { workspace = true } prometheus-metric-storage = { workspace = true } -rabbitmq-stream-client = { workspace = true } scopeguard = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } @@ -29,6 +30,7 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } tracing-serde = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "time"] } +url = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 9c502e8726..dcc62ed9cd 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -1,152 +1,63 @@ //! Implements a simple globally available way to publish events to an event -//! bus. Under the hood it's using a RabbitMQ stream. Because the RabbitMQ crate -//! internally uses a bounded channel which makes every send an async operation -//! we additionally buffer messages in an unbounded channel to also allow -//! publishing events from synchronous contexts. -//! The architecture looks roughly like this: -//! cow service: business_logic --message--> event_bus_channel -//! event_bus_background_task: event_bus_channel --message--> rabbitmq_channel -//! rabbitmq_background_task: rabbitmq_channel --message--> rabbitmq_service - +//! bus. Under the hood it's using NATS. To support publishing events from +//! synchronous contexts we use an unbounded channel as an in-memory buffer. +//! Whenever a message gets posted to this channel a background task wakes +//! up and forwards it to the NATS service running in a different process. +//! Messages always get serialized as JSON so you can publish anything that +//! can be serialized to JSON as well. use { + async_nats::{Subject, jetstream::Context as JetstreamClient}, + bytes::Bytes, chrono::Utc, - rabbitmq_stream_client::{ - Environment, - NoDedup, - Producer, - error::StreamCreateError, - types::{ByteCapacity, Message, ResponseCode}, - }, serde::Serialize, - std::future::Future, + serde_json::json, tokio::sync::{ OnceCell, mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, }, + url::Url, }; /// Channel to buffer emitted events until we have enough to send a bunch of /// them at once. -static EVENT_QUEUE: OnceCell> = OnceCell::const_new(); - -/// Configuration options describing to which RabbitMQ instance to connect to. -pub struct ClientConfig { - host: String, - port: u16, -} - -/// Configuration options describing which RabbitMQ channel to publish messages -/// in. -pub struct ChannelConfig { - name: String, - size: ByteCapacity, - /// Number of messages to buffer in-memory before actually sending them to - /// the event bus service running in a different process. - batch_size: usize, -} +static EVENT_QUEUE: OnceCell> = OnceCell::const_new(); pub struct Config { - client: ClientConfig, - channel: ChannelConfig, + client: Url, + channel_name: String, } /// Initializes the event bus and panics if it fails. -pub async fn init(config: Config, shutdown: impl Future + Send + 'static) { +pub async fn init(config: Config) { EVENT_QUEUE .get_or_init(|| async move { - let environment = Environment::builder() - .host(&config.client.host) - .port(config.client.port) - .build() + let client = async_nats::connect(config.client.as_str()) .await - .expect("failed to connect to rabbitmq"); - - match environment - .stream_creator() - .max_length(config.channel.size) - .create(&config.channel.name) - .await - { - Ok(()) => {} - Err(StreamCreateError::Create { - status: ResponseCode::StreamAlreadyExists, - .. - }) => {} - Err(err) => panic!("failed to create channel: {err}"), - } - - let producer = environment - .producer() - .batch_size(10) - .build(&config.channel.name) - .await - .expect("failed to create producer"); + .expect("failed to connect to NATS service"); + let jetstream = async_nats::jetstream::new(client); let (sender, receiver) = unbounded_channel(); tokio::task::spawn(forward_messages_to_event_bus_client( receiver, - producer, - config.channel.batch_size, - shutdown, + jetstream, + config.channel_name.into(), )); sender }) .await; } -async fn send_batch(producer: &Producer, messages: Vec) { - if let Err(err) = producer - .batch_send(messages, |res| async move { - match res { - Ok(status) => tracing::trace!(?status, "messages confirmed"), - Err(err) => tracing::error!(?err, "failed to send messages"), - } - }) - .await - { - tracing::error!("failed to enqueue messages: {err:#?}"); - } -} - -/// Buffers incoming messages and sends a batch once the target batch size has -/// been reached. When `shutdown` resolves, flushes any buffered messages and -/// switches to publishing each subsequent message immediately. +/// Monitors a message queue and forwards all messages to the event bus +/// service. async fn forward_messages_to_event_bus_client( - mut receiver: UnboundedReceiver, - producer: Producer, - batch_size: usize, - shutdown: impl Future + Send + 'static, + mut receiver: UnboundedReceiver, + client: JetstreamClient, + channel: Subject, ) { - let mut buffer = Vec::with_capacity(batch_size); - tokio::pin!(shutdown); - - // Normal mode: buffer messages and send in batches. - loop { - tracing::debug!("poll futures"); - tokio::select! { - _ = &mut shutdown => break, - maybe_message = receiver.recv() => { - let Some(message) = maybe_message else { - tracing::debug!("event queue was closed"); - return; - }; - tracing::debug!("received new message"); - buffer.push(message); - tracing::debug!("send message"); - let mut messages = Vec::with_capacity(batch_size); - std::mem::swap(&mut messages, &mut buffer); - send_batch(&producer, messages).await; - } - } - } - - // Shutdown mode: flush the buffer, then publish each subsequent message - // immediately without waiting to fill a batch. - if !buffer.is_empty() { - send_batch(&producer, std::mem::take(&mut buffer)).await; - } while let Some(message) = receiver.recv().await { - send_batch(&producer, vec![message]).await; + if let Err(err) = client.publish(channel.clone(), message).await { + tracing::debug!(?err, "failed to publish event"); + } } } @@ -157,7 +68,12 @@ pub fn publish(name: impl Into, data: impl Serialize) { return; }; - let body = match serde_json::to_vec(&data) { + let message = json!({ + "timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + "name": name.into(), + "body": data, + }); + let body = match serde_json::to_vec(&message) { Ok(body) => body, Err(err) => { tracing::error!(?err, "failed to serialize event"); @@ -165,19 +81,9 @@ pub fn publish(name: impl Into, data: impl Serialize) { } }; - let message = Message::builder() - .properties() - .subject(name) - .creation_time(Utc::now()) - .content_type("application/json") - .message_builder() - .body(body) - .build(); - - if let Err(err) = queue.send(message) { + if let Err(err) = queue.send(body.into()) { tracing::error!(?err, "failed to enqueue message"); } - tracing::debug!("pushed even to queue"); } #[cfg(test)] @@ -192,22 +98,11 @@ mod tests { use_json_format: false, tracing: None, }); - init( - Config { - client: ClientConfig { - host: "localhost".to_string(), - port: 5552, - }, - channel: ChannelConfig { - name: "test_stream".to_string(), - size: ByteCapacity::GB(2), - batch_size: 1, - }, - }, - futures::future::pending(), - ) + init(Config { + client: "localhost:4222".parse().unwrap(), + channel_name: "test".to_string(), + }) .await; - tracing::debug!("connected to rabbit"); publish( "name", @@ -216,5 +111,6 @@ mod tests { "outAmount": 1234, }), ); + tokio::time::sleep(std::time::Duration::from_secs(2)).await; } } diff --git a/docker-compose.nats.yml b/docker-compose.nats.yml new file mode 100644 index 0000000000..b3642ea584 --- /dev/null +++ b/docker-compose.nats.yml @@ -0,0 +1,7 @@ +services: + nats: + image: nats:2.10-alpine + ports: + - "4222:4222" + - "8222:8222" + command: ["--http_port", "8222"] From 878c869aca0ef92637b5208cff63eae199fe0688 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 13:08:55 +0000 Subject: [PATCH 05/30] Define message type more precisely --- crates/observe/src/event_bus/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index dcc62ed9cd..bd69f385d2 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -68,11 +68,15 @@ pub fn publish(name: impl Into, data: impl Serialize) { return; }; - let message = json!({ + let mut message = json!({ + "version": "v1", "timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), "name": name.into(), "body": data, }); + if let Some(id) = crate::tracing::distributed::request_id::from_current_span() { + message["requestId"] = id.into(); + } let body = match serde_json::to_vec(&message) { Ok(body) => body, Err(err) => { From cddb3b5f0a4883ab9c624b0ec8c0f99704f04865 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 13:12:32 +0000 Subject: [PATCH 06/30] remove test setups again --- docker-compose.nats.yml | 7 ------- docker-compose.rabbitmq.yml | 12 ------------ 2 files changed, 19 deletions(-) delete mode 100644 docker-compose.nats.yml delete mode 100644 docker-compose.rabbitmq.yml diff --git a/docker-compose.nats.yml b/docker-compose.nats.yml deleted file mode 100644 index b3642ea584..0000000000 --- a/docker-compose.nats.yml +++ /dev/null @@ -1,7 +0,0 @@ -services: - nats: - image: nats:2.10-alpine - ports: - - "4222:4222" - - "8222:8222" - command: ["--http_port", "8222"] diff --git a/docker-compose.rabbitmq.yml b/docker-compose.rabbitmq.yml deleted file mode 100644 index 7bd9d41a62..0000000000 --- a/docker-compose.rabbitmq.yml +++ /dev/null @@ -1,12 +0,0 @@ -services: - rabbitmq: - image: rabbitmq:4-management - ports: - - "5552:5552" - - "5672:5672" - - "15672:15672" - environment: - RABBITMQ_DEFAULT_USER: guest - RABBITMQ_DEFAULT_PASS: guest - RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS: -rabbitmq_stream advertised_host localhost - command: bash -c "rabbitmq-plugins enable rabbitmq_stream --offline && rabbitmq-server" From 3a76b182ba5ee99d1e8b9d06203c8d79fc9b7ce0 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Tue, 19 May 2026 13:25:13 +0000 Subject: [PATCH 07/30] test --- crates/observe/src/event_bus/mod.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index bd69f385d2..7a73b9e4fe 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -94,7 +94,8 @@ pub fn publish(name: impl Into, data: impl Serialize) { mod tests { use {super::*, serde_json::json}; - #[tokio::test(flavor = "multi_thread")] + #[ignore] + #[tokio::test] async fn send_messages() { crate::tracing::init::initialize(&crate::Config { env_filter: "debug".to_string(), @@ -115,6 +116,6 @@ mod tests { "outAmount": 1234, }), ); - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_millis(1)).await; } } From 7d4242473c435904b1abb14e1bbdf0073f7221b9 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Wed, 20 May 2026 10:16:11 +0000 Subject: [PATCH 08/30] Get or create jetstream --- crates/observe/src/event_bus/mod.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 7a73b9e4fe..694a937443 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -35,6 +35,16 @@ pub async fn init(config: Config) { .await .expect("failed to connect to NATS service"); let jetstream = async_nats::jetstream::new(client); + let mut stream = jetstream + .get_or_create_stream(async_nats::jetstream::stream::Config { + name: config.channel_name.clone(), + max_bytes: 10_000_000, + ..Default::default() + }) + .await + .expect("could not connect to jetstream"); + let info = stream.info().await.expect("failed to fetch stream info"); + tracing::debug!(?info, "connected to jetstream"); let (sender, receiver) = unbounded_channel(); tokio::task::spawn(forward_messages_to_event_bus_client( @@ -55,8 +65,13 @@ async fn forward_messages_to_event_bus_client( channel: Subject, ) { while let Some(message) = receiver.recv().await { - if let Err(err) = client.publish(channel.clone(), message).await { - tracing::debug!(?err, "failed to publish event"); + match client.publish(channel.clone(), message).await { + Err(err) => { + tracing::debug!(?err, "failed to publish event"); + } + Ok(_fut) => { + // let's assume the message arrived for now + } } } } @@ -98,7 +113,7 @@ mod tests { #[tokio::test] async fn send_messages() { crate::tracing::init::initialize(&crate::Config { - env_filter: "debug".to_string(), + env_filter: "warn,observe=debug".to_string(), stderr_threshold: None, use_json_format: false, tracing: None, @@ -116,6 +131,6 @@ mod tests { "outAmount": 1234, }), ); - tokio::time::sleep(std::time::Duration::from_millis(1)).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } From 19abcf6312f915402699fc87568220517c61553b Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Wed, 20 May 2026 12:29:54 +0000 Subject: [PATCH 09/30] Assume the stream already exists --- crates/observe/src/event_bus/mod.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 694a937443..c8da4f36f1 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -36,11 +36,7 @@ pub async fn init(config: Config) { .expect("failed to connect to NATS service"); let jetstream = async_nats::jetstream::new(client); let mut stream = jetstream - .get_or_create_stream(async_nats::jetstream::stream::Config { - name: config.channel_name.clone(), - max_bytes: 10_000_000, - ..Default::default() - }) + .get_stream(&config.channel_name) .await .expect("could not connect to jetstream"); let info = stream.info().await.expect("failed to fetch stream info"); @@ -120,17 +116,19 @@ mod tests { }); init(Config { client: "localhost:4222".parse().unwrap(), - channel_name: "test".to_string(), + channel_name: "main".to_string(), }) .await; - publish( - "name", - json!({ - "estimator": "baseline", - "outAmount": 1234, - }), - ); + for _ in 0..1000 { + publish( + "name", + json!({ + "estimator": "baseline", + "outAmount": 1234, + }), + ); + } tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } From cffd2bf0dce080e22e509eb6df539cde95c2fbe0 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Wed, 20 May 2026 12:32:01 +0000 Subject: [PATCH 10/30] don't spam logs when feature is not enabled --- crates/observe/src/event_bus/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index c8da4f36f1..16a826f961 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -75,7 +75,6 @@ async fn forward_messages_to_event_bus_client( /// Enqueues the event to be sent to the event bus in a background task. pub fn publish(name: impl Into, data: impl Serialize) { let Some(queue) = EVENT_QUEUE.get() else { - tracing::error!("event queue not yet initialized"); return; }; From 0f2270f5b11d25788c863da44f264ca4f0724e71 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 06:48:02 +0000 Subject: [PATCH 11/30] Configure event bus in autopilot and orderbook --- crates/autopilot/src/run.rs | 9 ++++++++- crates/configs/src/shared.rs | 24 ++++++++++++++++++++++++ crates/observe/src/config.rs | 10 +++++++++- crates/observe/src/event_bus/mod.rs | 21 ++++++++------------- crates/orderbook/src/run.rs | 12 +++++++++++- 5 files changed, 60 insertions(+), 16 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 5a93ebf879..925f83be04 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -36,7 +36,7 @@ use { http_client::HttpClientFactory, model::DomainSeparator, num::ToPrimitive, - observe::metrics::LivenessChecking, + observe::{config::EventBusConfig, metrics::LivenessChecking}, price_estimation::{ config::price_estimation::BalanceOverridesConfigExt, factory::{self, PriceEstimatorFactory}, @@ -153,6 +153,13 @@ pub async fn start(args: impl Iterator) { ); observe::tracing::init::initialize(&obs_config); observe::panic_hook::install(); + if let Some(event_bus) = &config.shared.event_bus { + observe::event_bus::init(EventBusConfig { + url: event_bus.url.clone(), + channel: event_bus.channel.clone(), + }) + .await; + } #[cfg(unix)] observe::heap_dump_handler::spawn_heap_dump_handler(); diff --git a/crates/configs/src/shared.rs b/crates/configs/src/shared.rs index 237f28dc82..cd2f719fcc 100644 --- a/crates/configs/src/shared.rs +++ b/crates/configs/src/shared.rs @@ -96,6 +96,9 @@ pub struct SharedConfig { /// By default, volume fees are NOT applied to same-token trades. #[serde(default)] pub enable_sell_equals_buy_volume_fee: bool, + + /// Enables publishing events to a global events bus. + pub event_bus: Option, } impl Default for SharedConfig { @@ -112,6 +115,7 @@ impl Default for SharedConfig { contracts: Default::default(), volume_fee_bucket_overrides: Vec::new(), enable_sell_equals_buy_volume_fee: false, + event_bus: None, } } } @@ -311,6 +315,17 @@ where serializer.serialize_str(level.as_str()) } +/// OpenTelemetry tracing configuration. +#[derive(Debug, Deserialize)] +#[serde(rename_all = "kebab-case")] +#[derive(serde::Serialize)] +pub struct EventBusConfig { + /// Url of the event bus service. + pub url: Url, + /// Name of the channel to post events to. + pub channel: String, +} + /// Gas price estimation strategy. #[derive(Debug, Clone, Deserialize)] #[serde(tag = "type")] @@ -403,6 +418,10 @@ mod tests { "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", "0x6B175474E89094C44Da98b954EedeAC495271d0F", ] + + [event-bus] + url = "localhost:4222" + channel = "main" "#; let config: SharedConfig = toml::from_str(toml).unwrap(); @@ -430,5 +449,10 @@ mod tests { assert!(config.enable_sell_equals_buy_volume_fee); assert_eq!(config.volume_fee_bucket_overrides.len(), 1); assert_eq!(config.volume_fee_bucket_overrides[0].tokens.len(), 2); + assert_eq!( + config.event_bus.as_ref().unwrap().url, + "localhost:4222".parse().unwrap() + ); + assert_eq!(config.event_bus.as_ref().unwrap().channel, "main"); } } diff --git a/crates/observe/src/config.rs b/crates/observe/src/config.rs index cc061390e0..f17a8eb9d0 100644 --- a/crates/observe/src/config.rs +++ b/crates/observe/src/config.rs @@ -1,4 +1,4 @@ -use {core::time::Duration, tracing::Level}; +use {core::time::Duration, tracing::Level, url::Url}; #[derive(Debug, Clone)] pub struct Config { @@ -88,3 +88,11 @@ impl TracingConfig { } } } + +/// Configures a backend wide event bus events can be posted to. +pub struct EventBusConfig { + /// Url of the event bus service + pub url: Url, + /// Name of the channel to post events to + pub channel: String, +} diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 16a826f961..e875d953f6 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -6,6 +6,7 @@ //! Messages always get serialized as JSON so you can publish anything that //! can be serialized to JSON as well. use { + crate::config::EventBusConfig, async_nats::{Subject, jetstream::Context as JetstreamClient}, bytes::Bytes, chrono::Utc, @@ -15,28 +16,22 @@ use { OnceCell, mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, }, - url::Url, }; /// Channel to buffer emitted events until we have enough to send a bunch of /// them at once. static EVENT_QUEUE: OnceCell> = OnceCell::const_new(); -pub struct Config { - client: Url, - channel_name: String, -} - /// Initializes the event bus and panics if it fails. -pub async fn init(config: Config) { +pub async fn init(config: EventBusConfig) { EVENT_QUEUE .get_or_init(|| async move { - let client = async_nats::connect(config.client.as_str()) + let client = async_nats::connect(config.url.as_str()) .await .expect("failed to connect to NATS service"); let jetstream = async_nats::jetstream::new(client); let mut stream = jetstream - .get_stream(&config.channel_name) + .get_stream(&config.channel) .await .expect("could not connect to jetstream"); let info = stream.info().await.expect("failed to fetch stream info"); @@ -46,7 +41,7 @@ pub async fn init(config: Config) { tokio::task::spawn(forward_messages_to_event_bus_client( receiver, jetstream, - config.channel_name.into(), + config.channel.into(), )); sender }) @@ -113,9 +108,9 @@ mod tests { use_json_format: false, tracing: None, }); - init(Config { - client: "localhost:4222".parse().unwrap(), - channel_name: "main".to_string(), + init(EventBusConfig { + url: "localhost:4222".parse().unwrap(), + channel: "main".to_string(), }) .await; diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index 0d014ff38c..46b9a5bcd7 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -29,7 +29,10 @@ use { http_client::HttpClientFactory, model::DomainSeparator, num::ToPrimitive, - observe::metrics::{DEFAULT_METRICS_PORT, serve_metrics}, + observe::{ + config::EventBusConfig, + metrics::{DEFAULT_METRICS_PORT, serve_metrics}, + }, order_validation, price_estimation::{ PriceEstimating, @@ -74,6 +77,13 @@ pub async fn start(args: impl Iterator) { tracing::info!("running order book with validated arguments:\n{}", args); observe::panic_hook::install(); observe::metrics::setup_registry(Some("gp_v2_api".into()), None); + if let Some(event_bus) = &config.shared.event_bus { + observe::event_bus::init(EventBusConfig { + url: event_bus.url.clone(), + channel: event_bus.channel.clone(), + }) + .await; + } #[cfg(unix)] observe::heap_dump_handler::spawn_heap_dump_handler(); tracing::info!("file configuration:\n{:#?}", config); From a27c820cb088c705f09b7dc596d457658aa64a78 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 07:14:40 +0000 Subject: [PATCH 12/30] emit quote event --- .../price-estimation/src/competition/quote.rs | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index a558494d20..a406c9cdb9 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -12,6 +12,7 @@ use { anyhow::Context, futures::future::{BoxFuture, FutureExt, TryFutureExt}, model::order::OrderKind, + serde_json::json, std::{cmp::Ordering, sync::Arc, time::Duration}, tracing::instrument, }; @@ -37,7 +38,12 @@ impl PriceEstimating for CompetitionEstimator> { }; let get_results = self .produce_results(query.clone(), is_reasonable, |context| { - context.estimator.estimate(context.query) + async move { + let res = context.estimator.estimate(context.query.clone()).await; + emit_quote_event(&context.query, &res); + res + } + .boxed() }) .map(Result::Ok); @@ -156,6 +162,34 @@ impl RankingContext { } } +fn emit_quote_event(query: &Query, result: &PriceEstimateResult) { + observe::event_bus::publish( + "priceEstimate", + json!({ + "query": { + "sellToken": query.sell_token.to_string(), + "buyToken": query.sell_token.to_string(), + "inAmount": query.in_amount.to_string(), + "from": query.verification.from, + "timeout": query.timeout.as_millis(), + "kind": match query.kind { + OrderKind::Sell => "sell", + OrderKind::Buy => "buy", + } + }, + "result": match result { + Ok(estimate) => json!({ + "outAmount": estimate.out_amount.to_string(), + "gas": estimate.gas.to_string(), + "solver": estimate.solver.to_string(), + "verified": estimate.verified, + }), + Err(err) => json!({"error": err.to_string()}), + } + }), + ); +} + #[cfg(test)] mod tests { use { From 830e78ce5d5db40f7b5d0f00efef289a4e2d8e23 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 07:38:35 +0000 Subject: [PATCH 13/30] Disambiguate messages with chain_id --- crates/autopilot/src/run.rs | 4 ++ crates/observe/src/config.rs | 2 + crates/observe/src/event_bus/mod.rs | 65 +++++++++++++++++------------ crates/orderbook/src/run.rs | 4 ++ 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 925f83be04..da8f122896 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -157,6 +157,10 @@ pub async fn start(args: impl Iterator) { observe::event_bus::init(EventBusConfig { url: event_bus.url.clone(), channel: event_bus.channel.clone(), + chain_id: config + .shared + .chain_id + .expect("when the event bus is configured 'chain-id' is required"), }) .await; } diff --git a/crates/observe/src/config.rs b/crates/observe/src/config.rs index f17a8eb9d0..2e3fe3caed 100644 --- a/crates/observe/src/config.rs +++ b/crates/observe/src/config.rs @@ -95,4 +95,6 @@ pub struct EventBusConfig { pub url: Url, /// Name of the channel to post events to pub channel: String, + /// Which chain this service operates on + pub chain_id: u64, } diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index e875d953f6..0ca833e444 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -18,34 +18,45 @@ use { }, }; -/// Channel to buffer emitted events until we have enough to send a bunch of -/// them at once. -static EVENT_QUEUE: OnceCell> = OnceCell::const_new(); +struct EventBusConnector { + /// Unbounded channel to allow emitting events from synchrounous + /// contexts. + message_queue: UnboundedSender, + /// Chain id to disambiguate messages in globally shared event bus + /// service. + chain_id: u64, +} + +/// Singleton event bus connection to allow publishing events +/// conventiently from everywhere. +static BUS: OnceCell = OnceCell::const_new(); /// Initializes the event bus and panics if it fails. pub async fn init(config: EventBusConfig) { - EVENT_QUEUE - .get_or_init(|| async move { - let client = async_nats::connect(config.url.as_str()) - .await - .expect("failed to connect to NATS service"); - let jetstream = async_nats::jetstream::new(client); - let mut stream = jetstream - .get_stream(&config.channel) - .await - .expect("could not connect to jetstream"); - let info = stream.info().await.expect("failed to fetch stream info"); - tracing::debug!(?info, "connected to jetstream"); + BUS.get_or_init(|| async move { + let client = async_nats::connect(config.url.as_str()) + .await + .expect("failed to connect to NATS service"); + let jetstream = async_nats::jetstream::new(client); + let mut stream = jetstream + .get_stream(&config.channel) + .await + .expect("could not connect to jetstream"); + let info = stream.info().await.expect("failed to fetch stream info"); + tracing::debug!(?info, "connected to jetstream"); - let (sender, receiver) = unbounded_channel(); - tokio::task::spawn(forward_messages_to_event_bus_client( - receiver, - jetstream, - config.channel.into(), - )); - sender - }) - .await; + let (sender, receiver) = unbounded_channel(); + tokio::task::spawn(forward_messages_to_event_bus_client( + receiver, + jetstream, + config.channel.into(), + )); + EventBusConnector { + message_queue: sender, + chain_id: config.chain_id, + } + }) + .await; } /// Monitors a message queue and forwards all messages to the event bus @@ -69,12 +80,13 @@ async fn forward_messages_to_event_bus_client( /// Enqueues the event to be sent to the event bus in a background task. pub fn publish(name: impl Into, data: impl Serialize) { - let Some(queue) = EVENT_QUEUE.get() else { + let Some(bus) = BUS.get() else { return; }; let mut message = json!({ "version": "v1", + "chainId": bus.chain_id, "timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), "name": name.into(), "body": data, @@ -90,7 +102,7 @@ pub fn publish(name: impl Into, data: impl Serialize) { } }; - if let Err(err) = queue.send(body.into()) { + if let Err(err) = bus.message_queue.send(body.into()) { tracing::error!(?err, "failed to enqueue message"); } } @@ -111,6 +123,7 @@ mod tests { init(EventBusConfig { url: "localhost:4222".parse().unwrap(), channel: "main".to_string(), + chain_id: 1, }) .await; diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index 46b9a5bcd7..3ba977835e 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -81,6 +81,10 @@ pub async fn start(args: impl Iterator) { observe::event_bus::init(EventBusConfig { url: event_bus.url.clone(), channel: event_bus.channel.clone(), + chain_id: config + .shared + .chain_id + .expect("when the event bus is configured 'chain-id' is required"), }) .await; } From 6c09378d3e149b87b69e3fea827fdba620b56b00 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 07:57:36 +0000 Subject: [PATCH 14/30] Add more data to emitted event --- crates/observe/src/event_bus/mod.rs | 4 +-- .../price-estimation/src/competition/mod.rs | 5 ++- .../price-estimation/src/competition/quote.rs | 33 ++++++++++++------- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 0ca833e444..258bc9394c 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -79,7 +79,7 @@ async fn forward_messages_to_event_bus_client( } /// Enqueues the event to be sent to the event bus in a background task. -pub fn publish(name: impl Into, data: impl Serialize) { +pub fn publish(event_name: impl Into, data: impl Serialize) { let Some(bus) = BUS.get() else { return; }; @@ -88,7 +88,7 @@ pub fn publish(name: impl Into, data: impl Serialize) { "version": "v1", "chainId": bus.chain_id, "timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), - "name": name.into(), + "event": event_name.into(), "body": data, }); if let Some(id) = crate::tracing::distributed::request_id::from_current_span() { diff --git a/crates/price-estimation/src/competition/mod.rs b/crates/price-estimation/src/competition/mod.rs index 80a9119a5b..b2b1e1cb9e 100644 --- a/crates/price-estimation/src/competition/mod.rs +++ b/crates/price-estimation/src/competition/mod.rs @@ -108,9 +108,10 @@ impl CompetitionEstimator { while stage_index < self.stages.len() && requests.len() < requests_for_batch { let stage = &self.stages.get(stage_index).expect("index checked by loop"); - let futures = stage.iter().enumerate().map(|(index, (_name, estimator))| { + let futures = stage.iter().enumerate().map(|(index, (name, estimator))| { get_single_result(Context { estimator, + name, query: query.clone(), remaining_stages: Arc::clone(&remaining_stages), }) @@ -175,6 +176,8 @@ struct Context<'a, ESTIMATOR, QUERY> { /// the number of stages that are left after the queries /// produced by this Context's stages. remaining_stages: Arc>, + /// Name of the estimator + name: &'a str, } impl<'a, E, Q> Context<'a, E, Q> { diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index a406c9cdb9..57d16f4975 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -7,13 +7,18 @@ use { PriceEstimationError, Query, QuoteVerificationMode, + competition::Context, }, alloy::primitives::{Address, U256}, - anyhow::Context, + anyhow::Context as _, futures::future::{BoxFuture, FutureExt, TryFutureExt}, model::order::OrderKind, serde_json::json, - std::{cmp::Ordering, sync::Arc, time::Duration}, + std::{ + cmp::Ordering, + sync::Arc, + time::{Duration, Instant}, + }, tracing::instrument, }; @@ -39,8 +44,9 @@ impl PriceEstimating for CompetitionEstimator> { let get_results = self .produce_results(query.clone(), is_reasonable, |context| { async move { + let start = Instant::now(); let res = context.estimator.estimate(context.query.clone()).await; - emit_quote_event(&context.query, &res); + emit_quote_event(&context, &res, start.elapsed()); res } .boxed() @@ -162,26 +168,31 @@ impl RankingContext { } } -fn emit_quote_event(query: &Query, result: &PriceEstimateResult) { +fn emit_quote_event( + context: &Context, Arc>, + result: &PriceEstimateResult, + elapsed: Duration, +) { observe::event_bus::publish( "priceEstimate", json!({ "query": { - "sellToken": query.sell_token.to_string(), - "buyToken": query.sell_token.to_string(), - "inAmount": query.in_amount.to_string(), - "from": query.verification.from, - "timeout": query.timeout.as_millis(), - "kind": match query.kind { + "sellToken": context.query.sell_token.to_string(), + "buyToken": context.query.sell_token.to_string(), + "inAmount": context.query.in_amount.to_string(), + "kind": match context.query.kind { OrderKind::Sell => "sell", OrderKind::Buy => "buy", } }, + "from": context.query.verification.from, + "timeout": context.query.timeout.as_millis(), + "elapsed": elapsed.as_millis().to_string(), + "estimator": context.name.to_string(), "result": match result { Ok(estimate) => json!({ "outAmount": estimate.out_amount.to_string(), "gas": estimate.gas.to_string(), - "solver": estimate.solver.to_string(), "verified": estimate.verified, }), Err(err) => json!({"error": err.to_string()}), From 51b89dae44c8a080456bfaa8d14cf0893b64d40c Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 10:20:55 +0000 Subject: [PATCH 15/30] Better disambiguation --- crates/observe/src/event_bus/mod.rs | 39 ++++++++++++++++------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 258bc9394c..cf026bbb37 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -7,7 +7,7 @@ //! can be serialized to JSON as well. use { crate::config::EventBusConfig, - async_nats::{Subject, jetstream::Context as JetstreamClient}, + async_nats::jetstream::Context as JetstreamClient, bytes::Bytes, chrono::Utc, serde::Serialize, @@ -21,10 +21,15 @@ use { struct EventBusConnector { /// Unbounded channel to allow emitting events from synchrounous /// contexts. - message_queue: UnboundedSender, - /// Chain id to disambiguate messages in globally shared event bus + message_queue: UnboundedSender, + /// Subject prefix to disambiguate messages in globally shared event bus /// service. - chain_id: u64, + subject_prefix: String, +} + +struct Message { + subject: String, + data: Bytes, } /// Singleton event bus connection to allow publishing events @@ -46,14 +51,12 @@ pub async fn init(config: EventBusConfig) { tracing::debug!(?info, "connected to jetstream"); let (sender, receiver) = unbounded_channel(); - tokio::task::spawn(forward_messages_to_event_bus_client( - receiver, - jetstream, - config.channel.into(), - )); + tokio::task::spawn(forward_messages_to_event_bus_client(receiver, jetstream)); EventBusConnector { message_queue: sender, - chain_id: config.chain_id, + // we prefix every subject with `event` to allow consumers to easily + // subscribe to all events without also seeing NATS internal events + subject_prefix: format!("event.{}.", config.chain_id), } }) .await; @@ -62,12 +65,11 @@ pub async fn init(config: EventBusConfig) { /// Monitors a message queue and forwards all messages to the event bus /// service. async fn forward_messages_to_event_bus_client( - mut receiver: UnboundedReceiver, + mut receiver: UnboundedReceiver, client: JetstreamClient, - channel: Subject, ) { while let Some(message) = receiver.recv().await { - match client.publish(channel.clone(), message).await { + match client.publish(message.subject, message.data).await { Err(err) => { tracing::debug!(?err, "failed to publish event"); } @@ -79,16 +81,14 @@ async fn forward_messages_to_event_bus_client( } /// Enqueues the event to be sent to the event bus in a background task. -pub fn publish(event_name: impl Into, data: impl Serialize) { +pub fn publish(subject: &str, data: impl Serialize) { let Some(bus) = BUS.get() else { return; }; let mut message = json!({ "version": "v1", - "chainId": bus.chain_id, "timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), - "event": event_name.into(), "body": data, }); if let Some(id) = crate::tracing::distributed::request_id::from_current_span() { @@ -102,7 +102,12 @@ pub fn publish(event_name: impl Into, data: impl Serialize) { } }; - if let Err(err) = bus.message_queue.send(body.into()) { + let message = Message { + subject: format!("{}{}", bus.subject_prefix, subject), + data: body.into(), + }; + + if let Err(err) = bus.message_queue.send(message) { tracing::error!(?err, "failed to enqueue message"); } } From 9271d4fd472146317aa81043dc9d4d1502689717 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 10:49:05 +0000 Subject: [PATCH 16/30] fix copy-paste error --- crates/price-estimation/src/competition/quote.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index 57d16f4975..d6a6159975 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -178,7 +178,7 @@ fn emit_quote_event( json!({ "query": { "sellToken": context.query.sell_token.to_string(), - "buyToken": context.query.sell_token.to_string(), + "buyToken": context.query.buy_token.to_string(), "inAmount": context.query.in_amount.to_string(), "kind": match context.query.kind { OrderKind::Sell => "sell", From 17de493be2f41b68a072a037fe18b292ba36b522 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 10:53:15 +0000 Subject: [PATCH 17/30] Switch to bounded channel --- crates/observe/src/event_bus/mod.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index cf026bbb37..ece791bda8 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -1,6 +1,6 @@ //! Implements a simple globally available way to publish events to an event //! bus. Under the hood it's using NATS. To support publishing events from -//! synchronous contexts we use an unbounded channel as an in-memory buffer. +//! synchronous contexts we use a channel as an in-memory buffer. //! Whenever a message gets posted to this channel a background task wakes //! up and forwards it to the NATS service running in a different process. //! Messages always get serialized as JSON so you can publish anything that @@ -14,14 +14,14 @@ use { serde_json::json, tokio::sync::{ OnceCell, - mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + mpsc::{Receiver, Sender, channel}, }, }; struct EventBusConnector { - /// Unbounded channel to allow emitting events from synchrounous - /// contexts. - message_queue: UnboundedSender, + /// Channel to decouple issuing events from actually sending them to the + /// event bus service. + message_queue: Sender, /// Subject prefix to disambiguate messages in globally shared event bus /// service. subject_prefix: String, @@ -50,7 +50,8 @@ pub async fn init(config: EventBusConfig) { let info = stream.info().await.expect("failed to fetch stream info"); tracing::debug!(?info, "connected to jetstream"); - let (sender, receiver) = unbounded_channel(); + const EVENT_BUS_SIZE: usize = 1_000; + let (sender, receiver) = channel(EVENT_BUS_SIZE); tokio::task::spawn(forward_messages_to_event_bus_client(receiver, jetstream)); EventBusConnector { message_queue: sender, @@ -65,7 +66,7 @@ pub async fn init(config: EventBusConfig) { /// Monitors a message queue and forwards all messages to the event bus /// service. async fn forward_messages_to_event_bus_client( - mut receiver: UnboundedReceiver, + mut receiver: Receiver, client: JetstreamClient, ) { while let Some(message) = receiver.recv().await { @@ -107,7 +108,7 @@ pub fn publish(subject: &str, data: impl Serialize) { data: body.into(), }; - if let Err(err) = bus.message_queue.send(message) { + if let Err(err) = bus.message_queue.try_send(message) { tracing::error!(?err, "failed to enqueue message"); } } From 2b9e78e88067f0e46430ef487ef57ca98c09c198 Mon Sep 17 00:00:00 2001 From: MartinquaXD Date: Thu, 21 May 2026 10:59:26 +0000 Subject: [PATCH 18/30] fmt dependencies --- crates/observe/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index a94ab66785..23e8574155 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -8,8 +8,8 @@ license = "MIT OR Apache-2.0" [dependencies] async-nats = { workspace = true } async-trait = { workspace = true } -bytes = { workspace = true } axum = { workspace = true } +bytes = { workspace = true } chrono = { workspace = true, features = ["now"] } console-subscriber = { workspace = true, optional = true } futures = { workspace = true } From 68d7a1ac5e5b05a62ae4ab06ee2d48ab19e3e360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 14:22:57 +0100 Subject: [PATCH 19/30] Make event bus init non-fatal Previously, a misconfigured or unreachable NATS would crash the orderbook and autopilot at startup via three separate .expect() calls. The event bus is observational; failures should disable it, not take the binary down. Init now logs and proceeds; publish() already no-ops when BUS is uninitialized. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/observe/src/event_bus/mod.rs | 56 +++++++++++++++++------------ 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index ece791bda8..3fd11d6b95 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -36,31 +36,43 @@ struct Message { /// conventiently from everywhere. static BUS: OnceCell = OnceCell::const_new(); -/// Initializes the event bus and panics if it fails. +/// Initializes the event bus. Connection failures are logged but do not +/// abort startup: the event bus is purely observational, so a misconfigured +/// or unreachable NATS must not take the binary down. When init fails the +/// global `BUS` stays uninitialized and [`publish`] becomes a no-op. pub async fn init(config: EventBusConfig) { - BUS.get_or_init(|| async move { - let client = async_nats::connect(config.url.as_str()) - .await - .expect("failed to connect to NATS service"); - let jetstream = async_nats::jetstream::new(client); - let mut stream = jetstream - .get_stream(&config.channel) - .await - .expect("could not connect to jetstream"); - let info = stream.info().await.expect("failed to fetch stream info"); - tracing::debug!(?info, "connected to jetstream"); + let mut initialized = false; + BUS.get_or_try_init(|| async { + let connector = connect(&config).await?; + initialized = true; + Ok::<_, async_nats::Error>(connector) + }) + .await + .inspect_err(|err| { + tracing::error!(?err, url = %config.url, channel = %config.channel, "failed to initialize event bus; events will be dropped"); + }) + .ok(); + if initialized { + tracing::info!(channel = %config.channel, chain_id = config.chain_id, "event bus connected"); + } +} - const EVENT_BUS_SIZE: usize = 1_000; - let (sender, receiver) = channel(EVENT_BUS_SIZE); - tokio::task::spawn(forward_messages_to_event_bus_client(receiver, jetstream)); - EventBusConnector { - message_queue: sender, - // we prefix every subject with `event` to allow consumers to easily - // subscribe to all events without also seeing NATS internal events - subject_prefix: format!("event.{}.", config.chain_id), - } +async fn connect(config: &EventBusConfig) -> Result { + let client = async_nats::connect(config.url.as_str()).await?; + let jetstream = async_nats::jetstream::new(client); + // Make sure the stream exists up-front; otherwise every publish would fail + // server-side and we'd only find out at runtime. + jetstream.get_stream(&config.channel).await?; + + const EVENT_BUS_SIZE: usize = 1_000; + let (sender, receiver) = channel(EVENT_BUS_SIZE); + tokio::task::spawn(forward_messages_to_event_bus_client(receiver, jetstream)); + Ok(EventBusConnector { + message_queue: sender, + // we prefix every subject with `event` to allow consumers to easily + // subscribe to all events without also seeing NATS internal events + subject_prefix: format!("event.{}.", config.chain_id), }) - .await; } /// Monitors a message queue and forwards all messages to the event bus From 990c58c458b83dabd250389283c80d68059f1705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 14:24:41 +0100 Subject: [PATCH 20/30] Observe JetStream publish acks The previous forwarder ignored the returned PublishAckFuture, so any server-side rejection (subject mismatch, stream limits, ...) was silently dropped. Pipeline acks via FuturesUnordered so failures are logged without adding a per-message round-trip to publish throughput. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/observe/src/event_bus/mod.rs | 34 ++++++++++++++++++++++++----- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 3fd11d6b95..dcd18ff377 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -10,6 +10,7 @@ use { async_nats::jetstream::Context as JetstreamClient, bytes::Bytes, chrono::Utc, + futures::stream::{FuturesUnordered, StreamExt}, serde::Serialize, serde_json::json, tokio::sync::{ @@ -81,13 +82,34 @@ async fn forward_messages_to_event_bus_client( mut receiver: Receiver, client: JetstreamClient, ) { - while let Some(message) = receiver.recv().await { - match client.publish(message.subject, message.data).await { - Err(err) => { - tracing::debug!(?err, "failed to publish event"); + // JetStream publish completes in two stages: the inner future returns + // once the client has buffered the publish, the outer ack future resolves + // once the server has accepted and stored it. We need the second stage to + // observe server-side rejections (subject mismatch, stream limits, ...), + // but awaiting it inline would add a full round-trip to every publish. + // Instead we drive pending acks concurrently and only log failures. + let mut pending_acks = FuturesUnordered::new(); + loop { + tokio::select! { + biased; + // Drain pending acks alongside new messages so failures are + // logged promptly and the set doesn't grow without bound. + Some((subject, ack)) = pending_acks.next(), if !pending_acks.is_empty() => { + if let Err(err) = ack { + tracing::debug!(?err, %subject, "NATS did not acknowledge event"); + } } - Ok(_fut) => { - // let's assume the message arrived for now + maybe_message = receiver.recv() => { + let Some(message) = maybe_message else { break }; + let subject = message.subject; + let ack_fut = match client.publish(subject.clone(), message.data).await { + Ok(ack) => ack, + Err(err) => { + tracing::debug!(?err, %subject, "failed to enqueue event with NATS client"); + continue; + } + }; + pending_acks.push(async move { (subject, ack_fut.await) }); } } } From 672ad514bf4c399c78333c4b374008a3a6903d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 14:35:45 +0100 Subject: [PATCH 21/30] Count dropped event bus messages Each failure mode (channel full, JSON serialization, NATS client publish error, JetStream ack failure) was only visible via a log line, which is hard to alert on. Expose a Prometheus counter keyed by a DropReason enum so oncall can notice degradation without grepping logs. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/observe/src/event_bus/mod.rs | 49 +++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index dcd18ff377..b3c260d1be 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -97,6 +97,7 @@ async fn forward_messages_to_event_bus_client( Some((subject, ack)) = pending_acks.next(), if !pending_acks.is_empty() => { if let Err(err) = ack { tracing::debug!(?err, %subject, "NATS did not acknowledge event"); + record_dropped(DropReason::Ack); } } maybe_message = receiver.recv() => { @@ -106,6 +107,7 @@ async fn forward_messages_to_event_bus_client( Ok(ack) => ack, Err(err) => { tracing::debug!(?err, %subject, "failed to enqueue event with NATS client"); + record_dropped(DropReason::Publish); continue; } }; @@ -133,6 +135,7 @@ pub fn publish(subject: &str, data: impl Serialize) { Ok(body) => body, Err(err) => { tracing::error!(?err, "failed to serialize event"); + record_dropped(DropReason::Serialize); return; } }; @@ -144,9 +147,55 @@ pub fn publish(subject: &str, data: impl Serialize) { if let Err(err) = bus.message_queue.try_send(message) { tracing::error!(?err, "failed to enqueue message"); + record_dropped(DropReason::ChannelFull); } } +/// Why an event was not delivered to the event bus. Used as a Prometheus +/// label so the failure modes can be alerted on independently. +#[derive(Copy, Clone, Debug)] +enum DropReason { + /// In-memory queue between [`publish`] and the background forwarder was + /// saturated. + ChannelFull, + /// The payload could not be encoded as JSON. + Serialize, + /// The NATS client rejected the publish locally (e.g. disconnected). + Publish, + /// JetStream did not acknowledge the publish. + Ack, +} + +impl DropReason { + fn as_label(self) -> &'static str { + match self { + DropReason::ChannelFull => "channel_full", + DropReason::Serialize => "serialize", + DropReason::Publish => "publish", + DropReason::Ack => "ack", + } + } +} + +#[derive(prometheus_metric_storage::MetricStorage)] +#[metric(subsystem = "event_bus")] +struct Metrics { + /// Events that were not delivered to the event bus, by failure mode. + /// See [`DropReason`] for the meaning of each label value. + #[metric(labels("reason"))] + dropped_events: prometheus::IntCounterVec, +} + +fn record_dropped(reason: DropReason) { + let Ok(metrics) = Metrics::instance(crate::metrics::get_storage_registry()) else { + return; + }; + metrics + .dropped_events + .with_label_values(&[reason.as_label()]) + .inc(); +} + #[cfg(test)] mod tests { use {super::*, serde_json::json}; From cc990e9371bc1c943629090878a63f48ffdbb49f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 14:43:47 +0100 Subject: [PATCH 22/30] Validate event bus requires chain-id at config load Previously, both autopilot and orderbook had an inline .expect(...) that panicked at startup if the event bus was configured without chain-id. Move the cross-field invariant onto SharedConfig::validate and call it from each binary's config validation, so the misconfiguration is caught once with a clean error message. The runtime use sites can now safely .unwrap(). Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/autopilot/src/run.rs | 7 +++-- crates/configs/src/autopilot/mod.rs | 1 + crates/configs/src/orderbook/mod.rs | 5 ++++ crates/configs/src/shared.rs | 40 +++++++++++++++++++++++++++++ crates/observe/src/config.rs | 4 ++- crates/orderbook/src/run.rs | 11 ++++---- 6 files changed, 58 insertions(+), 10 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index da8f122896..1be1b5b820 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -157,10 +157,9 @@ pub async fn start(args: impl Iterator) { observe::event_bus::init(EventBusConfig { url: event_bus.url.clone(), channel: event_bus.channel.clone(), - chain_id: config - .shared - .chain_id - .expect("when the event bus is configured 'chain-id' is required"), + // Presence of `chain-id` alongside `event_bus` is enforced by + // `SharedConfig::validate` at startup. + chain_id: config.shared.chain_id.unwrap(), }) .await; } diff --git a/crates/configs/src/autopilot/mod.rs b/crates/configs/src/autopilot/mod.rs index 4e4bf43c81..8dd4492e1d 100644 --- a/crates/configs/src/autopilot/mod.rs +++ b/crates/configs/src/autopilot/mod.rs @@ -190,6 +190,7 @@ impl Configuration { !self.drivers.is_empty(), "colocation is enabled but no drivers are configured" ); + self.shared.validate()?; Ok(self) } } diff --git a/crates/configs/src/orderbook/mod.rs b/crates/configs/src/orderbook/mod.rs index 4b8e21b985..3f063ebfd6 100644 --- a/crates/configs/src/orderbook/mod.rs +++ b/crates/configs/src/orderbook/mod.rs @@ -143,6 +143,11 @@ impl Configuration { )), } } + + pub fn validate(self) -> anyhow::Result { + self.shared.validate()?; + Ok(self) + } } #[cfg(any(test, feature = "test-util"))] diff --git a/crates/configs/src/shared.rs b/crates/configs/src/shared.rs index cd2f719fcc..1c548f6e08 100644 --- a/crates/configs/src/shared.rs +++ b/crates/configs/src/shared.rs @@ -1,6 +1,7 @@ use { crate::fee_factor::FeeFactor, alloy::primitives::Address, + anyhow::ensure, serde::{Deserialize, Deserializer, de::Unexpected}, std::{collections::HashSet, str::FromStr, time::Duration}, tracing::Level, @@ -101,6 +102,17 @@ pub struct SharedConfig { pub event_bus: Option, } +impl SharedConfig { + /// Cross-field invariants that cannot be expressed in the serde schema. + pub fn validate(&self) -> anyhow::Result<()> { + ensure!( + self.event_bus.is_none() || self.chain_id.is_some(), + "`chain-id` is required when the event bus is configured", + ); + Ok(()) + } +} + impl Default for SharedConfig { fn default() -> Self { Self { @@ -455,4 +467,32 @@ mod tests { ); assert_eq!(config.event_bus.as_ref().unwrap().channel, "main"); } + + #[test] + fn validate_event_bus_requires_chain_id() { + let with_event_bus_only = toml::from_str::( + r#" + [event-bus] + url = "localhost:4222" + channel = "main" + "#, + ) + .unwrap(); + assert!(with_event_bus_only.validate().is_err()); + + let with_both = toml::from_str::( + r#" + chain-id = 1 + + [event-bus] + url = "localhost:4222" + channel = "main" + "#, + ) + .unwrap(); + with_both.validate().unwrap(); + + // Default config has neither set; validation should pass. + SharedConfig::default().validate().unwrap(); + } } diff --git a/crates/observe/src/config.rs b/crates/observe/src/config.rs index 2e3fe3caed..ae6cea0e6a 100644 --- a/crates/observe/src/config.rs +++ b/crates/observe/src/config.rs @@ -95,6 +95,8 @@ pub struct EventBusConfig { pub url: Url, /// Name of the channel to post events to pub channel: String, - /// Which chain this service operates on + /// Which chain this service operates on. The service-level `chain-id` + /// must be set when the event bus is configured; this is checked at + /// config validation time, so callers can pass it through directly. pub chain_id: u64, } diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index a540d6ea2a..2f6aed72f2 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -53,7 +53,9 @@ pub async fn start(args: impl Iterator) { let args = Arguments::parse_from(args); let config = Configuration::from_path(&args.config) .await - .expect("failed to load configuration file"); + .expect("failed to load configuration file") + .validate() + .expect("failed to validate configuration file"); let tracing_config = config .shared .tracing @@ -81,10 +83,9 @@ pub async fn start(args: impl Iterator) { observe::event_bus::init(EventBusConfig { url: event_bus.url.clone(), channel: event_bus.channel.clone(), - chain_id: config - .shared - .chain_id - .expect("when the event bus is configured 'chain-id' is required"), + // Presence of `chain-id` alongside `event_bus` is enforced by + // `SharedConfig::validate` at startup. + chain_id: config.shared.chain_id.unwrap(), }) .await; } From 0dc756bbd4c519c3e77050780ee813488f85f4f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 14:46:11 +0100 Subject: [PATCH 23/30] Track event bus init success in a static flag A previous local `mut initialized` only existed for the duration of a single init() call, so its log message could fire on every successful re-init attempt and a failed init followed by a re-init still re-ran connect(). Promote it to a static AtomicBool so calling init() multiple times is a cheap no-op once any prior call succeeded, while still allowing retries after a failure. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/observe/src/event_bus/mod.rs | 45 ++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index b3c260d1be..a8c56e49ae 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -13,6 +13,7 @@ use { futures::stream::{FuturesUnordered, StreamExt}, serde::Serialize, serde_json::json, + std::sync::atomic::{AtomicBool, Ordering}, tokio::sync::{ OnceCell, mpsc::{Receiver, Sender, channel}, @@ -36,25 +37,43 @@ struct Message { /// Singleton event bus connection to allow publishing events /// conventiently from everywhere. static BUS: OnceCell = OnceCell::const_new(); +/// Latches once a call to [`init`] has successfully connected. Lets repeat +/// calls return immediately without re-running `get_or_try_init` (which +/// would otherwise retry `connect` whenever a previous attempt failed). +static INITIALIZED: AtomicBool = AtomicBool::new(false); /// Initializes the event bus. Connection failures are logged but do not /// abort startup: the event bus is purely observational, so a misconfigured /// or unreachable NATS must not take the binary down. When init fails the /// global `BUS` stays uninitialized and [`publish`] becomes a no-op. +/// +/// Safe to call multiple times: once a previous call has succeeded the +/// subsequent ones short-circuit. A failed call leaves the bus uninitialized +/// so the next call gets another chance. pub async fn init(config: EventBusConfig) { - let mut initialized = false; - BUS.get_or_try_init(|| async { - let connector = connect(&config).await?; - initialized = true; - Ok::<_, async_nats::Error>(connector) - }) - .await - .inspect_err(|err| { - tracing::error!(?err, url = %config.url, channel = %config.channel, "failed to initialize event bus; events will be dropped"); - }) - .ok(); - if initialized { - tracing::info!(channel = %config.channel, chain_id = config.chain_id, "event bus connected"); + if INITIALIZED.load(Ordering::Acquire) { + return; + } + let result = BUS + .get_or_try_init(|| async { connect(&config).await }) + .await; + match result { + Ok(_) => { + INITIALIZED.store(true, Ordering::Release); + tracing::info!( + channel = %config.channel, + chain_id = config.chain_id, + "event bus connected", + ); + } + Err(err) => { + tracing::error!( + ?err, + url = %config.url, + channel = %config.channel, + "failed to initialize event bus; events will be dropped", + ); + } } } From afa8a595d520b9d94e4359052ae3dcb14c12ee39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 14:59:23 +0100 Subject: [PATCH 24/30] Use typed envelope for event bus payloads Replace the serde_json::Value mutation pattern in publish() with a small Envelope struct. The wire format ("v1") now lives in one named constant, the requestId field is camelCased declaratively, and two unit tests pin the wire shape (presence/absence of requestId) so regressions break a build instead of a downstream consumer. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/observe/src/event_bus/mod.rs | 69 ++++++++++++++++++++++++----- 1 file changed, 58 insertions(+), 11 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index a8c56e49ae..6a2897b2ff 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -12,7 +12,6 @@ use { chrono::Utc, futures::stream::{FuturesUnordered, StreamExt}, serde::Serialize, - serde_json::json, std::sync::atomic::{AtomicBool, Ordering}, tokio::sync::{ OnceCell, @@ -20,6 +19,23 @@ use { }, }; +/// Wire format version of the JSON envelope sent on every event. Bump +/// alongside any breaking change to [`Envelope`]. +const ENVELOPE_VERSION: &str = "v1"; + +/// JSON envelope wrapping every event published to the bus. Consumers can +/// rely on `version` to evolve their parsers, on `timestamp` for ordering, +/// and on `requestId` to correlate events to a single inbound request. +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct Envelope { + version: &'static str, + timestamp: String, + #[serde(skip_serializing_if = "Option::is_none")] + request_id: Option, + body: T, +} + struct EventBusConnector { /// Channel to decouple issuing events from actually sending them to the /// event bus service. @@ -35,7 +51,7 @@ struct Message { } /// Singleton event bus connection to allow publishing events -/// conventiently from everywhere. +/// conveniently from everywhere. static BUS: OnceCell = OnceCell::const_new(); /// Latches once a call to [`init`] has successfully connected. Lets repeat /// calls return immediately without re-running `get_or_try_init` (which @@ -139,18 +155,17 @@ async fn forward_messages_to_event_bus_client( /// Enqueues the event to be sent to the event bus in a background task. pub fn publish(subject: &str, data: impl Serialize) { let Some(bus) = BUS.get() else { + tracing::warn!("attempting to publish events without initializing the event bus"); return; }; - let mut message = json!({ - "version": "v1", - "timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), - "body": data, - }); - if let Some(id) = crate::tracing::distributed::request_id::from_current_span() { - message["requestId"] = id.into(); - } - let body = match serde_json::to_vec(&message) { + let envelope = Envelope { + version: ENVELOPE_VERSION, + timestamp: Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + request_id: crate::tracing::distributed::request_id::from_current_span(), + body: data, + }; + let body = match serde_json::to_vec(&envelope) { Ok(body) => body, Err(err) => { tracing::error!(?err, "failed to serialize event"); @@ -219,6 +234,38 @@ fn record_dropped(reason: DropReason) { mod tests { use {super::*, serde_json::json}; + #[test] + fn envelope_matches_wire_format() { + let envelope = Envelope { + version: ENVELOPE_VERSION, + timestamp: "2026-05-22T12:00:00.000Z".to_string(), + request_id: Some("req-1".to_string()), + body: json!({"outAmount": 1234}), + }; + let serialized: serde_json::Value = serde_json::to_value(&envelope).unwrap(); + assert_eq!( + serialized, + json!({ + "version": "v1", + "timestamp": "2026-05-22T12:00:00.000Z", + "requestId": "req-1", + "body": {"outAmount": 1234}, + }) + ); + } + + #[test] + fn envelope_omits_missing_request_id() { + let envelope = Envelope { + version: ENVELOPE_VERSION, + timestamp: "2026-05-22T12:00:00.000Z".to_string(), + request_id: None, + body: json!({}), + }; + let serialized: serde_json::Value = serde_json::to_value(&envelope).unwrap(); + assert!(serialized.get("requestId").is_none()); + } + #[ignore] #[tokio::test] async fn send_messages() { From e1b007ea4894c8bdc302020e4a4d762a7d813b66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 15:03:34 +0100 Subject: [PATCH 25/30] Split event bus forwarder into publisher and ack-waiter tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The forwarder loop combined two distinct concerns — pumping messages through the NATS client and awaiting JetStream's ack futures — into a single select-loop that was awkward to read. Splitting them into two tasks linked by a bounded channel gives each loop a single responsibility, lets the runtime schedule them on separate cores, and applies natural back-pressure when ack handling can't keep up. The ack waiter also drains its pending set on shutdown so server-side rejections are still surfaced. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/observe/src/event_bus/mod.rs | 94 ++++++++++++++++++----------- 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 6a2897b2ff..76a1dd8cae 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -100,56 +100,80 @@ async fn connect(config: &EventBusConfig) -> Result, +const EVENT_BUS_SIZE: usize = 1_000; + +/// In-flight handle returned by JetStream's `publish` call, passed from the +/// publisher task to the ack-handling task. +type PendingAck = (String, async_nats::jetstream::context::PublishAckFuture); + +/// Reads messages off the in-memory queue and hands the publish ack future +/// off to [`await_acks`]. +async fn publish_messages( + mut messages: Receiver, client: JetstreamClient, + acks: Sender, ) { - // JetStream publish completes in two stages: the inner future returns - // once the client has buffered the publish, the outer ack future resolves - // once the server has accepted and stored it. We need the second stage to - // observe server-side rejections (subject mismatch, stream limits, ...), - // but awaiting it inline would add a full round-trip to every publish. - // Instead we drive pending acks concurrently and only log failures. - let mut pending_acks = FuturesUnordered::new(); + while let Some(message) = messages.recv().await { + let subject = message.subject; + let ack_fut = match client.publish(subject.clone(), message.data).await { + Ok(fut) => fut, + Err(err) => { + tracing::debug!(?err, %subject, "failed to enqueue event with NATS client"); + record_dropped(DropReason::Publish); + continue; + } + }; + if acks.send((subject, ack_fut)).await.is_err() { + // ack task has shut down; nothing useful left to do + return; + } + } +} + +/// Awaits JetStream publish acks concurrently and logs any failures. Runs +/// until the publisher task drops its sender and all in-flight acks have +/// resolved, so server-side rejections are still observed during shutdown. +async fn await_acks(mut acks: Receiver) { + let mut pending = FuturesUnordered::new(); loop { tokio::select! { biased; - // Drain pending acks alongside new messages so failures are - // logged promptly and the set doesn't grow without bound. - Some((subject, ack)) = pending_acks.next(), if !pending_acks.is_empty() => { - if let Err(err) = ack { - tracing::debug!(?err, %subject, "NATS did not acknowledge event"); - record_dropped(DropReason::Ack); - } - } - maybe_message = receiver.recv() => { - let Some(message) = maybe_message else { break }; - let subject = message.subject; - let ack_fut = match client.publish(subject.clone(), message.data).await { - Ok(ack) => ack, - Err(err) => { - tracing::debug!(?err, %subject, "failed to enqueue event with NATS client"); - record_dropped(DropReason::Publish); - continue; - } - }; - pending_acks.push(async move { (subject, ack_fut.await) }); + // Drain pending acks alongside new ones so failures are logged + // promptly and the set doesn't grow without bound. + Some(()) = pending.next(), if !pending.is_empty() => {} + next = acks.recv() => { + let Some((subject, ack_fut)) = next else { break }; + pending.push(log_ack(subject, ack_fut)); } } } + while pending.next().await.is_some() {} +} + +async fn log_ack(subject: String, ack_fut: async_nats::jetstream::context::PublishAckFuture) { + if let Err(err) = ack_fut.await { + tracing::debug!(?err, %subject, "NATS did not acknowledge event"); + record_dropped(DropReason::Ack); + } } /// Enqueues the event to be sent to the event bus in a background task. From 5d5a5a574a64821818a1540ad95b2d02b1f2af34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 15:25:50 +0100 Subject: [PATCH 26/30] Clean up event bus internals and docs - Add Envelope::new so publish() reads as a single expression. - Fix the copy-paste doc on configs::shared::EventBusConfig (was describing OpenTelemetry) and merge its duplicate derive attribute. - Promote a couple of internal failure logs from debug to warn so they show up at default verbosity, and document why the await_acks drain loop exists. - Emit `elapsed` in priceEstimate events as a number instead of a stringified one. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/configs/src/shared.rs | 7 ++-- crates/observe/src/event_bus/mod.rs | 33 ++++++++++++------- .../price-estimation/src/competition/quote.rs | 2 +- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/crates/configs/src/shared.rs b/crates/configs/src/shared.rs index 1c548f6e08..4da4038fb4 100644 --- a/crates/configs/src/shared.rs +++ b/crates/configs/src/shared.rs @@ -2,7 +2,7 @@ use { crate::fee_factor::FeeFactor, alloy::primitives::Address, anyhow::ensure, - serde::{Deserialize, Deserializer, de::Unexpected}, + serde::{Deserialize, Deserializer, Serialize, de::Unexpected}, std::{collections::HashSet, str::FromStr, time::Duration}, tracing::Level, url::Url, @@ -327,10 +327,9 @@ where serializer.serialize_str(level.as_str()) } -/// OpenTelemetry tracing configuration. -#[derive(Debug, Deserialize)] +/// Event bus configuration for a backend service. +#[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] -#[derive(serde::Serialize)] pub struct EventBusConfig { /// Url of the event bus service. pub url: Url, diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 76a1dd8cae..099474e3c5 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -36,6 +36,17 @@ struct Envelope { body: T, } +impl Envelope { + fn new(request_id: Option, body: T) -> Self { + Self { + version: ENVELOPE_VERSION, + timestamp: Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), + request_id, + body, + } + } +} + struct EventBusConnector { /// Channel to decouple issuing events from actually sending them to the /// event bus service. @@ -102,10 +113,7 @@ async fn connect(config: &EventBusConfig) -> Result fut, Err(err) => { - tracing::debug!(?err, %subject, "failed to enqueue event with NATS client"); + tracing::warn!(?err, %subject, "failed to enqueue event with NATS client"); record_dropped(DropReason::Publish); continue; } }; if acks.send((subject, ack_fut)).await.is_err() { - // ack task has shut down; nothing useful left to do + tracing::warn!("ack task was shut down; returning"); return; } } @@ -166,6 +174,9 @@ async fn await_acks(mut acks: Receiver) { } } } + // Only reached on publisher panic (steady state keeps both tasks alive). + // Drain pending futures so `log_ack` still runs for any in-flight publish + // — otherwise dropping them unpolled silently loses the log + metric. while pending.next().await.is_some() {} } @@ -183,12 +194,10 @@ pub fn publish(subject: &str, data: impl Serialize) { return; }; - let envelope = Envelope { - version: ENVELOPE_VERSION, - timestamp: Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true), - request_id: crate::tracing::distributed::request_id::from_current_span(), - body: data, - }; + let envelope = Envelope::new( + crate::tracing::distributed::request_id::from_current_span(), + data, + ); let body = match serde_json::to_vec(&envelope) { Ok(body) => body, Err(err) => { diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index d6a6159975..159dbc70f4 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -187,7 +187,7 @@ fn emit_quote_event( }, "from": context.query.verification.from, "timeout": context.query.timeout.as_millis(), - "elapsed": elapsed.as_millis().to_string(), + "elapsed": elapsed.as_millis(), "estimator": context.name.to_string(), "result": match result { Ok(estimate) => json!({ From 246380e081eaf0ac02e0961f0e4259fc204f8e84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 15:46:10 +0100 Subject: [PATCH 27/30] Call estimate() eagerly when emitting quote events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The closure passed to produce_results wrapped estimate() in an async block, deferring the call until the future was polled. That broke the queries_stages_sequentially test: with early-return enabled, a racing estimator could complete before later ones in the same stage were ever polled, so their .estimate() invocation — and the side-effects mocks rely on — never happened. Attach the timing / event emission via .map() instead, restoring eager invocation. Also trim emit_quote_event to take just the name and query it actually reads, dropping the unused Context generic. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../price-estimation/src/competition/quote.rs | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index 159dbc70f4..f6f576f115 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -7,7 +7,6 @@ use { PriceEstimationError, Query, QuoteVerificationMode, - competition::Context, }, alloy::primitives::{Address, U256}, anyhow::Context as _, @@ -43,13 +42,19 @@ impl PriceEstimating for CompetitionEstimator> { }; let get_results = self .produce_results(query.clone(), is_reasonable, |context| { - async move { - let start = Instant::now(); - let res = context.estimator.estimate(context.query.clone()).await; - emit_quote_event(&context, &res, start.elapsed()); - res - } - .boxed() + // Call estimate() eagerly so its side-effects still happen + // when an early-return drops the future before it's polled. + let start = Instant::now(); + let estimator_name = context.name; + let inner_query = context.query.clone(); + context + .estimator + .estimate(context.query.clone()) + .map(move |res| { + emit_quote_event(estimator_name, &inner_query, &res, start.elapsed()); + res + }) + .boxed() }) .map(Result::Ok); @@ -169,7 +174,8 @@ impl RankingContext { } fn emit_quote_event( - context: &Context, Arc>, + estimator_name: &str, + query: &Query, result: &PriceEstimateResult, elapsed: Duration, ) { @@ -177,18 +183,18 @@ fn emit_quote_event( "priceEstimate", json!({ "query": { - "sellToken": context.query.sell_token.to_string(), - "buyToken": context.query.buy_token.to_string(), - "inAmount": context.query.in_amount.to_string(), - "kind": match context.query.kind { + "sellToken": query.sell_token.to_string(), + "buyToken": query.buy_token.to_string(), + "inAmount": query.in_amount.to_string(), + "kind": match query.kind { OrderKind::Sell => "sell", OrderKind::Buy => "buy", } }, - "from": context.query.verification.from, - "timeout": context.query.timeout.as_millis(), + "from": query.verification.from, + "timeout": query.timeout.as_millis(), "elapsed": elapsed.as_millis(), - "estimator": context.name.to_string(), + "estimator": estimator_name, "result": match result { Ok(estimate) => json!({ "outAmount": estimate.out_amount.to_string(), From 9fc9085371997fa217a320d69b5aae46b1932f4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Fri, 22 May 2026 18:10:29 +0100 Subject: [PATCH 28/30] Address comments and create stream on startup --- crates/autopilot/src/run.rs | 2 +- crates/configs/src/shared.rs | 50 ++++++- crates/observe/src/config.rs | 2 +- crates/observe/src/event_bus/mod.rs | 67 ++++----- crates/orderbook/src/run.rs | 2 +- .../price-estimation/src/competition/quote.rs | 140 ++++++++++++++---- 6 files changed, 191 insertions(+), 72 deletions(-) diff --git a/crates/autopilot/src/run.rs b/crates/autopilot/src/run.rs index 1be1b5b820..9899b428d3 100644 --- a/crates/autopilot/src/run.rs +++ b/crates/autopilot/src/run.rs @@ -156,7 +156,7 @@ pub async fn start(args: impl Iterator) { if let Some(event_bus) = &config.shared.event_bus { observe::event_bus::init(EventBusConfig { url: event_bus.url.clone(), - channel: event_bus.channel.clone(), + stream_name: event_bus.channel.clone(), // Presence of `chain-id` alongside `event_bus` is enforced by // `SharedConfig::validate` at startup. chain_id: config.shared.chain_id.unwrap(), diff --git a/crates/configs/src/shared.rs b/crates/configs/src/shared.rs index 4da4038fb4..80e43aa895 100644 --- a/crates/configs/src/shared.rs +++ b/crates/configs/src/shared.rs @@ -333,10 +333,27 @@ where pub struct EventBusConfig { /// Url of the event bus service. pub url: Url, - /// Name of the channel to post events to. + /// Name of the channel to post events to. Must not contain spaces, tabs, + /// or `.` — NATS uses `.` as its subject hierarchy separator and disallows + /// whitespace in stream/subject names. + #[serde(deserialize_with = "deserialize_channel_name")] pub channel: String, } +fn deserialize_channel_name<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let raw = String::deserialize(deserializer)?; + if raw.contains(|c: char| matches!(c, ' ' | '\t' | '.')) { + return Err(serde::de::Error::invalid_value( + Unexpected::Str(&raw), + &"a channel name without spaces, tabs, or '.'", + )); + } + Ok(raw) +} + /// Gas price estimation strategy. #[derive(Debug, Clone, Deserialize)] #[serde(tag = "type")] @@ -467,6 +484,37 @@ mod tests { assert_eq!(config.event_bus.as_ref().unwrap().channel, "main"); } + #[test] + fn rejects_invalid_event_bus_channel_at_deserialization() { + for bad in ["with space", "with\ttab", "with.dot"] { + let toml = format!( + r#" + chain-id = 1 + + [event-bus] + url = "localhost:4222" + channel = "{bad}" + "#, + ); + assert!( + toml::from_str::(&toml).is_err(), + "channel {bad:?} should be rejected", + ); + } + + let ok = toml::from_str::( + r#" + chain-id = 1 + + [event-bus] + url = "localhost:4222" + channel = "main" + "#, + ) + .unwrap(); + assert_eq!(ok.event_bus.unwrap().channel, "main"); + } + #[test] fn validate_event_bus_requires_chain_id() { let with_event_bus_only = toml::from_str::( diff --git a/crates/observe/src/config.rs b/crates/observe/src/config.rs index ae6cea0e6a..7bcbdbeca8 100644 --- a/crates/observe/src/config.rs +++ b/crates/observe/src/config.rs @@ -94,7 +94,7 @@ pub struct EventBusConfig { /// Url of the event bus service pub url: Url, /// Name of the channel to post events to - pub channel: String, + pub stream_name: String, /// Which chain this service operates on. The service-level `chain-id` /// must be set when the event bus is configured; this is checked at /// config validation time, so callers can pass it through directly. diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 099474e3c5..962588a2f4 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -7,7 +7,7 @@ //! can be serialized to JSON as well. use { crate::config::EventBusConfig, - async_nats::jetstream::Context as JetstreamClient, + async_nats::jetstream::{Context as JetstreamClient, stream}, bytes::Bytes, chrono::Utc, futures::stream::{FuturesUnordered, StreamExt}, @@ -64,10 +64,6 @@ struct Message { /// Singleton event bus connection to allow publishing events /// conveniently from everywhere. static BUS: OnceCell = OnceCell::const_new(); -/// Latches once a call to [`init`] has successfully connected. Lets repeat -/// calls return immediately without re-running `get_or_try_init` (which -/// would otherwise retry `connect` whenever a previous attempt failed). -static INITIALIZED: AtomicBool = AtomicBool::new(false); /// Initializes the event bus. Connection failures are logged but do not /// abort startup: the event bus is purely observational, so a misconfigured @@ -78,7 +74,7 @@ static INITIALIZED: AtomicBool = AtomicBool::new(false); /// subsequent ones short-circuit. A failed call leaves the bus uninitialized /// so the next call gets another chance. pub async fn init(config: EventBusConfig) { - if INITIALIZED.load(Ordering::Acquire) { + if BUS.initialized() { return; } let result = BUS @@ -86,9 +82,8 @@ pub async fn init(config: EventBusConfig) { .await; match result { Ok(_) => { - INITIALIZED.store(true, Ordering::Release); tracing::info!( - channel = %config.channel, + channel = %config.stream_name, chain_id = config.chain_id, "event bus connected", ); @@ -97,7 +92,7 @@ pub async fn init(config: EventBusConfig) { tracing::error!( ?err, url = %config.url, - channel = %config.channel, + channel = %config.stream_name, "failed to initialize event bus; events will be dropped", ); } @@ -105,11 +100,29 @@ pub async fn init(config: EventBusConfig) { } async fn connect(config: &EventBusConfig) -> Result { + // we prefix every subject with `event` to allow consumers to easily + // subscribe to all events without also seeing NATS internal events + let subject_prefix = format!("event.{}.", config.chain_id); + + let name = config.stream_name.clone(); let client = async_nats::connect(config.url.as_str()).await?; let jetstream = async_nats::jetstream::new(client); + // Make sure the stream exists up-front; otherwise every publish would fail // server-side and we'd only find out at runtime. - jetstream.get_stream(&config.channel).await?; + if let Err(err) = jetstream + .create_stream(stream::Config { + name: name.clone(), + subjects: vec![subject_prefix.clone()], + ..Default::default() + }) + .await + { + // Ignore error on stream creation (can be that the stream exists), if we're + // then unable to get the stream, that's were the issue lies. + tracing::warn!(?err, "error creating the stream {name}"); + } + jetstream.get_stream(&config.stream_name).await?; // JetStream publish completes in two stages: the call to `publish()` // returns once the client has buffered the message, the returned @@ -121,9 +134,7 @@ async fn connect(config: &EventBusConfig) -> Result) { async fn log_ack(subject: String, ack_fut: async_nats::jetstream::context::PublishAckFuture) { if let Err(err) = ack_fut.await { - tracing::debug!(?err, %subject, "NATS did not acknowledge event"); + tracing::warn!(?err, %subject, "NATS did not acknowledge event"); record_dropped(DropReason::Ack); } } @@ -298,32 +309,4 @@ mod tests { let serialized: serde_json::Value = serde_json::to_value(&envelope).unwrap(); assert!(serialized.get("requestId").is_none()); } - - #[ignore] - #[tokio::test] - async fn send_messages() { - crate::tracing::init::initialize(&crate::Config { - env_filter: "warn,observe=debug".to_string(), - stderr_threshold: None, - use_json_format: false, - tracing: None, - }); - init(EventBusConfig { - url: "localhost:4222".parse().unwrap(), - channel: "main".to_string(), - chain_id: 1, - }) - .await; - - for _ in 0..1000 { - publish( - "name", - json!({ - "estimator": "baseline", - "outAmount": 1234, - }), - ); - } - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - } } diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index 2f6aed72f2..7cc571ec76 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -82,7 +82,7 @@ pub async fn start(args: impl Iterator) { if let Some(event_bus) = &config.shared.event_bus { observe::event_bus::init(EventBusConfig { url: event_bus.url.clone(), - channel: event_bus.channel.clone(), + stream_name: event_bus.channel.clone(), // Presence of `chain-id` alongside `event_bus` is enforced by // `SharedConfig::validate` at startup. chain_id: config.shared.chain_id.unwrap(), diff --git a/crates/price-estimation/src/competition/quote.rs b/crates/price-estimation/src/competition/quote.rs index f6f576f115..7b0d3bd0b5 100644 --- a/crates/price-estimation/src/competition/quote.rs +++ b/crates/price-estimation/src/competition/quote.rs @@ -12,7 +12,7 @@ use { anyhow::Context as _, futures::future::{BoxFuture, FutureExt, TryFutureExt}, model::order::OrderKind, - serde_json::json, + serde::Serialize, std::{ cmp::Ordering, sync::Arc, @@ -179,32 +179,67 @@ fn emit_quote_event( result: &PriceEstimateResult, elapsed: Duration, ) { - observe::event_bus::publish( - "priceEstimate", - json!({ - "query": { - "sellToken": query.sell_token.to_string(), - "buyToken": query.buy_token.to_string(), - "inAmount": query.in_amount.to_string(), - "kind": match query.kind { - OrderKind::Sell => "sell", - OrderKind::Buy => "buy", - } + let event = PriceEstimateEvent { + query: QueryFields { + sell_token: query.sell_token.to_string(), + buy_token: query.buy_token.to_string(), + in_amount: query.in_amount.to_string(), + kind: match query.kind { + OrderKind::Sell => "sell", + OrderKind::Buy => "buy", }, - "from": query.verification.from, - "timeout": query.timeout.as_millis(), - "elapsed": elapsed.as_millis(), - "estimator": estimator_name, - "result": match result { - Ok(estimate) => json!({ - "outAmount": estimate.out_amount.to_string(), - "gas": estimate.gas.to_string(), - "verified": estimate.verified, - }), - Err(err) => json!({"error": err.to_string()}), - } - }), - ); + }, + from: query.verification.from, + timeout: query.timeout.as_millis(), + elapsed: elapsed.as_millis(), + estimator: estimator_name, + result: match result { + Ok(estimate) => EstimateResult::Ok { + out_amount: estimate.out_amount.to_string(), + gas: estimate.gas.to_string(), + verified: estimate.verified, + }, + Err(err) => EstimateResult::Err { + error: err.to_string(), + }, + }, + }; + observe::event_bus::publish("priceEstimate", event); +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct PriceEstimateEvent<'a> { + query: QueryFields, + from: Address, + timeout: u128, + elapsed: u128, + estimator: &'a str, + result: EstimateResult, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct QueryFields { + sell_token: String, + buy_token: String, + in_amount: String, + kind: &'static str, +} + +/// Mirrors the previous JSON: either the estimate fields or an `error`. +#[derive(Serialize)] +#[serde(untagged)] +enum EstimateResult { + #[serde(rename_all = "camelCase")] + Ok { + out_amount: String, + gas: String, + verified: bool, + }, + Err { + error: String, + }, } #[cfg(test)] @@ -215,8 +250,61 @@ mod tests { alloy::{eips::eip1559::Eip1559Estimation, primitives::U256}, gas_price_estimation::FakeGasPriceEstimator, model::order::OrderKind, + serde_json::json, }; + #[test] + fn price_estimate_event_matches_wire_format() { + let event = PriceEstimateEvent { + query: QueryFields { + sell_token: "0x01".into(), + buy_token: "0x02".into(), + in_amount: "100".into(), + kind: "sell", + }, + from: Address::ZERO, + timeout: 5000, + elapsed: 12, + estimator: "baseline", + result: EstimateResult::Ok { + out_amount: "99".into(), + gas: "21000".into(), + verified: true, + }, + }; + assert_eq!( + serde_json::to_value(&event).unwrap(), + json!({ + "query": { + "sellToken": "0x01", + "buyToken": "0x02", + "inAmount": "100", + "kind": "sell", + }, + "from": Address::ZERO, + "timeout": 5000, + "elapsed": 12, + "estimator": "baseline", + "result": { + "outAmount": "99", + "gas": "21000", + "verified": true, + }, + }), + ); + } + + #[test] + fn price_estimate_event_error_variant() { + let result = EstimateResult::Err { + error: "boom".into(), + }; + assert_eq!( + serde_json::to_value(&result).unwrap(), + json!({ "error": "boom" }), + ); + } + fn price(out_amount: u128, gas: u64) -> PriceEstimateResult { Ok(Estimate { out_amount: U256::from(out_amount), From 1f4d2985c621fb942894d1c02efa311b5811240b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Sat, 23 May 2026 10:41:28 +0100 Subject: [PATCH 29/30] Go back to only getting the stream --- crates/observe/src/event_bus/mod.rs | 23 +++++------------------ 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/crates/observe/src/event_bus/mod.rs b/crates/observe/src/event_bus/mod.rs index 962588a2f4..26b98e168c 100644 --- a/crates/observe/src/event_bus/mod.rs +++ b/crates/observe/src/event_bus/mod.rs @@ -7,12 +7,11 @@ //! can be serialized to JSON as well. use { crate::config::EventBusConfig, - async_nats::jetstream::{Context as JetstreamClient, stream}, + async_nats::jetstream::Context as JetstreamClient, bytes::Bytes, chrono::Utc, futures::stream::{FuturesUnordered, StreamExt}, serde::Serialize, - std::sync::atomic::{AtomicBool, Ordering}, tokio::sync::{ OnceCell, mpsc::{Receiver, Sender, channel}, @@ -100,28 +99,16 @@ pub async fn init(config: EventBusConfig) { } async fn connect(config: &EventBusConfig) -> Result { - // we prefix every subject with `event` to allow consumers to easily - // subscribe to all events without also seeing NATS internal events + // We prefix every subject with `event` so consumers can subscribe to all + // events (e.g. `event.>`) without also seeing NATS internal events. The + // trailing dot is significant: see [`publish`] for how it's concatenated + // with the per-event subject suffix. let subject_prefix = format!("event.{}.", config.chain_id); - let name = config.stream_name.clone(); let client = async_nats::connect(config.url.as_str()).await?; let jetstream = async_nats::jetstream::new(client); - // Make sure the stream exists up-front; otherwise every publish would fail // server-side and we'd only find out at runtime. - if let Err(err) = jetstream - .create_stream(stream::Config { - name: name.clone(), - subjects: vec![subject_prefix.clone()], - ..Default::default() - }) - .await - { - // Ignore error on stream creation (can be that the stream exists), if we're - // then unable to get the stream, that's were the issue lies. - tracing::warn!(?err, "error creating the stream {name}"); - } jetstream.get_stream(&config.stream_name).await?; // JetStream publish completes in two stages: the call to `publish()` From 0b56fca4f22eaf741216a47ee0322d8182c95257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Duarte?= <15343819+jmg-duarte@users.noreply.github.com> Date: Sat, 23 May 2026 11:17:01 +0100 Subject: [PATCH 30/30] lint --- crates/configs/src/shared.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/configs/src/shared.rs b/crates/configs/src/shared.rs index 80e43aa895..c6737c5cdf 100644 --- a/crates/configs/src/shared.rs +++ b/crates/configs/src/shared.rs @@ -345,7 +345,7 @@ where D: Deserializer<'de>, { let raw = String::deserialize(deserializer)?; - if raw.contains(|c: char| matches!(c, ' ' | '\t' | '.')) { + if raw.contains([' ', '\t', '.']) { return Err(serde::de::Error::invalid_value( Unexpected::Str(&raw), &"a channel name without spaces, tabs, or '.'",