Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
daa6499
Initial commit
MartinquaXD May 19, 2026
ea1e32f
Implement shutdown mode
MartinquaXD May 19, 2026
2dd6067
failing unit test
MartinquaXD May 19, 2026
ceede24
Switch to NATS
MartinquaXD May 19, 2026
878c869
Define message type more precisely
MartinquaXD May 19, 2026
cddb3b5
remove test setups again
MartinquaXD May 19, 2026
3a76b18
test
MartinquaXD May 19, 2026
7d42424
Get or create jetstream
MartinquaXD May 20, 2026
19abcf6
Assume the stream already exists
MartinquaXD May 20, 2026
cffd2bf
don't spam logs when feature is not enabled
MartinquaXD May 20, 2026
0f2270f
Configure event bus in autopilot and orderbook
MartinquaXD May 21, 2026
a27c820
emit quote event
MartinquaXD May 21, 2026
830e78c
Disambiguate messages with chain_id
MartinquaXD May 21, 2026
6c09378
Add more data to emitted event
MartinquaXD May 21, 2026
51b89da
Better disambiguation
MartinquaXD May 21, 2026
85c67da
Merge branch 'main' into rabbit-integration
MartinquaXD May 21, 2026
9271d4f
fix copy-paste error
MartinquaXD May 21, 2026
17de493
Switch to bounded channel
MartinquaXD May 21, 2026
2b9e78e
fmt dependencies
MartinquaXD May 21, 2026
68d7a1a
Make event bus init non-fatal
jmg-duarte May 22, 2026
990c58c
Observe JetStream publish acks
jmg-duarte May 22, 2026
672ad51
Count dropped event bus messages
jmg-duarte May 22, 2026
cc990e9
Validate event bus requires chain-id at config load
jmg-duarte May 22, 2026
0dc756b
Track event bus init success in a static flag
jmg-duarte May 22, 2026
afa8a59
Use typed envelope for event bus payloads
jmg-duarte May 22, 2026
e1b007e
Split event bus forwarder into publisher and ack-waiter tasks
jmg-duarte May 22, 2026
5d5a5a5
Clean up event bus internals and docs
jmg-duarte May 22, 2026
246380e
Call estimate() eagerly when emitting quote events
jmg-duarte May 22, 2026
9fc9085
Address comments and create stream on startup
jmg-duarte May 22, 2026
1f4d298
Go back to only getting the stream
jmg-duarte May 23, 2026
0b56fca
lint
jmg-duarte May 23, 2026
36cf438
Merge branch 'main' into rabbit-integration
jmg-duarte May 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 232 additions & 13 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ alloy-transport-ws = { version = "1.8.3", default-features = false }
anyhow = "1.0.100"
app-data = { path = "crates/app-data" }
arc-swap = "1.7.1"
async-nats = "0.48.0"
async-stream = "0.3.5"
async-trait = "0.1.80"
autopilot = { path = "crates/autopilot" }
Expand Down
13 changes: 12 additions & 1 deletion crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use {
http_client::HttpClientFactory,
model::DomainSeparator,
num::ToPrimitive,
observe::metrics::LivenessChecking,
observe::{config::EventBusConfig, metrics::LivenessChecking},
price_estimation::{
config::price_estimation::BalanceOverridesConfigExt,
factory::{self, PriceEstimatorFactory},
Expand Down Expand Up @@ -153,6 +153,17 @@ pub async fn start(args: impl Iterator<Item = String>) {
);
observe::tracing::init::initialize(&obs_config);
observe::panic_hook::install();
if let Some(event_bus) = &config.shared.event_bus {
observe::event_bus::init(EventBusConfig {
url: event_bus.url.clone(),
channel: event_bus.channel.clone(),
chain_id: config
.shared
.chain_id
.expect("when the event bus is configured 'chain-id' is required"),
})
.await;
}
#[cfg(unix)]
observe::heap_dump_handler::spawn_heap_dump_handler();

Expand Down
24 changes: 24 additions & 0 deletions crates/configs/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ pub struct SharedConfig {
/// By default, volume fees are NOT applied to same-token trades.
#[serde(default)]
pub enable_sell_equals_buy_volume_fee: bool,

/// Enables publishing events to a global events bus.
pub event_bus: Option<EventBusConfig>,
}

impl Default for SharedConfig {
Expand All @@ -112,6 +115,7 @@ impl Default for SharedConfig {
contracts: Default::default(),
volume_fee_bucket_overrides: Vec::new(),
enable_sell_equals_buy_volume_fee: false,
event_bus: None,
}
}
}
Expand Down Expand Up @@ -311,6 +315,17 @@ where
serializer.serialize_str(level.as_str())
}

