Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions crates/transport-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ opentelemetry = { version = "0.31.0", optional = true }
opentelemetry-http = { version = "0.31.0", optional = true}
tracing-opentelemetry = { version = "0.32.0", optional = true}

# mpp payment layer
mpp = { version = "0.7", default-features = false, features = ["client"], optional = true }

[features]
default = ["reqwest", "reqwest-default-tls", "traceparent"]
reqwest = [
Expand Down Expand Up @@ -87,3 +90,10 @@ traceparent = [
reqwest-default-tls = ["reqwest?/default-tls"]
reqwest-native-tls = ["reqwest?/native-tls"]
reqwest-rustls-tls = ["reqwest?/rustls"]
mpp = [
"dep:mpp",
"dep:alloy-json-rpc",
"dep:serde_json",
"dep:tower",
"dep:tracing",
]
5 changes: 5 additions & 0 deletions crates/transport-http/src/layers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ pub use auth::{AuthLayer, AuthService};
mod trace;
#[cfg(feature = "traceparent")]
pub use trace::{TraceParentLayer, TraceParentService};

#[cfg(all(feature = "mpp", feature = "hyper"))]
mod mpp;
#[cfg(all(feature = "mpp", feature = "hyper"))]
pub use self::mpp::{MppLayer, MppService};
138 changes: 138 additions & 0 deletions crates/transport-http/src/layers/mpp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use crate::hyper::{header, Request, Response};
use alloy_transport::{TransportError, TransportErrorKind};
use hyper::header::HeaderValue;
use mpp::{client::PaymentProvider, format_authorization, PaymentChallenge};
use std::{future::Future, pin::Pin, task};
use tower::{Layer, Service};
use tracing::debug;

/// A tower [`Layer`] that intercepts HTTP 402 responses and automatically
/// handles payment challenges using the [Machine Payments Protocol (MPP)].
///
/// When the upstream service returns a `402 Payment Required` response with
/// a `WWW-Authenticate: Payment ...` header, this layer:
/// 1. Parses the [`PaymentChallenge`] from the header
/// 2. Calls [`PaymentProvider::pay`] to obtain a [`PaymentCredential`]
/// 3. Retries the original request with an `Authorization: Payment ...` header
///
/// Non-402 responses pass through unchanged.
///
/// # Example
///
/// ```ignore
/// use alloy_transport_http::{HyperClient, MppLayer};
///
/// let client = HyperClient::new()
/// .layer(MppLayer::new(my_provider));
/// ```
///
/// [Machine Payments Protocol (MPP)]: https://github.com/tempoxyz/mpp-rs
/// [`PaymentCredential`]: mpp::PaymentCredential
#[derive(Clone, Debug)]
pub struct MppLayer<P> {
provider: P,
}

impl<P> MppLayer<P> {
/// Create a new [`MppLayer`] with the given [`PaymentProvider`].
pub const fn new(provider: P) -> Self {
Self { provider }
}
}

impl<S, P: Clone> Layer<S> for MppLayer<P> {
type Service = MppService<S, P>;

fn layer(&self, inner: S) -> Self::Service {
MppService { inner, provider: self.provider.clone() }
}
}

/// A service that handles MPP 402 payment challenges automatically.
///
/// See [`MppLayer`] for details.
#[derive(Clone, Debug)]
pub struct MppService<S, P> {
inner: S,
provider: P,
}

impl<S, B, ResBody, P> Service<Request<B>> for MppService<S, P>
where
S: Service<Request<B>, Response = Response<ResBody>> + Clone + Send + Sync + 'static,
S::Future: Send,
S::Error: std::error::Error + Send + Sync + 'static,
B: From<Vec<u8>> + Send + 'static + Clone + Sync,
ResBody: hyper::body::Body + Send + 'static,
ResBody::Error: std::error::Error + Send + Sync + 'static,
ResBody::Data: Send,
P: PaymentProvider + 'static,
{
type Response = Response<ResBody>;
type Error = TransportError;
type Future =
Pin<Box<dyn Future<Output = Result<Response<ResBody>, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(TransportErrorKind::custom)
}

fn call(&mut self, req: Request<B>) -> Self::Future {
let mut service = self.inner.clone();
let provider = self.provider.clone();
let body = req.body().clone();
let original_parts = req.uri().clone();
let original_headers = req.headers().clone();
let original_method = req.method().clone();

Box::pin(async move {
let resp = service.call(req).await.map_err(TransportErrorKind::custom)?;

if resp.status() != hyper::StatusCode::PAYMENT_REQUIRED {
return Ok(resp);
}

debug!("received 402, attempting MPP payment");

let www_auth = resp
.headers()
.get(header::WWW_AUTHENTICATE)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| {
TransportErrorKind::custom_str("402 response missing WWW-Authenticate header")
})?
.to_owned();

let challenge = PaymentChallenge::from_header(&www_auth).map_err(|e| {
TransportErrorKind::custom_str(&format!("failed to parse MPP challenge: {e}"))
})?;

let credential = provider
.pay(&challenge)
.await
.map_err(|e| TransportErrorKind::custom_str(&format!("MPP payment failed: {e}")))?;

let auth_value = format_authorization(&credential).map_err(|e| {
TransportErrorKind::custom_str(&format!("failed to format MPP authorization: {e}"))
})?;

debug!("MPP payment succeeded, retrying request");

let mut retry = Request::builder().method(original_method).uri(original_parts);

for (name, value) in original_headers.iter() {
retry = retry.header(name, value);
}

let retry = retry
.header(
header::AUTHORIZATION,
HeaderValue::from_str(&auth_value).map_err(TransportErrorKind::custom)?,
)
.body(body)
.map_err(TransportErrorKind::custom)?;

service.call(retry).await.map_err(TransportErrorKind::custom)
})
}
}
7 changes: 7 additions & 0 deletions crates/transport-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ pub use hyper_util;
mod layers;
#[cfg(all(not(target_family = "wasm"), feature = "jwt-auth"))]
pub use layers::{AuthLayer, AuthService};
#[cfg(all(not(target_family = "wasm"), feature = "mpp", feature = "hyper"))]
pub use layers::{MppLayer, MppService};
#[cfg(all(not(target_family = "wasm"), feature = "traceparent"))]
pub use layers::{TraceParentLayer, TraceParentService};

#[cfg(all(feature = "mpp", feature = "reqwest"))]
mod mpp_reqwest;
#[cfg(all(feature = "mpp", feature = "reqwest"))]
pub use mpp_reqwest::{MppReqwestClient, MppReqwestConnect, MppReqwestTransport};

#[cfg(all(not(target_family = "wasm"), feature = "hyper"))]
mod hyper_transport;
#[cfg(all(not(target_family = "wasm"), feature = "hyper"))]
Expand Down
169 changes: 169 additions & 0 deletions crates/transport-http/src/mpp_reqwest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
use crate::{Http, HttpConnect};
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_transport::{TransportError, TransportErrorKind, TransportFut, TransportResult};
use itertools::Itertools;
use mpp::{client::PaymentProvider, format_authorization, PaymentChallenge};
use std::task;
use tower::Service;
use tracing::{debug, debug_span, instrument, trace, Instrument};
use url::Url;

/// A reqwest-based HTTP client that automatically handles MPP 402 payment
/// challenges.
///
/// This wraps a [`reqwest::Client`] together with a [`PaymentProvider`].
/// When a request receives a `402 Payment Required` response with a
/// `WWW-Authenticate: Payment ...` header, the client:
/// 1. Parses the [`PaymentChallenge`]
/// 2. Calls [`PaymentProvider::pay`] to obtain a credential
/// 3. Retries the request with an `Authorization: Payment ...` header
///
/// # Example
///
/// ```ignore
/// use alloy_transport_http::{Http, MppReqwestClient};
///
/// let client = Http::mpp_reqwest("https://rpc.example.com".parse()?, provider);
/// ```
#[derive(Clone, Debug)]
pub struct MppReqwestClient<P> {
client: reqwest::Client,
provider: P,
}

impl<P> MppReqwestClient<P> {
/// Create a new [`MppReqwestClient`] with the default reqwest client.
pub fn new(provider: P) -> Self {
Self { client: reqwest::Client::new(), provider }
}

/// Create a new [`MppReqwestClient`] with a custom reqwest client.
pub const fn with_client(client: reqwest::Client, provider: P) -> Self {
Self { client, provider }
}
}

/// An [`Http`] transport using [`reqwest`] with automatic MPP payment handling.
pub type MppReqwestTransport<P> = Http<MppReqwestClient<P>>;

/// Connection details for an [`MppReqwestTransport`].
pub type MppReqwestConnect<P> = HttpConnect<MppReqwestTransport<P>>;

impl<P: PaymentProvider + 'static> Http<MppReqwestClient<P>> {
/// Create a new [`Http`] transport with MPP payment support.
pub fn mpp_reqwest(url: Url, provider: P) -> Self {
Self::with_client(MppReqwestClient::new(provider), url)
}

