Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions example-service/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::{net::SocketAddr, time::Duration};
use tarpc::{client, context, tokio_serde::formats::Json};
use tokio::time::sleep;
use tracing::Instrument;
use tarpc::context::ClientContext;

#[derive(Parser)]
struct Flags {
Expand All @@ -34,10 +35,13 @@ async fn main() -> anyhow::Result<()> {
let client = WorldClient::new(client::Config::default(), transport.await?).spawn();

let hello = async move {
let mut context = ClientContext::current();
let mut context2 = ClientContext::current();

// Send the request twice, just to be safe! ;)
tokio::select! {
hello1 = client.hello(context::current(), format!("{}1", flags.name)) => { hello1 }
hello2 = client.hello(context::current(), format!("{}2", flags.name)) => { hello2 }
hello1 = client.hello(&mut context, format!("{}1", flags.name)) => { hello1 }
hello2 = client.hello(&mut context2, format!("{}2", flags.name)) => { hello2 }
}
}
.instrument(tracing::info_span!("Two Hellos"))
Expand Down
2 changes: 1 addition & 1 deletion example-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ struct Flags {
struct HelloServer(SocketAddr);

impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
async fn hello(self, _: &mut context::ServerContext, name: String) -> String {
let sleep_time =
Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng()));
time::sleep(sleep_time).await;
Expand Down
10 changes: 5 additions & 5 deletions plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ fn collect_cfg_attrs(rpcs: &[RpcMethod]) -> Vec<Vec<&Attribute>> {
/// # Example
///
/// ```no_run
/// use tarpc::{client, transport, service, server::{self, Channel}, context::Context};
/// use tarpc::{client, transport, service, server::{self, Channel}, context::ServerContext};
///
/// #[service]
/// pub trait Calculator {
Expand All @@ -401,7 +401,7 @@ fn collect_cfg_attrs(rpcs: &[RpcMethod]) -> Vec<Vec<&Attribute>> {
/// #[derive(Clone)]
/// struct CalculatorServer;
/// impl Calculator for CalculatorServer {
/// async fn add(self, context: Context, a: i32, b: i32) -> i32 {
/// async fn add(self, context: &mut ServerContext, a: i32, b: i32) -> i32 {
/// a + b
/// }
/// }
Expand Down Expand Up @@ -558,7 +558,7 @@ impl ServiceGenerator<'_> {
)| {
quote! {
#( #attrs )*
async fn #ident(self, context: ::tarpc::context::Context, #( #args ),*) -> #output;
async fn #ident(self, context: &mut ::tarpc::context::ServerContext, #( #args ),*) -> #output;
}
},
);
Expand Down Expand Up @@ -622,7 +622,7 @@ impl ServiceGenerator<'_> {
type Resp = #response_ident;


async fn serve(self, ctx: ::tarpc::context::Context, req: #request_ident)
async fn serve(self, ctx: &mut ::tarpc::context::ServerContext, req: #request_ident)
-> ::core::result::Result<#response_ident, ::tarpc::ServerError> {
match req {
#(
Expand Down Expand Up @@ -786,7 +786,7 @@ impl ServiceGenerator<'_> {
#(
#[allow(unused)]
#( #method_attrs )*
#vis fn #method_idents(&self, ctx: ::tarpc::context::Context, #( #args ),*)
#vis fn #method_idents<'a>(&'a self, ctx: &'a mut ::tarpc::context::ClientContext, #( #args ),*)
-> impl ::core::future::Future<Output = ::core::result::Result<#return_types, ::tarpc::client::RpcError>> + '_ {
let request = #request_ident::#camel_case_idents { #( #arg_pats ),* };
let resp = self.0.call(ctx, request);
Expand Down
14 changes: 7 additions & 7 deletions plugins/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ fn att_service_trait() {
}

impl Foo for () {
async fn two_part(self, _: context::Context, s: String, i: i32) -> (String, i32) {
async fn two_part(self, _: &mut context::ServerContext, s: String, i: i32) -> (String, i32) {
(s, i)
}

async fn bar(self, _: context::Context, s: String) -> String {
async fn bar(self, _: &mut context::ServerContext, s: String) -> String {
s
}

async fn baz(self, _: context::Context) {}
async fn baz(self, _: &mut context::ServerContext) {}
}
}

Expand All @@ -39,18 +39,18 @@ fn raw_idents() {
impl r#trait for () {
async fn r#await(
self,
_: context::Context,
_: &mut context::ServerContext,
r#struct: r#yield,
r#enum: i32,
) -> (r#yield, i32) {
(r#struct, r#enum)
}

async fn r#fn(self, _: context::Context, r#impl: r#yield) -> r#yield {
async fn r#fn(self, _: &mut context::ServerContext, r#impl: r#yield) -> r#yield {
r#impl
}

async fn r#async(self, _: context::Context) {}
async fn r#async(self, _: &mut context::ServerContext) {}
}
}

Expand All @@ -64,7 +64,7 @@ fn service_with_cfg_rpc() {
}

impl Foo for () {
async fn foo(self, _: context::Context) {}
async fn foo(self, _: &mut context::ServerContext) {}
}
}

Expand Down
1 change: 1 addition & 0 deletions tarpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ tracing = { version = "0.1", default-features = false, features = [
tracing-opentelemetry = { version = "0.31.0", default-features = false }
opentelemetry = { version = "0.30.0", default-features = false }
opentelemetry-semantic-conventions = "0.30.0"
anymap3 = "1.0.1"

[dev-dependencies]
assert_matches = "1.4"
Expand Down
4 changes: 2 additions & 2 deletions tarpc/examples/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub trait World {
struct HelloServer;

impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
async fn hello(self, _: &mut context::ServerContext, name: String) -> String {
format!("Hey, {name}!")
}
}
Expand All @@ -134,7 +134,7 @@ async fn main() -> anyhow::Result<()> {

println!(
"{}",
client.hello(context::current(), "friend".into()).await?
client.hello(&mut context::ClientContext::current(), "friend".into()).await?
);
Ok(())
}
6 changes: 3 additions & 3 deletions tarpc/examples/custom_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// https://opensource.org/licenses/MIT.

use futures::prelude::*;
use tarpc::context::Context;
use tarpc::context::{ClientContext, ServerContext};
use tarpc::serde_transport as transport;
use tarpc::server::{BaseChannel, Channel};
use tarpc::tokio_serde::formats::Bincode;
Expand All @@ -21,7 +21,7 @@ pub trait PingService {
struct Service;

impl PingService for Service {
async fn ping(self, _: Context) {}
async fn ping(self, _: &mut ServerContext) {}
}

#[tokio::main]
Expand Down Expand Up @@ -52,7 +52,7 @@ async fn main() -> anyhow::Result<()> {
let transport = transport::new(codec_builder.new_framed(conn), Bincode::default());
PingServiceClient::new(Default::default(), transport)
.spawn()
.ping(tarpc::context::current())
.ping(&mut ClientContext::current())
.await?;

Ok(())
Expand Down
22 changes: 13 additions & 9 deletions tarpc/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ struct Subscriber {
}

impl subscriber::Subscriber for Subscriber {
async fn topics(self, _: context::Context) -> Vec<String> {
async fn topics(self, _: &mut context::ServerContext) -> Vec<String> {
self.topics.clone()
}

async fn receive(self, _: context::Context, topic: String, message: String) {
async fn receive(self, _: &mut context::ServerContext, topic: String, message: String) {
info!(local_addr = %self.local_addr, %topic, %message, "ReceivedMessage")
}
}
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Publisher {
subscriber: subscriber::SubscriberClient,
) {
// Populate the topics
if let Ok(topics) = subscriber.topics(context::current()).await {
if let Ok(topics) = subscriber.topics(&mut context::ClientContext::current()).await {
self.clients.lock().unwrap().insert(
subscriber_addr,
Subscription {
Expand Down Expand Up @@ -263,15 +263,19 @@ impl Publisher {
}

impl publisher::Publisher for Publisher {
async fn publish(self, _: context::Context, topic: String, message: String) {
async fn publish(self, _: &mut context::ServerContext, topic: String, message: String) {
info!("received message to publish.");
let mut subscribers = match self.subscriptions.read().unwrap().get(&topic) {
None => return,
Some(subscriptions) => subscriptions.clone(),
};
let mut publications = Vec::new();


for client in subscribers.values_mut() {
publications.push(client.receive(context::current(), topic.clone(), message.clone()));
publications.push(async {
client.receive(&mut context::ClientContext::current(), topic.clone(), message.clone()).await
});
}
// Ignore failing subscribers. In a real pubsub, you'd want to continually retry until
// subscribers ack. Of course, a lot would be different in a real pubsub :)
Expand Down Expand Up @@ -342,26 +346,26 @@ async fn main() -> anyhow::Result<()> {
.spawn();

publisher
.publish(context::current(), "calculus".into(), "sqrt(2)".into())
.publish(&mut context::ClientContext::current(), "calculus".into(), "sqrt(2)".into())
.await?;

publisher
.publish(
context::current(),
&mut context::ClientContext::current(),
"cool shorts".into(),
"hello to all".into(),
)
.await?;

publisher
.publish(context::current(), "history".into(), "napoleon".to_string())
.publish(&mut context::ClientContext::current(), "history".into(), "napoleon".to_string())
.await?;

drop(_subscriber0);

publisher
.publish(
context::current(),
&mut context::ClientContext::current(),
"cool shorts".into(),
"hello to who?".into(),
)
Expand Down
4 changes: 2 additions & 2 deletions tarpc/examples/readme.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub trait World {
struct HelloServer;

impl World for HelloServer {
async fn hello(self, _: context::Context, name: String) -> String {
async fn hello(self, _: &mut context::ServerContext, name: String) -> String {
format!("Hello, {name}!")
}
}
Expand All @@ -46,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
// The client has an RPC method for each RPC defined in the annotated trait. It takes the same
// args as defined, with the addition of a Context, which is always the first arg. The Context
// specifies a deadline and trace information which can be helpful in debugging requests.
let hello = client.hello(context::current(), "Stim".to_string()).await?;
let hello = client.hello(&mut context::ClientContext::current(), "Stim".to_string()).await?;

println!("{hello}");

Expand Down
6 changes: 3 additions & 3 deletions tarpc/examples/tls_over_tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio_rustls::rustls::{
};
use tokio_rustls::{TlsAcceptor, TlsConnector};

use tarpc::context::Context;
use tarpc::context::{ClientContext, ServerContext};
use tarpc::serde_transport as transport;
use tarpc::server::{BaseChannel, Channel};
use tarpc::tokio_serde::formats::Bincode;
Expand All @@ -33,7 +33,7 @@ pub trait PingService {
struct Service;

impl PingService for Service {
async fn ping(self, _: Context) -> String {
async fn ping(self, _: &mut ServerContext) -> String {
"🔒".to_owned()
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn main() -> anyhow::Result<()> {
let transport = transport::new(codec_builder.new_framed(stream), Bincode::default());
let answer = PingServiceClient::new(Default::default(), transport)
.spawn()
.ping(tarpc::context::current())
.ping(&mut ClientContext::current())
.await?;

println!("ping answer: {answer}");
Expand Down
9 changes: 4 additions & 5 deletions tarpc/examples/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub mod double {
struct AddServer;

impl AddService for AddServer {
async fn add(self, _: context::Context, x: i32, y: i32) -> i32 {
async fn add(self, _: &mut context::ServerContext, x: i32, y: i32) -> i32 {
x + y
}
}
Expand All @@ -70,9 +70,9 @@ impl<Stub> DoubleService for DoubleServer<Stub>
where
Stub: AddStub + Clone + Send + Sync + 'static,
{
async fn double(self, _: context::Context, x: i32) -> Result<i32, String> {
async fn double(self, _: &mut context::ServerContext, x: i32) -> Result<i32, String> {
self.add_client
.add(context::current(), x, x)
.add(&mut context::ClientContext::current(), x, x)
.await
.map_err(|e| e.to_string())
}
Expand Down Expand Up @@ -193,9 +193,8 @@ async fn main() -> anyhow::Result<()> {
let double_client =
double::DoubleClient::new(client::Config::default(), to_double_server).spawn();

let ctx = context::current();
for _ in 1..=5 {
tracing::info!("{:?}", double_client.double(ctx, 1).await?);
tracing::info!("{:?}", double_client.double(&mut context::ClientContext::current(), 1).await?);
}

tracer_provider.shutdown()?;
Expand Down
Loading