Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/autopilot/src/domain/auction/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct Interaction {
}

/// Source from which the sellAmount should be drawn upon order fulfillment
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SellTokenSource {
/// Direct ERC20 allowances to the Vault relayer contract
Erc20,
Expand All @@ -90,7 +90,7 @@ pub enum SellTokenSource {
Internal,
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum BuyTokenDestination {
/// Pay trade proceeds as an ERC20 token transfer
Erc20,
Expand Down
13 changes: 5 additions & 8 deletions crates/autopilot/src/infra/persistence/dto/auction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,20 @@ use {
std::collections::BTreeMap,
};

pub fn from_domain(auction: domain::RawAuctionData) -> RawAuctionData {
pub fn from_domain(auction: &domain::RawAuctionData) -> RawAuctionData {
RawAuctionData {
block: auction.block,
orders: auction
.orders
.into_iter()
.iter()
.map(super::order::from_domain)
.collect(),
prices: auction
.prices
.into_iter()
.map(|(key, value)| (*key, value.get().0))
.collect(),
surplus_capturing_jit_order_owners: auction
.surplus_capturing_jit_order_owners
.into_iter()
.iter()
.map(|(key, value)| (**key, value.get().0))
.collect(),
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners.to_vec(),
}
}

Expand Down
23 changes: 15 additions & 8 deletions crates/autopilot/src/infra/persistence/dto/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct Order {
pub quote: Option<Quote>,
}

pub fn from_domain(order: domain::Order) -> Order {
pub fn from_domain(order: &domain::Order) -> Order {
Order {
uid: order.uid.into(),
sell_token: order.sell.token.into(),
Expand All @@ -53,7 +53,8 @@ pub fn from_domain(order: domain::Order) -> Order {
buy_amount: order.buy.amount.into(),
protocol_fees: order
.protocol_fees
.into_iter()
.iter()
.cloned()
.map(FeePolicy::from_domain)
.collect(),
created: order.created,
Expand All @@ -63,18 +64,24 @@ pub fn from_domain(order: domain::Order) -> Order {
owner: order.owner,
partially_fillable: order.partially_fillable,
executed: order.executed.into(),
pre_interactions: order.pre_interactions.into_iter().map(Into::into).collect(),
pre_interactions: order
.pre_interactions
.iter()
.cloned()
.map(Into::into)
.collect(),
post_interactions: order
.post_interactions
.into_iter()
.iter()
.cloned()
.map(Into::into)
.collect(),
sell_token_balance: order.sell_token_balance.into(),
buy_token_balance: order.buy_token_balance.into(),
class: boundary::OrderClass::Limit,
app_data: order.app_data.into(),
signature: order.signature.into(),
quote: order.quote.map(Quote::from_domain),
app_data: order.app_data.clone().into(),
signature: order.signature.clone().into(),
quote: order.quote.as_ref().map(Quote::from_domain),
}
}

Expand Down Expand Up @@ -346,7 +353,7 @@ pub struct Quote {
}

impl Quote {
fn from_domain(quote: domain::Quote) -> Self {
fn from_domain(quote: &domain::Quote) -> Self {
Quote {
sell_amount: quote.sell_amount.0,
buy_amount: quote.buy_amount.0,
Expand Down
11 changes: 7 additions & 4 deletions crates/autopilot/src/infra/persistence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,25 @@ impl Persistence {
self.upload_queue
.send(AuctionUpload {
auction_id: new_auction_id,
auction_data: dto::auction::from_domain(new_auction_data.clone()),
auction_data: dto::auction::from_domain(new_auction_data),
})
.expect("upload queue should be alive at all times");
}

/// Spawns a background task that uploads the auction to S3.
pub fn upload_auction_to_s3(&self, id: domain::auction::Id, auction: &domain::RawAuctionData) {
pub fn upload_auction_to_s3(
&self,
id: domain::auction::Id,
auction: Arc<domain::RawAuctionData>,
) {
if auction.orders.is_empty() {
return;
}
let Some(s3) = self.s3.clone() else {
return;
};
let auction = auction.clone();
tokio::task::spawn(async move {
let auction_dto = dto::auction::from_domain(auction);
let auction_dto = dto::auction::from_domain(auction.as_ref());
match s3.upload(id.to_string(), auction_dto).await {
Ok(key) => tracing::info!(?key, "uploaded auction to s3"),
Err(err) => tracing::warn!(?err, "failed to upload auction to s3"),
Expand Down
7 changes: 1 addition & 6 deletions crates/autopilot/src/infra/solvers/dto/solve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,7 @@ impl Request {
observe::metrics::metrics().on_auction_overhead_start("autopilot", "serialize_request");
let helper = RequestHelper {
id: auction.id,
orders: auction
.orders
.clone()
.into_iter()
.map(dto::order::from_domain)
.collect(),
orders: auction.orders.iter().map(dto::order::from_domain).collect(),
tokens: auction
.prices
.iter()
Expand Down
109 changes: 71 additions & 38 deletions crates/autopilot/src/run_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ pub struct Probes {
pub startup: Arc<Option<AtomicBool>>,
}

#[derive(Debug)]
struct CutAuction {
id: Id,
auction: Arc<domain::RawAuctionData>,
}

pub struct RunLoop {
config: Config,
eth: infra::Ethereum,
Expand Down Expand Up @@ -141,7 +147,7 @@ impl RunLoop {
}

pub async fn run_forever(self, mut control: ShutdownController) {
let mut last_auction = None;
let mut last_auction: Option<Arc<domain::RawAuctionData>> = None;
let mut last_block = None;

let self_arc = Arc::new(self);
Expand Down Expand Up @@ -255,28 +261,35 @@ impl RunLoop {
async fn next_auction(
&self,
start_block: BlockInfo,
prev_auction: &mut Option<domain::Auction>,
prev_auction: &mut Option<Arc<domain::RawAuctionData>>,
prev_block: &mut Option<B256>,
) -> Option<domain::Auction> {
// wait for appropriate time to start building the auction
let auction = self.cut_auction().await?;
tracing::trace!(auction_id = ?auction.id, "auction cut");
let CutAuction { id, auction } = self.cut_auction().await?;
tracing::trace!(auction_id = ?id, "auction cut");

// Only run the solvers if the auction or block has changed.
let previous = prev_auction.replace(auction.clone());
if previous.as_ref() == Some(&auction)
let previous = prev_auction.replace(Arc::clone(&auction));
if previous.as_deref() == Some(auction.as_ref())
&& prev_block.replace(start_block.hash) == Some(start_block.hash)
{
return None;
}

observe::log_auction_delta(&previous, &auction, &start_block);
observe::log_raw_auction_delta(id, previous.as_deref(), auction.as_ref(), &start_block);
self.probes.liveness.auction();
Metrics::auction_ready(start_block.observed_at);
Some(auction)

Some(domain::Auction {
id,
block: auction.block,
orders: auction.orders.clone(),
prices: auction.prices.clone(),
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners.clone(),
})
}

async fn cut_auction(&self) -> Option<domain::Auction> {
async fn cut_auction(&self) -> Option<CutAuction> {
let Some(auction) = self.solvable_orders_cache.current_auction().await else {
tracing::debug!("no current auction");
return None;
Expand All @@ -290,22 +303,18 @@ impl RunLoop {
Metrics::auction(id);

// always update the auction because the tests use this as a readiness probe
self.persistence.replace_current_auction_in_db(id, &auction);
self.persistence.upload_auction_to_s3(id, &auction);
self.persistence
.replace_current_auction_in_db(id, auction.as_ref());
self.persistence
.upload_auction_to_s3(id, Arc::clone(&auction));

if auction.orders.is_empty() {
// Updating liveness probe to not report unhealthy due to this optimization
self.probes.liveness.auction();
tracing::debug!("skipping empty auction");
return None;
}
Some(domain::Auction {
id,
block: auction.block,
orders: auction.orders,
prices: auction.prices,
surplus_capturing_jit_order_owners: auction.surplus_capturing_jit_order_owners,
})
Some(CutAuction { id, auction })
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -1084,37 +1093,61 @@ pub mod observe {
std::collections::HashSet,
};

pub fn log_auction_delta(
previous: &Option<domain::Auction>,
current: &domain::Auction,
fn log_order_delta<I, J>(
id: domain::auction::Id,
previous: I,
current: J,
start_block: &BlockInfo,
) {
let previous_uids = match previous {
Some(previous) => previous
.orders
.iter()
.map(|order| order.uid)
.collect::<HashSet<_>>(),
None => HashSet::new(),
};
let current_uids = current
.orders
.iter()
.map(|order| order.uid)
.collect::<HashSet<_>>();
) where
I: IntoIterator<Item = domain::OrderUid>,
J: IntoIterator<Item = domain::OrderUid>,
{
let previous_uids = previous.into_iter().collect::<HashSet<_>>();
let current_uids = current.into_iter().collect::<HashSet<_>>();
let added = current_uids.difference(&previous_uids);
let removed = previous_uids.difference(&current_uids);
tracing::debug!(
id = current.id,
id,
added = ?added,
"New orders in auction"
);
tracing::debug!(
id = current.id,
id,
removed = ?removed,
"Orders no longer in auction"
);
tracing::debug!(auction_id = current.id, ?start_block);
tracing::debug!(auction_id = id, ?start_block);
}

pub fn log_raw_auction_delta(
id: domain::auction::Id,
previous: Option<&domain::RawAuctionData>,
current: &domain::RawAuctionData,
start_block: &BlockInfo,
) {
log_order_delta(
id,
previous
.into_iter()
.flat_map(|auction| auction.orders.iter().map(|order| order.uid)),
current.orders.iter().map(|order| order.uid),
start_block,
);
}

pub fn log_auction_delta(
previous: &Option<domain::Auction>,
current: &domain::Auction,
start_block: &BlockInfo,
) {
log_order_delta(
current.id,
previous
.iter()
.flat_map(|auction| auction.orders.iter().map(|order| order.uid)),
current.orders.iter().map(|order| order.uid),
start_block,
);
}

pub fn bids(bids: &[domain::competition::Bid<Unscored>]) {
Expand Down
8 changes: 4 additions & 4 deletions crates/autopilot/src/solvable_orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct SolvableOrdersCache {
type Balances = HashMap<Query, U256>;

struct Inner {
auction: domain::RawAuctionData,
auction: Arc<domain::RawAuctionData>,
solvable_orders: boundary::SolvableOrders,
}

Expand Down Expand Up @@ -186,12 +186,12 @@ impl SolvableOrdersCache {
})
}

pub async fn current_auction(&self) -> Option<domain::RawAuctionData> {
pub async fn current_auction(&self) -> Option<Arc<domain::RawAuctionData>> {
self.cache
.lock()
.await
.as_ref()
.map(|inner| inner.auction.clone())
.map(|inner| Arc::clone(&inner.auction))
}

/// Manually update solvable orders. Usually called by the background
Expand Down Expand Up @@ -358,7 +358,7 @@ impl SolvableOrdersCache {
};

*self.cache.lock().await = Some(Inner {
auction,
auction: Arc::new(auction),
solvable_orders: db_solvable_orders,
});

Expand Down
Loading