/// OpenTelemetry tracing configuration.
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case")]
#[derive(serde::Serialize)]
Comment thread
jmg-duarte marked this conversation as resolved.
Outdated
pub struct EventBusConfig {
/// Url of the event bus service.
pub url: Url,
/// Name of the channel to post events to.
pub channel: String,
}

/// Gas price estimation strategy.
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type")]
Expand Down Expand Up @@ -403,6 +418,10 @@ mod tests {
"0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2",
"0x6B175474E89094C44Da98b954EedeAC495271d0F",
]

[event-bus]
url = "localhost:4222"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The URL localhost:4222 is missing a scheme. Deserializing this into a url::Url will fail because it requires an absolute URL with a scheme (e.g., nats://).

Suggested change
url = "localhost:4222"
url = "nats://localhost:4222"

channel = "main"
"#;

let config: SharedConfig = toml::from_str(toml).unwrap();
Expand Down Expand Up @@ -430,5 +449,10 @@ mod tests {
assert!(config.enable_sell_equals_buy_volume_fee);
assert_eq!(config.volume_fee_bucket_overrides.len(), 1);
assert_eq!(config.volume_fee_bucket_overrides[0].tokens.len(), 2);
assert_eq!(
config.event_bus.as_ref().unwrap().url,
"localhost:4222".parse().unwrap()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This assertion will panic because localhost:4222 cannot be parsed as an absolute url::Url without a scheme.

Suggested change
"localhost:4222".parse().unwrap()
"nats://localhost:4222".parse().unwrap()

);
assert_eq!(config.event_bus.as_ref().unwrap().channel, "main");
}
}
3 changes: 3 additions & 0 deletions crates/observe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ edition = "2024"
license = "MIT OR Apache-2.0"

[dependencies]
async-nats = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
axum = { workspace = true }
chrono = { workspace = true, features = ["now"] }
console-subscriber = { workspace = true, optional = true }
Expand All @@ -28,6 +30,7 @@ tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-serde = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "time"] }
url = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread"] }
Expand Down
12 changes: 11 additions & 1 deletion crates/observe/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use {core::time::Duration, tracing::Level};
use {core::time::Duration, tracing::Level, url::Url};

#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -88,3 +88,13 @@ impl TracingConfig {
}
}
}

/// Configures a backend wide event bus events can be posted to.
pub struct EventBusConfig {
/// Url of the event bus service
pub url: Url,
/// Name of the channel to post events to
pub channel: String,
/// Which chain this service operates on
pub chain_id: u64,
}
146 changes: 146 additions & 0 deletions crates/observe/src/event_bus/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//! Implements a simple globally available way to publish events to an event
//! bus. Under the hood it's using NATS. To support publishing events from
//! synchronous contexts we use an unbounded channel as an in-memory buffer.
//! Whenever a message gets posted to this channel a background task wakes
//! up and forwards it to the NATS service running in a different process.
//! Messages always get serialized as JSON so you can publish anything that
//! can be serialized to JSON as well.
use {
crate::config::EventBusConfig,
async_nats::jetstream::Context as JetstreamClient,
bytes::Bytes,
chrono::Utc,
serde::Serialize,
serde_json::json,
tokio::sync::{
OnceCell,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
Comment thread
MartinquaXD marked this conversation as resolved.
Outdated
},
};

struct EventBusConnector {
/// Unbounded channel to allow emitting events from synchrounous
/// contexts.
message_queue: UnboundedSender<Message>,
Comment thread
MartinquaXD marked this conversation as resolved.
Outdated
/// Subject prefix to disambiguate messages in globally shared event bus
/// service.
subject_prefix: String,
}

struct Message {
subject: String,
data: Bytes,
}

/// Singleton event bus connection to allow publishing events
/// conventiently from everywhere.
Comment thread
jmg-duarte marked this conversation as resolved.
Outdated
static BUS: OnceCell<EventBusConnector> = OnceCell::const_new();

/// Initializes the event bus and panics if it fails.
pub async fn init(config: EventBusConfig) {
BUS.get_or_init(|| async move {
let client = async_nats::connect(config.url.as_str())
.await
.expect("failed to connect to NATS service");
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream
.get_stream(&config.channel)
.await
.expect("could not connect to jetstream");
let info = stream.info().await.expect("failed to fetch stream info");
tracing::debug!(?info, "connected to jetstream");

let (sender, receiver) = unbounded_channel();
Comment thread
MartinquaXD marked this conversation as resolved.
Outdated
tokio::task::spawn(forward_messages_to_event_bus_client(receiver, jetstream));
EventBusConnector {
message_queue: sender,
// we prefix every subject with `event` to allow consumers to easily
// subscribe to all events without also seeing NATS internal events
subject_prefix: format!("event.{}.", config.chain_id),
Comment thread
MartinquaXD marked this conversation as resolved.
Outdated
}
})
.await;
}

/// Monitors a message queue and forwards all messages to the event bus
/// service.
async fn forward_messages_to_event_bus_client(
mut receiver: UnboundedReceiver<Message>,
Comment thread
MartinquaXD marked this conversation as resolved.
Outdated
client: JetstreamClient,
) {
while let Some(message) = receiver.recv().await {
match client.publish(message.subject, message.data).await {
Err(err) => {
tracing::debug!(?err, "failed to publish event");
}
Ok(_fut) => {
// let's assume the message arrived for now
}
}
}
}

/// Enqueues the event to be sent to the event bus in a background task.
pub fn publish(subject: &str, data: impl Serialize) {
let Some(bus) = BUS.get() else {
return;
};
Comment on lines +188 to +193
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that the event bus should be initialized by now, but should we be more defensive here? Maybe log a warning, or call BUS.get_or_init since it's idempotent anyway? We could even unwrap if we feel so bold.


let mut message = json!({
"version": "v1",
"timestamp": Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
"body": data,
});
if let Some(id) = crate::tracing::distributed::request_id::from_current_span() {
message["requestId"] = id.into();
}
let body = match serde_json::to_vec(&message) {
Ok(body) => body,
Err(err) => {
tracing::error!(?err, "failed to serialize event");
return;
}
};

let message = Message {
subject: format!("{}{}", bus.subject_prefix, subject),
data: body.into(),
};

if let Err(err) = bus.message_queue.send(message) {
tracing::error!(?err, "failed to enqueue message");
}
Comment thread
MartinquaXD marked this conversation as resolved.
Outdated
}

#[cfg(test)]
mod tests {
use {super::*, serde_json::json};

#[ignore]
#[tokio::test]
async fn send_messages() {
crate::tracing::init::initialize(&crate::Config {
env_filter: "warn,observe=debug".to_string(),
stderr_threshold: None,
use_json_format: false,
tracing: None,
});
init(EventBusConfig {
url: "localhost:4222".parse().unwrap(),
channel: "main".to_string(),
chain_id: 1,
})
.await;

for _ in 0..1000 {
publish(
"name",
json!({
"estimator": "baseline",
"outAmount": 1234,
}),
);
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Comment thread
jmg-duarte marked this conversation as resolved.
Outdated
}
}
1 change: 1 addition & 0 deletions crates/observe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! improve the observability of a system. That includes initialization logic
//! for metrics and logging as well as logging helper functions.
pub mod config;
pub mod event_bus;
pub mod future;
#[cfg(unix)]
pub mod heap_dump_handler;
Expand Down
16 changes: 15 additions & 1 deletion crates/orderbook/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use {
http_client::HttpClientFactory,
model::DomainSeparator,
num::ToPrimitive,
observe::metrics::{DEFAULT_METRICS_PORT, serve_metrics},
observe::{
config::EventBusConfig,
metrics::{DEFAULT_METRICS_PORT, serve_metrics},
},
order_validation,
price_estimation::{
PriceEstimating,
Expand Down Expand Up @@ -74,6 +77,17 @@ pub async fn start(args: impl Iterator<Item = String>) {
tracing::info!("running order book with validated arguments:\n{}", args);
observe::panic_hook::install();
observe::metrics::setup_registry(Some("gp_v2_api".into()), None);
if let Some(event_bus) = &config.shared.event_bus {
observe::event_bus::init(EventBusConfig {
url: event_bus.url.clone(),
channel: event_bus.channel.clone(),
chain_id: config
.shared
.chain_id
.expect("when the event bus is configured 'chain-id' is required"),
})
.await;
}
#[cfg(unix)]
observe::heap_dump_handler::spawn_heap_dump_handler();
tracing::info!("file configuration:\n{:#?}", config);
Expand Down
5 changes: 4 additions & 1 deletion crates/price-estimation/src/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ impl<T: Send + Sync + 'static> CompetitionEstimator<T> {

while stage_index < self.stages.len() && requests.len() < requests_for_batch {
let stage = &self.stages.get(stage_index).expect("index checked by loop");
let futures = stage.iter().enumerate().map(|(index, (_name, estimator))| {
let futures = stage.iter().enumerate().map(|(index, (name, estimator))| {
get_single_result(Context {
estimator,
name,
query: query.clone(),
remaining_stages: Arc::clone(&remaining_stages),
})
Expand Down Expand Up @@ -175,6 +176,8 @@ struct Context<'a, ESTIMATOR, QUERY> {
/// the number of stages that are left after the queries
/// produced by this Context's stages.
remaining_stages: Arc<OnceLock<usize>>,
/// Name of the estimator
name: &'a str,
}

impl<'a, E, Q> Context<'a, E, Q> {
Expand Down
Loading
Loading