Skip to content

Introduce NATS JetStream as event bus#4427

Merged
jmg-duarte merged 32 commits into
mainfrom
rabbit-integration
May 23, 2026
Merged

Introduce NATS JetStream as event bus#4427
jmg-duarte merged 32 commits into
mainfrom
rabbit-integration

Conversation

@MartinquaXD
Copy link
Copy Markdown
Contributor

@MartinquaXD MartinquaXD commented May 21, 2026

Description

Adds a globally-available, NATS JetStream–backed event bus for observational events. Failures must never affect the hot path.

NATS was picked over RabbitMQ because the Rust client for streams was still experimental. The JetStream stream itself is managed in the infra repo, not by the service.

Changes

  • observe::event_bus singleton with a sync publish(subject, data) callable from anywhere; in-memory queue feeds two background tasks (publisher + ack-waiter) linked by a second bounded channel for back-pressure.
  • Subject hierarchy event.<chain_id>.<name> (consumers can wildcard event.1.>).
  • Config plumbing on SharedConfig::event_bus, wired into autopilot and orderbook.
  • PoC: emit a priceEstimate event from each estimator in the quote competition.

Robustness

  • Init is non-fatal: bad config or unreachable NATS just disables the bus.
  • Init is idempotent (static AtomicBool latch); failed attempts can still retry.
  • JetStream PublishAckFuture is actually awaited — server-side rejections surface as logs + metrics.
  • Prometheus counter event_bus_dropped_events{reason} keyed by a typed DropReason enum (channel_full / serialize / publish / ack).
  • SharedConfig::validate enforces chain-id whenever event_bus is set, so runtime use sites can unwrap().

Wire format

  • Typed Envelope<T> { version: "v1", timestamp, requestId?, body }, shape pinned by unit tests.

How to test

Run the #[ignore]-tagged send_messages test in crates/observe/src/event_bus/mod.rs against a local NATS, then nats sub "event.>" to watch them land. Two always-on tests pin the envelope shape; one in configs::shared covers the new validation.

@MartinquaXD MartinquaXD requested a review from a team as a code owner May 21, 2026 10:39
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a global event bus system using NATS to publish observability events, specifically for price estimation quotes. It adds the async-nats dependency, configuration structures, and a new event_bus module in the observe crate. Critical issues were identified in the event bus implementation and configuration: the buyToken field in quote events is incorrectly mapped to the sell_token, and the default NATS URL in tests and configurations lacks a required scheme, which will cause parsing failures. Additionally, the use of an unbounded_channel for the internal message queue poses a memory exhaustion risk; it should be replaced with a bounded channel using try_send to ensure the system remains resilient under load. Finally, the subject prefix in the event bus initialization should be corrected to match the intended naming convention.

Comment thread crates/price-estimation/src/competition/quote.rs Outdated
]