#[instrument(name = "request", skip_all, fields(method_names = %req.method_names().take(3).format(", ").to_string()))]
async fn do_mpp_reqwest(self, req: RequestPacket) -> TransportResult<ResponsePacket> {
let resp = self
.client
.client
.post(self.url.clone())
.json(&req)
.headers(req.headers())
.send()
.await
.map_err(TransportErrorKind::custom)?;

let status = resp.status();
debug!(%status, "received response from server");

if status == reqwest::StatusCode::PAYMENT_REQUIRED {
debug!("received 402, attempting MPP payment");

let www_auth = resp
.headers()
.get(reqwest::header::WWW_AUTHENTICATE)
.and_then(|v| v.to_str().ok())
.ok_or_else(|| {
TransportErrorKind::custom_str("402 response missing WWW-Authenticate header")
})?
.to_owned();

let challenge = PaymentChallenge::from_header(&www_auth).map_err(|e| {
TransportErrorKind::custom_str(&format!("failed to parse MPP challenge: {e}"))
})?;

let credential =
self.client.provider.pay(&challenge).await.map_err(|e| {
TransportErrorKind::custom_str(&format!("MPP payment failed: {e}"))
})?;

let auth_value = format_authorization(&credential).map_err(|e| {
TransportErrorKind::custom_str(&format!("failed to format MPP authorization: {e}"))
})?;

debug!("MPP payment succeeded, retrying request");

let retry_resp = self
.client
.client
.post(self.url)
.json(&req)
.headers(req.headers())
.header(reqwest::header::AUTHORIZATION, &auth_value)
.send()
.await
.map_err(TransportErrorKind::custom)?;

let retry_status = retry_resp.status();
debug!(%retry_status, "received retry response from server");

let body = retry_resp.bytes().await.map_err(TransportErrorKind::custom)?;

if tracing::enabled!(tracing::Level::TRACE) {
trace!(body = %String::from_utf8_lossy(&body), "response body");
} else {
debug!(bytes = body.len(), "retrieved response body");
}

if !retry_status.is_success() {
return Err(TransportErrorKind::http_error(
retry_status.as_u16(),
String::from_utf8_lossy(&body).into_owned(),
));
}

return serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)));
}

let body = resp.bytes().await.map_err(TransportErrorKind::custom)?;

if tracing::enabled!(tracing::Level::TRACE) {
trace!(body = %String::from_utf8_lossy(&body), "response body");
} else {
debug!(bytes = body.len(), "retrieved response body");
}

if !status.is_success() {
return Err(TransportErrorKind::http_error(
status.as_u16(),
String::from_utf8_lossy(&body).into_owned(),
));
}

serde_json::from_slice(&body)
.map_err(|err| TransportError::deser_err(err, String::from_utf8_lossy(&body)))
}
}

impl<P: PaymentProvider + 'static> Service<RequestPacket> for Http<MppReqwestClient<P>> {
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;

#[inline]
fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
task::Poll::Ready(Ok(()))
}

#[inline]
fn call(&mut self, req: RequestPacket) -> Self::Future {
let this = self.clone();
let span = debug_span!("MppReqwestTransport", url = %this.url);
Box::pin(this.do_mpp_reqwest(req).instrument(span.or_current()))
}
}
7 changes: 7 additions & 0 deletions crates/transport-ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ tokio = { workspace = true, features = ["sync", "rt", "time"] }
rustls = { version = "0.23", default-features = false, features = ["aws_lc_rs"] }
tokio-tungstenite = { workspace = true, features = ["rustls-tls-webpki-roots"] }

# mpp payment support
mpp = { version = "0.7", default-features = false, features = ["client"], optional = true }

# WASM only
[target.'cfg(target_family = "wasm")'.dependencies]
ws_stream_wasm = "0.7.4"

[features]
mpp = ["dep:mpp"]

5 changes: 5 additions & 0 deletions crates/transport-ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ mod native;
#[cfg(not(target_family = "wasm"))]
pub use native::{WebSocketConfig, WsConnect};

#[cfg(all(not(target_family = "wasm"), feature = "mpp"))]
mod mpp;
#[cfg(all(not(target_family = "wasm"), feature = "mpp"))]
pub use self::mpp::MppWsConnect;

#[cfg(target_family = "wasm")]
mod wasm;
#[cfg(target_family = "wasm")]
Expand Down
Loading
Loading