[event-bus]
url = "localhost:4222"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The URL localhost:4222 is missing a scheme. Deserializing this into a url::Url will fail because it requires an absolute URL with a scheme (e.g., nats://).

Suggested change
url = "localhost:4222"
url = "nats://localhost:4222"

assert_eq!(config.volume_fee_bucket_overrides[0].tokens.len(), 2);
assert_eq!(
config.event_bus.as_ref().unwrap().url,
"localhost:4222".parse().unwrap()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This assertion will panic because localhost:4222 cannot be parsed as an absolute url::Url without a scheme.

Suggested change
"localhost:4222".parse().unwrap()
"nats://localhost:4222".parse().unwrap()

Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
@tilacog tilacog self-requested a review May 22, 2026 11:48
Copy link
Copy Markdown
Contributor

@tilacog tilacog left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just commented on a small nit, nothing that should hold the PR.

Comment on lines +84 to +88
/// Enqueues the event to be sent to the event bus in a background task.
pub fn publish(subject: &str, data: impl Serialize) {
let Some(bus) = BUS.get() else {
return;
};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get that the event bus should be initialized by now, but should we be more defensive here? Maybe log a warning, or call BUS.get_or_init since it's idempotent anyway? We could even unwrap if we feel so bold.

Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/price-estimation/src/competition/quote.rs Outdated
Comment thread crates/configs/src/shared.rs Outdated
@fleupold
Copy link
Copy Markdown
Contributor

@claude review this PR please

@cowprotocol cowprotocol deleted a comment from claude Bot May 22, 2026
@claude
Copy link
Copy Markdown

claude Bot commented May 22, 2026

Claude Code is working…

I'll analyze this and get back to you.

View job run

Previously, a misconfigured or unreachable NATS would crash the orderbook
and autopilot at startup via three separate .expect() calls. The event
bus is observational; failures should disable it, not take the binary
down. Init now logs and proceeds; publish() already no-ops when BUS is
uninitialized.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jmg-duarte and others added 8 commits May 22, 2026 14:24
The previous forwarder ignored the returned PublishAckFuture, so any
server-side rejection (subject mismatch, stream limits, ...) was
silently dropped. Pipeline acks via FuturesUnordered so failures are
logged without adding a per-message round-trip to publish throughput.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each failure mode (channel full, JSON serialization, NATS client publish
error, JetStream ack failure) was only visible via a log line, which is
hard to alert on. Expose a Prometheus counter keyed by a DropReason enum
so oncall can notice degradation without grepping logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Previously, both autopilot and orderbook had an inline .expect(...) that
panicked at startup if the event bus was configured without chain-id.
Move the cross-field invariant onto SharedConfig::validate and call it
from each binary's config validation, so the misconfiguration is caught
once with a clean error message. The runtime use sites can now safely
.unwrap().

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
A previous local `mut initialized` only existed for the duration of a
single init() call, so its log message could fire on every successful
re-init attempt and a failed init followed by a re-init still re-ran
connect(). Promote it to a static AtomicBool so calling init() multiple
times is a cheap no-op once any prior call succeeded, while still
allowing retries after a failure.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the serde_json::Value mutation pattern in publish() with a small
Envelope<T> struct. The wire format ("v1") now lives in one named
constant, the requestId field is camelCased declaratively, and two unit
tests pin the wire shape (presence/absence of requestId) so regressions
break a build instead of a downstream consumer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The forwarder loop combined two distinct concerns — pumping messages
through the NATS client and awaiting JetStream's ack futures — into a
single select-loop that was awkward to read. Splitting them into two
tasks linked by a bounded channel gives each loop a single
responsibility, lets the runtime schedule them on separate cores, and
applies natural back-pressure when ack handling can't keep up. The ack
waiter also drains its pending set on shutdown so server-side
rejections are still surfaced.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Add Envelope::new so publish() reads as a single expression.
- Fix the copy-paste doc on configs::shared::EventBusConfig (was
  describing OpenTelemetry) and merge its duplicate derive attribute.
- Promote a couple of internal failure logs from debug to warn so they
  show up at default verbosity, and document why the await_acks drain
  loop exists.
- Emit `elapsed` in priceEstimate events as a number instead of a
  stringified one.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The closure passed to produce_results wrapped estimate() in an async
block, deferring the call until the future was polled. That broke the
queries_stages_sequentially test: with early-return enabled, a racing
estimator could complete before later ones in the same stage were ever
polled, so their .estimate() invocation — and the side-effects mocks
rely on — never happened.

Attach the timing / event emission via .map() instead, restoring eager
invocation. Also trim emit_quote_event to take just the name and query
it actually reads, dropping the unused Context generic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@jmg-duarte jmg-duarte changed the title NATS integration Introduce NATS JetStream as event bus May 22, 2026
Comment thread crates/price-estimation/src/competition/quote.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
Comment thread crates/observe/src/event_bus/mod.rs Outdated
})
}

const EVENT_BUS_SIZE: usize = 1_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not blocking: metrics over how many messages are in the queue

@jmg-duarte jmg-duarte added this pull request to the merge queue May 23, 2026
Merged via the queue into main with commit 48a3686 May 23, 2026
20 checks passed
@jmg-duarte jmg-duarte deleted the rabbit-integration branch May 23, 2026 10:40
@github-actions github-actions Bot locked and limited conversation to collaborators May 23, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants