diff --git a/Cargo.toml b/Cargo.toml index 4bb4347..09929c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ hyper = ["dep:hyper", "http-body-util", "restate-sdk-shared-core/http"] http_server = ["hyper", "hyper/server", "hyper/http2", "hyper-util", "tokio/net", "tokio/signal", "tokio/macros"] tracing-span-filter = ["dep:tracing-subscriber"] lambda = [ "dep:http-serde", "dep:lambda_runtime", "dep:aws_lambda_events"] +reqwest-client = ["dep:reqwest"] # jsonwebtoken crypto backend pass-through for request identity verification. # Enable exactly one (or call jsonwebtoken's CryptoProvider::install_default yourself). rust_crypto = ["restate-sdk-shared-core/rust_crypto"] @@ -52,6 +53,9 @@ uuid = { version = "1.20", optional = true } http-serde = { version = "2.1.1", optional = true } aws_lambda_events = { version = "1.0", optional = true } lambda_runtime = { version = "1.0", optional = true } +reqwest = { version = "0.13", features = ["json"], optional = true } +secrecy = { version = "0.10" } +url = { version = "2" } [dev-dependencies] tokio = { version = "1", features = ["full"] } @@ -60,6 +64,8 @@ trybuild = "1.0" reqwest = { version = "0.13", features = ["json"] } rand = "0.10" schemars = "1.2" +wiremock = "0.6" +assert_matches = "1.5" [build-dependencies] jsonptr = "0.7" diff --git a/macros/src/generator.rs b/macros/src/generator.rs index 929f021..a7cbe70 100644 --- a/macros/src/generator.rs +++ b/macros/src/generator.rs @@ -2,33 +2,13 @@ use crate::ast::{Handler, Object, Service, ServiceInner, ServiceType, Workflow}; use proc_macro2::TokenStream as TokenStream2; use proc_macro2::{Ident, Literal}; use quote::{ToTokens, format_ident, quote}; -use syn::{Attribute, PatType, Visibility}; pub(crate) struct ServiceGenerator<'a> { - pub(crate) service_ty: ServiceType, - pub(crate) restate_name: &'a str, - pub(crate) service_ident: &'a Ident, - pub(crate) client_ident: Ident, - pub(crate) serve_ident: Ident, - pub(crate) vis: &'a Visibility, - pub(crate) attrs: &'a [Attribute], - pub(crate) handlers: &'a [Handler], + service: ServiceScope<'a>, + handlers: Vec>, } impl<'a> ServiceGenerator<'a> { - fn new(service_ty: ServiceType, s: &'a ServiceInner) -> Self { - ServiceGenerator { - service_ty, - restate_name: &s.restate_name, - service_ident: &s.ident, - client_ident: format_ident!("{}Client", s.ident), - serve_ident: format_ident!("Serve{}", s.ident), - vis: &s.vis, - attrs: &s.attrs, - handlers: &s.handlers, - } - } - pub(crate) fn new_service(s: &'a Service) -> Self { Self::new(ServiceType::Service, &s.0) } @@ -41,37 +21,99 @@ impl<'a> ServiceGenerator<'a> { Self::new(ServiceType::Workflow, &s.0) } + fn new(service_ty: ServiceType, s: &'a ServiceInner) -> Self { + ServiceGenerator { + service: ServiceScope::new(service_ty, s), + handlers: s.handlers.iter().map(HandlerScope::from_handler).collect(), + } + } + fn trait_service(&self) -> TokenStream2 { - let Self { - attrs, - handlers, - vis, - service_ident, + self.service.trait_service_tokens(self.handlers.iter()) + } + + fn struct_serve(&self) -> TokenStream2 { + self.service.struct_serve_tokens() + } + + fn impl_service_for_serve(&self) -> TokenStream2 { + self.service + .impl_service_for_serve_tokens(self.handlers.iter()) + } + + fn impl_discoverable(&self) -> TokenStream2 { + self.service.impl_discoverable_tokens(self.handlers.iter()) + } + + fn struct_client(&self) -> TokenStream2 { + self.service.struct_client_tokens() + } + + fn impl_client(&self) -> TokenStream2 { + self.service.impl_client_tokens(self.handlers.iter()) + } + + fn struct_ingress(&self) -> TokenStream2 { + self.service.struct_ingress_tokens() + } + + fn impl_ingress(&self) -> TokenStream2 { + self.service.impl_ingress_tokens(self.handlers.iter()) + } +} + +impl ToTokens for ServiceGenerator<'_> { + fn to_tokens(&self, output: &mut TokenStream2) { + output.extend(vec![ + self.trait_service(), + self.struct_serve(), + self.impl_service_for_serve(), + self.impl_discoverable(), + self.struct_client(), + self.impl_client(), + self.struct_ingress(), + self.impl_ingress(), + ]); + } +} + +struct ServiceScope<'a> { + service_ty: ServiceType, + service: &'a ServiceInner, + service_literal: Literal, + client_ident: Ident, + ingress_ident: Ident, + serve_ident: Ident, +} + +impl ServiceScope<'_> { + fn new<'a>(service_ty: ServiceType, s: &'a ServiceInner) -> ServiceScope<'a> { + let service_literal = Literal::string(&s.restate_name); + let client_ident = format_ident!("{}Client", s.ident); + let ingress_ident = format_ident!("Ingress{}", s.ident); + let serve_ident = format_ident!("Serve{}", s.ident); + + ServiceScope { service_ty, + service: s, + service_literal, + client_ident, + ingress_ident, serve_ident, - .. - } = self; + } + } + fn trait_service_tokens<'a>( + &self, + handlers: impl IntoIterator>, + ) -> TokenStream2 { + let service_ident = &self.service.ident; + let serve_ident = &self.serve_ident; + let vis = &self.service.vis; + let attrs = &self.service.attrs; let handler_fns = handlers - .iter() - .map( - |Handler { attrs, ident, arg, is_shared, output_ok, output_err, .. }| { - let args = arg.iter(); - - let ctx = match (&service_ty, is_shared) { - (ServiceType::Service, _) => quote! { ::restate_sdk::prelude::Context }, - (ServiceType::Object, true) => quote! { ::restate_sdk::prelude::SharedObjectContext }, - (ServiceType::Object, false) => quote! { ::restate_sdk::prelude::ObjectContext }, - (ServiceType::Workflow, true) => quote! { ::restate_sdk::prelude::SharedWorkflowContext }, - (ServiceType::Workflow, false) => quote! { ::restate_sdk::prelude::WorkflowContext }, - }; - - quote! { - #( #attrs )* - fn #ident(&self, context: #ctx, #( #args ),*) -> impl std::future::Future> + ::core::marker::Send; - } - }, - ); + .into_iter() + .map(|handler| handler.trait_method_tokens(self)); quote! { #( #attrs )* @@ -86,12 +128,9 @@ impl<'a> ServiceGenerator<'a> { } } - fn struct_serve(&self) -> TokenStream2 { - let &Self { - vis, - ref serve_ident, - .. - } = self; + fn struct_serve_tokens(&self) -> TokenStream2 { + let vis = &self.service.vis; + let serve_ident = &self.serve_ident; quote! { /// Struct implementing [::restate_sdk::service::Service], to be used with [::restate_sdk::endpoint::Builder::with_service]. @@ -102,41 +141,15 @@ impl<'a> ServiceGenerator<'a> { } } - fn impl_service_for_serve(&self) -> TokenStream2 { - let Self { - serve_ident, - service_ident, - handlers, - .. - } = self; - - let match_arms = handlers.iter().map(|handler| { - let handler_ident = &handler.ident; - - let get_input_and_call = if handler.arg.is_some() { - quote! { - let (input, metadata) = ctx.input().await; - let fut = S::#handler_ident(&service_clone, (&ctx, metadata).into(), input); - } - } else { - quote! { - let (_, metadata) = ctx.input::<()>().await; - let fut = S::#handler_ident(&service_clone, (&ctx, metadata).into()); - } - }; - - let handler_literal = Literal::string(&handler.restate_name); - - quote! { - #handler_literal => { - #get_input_and_call - let res = fut.await.map_err(::restate_sdk::errors::HandlerError::from); - ctx.handle_handler_result(res); - ctx.end(); - Ok(()) - } - } - }); + fn impl_service_for_serve_tokens<'a>( + &self, + handlers: impl IntoIterator>, + ) -> TokenStream2 { + let serve_ident = &self.serve_ident; + let service_ident = &self.service.ident; + let match_arms = handlers + .into_iter() + .map(HandlerScope::serve_match_arm_tokens); quote! { impl ::restate_sdk::service::Service for #serve_ident @@ -162,98 +175,27 @@ impl<'a> ServiceGenerator<'a> { } } - fn impl_discoverable(&self) -> TokenStream2 { - let Self { - service_ty, - serve_ident, - service_ident, - handlers, - restate_name, - .. - } = self; - - let service_literal = Literal::string(restate_name); - - let service_ty_token = match service_ty { - ServiceType::Service => quote! { ::restate_sdk::discovery::ServiceType::Service }, - ServiceType::Object => { - quote! { ::restate_sdk::discovery::ServiceType::VirtualObject } - } - ServiceType::Workflow => quote! { ::restate_sdk::discovery::ServiceType::Workflow }, - }; - - let handlers = handlers.iter().map(|handler| { - let handler_literal = Literal::string(&handler.restate_name); - - let handler_ty = if handler.is_shared { - quote! { Some(::restate_sdk::discovery::HandlerType::Shared) } - } else if *service_ty == ServiceType::Workflow { - quote! { Some(::restate_sdk::discovery::HandlerType::Workflow) } - } else { - // Macro has same defaulting rules of the discovery manifest - quote! { None } - }; - - let lazy_state = if handler.is_lazy_state { - quote! { Some(true) } - } else { - quote! { None} - }; - - let input_schema = match &handler.arg { - Some(PatType { ty, .. }) => { - quote! { - Some(::restate_sdk::discovery::InputPayload::from_metadata::<#ty>()) - } - } - None => quote! { - Some(::restate_sdk::discovery::InputPayload::empty()) - } - }; - - let output_ty = &handler.output_ok; - let output_schema = match output_ty { - syn::Type::Tuple(tuple) if tuple.elems.is_empty() => quote! { - Some(::restate_sdk::discovery::OutputPayload::empty()) - }, - _ => quote! { - Some(::restate_sdk::discovery::OutputPayload::from_metadata::<#output_ty>()) - } - }; - - quote! { - ::restate_sdk::discovery::Handler { - name: ::restate_sdk::discovery::HandlerName::try_from(#handler_literal).expect("Handler name valid"), - input: #input_schema, - output: #output_schema, - ty: #handler_ty, - documentation: None, - metadata: Default::default(), - abort_timeout: None, - inactivity_timeout: None, - journal_retention: None, - idempotency_retention: None, - workflow_completion_retention: None, - enable_lazy_state: #lazy_state, - ingress_private: None, - retry_policy_initial_interval: None, - retry_policy_max_interval: None, - retry_policy_max_attempts: None, - retry_policy_exponentiation_factor: None, - retry_policy_on_max_attempts: None, - } - } - }); + fn impl_discoverable_tokens<'a>( + &self, + handlers: impl IntoIterator>, + ) -> TokenStream2 { + let serve_ident = &self.serve_ident; + let service_ident = &self.service.ident; + let service_literal = &self.service_literal; + let service_ty_token = self.discovery_service_type_tokens(); + let handlers = handlers + .into_iter() + .map(|handler| handler.discovery_handler_tokens(self)); quote! { impl ::restate_sdk::service::Discoverable for #serve_ident where S: #service_ident, { fn discover() -> ::restate_sdk::discovery::Service { - ::restate_sdk::discovery::Service { + use ::restate_sdk::discovery; + discovery::Service { + name: discovery::ServiceName::try_from(#service_literal).unwrap(), ty: #service_ty_token, - name: ::restate_sdk::discovery::ServiceName::try_from(#service_literal.to_string()) - .expect("Service name valid"), handlers: vec![#( #handlers ),*], documentation: None, metadata: Default::default(), @@ -274,115 +216,162 @@ impl<'a> ServiceGenerator<'a> { } } - fn struct_client(&self) -> TokenStream2 { - let &Self { - vis, - ref client_ident, - // service_ident, - ref service_ty, - .. - } = self; - - let key_field = match service_ty { - ServiceType::Service => quote! {}, - ServiceType::Object | ServiceType::Workflow => quote! { - key: String, - }, + fn discovery_service_type_tokens(&self) -> TokenStream2 { + match self.service_ty { + ServiceType::Service => quote! { ::restate_sdk::discovery::ServiceType::Service }, + ServiceType::Object => quote! { ::restate_sdk::discovery::ServiceType::VirtualObject }, + ServiceType::Workflow => quote! { ::restate_sdk::discovery::ServiceType::Workflow }, + } + } + + fn client_into_impl_tokens(&self) -> TokenStream2 { + let client_ident = &self.client_ident; + let service_literal = &self.service_literal; + + let service_client_impl = quote! { + impl ::restate_sdk::context::ServiceClient for #client_ident<'_> { + const SERVICE_NAME: &'static str = #service_literal; + } }; - let into_client_impl = match service_ty { - ServiceType::Service => { - quote! { - impl<'ctx> ::restate_sdk::context::IntoServiceClient<'ctx> for #client_ident<'ctx> { - fn create_client(ctx: &'ctx ::restate_sdk::endpoint::ContextInternal) -> Self { - Self { ctx } - } + let specific_impl = match self.service_ty { + ServiceType::Service => quote! { + impl<'ctx> ::restate_sdk::context::IntoServiceClient<'ctx> for #client_ident<'ctx> { + fn create_client(ctx: &'ctx ::restate_sdk::endpoint::ContextInternal) -> Self { + Self { ctx } } } - } + }, ServiceType::Object => quote! { - impl<'ctx> ::restate_sdk::context::IntoObjectClient<'ctx> for #client_ident<'ctx> { + impl<'ctx> ::restate_sdk::context::KeyedClient<'ctx> for #client_ident<'ctx> { fn create_client(ctx: &'ctx ::restate_sdk::endpoint::ContextInternal, key: String) -> Self { Self { ctx, key } } } + + impl<'ctx> ::restate_sdk::context::IntoObjectClient<'ctx> for #client_ident<'ctx> {} }, ServiceType::Workflow => quote! { - impl<'ctx> ::restate_sdk::context::IntoWorkflowClient<'ctx> for #client_ident<'ctx> { + impl<'ctx> ::restate_sdk::context::KeyedClient<'ctx> for #client_ident<'ctx> { fn create_client(ctx: &'ctx ::restate_sdk::endpoint::ContextInternal, key: String) -> Self { Self { ctx, key } } } + + impl<'ctx> ::restate_sdk::context::IntoWorkflowClient<'ctx> for #client_ident<'ctx> {} }, }; quote! { - /// Struct exposing the client to invoke [#service_ident] from another service. - #vis struct #client_ident<'ctx> { - ctx: &'ctx ::restate_sdk::endpoint::ContextInternal, + #service_client_impl + #specific_impl + } + } + + fn ingress_into_impl_tokens(&self) -> TokenStream2 { + let client_ident = &self.client_ident; + let ingress_ident = &self.ingress_ident; + match self.service_ty { + ServiceType::Service => quote! { + impl<'ctx> ::restate_sdk::ingress::builder::IntoServiceRequest for #client_ident<'ctx> { + type Request<'a> = #ingress_ident<'a>; + + fn create_request<'a>(client: &'a ::restate_sdk::ingress::Client) -> Self::Request<'a> { + #ingress_ident { + client, + } + } + } + }, + ServiceType::Object => quote! { + impl<'ctx> ::restate_sdk::ingress::builder::IntoObjectRequest for #client_ident<'ctx> { + type Request<'a> = #ingress_ident<'a>; + + fn create_request<'a>(client: &'a ::restate_sdk::ingress::Client, key: String) -> Self::Request<'a> { + #ingress_ident { + client, + key, + } + } + } + }, + ServiceType::Workflow => quote! { + impl<'ctx> ::restate_sdk::ingress::builder::IntoWorkflowRequest for #client_ident<'ctx> { + type Request<'a> = #ingress_ident<'a>; + + fn create_request<'a>(client: &'a ::restate_sdk::ingress::Client, key: String) -> Self::Request<'a> { + #ingress_ident { + client, + key, + } + } + } + }, + } + } + + fn struct_ingress_tokens(&self) -> TokenStream2 { + let vis = &self.service.vis; + let ingress_ident = &self.ingress_ident; + let key_field = self.key_field_tokens(); + let ingress_into_client_impl = self.ingress_into_impl_tokens(); + let service_ident = &self.service.ident; + let doc_msg = format!( + "Struct exposing the ingress client to invoke [`{service_ident}`] from from the ingress API." + ); + + quote! { + #[doc = #doc_msg] + #vis struct #ingress_ident<'a> { + client: &'a ::restate_sdk::ingress::Client, #key_field } - #into_client_impl + #ingress_into_client_impl } } - fn impl_client(&self) -> TokenStream2 { - let &Self { - vis, - ref client_ident, - service_ident, - handlers, - restate_name, - service_ty, - .. - } = self; - - let service_literal = Literal::string(restate_name); - - let handlers_fns = handlers.iter().map(|handler| { - let handler_ident = &handler.ident; - let handler_literal = Literal::string(&handler.restate_name); - - let argument = match &handler.arg { - None => quote! {}, - Some(PatType { - ty, .. - }) => quote! { req: #ty } - }; - let argument_ty = match &handler.arg { - None => quote! { () }, - Some(PatType { - ty, .. - }) => quote! { #ty } - }; - let res_ty = &handler.output_ok; - let input = match &handler.arg { - None => quote! { () }, - Some(_) => quote! { req } - }; - let request_target = match service_ty { - ServiceType::Service => quote! { - ::restate_sdk::context::RequestTarget::service(#service_literal, #handler_literal) - }, - ServiceType::Object => quote! { - ::restate_sdk::context::RequestTarget::object(#service_literal, &self.key, #handler_literal) - }, - ServiceType::Workflow => quote! { - ::restate_sdk::context::RequestTarget::workflow(#service_literal, &self.key, #handler_literal) - } - }; + fn struct_client_tokens(&self) -> TokenStream2 { + let vis = &self.service.vis; + let client_ident = &self.client_ident; + let key_field = self.key_field_tokens(); + let client_into_impl_tokens = self.client_into_impl_tokens(); + let service_ident = &self.service.ident; + let doc_msg = format!( + "Struct exposing the client to invoke [`{service_ident}`] from another service." + ); - quote! { - #vis fn #handler_ident(&self, #argument) -> ::restate_sdk::context::Request<'ctx, #argument_ty, #res_ty> { - self.ctx.request(#request_target, #input) - } + quote! { + #[doc = #doc_msg] + #vis struct #client_ident<'ctx> { + ctx: &'ctx ::restate_sdk::endpoint::ContextInternal, + #key_field } - }); + #client_into_impl_tokens + } + } + + fn key_field_tokens(&self) -> TokenStream2 { + match self.service_ty { + ServiceType::Service => quote! {}, + ServiceType::Object | ServiceType::Workflow => quote! { key: String, }, + } + } + + fn impl_client_tokens<'a>( + &self, + handlers: impl IntoIterator>, + ) -> TokenStream2 { + let client_ident = &self.client_ident; + let handlers_fns = handlers + .into_iter() + .map(|handler| handler.client_method_tokens(self)); + let service_ident = &self.service.ident; let doc_msg = format!( "Struct exposing the client to invoke [`{service_ident}`] from another service." ); + quote! { #[doc = #doc_msg] impl<'ctx> #client_ident<'ctx> { @@ -390,17 +379,249 @@ impl<'a> ServiceGenerator<'a> { } } } + + fn impl_ingress_tokens<'a>( + &self, + handlers: impl IntoIterator>, + ) -> TokenStream2 { + let ingress_ident = &self.ingress_ident; + let handler_fns = handlers + .into_iter() + .map(|handler| handler.ingress_method_tokens(self)); + + quote! { + impl #ingress_ident<'_> { + #( #handler_fns )* + } + } + } } -impl ToTokens for ServiceGenerator<'_> { - fn to_tokens(&self, output: &mut TokenStream2) { - output.extend(vec![ - self.trait_service(), - self.struct_serve(), - self.impl_service_for_serve(), - self.impl_discoverable(), - self.struct_client(), - self.impl_client(), - ]); +struct HandlerScope<'a> { + handler: &'a Handler, + arg_ty: Option<&'a syn::Type>, + handler_literal: Literal, +} + +impl HandlerScope<'_> { + fn from_handler(handler: &Handler) -> HandlerScope<'_> { + let arg_ty = handler.arg.as_ref().map(|pat_ty| pat_ty.ty.as_ref()); + let handler_literal = Literal::string(&handler.restate_name); + + HandlerScope { + handler, + arg_ty, + handler_literal, + } + } + + fn trait_method_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + let attrs = &self.handler.attrs; + let ident = &self.handler.ident; + let ctx = self.handler_client_tokens(service); + let args = self.handler.arg.iter(); + let output_ok = &self.handler.output_ok; + let output_err = &self.handler.output_err; + + quote! { + #( #attrs )* + fn #ident(&self, context: #ctx, #( #args ),*) -> impl std::future::Future> + ::core::marker::Send; + } + } + + fn handler_client_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + match (service.service_ty, self.handler.is_shared) { + (ServiceType::Service, _) => quote! { ::restate_sdk::prelude::Context }, + (ServiceType::Object, true) => quote! { ::restate_sdk::prelude::SharedObjectContext }, + (ServiceType::Object, false) => quote! { ::restate_sdk::prelude::ObjectContext }, + (ServiceType::Workflow, true) => { + quote! { ::restate_sdk::prelude::SharedWorkflowContext } + } + (ServiceType::Workflow, false) => quote! { ::restate_sdk::prelude::WorkflowContext }, + } + } + + fn serve_match_arm_tokens(&self) -> TokenStream2 { + let handler_ident = &self.handler.ident; + let get_input_and_call = if self.arg_ty.is_some() { + quote! { + let (input, metadata) = ctx.input().await; + let fut = S::#handler_ident(&service_clone, (&ctx, metadata).into(), input); + } + } else { + quote! { + let (_, metadata) = ctx.input::<()>().await; + let fut = S::#handler_ident(&service_clone, (&ctx, metadata).into()); + } + }; + let handler_literal = &self.handler_literal; + + quote! { + #handler_literal => { + #get_input_and_call + let res = fut.await.map_err(::restate_sdk::errors::HandlerError::from); + ctx.handle_handler_result(res); + ctx.end(); + Ok(()) + } + } + } + + fn discovery_handler_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + let handler_literal = &self.handler_literal; + let handler_ty = self.discovery_handler_type_tokens(service); + let input_schema = self.discovery_input_schema_tokens(); + let output_schema = self.discovery_output_schema_tokens(); + let lazy_state = self.discovery_lazy_state_tokens(); + + quote! { + ::restate_sdk::discovery::Handler { + name: ::restate_sdk::discovery::HandlerName::try_from(#handler_literal).expect("Handler name valid"), + ty: #handler_ty, + input: #input_schema, + output: #output_schema, + enable_lazy_state: #lazy_state, + documentation: None, + metadata: Default::default(), + abort_timeout: None, + inactivity_timeout: None, + journal_retention: None, + idempotency_retention: None, + workflow_completion_retention: None, + ingress_private: None, + retry_policy_initial_interval: None, + retry_policy_max_interval: None, + retry_policy_max_attempts: None, + retry_policy_exponentiation_factor: None, + retry_policy_on_max_attempts: None, + } + } + } + + fn discovery_handler_type_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + if self.handler.is_shared { + quote! { Some(::restate_sdk::discovery::HandlerType::Shared) } + } else if service.service_ty == ServiceType::Workflow { + quote! { Some(::restate_sdk::discovery::HandlerType::Workflow) } + } else { + // Macro has same defaulting rules of the discovery manifest + quote! { None } + } + } + + fn discovery_input_schema_tokens(&self) -> TokenStream2 { + match self.arg_ty { + Some(ty) => quote! { + Some(::restate_sdk::discovery::InputPayload::from_metadata::<#ty>()) + }, + None => quote! { + Some(::restate_sdk::discovery::InputPayload::empty()) + }, + } + } + + fn discovery_output_schema_tokens(&self) -> TokenStream2 { + let output_ty = &self.handler.output_ok; + match output_ty { + syn::Type::Tuple(tuple) if tuple.elems.is_empty() => quote! { + Some(::restate_sdk::discovery::OutputPayload::empty()) + }, + _ => quote! { + Some(::restate_sdk::discovery::OutputPayload::from_metadata::<#output_ty>()) + }, + } + } + + fn discovery_lazy_state_tokens(&self) -> TokenStream2 { + if self.handler.is_lazy_state { + quote! { Some(true) } + } else { + quote! { None } + } + } + + fn client_method_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + let vis = &service.service.vis; + let handler_ident = &self.handler.ident; + let argument = self.argument_tokens(); + let argument_ty = self.argument_type_tokens(); + let response_ty = &self.handler.output_ok; + let request_target = self.context_request_target_tokens(service); + let input = self.input_tokens(); + + quote! { + #vis fn #handler_ident(&self, #argument) -> ::restate_sdk::context::Request<'ctx, #argument_ty, #response_ty> { + self.ctx.request(#request_target, #input) + } + } + } + + fn ingress_method_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + let vis = &service.service.vis; + let handler_ident = &self.handler.ident; + let argument = self.argument_tokens(); + let response_ty = &self.handler.output_ok; + let request_call = self.ingress_request_call_tokens(service); + + quote! { + #vis fn #handler_ident(&self, #argument) -> ::restate_sdk::ingress::Request<#response_ty> { + #request_call + } + } + } + + fn context_request_target_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + let service_literal = &service.service_literal; + let handler_literal = &self.handler_literal; + match service.service_ty { + ServiceType::Service => quote! { + ::restate_sdk::context::RequestTarget::service(#service_literal, #handler_literal) + }, + ServiceType::Object => quote! { + ::restate_sdk::context::RequestTarget::object(#service_literal, &self.key, #handler_literal) + }, + ServiceType::Workflow => quote! { + ::restate_sdk::context::RequestTarget::workflow(#service_literal, &self.key, #handler_literal) + }, + } + } + + fn ingress_request_call_tokens(&self, service: &ServiceScope<'_>) -> TokenStream2 { + let service_literal = &service.service_literal; + let handler_literal = &self.handler_literal; + let input = self.input_tokens(); + match service.service_ty { + ServiceType::Service => quote! { + ::restate_sdk::ingress::builder::service(self.client, concat!("/", #service_literal, "/", #handler_literal), #input) + }, + ServiceType::Object => quote! { + ::restate_sdk::ingress::builder::object(self.client, #service_literal, &self.key, #handler_literal, #input) + }, + ServiceType::Workflow => quote! { + ::restate_sdk::ingress::builder::workflow(self.client, #service_literal, &self.key, #handler_literal, #input) + }, + } + } + + fn argument_tokens(&self) -> TokenStream2 { + match self.arg_ty { + None => quote! {}, + Some(arg_ty) => quote! { req: #arg_ty }, + } + } + + fn argument_type_tokens(&self) -> TokenStream2 { + match self.arg_ty { + None => quote! { () }, + Some(arg_ty) => quote! { #arg_ty }, + } + } + + fn input_tokens(&self) -> TokenStream2 { + if self.arg_ty.is_some() { + quote! { req } + } else { + quote! { () } + } } } diff --git a/src/context/mod.rs b/src/context/mod.rs index 4437292..61c7c22 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -575,20 +575,26 @@ pub trait ContextClient<'ctx>: private::SealedContext<'ctx> { } } +/// Base trait for all generated service clients. Provides the service name. +pub trait ServiceClient { + const SERVICE_NAME: &'static str; +} + /// Trait used by codegen to create the service client. -pub trait IntoServiceClient<'ctx>: Sized { +pub trait IntoServiceClient<'ctx>: ServiceClient + Sized { fn create_client(ctx: &'ctx ContextInternal) -> Self; } -/// Trait used by codegen to use the object client. -pub trait IntoObjectClient<'ctx>: Sized { +/// Trait for keyed clients (objects and workflows) that can be constructed with a key. +pub trait KeyedClient<'ctx>: ServiceClient + Sized { fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self; } +/// Trait used by codegen to use the object client. +pub trait IntoObjectClient<'ctx>: KeyedClient<'ctx> {} + /// Trait used by codegen to use the workflow client. -pub trait IntoWorkflowClient<'ctx>: Sized { - fn create_client(ctx: &'ctx ContextInternal, key: String) -> Self; -} +pub trait IntoWorkflowClient<'ctx>: KeyedClient<'ctx> {} impl<'ctx, CTX: private::SealedContext<'ctx>> ContextClient<'ctx> for CTX {} diff --git a/src/ingress.rs b/src/ingress.rs new file mode 100644 index 0000000..89b124d --- /dev/null +++ b/src/ingress.rs @@ -0,0 +1,651 @@ +//! Ingress client for invoking Restate handlers over HTTP. +//! +//! This module provides a typed HTTP client surface for calling Restate +//! services, virtual objects, and workflows from outside handler execution. +//! +//! ## When to use ingress +//! +//! Use this module when you want to invoke Restate handlers from: +//! +//! - external services, +//! - jobs/cron processes, +//! - tests/integration harnesses, +//! - CLIs or admin tools. +//! +//! If you are calling another handler from within a currently executing +//! handler, prefer the context client APIs in [`crate::context`]. +//! +//! ## Core types +//! +//! - [`Client`]: shared HTTP client for building typed service/object/workflow clients. +//! - [`ServerUrl`]: validated base URL used to construct request targets. +//! - [`AuthToken`]: bearer token wrapper with redacted debug output. +//! - [`Request`]: fluent request builder returned by generated client methods. +//! - [`RequestError`]: errors for request build/send/status/JSON handling. +//! +//! ## Typical flow +//! +//! 1. Construct a [`ServerUrl`] and optional [`AuthToken`]. +//! 2. Create a [`Client`]. +//! 3. Obtain a generated client with: +//! - `client.service_client::()` +//! - `client.object_client::(key)` +//! - `client.workflow_client::(key)` +//! 4. Invoke a method, optionally configure request metadata, then call: +//! - `.header(...)` / `.headers(...)` +//! - `.idempotency_key(...)` +//! - `.timeout(...)` +//! - `.delay(...)` +//! - `.call(&executor).await` +//! +//! ## Typed client example +//! +//! ```no_run +//! # use restate_sdk::ingress::{AuthToken, Client, ServerUrl, RequestError}; +//! # use restate_sdk::ingress::executor::{Executor, HttpRequest, HttpResponse}; +//! #[restate_sdk::service] +//! trait Greeter { +//! async fn greet(name: String) -> restate_sdk::errors::HandlerResult; +//! } +//! # struct DemoExecutor; +//! # impl Executor for DemoExecutor { +//! # fn execute(&self, _request: HttpRequest) -> impl std::future::Future> + Send { +//! # async { Err(RequestError::Request("demo executor".to_string())) } +//! # } +//! # } +//! # fn main() -> Result<(), Box> { +//! # let executor = DemoExecutor; +//! let server_url: ServerUrl = "https://api.example.com".try_into()?; +//! let token = AuthToken::new("token".to_string().into())?; +//! let client = Client::new(server_url, Some(token)); +//! +//! # async fn invoke(client: Client) -> Result<(), Box> { +//! let executor = DemoExecutor; +//! let response: String = client +//! .service_client::() +//! .greet("Ada".to_string()) +//! .idempotency_key("greet-ada") +//! .call(&executor) +//! .await?; +//! +//! # let _ = response; +//! # Ok(()) +//! # } +//! # let _ = invoke(client); +//! # Ok(()) +//! # } +//! ``` +//! +//! ## Reqwest executor +//! +//! Enable feature `reqwest-client` to use `reqwest::Client` as an ingress executor. +//! +//! ## Notes +//! +//! - The ingress client is cheap to clone and can be shared across threads. +//! - Non-success HTTP responses are surfaced as [`RequestError::Status`] with +//! both status code and response body. +//! - Successful responses are JSON-decoded into the handler return type. +//! +use http::Uri; +use http::header::AUTHORIZATION; +use http::header::{HeaderMap, HeaderName, HeaderValue, InvalidHeaderValue}; +use http::uri::Authority; +use secrecy::{ExposeSecret, SecretString}; +use std::marker::PhantomData; +use std::time::Duration; +use thiserror::Error; +use url::Url; + +use crate::serde::Deserialize; + +pub use secrecy; + +#[derive(Clone)] +/// Shared ingress HTTP client used by generated service/object/workflow clients. +/// +/// A `Client` holds a validated [`ServerUrl`] and optional [`AuthToken`]. +/// It is cheap to clone and can be reused across threads. +/// +/// # Example +/// +/// ```no_run +/// # use restate_sdk::ingress::{AuthToken, Client, ServerUrl}; +/// # fn main() -> Result<(), Box> { +/// let server_url: ServerUrl = "https://api.example.com".try_into()?; +/// let token = AuthToken::new("token".to_string().into())?; +/// let client = Client::new(server_url, Some(token)); +/// # let _ = client; +/// # Ok(()) +/// # } +/// ``` +pub struct Client { + server_url: ServerUrl, + auth_token: Option, +} + +impl std::fmt::Debug for Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("Client(..)") + } +} + +impl Client { + pub fn new(server_url: ServerUrl, auth_token: Option) -> Client { + Client { + server_url, + auth_token, + } + } + + pub fn service_client(&self) -> C::Request<'_> + where + C: builder::IntoServiceRequest, + { + C::create_request(self) + } + + pub fn object_client(&self, key: impl Into) -> C::Request<'_> + where + C: builder::IntoObjectRequest, + { + C::create_request(self, key.into()) + } + + pub fn workflow_client(&self, key: impl Into) -> C::Request<'_> + where + C: builder::IntoWorkflowRequest, + { + C::create_request(self, key.into()) + } +} + +#[derive(Debug, Clone)] +/// Validated base URL for ingress requests. +/// +/// `ServerUrl` must be an absolute HTTP(S) URL with a host and without query or +/// fragment components. It is used as the base for service, object, and workflow +/// request paths. +/// +/// # Example +/// +/// ``` +/// # use restate_sdk::ingress::ServerUrl; +/// # use http::Uri; +/// # use http::uri::Authority; +/// # use url::Url; +/// # use std::net::SocketAddr; +/// # fn main() -> Result<(), Box> { +/// let from_str: ServerUrl = "http://localhost:8080".try_into()?; +/// let from_string: ServerUrl = "http://localhost:8080".to_string().try_into()?; +/// let from_url: ServerUrl = Url::parse("http://localhost:8080")?.try_into()?; +/// let uri: Uri = "http://localhost:8080".parse()?; +/// let from_uri: ServerUrl = uri.try_into()?; +/// +/// // (defaults scheme to http) +/// let authority: Authority = "localhost:8080".parse()?; +/// let from_authority: ServerUrl = authority.try_into()?; +/// let socket: SocketAddr = "127.0.0.1:8080".parse()?; +/// let from_socket: ServerUrl = socket.try_into()?; +/// # let _ = (from_str, from_string, from_url, from_uri, from_authority, from_socket); +/// # Ok(()) +/// # } +/// ``` +pub struct ServerUrl(Url); + +impl ServerUrl { + pub fn build_for_path(&self, request_path: &str) -> Result { + Ok(self.0.join(request_path)?) + } + + pub fn build_for_keyed(&self, service: &str, key: &str, handler: &str) -> Url { + let mut request_url = self.0.clone(); + { + let mut path = request_url.path_segments_mut().unwrap(); + path.pop_if_empty(); + path.push(service); + path.push(key); + path.push(handler); + } + request_url + } +} + +impl TryFrom for ServerUrl { + type Error = url::ParseError; + + fn try_from(value: Url) -> Result { + if value.cannot_be_a_base() + || value.host_str().is_none() + || (value.scheme() != "http" && value.scheme() != "https") + || value.query().is_some() + || value.fragment().is_some() + { + return Err(url::ParseError::RelativeUrlWithoutBase); + } + Ok(Self(value)) + } +} + +impl TryFrom for ServerUrl { + type Error = url::ParseError; + + fn try_from(value: std::net::SocketAddr) -> Result { + Self::try_from(Authority::try_from(value.to_string().as_str()).unwrap()) + } +} + +impl TryFrom for ServerUrl { + type Error = url::ParseError; + + fn try_from(value: Authority) -> Result { + let base_uri = http::Uri::builder() + .scheme("http") + .authority(value.as_str()) + .path_and_query("/") + .build() + .unwrap(); + Self::try_from(base_uri) + } +} + +impl TryFrom for ServerUrl { + type Error = url::ParseError; + + fn try_from(value: Uri) -> Result { + Self::try_from(value.to_string()) + } +} + +impl TryFrom for ServerUrl { + type Error = url::ParseError; + + fn try_from(value: String) -> Result { + Self::try_from(value.as_str()) + } +} + +impl TryFrom<&str> for ServerUrl { + type Error = url::ParseError; + + fn try_from(value: &str) -> Result { + let url = Url::parse(value)?; + Self::try_from(url) + } +} + +#[derive(Clone, Debug)] +/// Bearer token used for ingress authentication. +/// +/// This wraps [`SecretString`] and builds an `Authorization: Bearer ...` header. +/// Header values are marked sensitive before insertion. +pub struct AuthToken(SecretString); + +impl AuthToken { + pub fn new(token: SecretString) -> Result { + let _ = Self::parse_header_value(&token)?; + Ok(Self(token)) + } + + pub fn to_request_header(&self) -> (HeaderName, HeaderValue) { + let mut authorization = Self::parse_header_value(&self.0).unwrap(); + authorization.set_sensitive(true); + (AUTHORIZATION, authorization) + } + + fn parse_header_value(token: &SecretString) -> Result { + HeaderValue::from_str(&format!("Bearer {}", token.expose_secret())) + } +} + +impl TryFrom for AuthToken { + type Error = InvalidHeaderValue; + + fn try_from(value: SecretString) -> Result { + Self::new(value) + } +} + +#[derive(Debug, Error)] +/// Error returned while building, sending, or decoding an ingress request. +pub enum RequestError { + /// JSON serialization or deserialization failed for request/response payloads. + #[error("request JSON serde failed: {0}")] + Serde(#[from] serde_json::Error), + /// Request URL construction failed (invalid base URL or request path composition). + #[error("request invalid path: {0}")] + InvalidPath(#[from] url::ParseError), + /// Transport-layer request execution failed in the configured executor. + #[error("request failed: {0}")] + Request(String), + /// Invalid header key or value was provided when building request metadata. + #[error("invalid header: {0}")] + InvalidHeader(String), + /// The server returned a non-success status code; body contains response text. + #[error("non-success status {status}: {body}")] + Status { status: u16, body: String }, +} + +impl From for RequestError { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +pub struct Request { + request: Result, + _res: PhantomData, +} + +/// Fluent request builder returned by generated ingress clients. +/// +/// `Request` lets callers add headers, idempotency, timeout, and delay before +/// executing the HTTP call via [`Request::call`]. +impl From for Request { + fn from(value: RequestError) -> Self { + Self { + request: Err(value), + _res: PhantomData, + } + } +} + +impl Request { + fn from_request_result(request: Result) -> Self { + Self { + request, + _res: PhantomData, + } + } + + /// Adds a single HTTP header to the ingress request. + /// See the module-level typed client example for end-to-end usage. + pub fn header(mut self, key: K, value: V) -> Self + where + HeaderName: TryFrom, + >::Error: Into, + HeaderValue: TryFrom, + >::Error: Into, + { + self.request = match self.request { + Ok(mut request) => { + let name = match HeaderName::try_from(key) { + Ok(name) => name, + Err(err) => return RequestError::InvalidHeader(err.into().to_string()).into(), + }; + let value = match HeaderValue::try_from(value) { + Ok(value) => value, + Err(err) => return RequestError::InvalidHeader(err.into().to_string()).into(), + }; + request.headers.insert(name, value); + Ok(request) + } + Err(err) => Err(err), + }; + self + } + + /// Adds a map of HTTP headers to the ingress request. + /// See the module-level typed client example for end-to-end usage. + pub fn headers(mut self, headers: HeaderMap) -> Self { + if let Ok(mut request) = self.request { + request.headers.extend(headers); + self.request = Ok(request); + } + self + } + + /// Sets a per-request timeout for the ingress call. + pub fn timeout(mut self, timeout: Duration) -> Self { + if let Ok(mut request) = self.request { + request.timeout = Some(timeout); + self.request = Ok(request); + } + self + } + + /// Sets Restate's idempotency key for this ingress request. + pub fn idempotency_key(mut self, idempotency_key: impl AsRef) -> Self { + if let Ok(mut request) = self.request { + let value = match HeaderValue::from_str(idempotency_key.as_ref()) { + Ok(value) => value, + Err(err) => return RequestError::InvalidHeader(err.to_string()).into(), + }; + + const IDP_KEY_HEADER: HeaderName = HeaderName::from_static("idempotency-key"); + request.headers.insert(IDP_KEY_HEADER, value); + self.request = Ok(request); + } + self + } + + /// Adds a Restate ingress `delay` query parameter to defer invocation. + pub fn delay(mut self, delay: Duration) -> Self { + if let Ok(mut request) = self.request { + request + .url + .query_pairs_mut() + .append_pair("delay", &format!("{}ms", delay.as_millis())); + self.request = Ok(request); + } + self + } + + /// Sends the request and deserializes the JSON response into `Res`. + /// + /// Non-success HTTP responses are returned as [`RequestError::Status`]. + pub async fn call(self, executor: &E) -> Result + where + E: executor::Executor, + Res: Deserialize, + RequestError: From, + { + decode_response(executor.execute(self.request?).await?) + } +} + +fn decode_response(response: executor::HttpResponse) -> Result +where + Res: Deserialize, + RequestError: From, +{ + if response.status < 200 || response.status > 299 { + let status = response.status; + let body = String::from_utf8_lossy(response.body.as_ref()).into_owned(); + return Err(RequestError::Status { status, body }); + } + let mut body = response.body; + let response = Res::deserialize(&mut body)?; + Ok(response) +} + +pub mod executor { + use super::RequestError; + use http::HeaderMap; + use std::future::Future; + use std::time::Duration; + use url::Url; + + /// Transport abstraction for executing ingress HTTP requests. + /// + /// Implement this trait to plug in a custom HTTP client backend. + /// The ingress client builds a [`HttpRequest`] and delegates delivery to this trait. + /// + /// For reqwest, enable the `reqwest-client` feature and pass a `reqwest::Client` + /// directly to [`crate::ingress::Request::call`]. + #[derive(Clone, Debug)] + /// Transport-agnostic ingress request payload produced by the ingress builder. + pub struct HttpRequest { + pub url: Url, + pub headers: HeaderMap, + pub body: bytes::Bytes, + pub timeout: Option, + } + + #[derive(Debug)] + /// Transport-agnostic ingress response returned by an [`Executor`]. + pub struct HttpResponse { + pub status: u16, + pub body: bytes::Bytes, + } + + /// Executes an ingress [`HttpRequest`] and returns an [`HttpResponse`]. + /// + /// Implement this with your async HTTP transport and pass the executor + /// by reference to [`crate::ingress::Request::call`]. + pub trait Executor { + fn execute( + &self, + request: HttpRequest, + ) -> impl Future> + Send; + } +} + +#[doc(hidden)] +pub mod builder { + use super::executor::HttpRequest; + use super::{Client, ServerUrl}; + use crate::ingress::{Request, RequestError}; + use crate::serde::Serialize; + use http::header::CONTENT_TYPE; + use http::header::HeaderMap; + use http::header::HeaderValue; + use url::Url; + + pub trait IntoServiceRequest: Sized { + type Request<'a>; + + fn create_request<'a>(client: &'a Client) -> Self::Request<'a>; + } + + pub trait IntoObjectRequest: Sized { + type Request<'a>; + + fn create_request<'a>(client: &'a Client, key: String) -> Self::Request<'a>; + } + + pub trait IntoWorkflowRequest: Sized { + type Request<'a>; + + fn create_request<'a>(client: &'a Client, key: String) -> Self::Request<'a>; + } + + pub fn service(client: &Client, request_path: &str, req: Req) -> Request + where + Req: Serialize, + RequestError: From, + { + Request::from_request_result(build_post( + req, + |server_url| server_url.build_for_path(request_path), + &client.server_url, + client, + )) + } + + pub fn object( + client: &Client, + service: &str, + key: &str, + handler: &str, + req: Req, + ) -> Request + where + Req: Serialize, + RequestError: From, + { + keyed(client, service, key, handler, req) + } + + pub fn workflow( + client: &Client, + service: &str, + key: &str, + handler: &str, + req: Req, + ) -> Request + where + Req: Serialize, + RequestError: From, + { + keyed(client, service, key, handler, req) + } + + fn keyed( + client: &Client, + service: &str, + key: &str, + handler: &str, + req: Req, + ) -> Request + where + Req: Serialize, + RequestError: From, + { + Request::from_request_result(build_post( + req, + |server_url| Ok(server_url.build_for_keyed(service, key, handler)), + &client.server_url, + client, + )) + } + + fn build_post( + req: Req, + make_url: impl FnOnce(&ServerUrl) -> Result, + server_url: &ServerUrl, + client: &Client, + ) -> Result + where + Req: Serialize, + RequestError: From, + { + let request_url = make_url(server_url)?; + let body = req.serialize()?; + let mut headers = HeaderMap::new(); + if !body.is_empty() { + headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + } + if let Some(auth_token) = &client.auth_token { + let (name, value) = auth_token.to_request_header(); + headers.insert(name, value); + } + + Ok(HttpRequest { + url: request_url, + headers, + body, + timeout: None, + }) + } +} + +#[cfg(feature = "reqwest-client")] +pub mod reqwest { + use super::RequestError; + use super::executor::{Executor as IngressExecutor, HttpRequest, HttpResponse}; + use std::future::Future; + type IngressResult = std::result::Result; + pub use reqwest::*; + + impl IngressExecutor for Client { + fn execute(&self, request: HttpRequest) -> impl Future + Send { + fn request_error(err: Error) -> RequestError { + RequestError::Request(err.to_string()) + } + async move { + let mut req = self + .post(request.url) + .headers(request.headers) + .body(request.body); + if let Some(timeout) = request.timeout { + req = req.timeout(timeout); + } + let response = req.send().await.map_err(request_error)?; + let status = response.status().as_u16(); + let body = response.bytes().await.map_err(request_error)?; + Ok(HttpResponse { status, body }) + } + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 0772130..1791867 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -217,6 +217,8 @@ pub mod endpoint; pub mod service; +pub mod ingress; + pub mod context; pub mod discovery; pub mod errors; @@ -514,11 +516,13 @@ pub mod prelude { #[cfg(feature = "lambda")] pub use crate::lambda::LambdaEndpoint; + pub use crate::ingress; + pub use crate::context::{ CallFuture, Context, ContextAwakeables, ContextClient, ContextPromises, ContextReadState, ContextSideEffects, ContextTimers, ContextWriteState, DurableFuturesUnordered, HeaderMap, - InvocationHandle, ObjectContext, Request, RunFuture, RunRetryPolicy, SharedObjectContext, - SharedWorkflowContext, WorkflowContext, + InvocationHandle, ObjectContext, Request, RunFuture, RunRetryPolicy, ServiceClient, + SharedObjectContext, SharedWorkflowContext, WorkflowContext, }; pub use crate::endpoint::{ Endpoint, HandleOptions, HandlerOptions, ProtocolMode, ServiceOptions, diff --git a/testcontainers/Cargo.toml b/testcontainers/Cargo.toml index fb97bde..23cb787 100644 --- a/testcontainers/Cargo.toml +++ b/testcontainers/Cargo.toml @@ -7,6 +7,9 @@ license = "MIT" repository = "https://github.com/restatedev/sdk-rust" rust-version = "1.90.0" +[features] +default = [] +reqwest-client = ["restate-sdk/reqwest-client"] [dependencies] anyhow = "1.0" diff --git a/testcontainers/tests/test_container.rs b/testcontainers/tests/test_container.rs index 5de3dd5..e3f2d48 100644 --- a/testcontainers/tests/test_container.rs +++ b/testcontainers/tests/test_container.rs @@ -31,21 +31,13 @@ impl MyService for MyServiceImpl { } } -#[tokio::test] -async fn test_container() { - tracing_subscriber::fmt::fmt() - .with_max_level(tracing::Level::INFO) // Set the maximum log level - .init(); - +async fn start_test_environment() -> restate_sdk_testcontainers::StartedTestEnvironment { + let _ = tracing_subscriber::fmt::fmt() + .with_max_level(tracing::Level::INFO) + .try_init(); let endpoint = Endpoint::builder().bind(MyServiceImpl.serve()).build(); - // simple test container initialization with default configuration - //let test_container = TestContainer::default().start(endpoint).await.unwrap(); - - // custom test container initialization with builder - let test_environment = TestEnvironment::new() - // optional passthrough logging from the restate server testcontainers - // prints container logs to tracing::info level + TestEnvironment::new() .with_container_logging() .with_container( "docker.io/restatedev/restate".to_string(), @@ -53,10 +45,13 @@ async fn test_container() { ) .start(endpoint) .await - .unwrap(); + .unwrap() +} +#[tokio::test] +async fn test_container() { + let test_environment = start_test_environment().await; let ingress_url = test_environment.ingress_url(); - // call container ingress url for /MyService/my_handler let response = reqwest::Client::new() .post(format!("{}/MyService/my_handler", ingress_url)) @@ -72,3 +67,24 @@ async fn test_container() { response.text().await.unwrap() ); } + +#[cfg(feature = "reqwest-client")] +#[tokio::test] +async fn test_container_ingress_client() { + use restate_sdk::ingress; + let test_environment = start_test_environment().await; + + let client = ingress::Client::new(test_environment.ingress_url().try_into().unwrap(), None); + let executor = reqwest::Client::new(); + + let response = client + .service_client::() + .my_handler() + .idempotency_key("abc") + .call(&executor) + .await + .unwrap(); + + assert_eq!(response, "hello!"); + info!("/MyService/my_handler response: {:?}", response); +} diff --git a/tests/compiletest.rs b/tests/compiletest.rs index 870c2f9..e6a18c7 100644 --- a/tests/compiletest.rs +++ b/tests/compiletest.rs @@ -2,4 +2,8 @@ fn ui() { let t = trybuild::TestCases::new(); t.compile_fail("tests/ui/*.rs"); + #[cfg(not(feature = "reqwest-client"))] + t.compile_fail("tests/ui/reqwest-client/*.rs"); + #[cfg(feature = "reqwest-client")] + t.pass("tests/ui/reqwest-client/*.rs"); } diff --git a/tests/ingress.rs b/tests/ingress.rs new file mode 100644 index 0000000..59f819d --- /dev/null +++ b/tests/ingress.rs @@ -0,0 +1,377 @@ +#[cfg(feature = "reqwest-client")] +mod ingress_client_tests { + use assert_matches::assert_matches; + use restate_sdk::prelude::*; + use restate_sdk::serde::Serialize as RestateSerialize; + use serde::{Deserialize, Serialize}; + use std::error::Error; + use std::time::Duration; + use wiremock::matchers::{body_json, body_string, header, method, path, query_param}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + type TestResult = Result<(), Box>; + + fn executor() -> reqwest::Client { + reqwest::Client::new() + } + + #[restate_sdk::service] + trait GreeterService { + async fn greet(name: String) -> HandlerResult; + } + + #[restate_sdk::service] + trait UnitService { + async fn ping() -> HandlerResult<()>; + } + + #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] + #[derive(Debug, PartialEq, Serialize, Deserialize)] + struct TestUser { + name: String, + age: u32, + } + + #[restate_sdk::service] + trait JsonService { + async fn echo_user(user: Json) -> HandlerResult; + } + + #[restate_sdk::service] + #[name = "renamed-http-service"] + trait RenamedService { + #[name = "renamed-http-handler"] + async fn greet(name: String) -> HandlerResult; + } + + #[restate_sdk::object] + trait GreeterObject { + async fn greet(name: String) -> HandlerResult; + } + + #[restate_sdk::workflow] + trait GreeterWorkflow { + async fn greet(name: String) -> HandlerResult; + } + + #[restate_sdk::service] + trait FailingService { + async fn fail(req: FailingPayload) -> HandlerResult; + } + + #[test] + fn reqwest_builder_exposes_typed_service_client() -> TestResult { + let client = + ingress::Client::new(ingress::ServerUrl::try_from("http://localhost:8080")?, None); + + let _request = client + .service_client::() + .greet("hi".to_string()); + Ok(()) + } + + #[test] + fn auth_token_rejects_invalid_auth_header() -> TestResult { + let err = ingress::AuthToken::new("token\nbad".to_string().into()); + assert!(err.is_err(), "invalid auth header should fail"); + Ok(()) + } + + #[test] + fn reqwest_builder_accepts_auth_token() -> TestResult { + let token = ingress::AuthToken::new("token".to_string().into())?; + let server_url = "https://localhost:8080/".try_into()?; + let _client = ingress::Client::new(server_url, Some(token)); + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_call_executes_http_request() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/GreeterService/greet")) + .and(header("Content-Type", "application/json")) + .and(body_string("\"hi\"")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"hello hi\""), + ) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + let response = client + .service_client::() + .greet("hi".to_string()) + .call(&executor) + .await?; + + assert_eq!(response, "hello hi"); + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_call_accepts_empty_body_for_unit_response() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/UnitService/ping")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string(""), + ) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + client + .service_client::() + .ping() + .call(&executor) + .await?; + + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_call_supports_json_wrapper_types() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/JsonService/echo_user")) + .and(body_json(serde_json::json!({"name":"alice","age":42}))) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"ok\""), + ) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + let response = client + .service_client::() + .echo_user(Json(TestUser { + name: "alice".to_string(), + age: 42, + })) + .call(&executor) + .await?; + + assert_eq!(response, "ok"); + Ok(()) + } + + struct FailingPayload; + + impl RestateSerialize for FailingPayload { + type Error = serde_json::Error; + + fn serialize(&self) -> Result { + Err(serde_json::Error::io(std::io::Error::other( + "intentional serialize failure", + ))) + } + } + + impl restate_sdk::serde::Deserialize for FailingPayload { + type Error = std::convert::Infallible; + + fn deserialize(_: &mut bytes::Bytes) -> Result { + Ok(Self) + } + } + + impl restate_sdk::serde::PayloadMetadata for FailingPayload {} + + #[tokio::test] + async fn reqwest_builder_post_propagates_serialize_failure() -> TestResult { + let client = + ingress::Client::new(ingress::ServerUrl::try_from("http://localhost:8080")?, None); + let executor = executor(); + + let result = client + .service_client::() + .fail(FailingPayload) + .call(&executor) + .await; + + assert_matches!(result, Err(ingress::RequestError::Serde(_))); + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_call_propagates_non_success_status() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/GreeterService/greet")) + .respond_with(ResponseTemplate::new(503).set_body_string("downstream unavailable")) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + let result = client + .service_client::() + .greet("hi".to_string()) + .call(&executor) + .await; + + assert_matches!( + result, + Err(ingress::RequestError::Status { status: 503, ref body }) + if body.contains("downstream unavailable") + ); + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_call_serializes_delay_query_param() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/GreeterService/greet")) + .and(query_param("delay", "5000ms")) + .and(body_string("\"hi\"")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"hello hi\""), + ) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + let response = client + .service_client::() + .greet("hi".to_string()) + .delay(Duration::from_secs(5)) + .call(&executor) + .await?; + + assert_eq!(response, "hello hi"); + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_uses_renamed_service_and_handler_paths() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/renamed-http-service/renamed-http-handler")) + .and(body_string("\"hi\"")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"hello hi\""), + ) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + let response = client + .service_client::() + .greet("hi".to_string()) + .call(&executor) + .await?; + + assert_eq!(response, "hello hi"); + Ok(()) + } + + #[tokio::test] + async fn reqwest_service_client_call_propagates_idempotency_key_header() -> TestResult { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/GreeterService/greet")) + .and(header("Idempotency-Key", "test-key-1")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"hello hi\""), + ) + .expect(1) + .mount(&server) + .await; + + let client = ingress::Client::new(server.uri().try_into()?, None); + let executor = executor(); + + let response = client + .service_client::() + .greet("hi".to_string()) + .idempotency_key("test-key-1") + .call(&executor) + .await?; + + assert_eq!(response, "hello hi"); + Ok(()) + } + + #[tokio::test] + async fn reqwest_keyed_clients_call_execute_http_request() -> TestResult { + let object_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/GreeterObject/my-object-key/greet")) + .and(body_string("\"hi\"")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"hello hi\""), + ) + .expect(1) + .mount(&object_server) + .await; + + let object_client = ingress::Client::new(object_server.uri().try_into()?, None); + let object_executor = executor(); + + let object_response = object_client + .object_client::("my-object-key") + .greet("hi".to_string()) + .call(&object_executor) + .await?; + + assert_eq!(object_response, "hello hi"); + + let workflow_server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/GreeterWorkflow/my-workflow-key/greet")) + .and(body_string("\"hi\"")) + .respond_with( + ResponseTemplate::new(200) + .insert_header("Content-Type", "application/json") + .set_body_string("\"hello hi\""), + ) + .expect(1) + .mount(&workflow_server) + .await; + + let workflow_client = ingress::Client::new(workflow_server.uri().try_into()?, None); + let workflow_executor = executor(); + + let workflow_response = workflow_client + .workflow_client::("my-workflow-key") + .greet("hi".to_string()) + .call(&workflow_executor) + .await?; + + assert_eq!(workflow_response, "hello hi"); + Ok(()) + } +} diff --git a/tests/ui/reqwest-client/reqwest_client_ext_without_feature.rs b/tests/ui/reqwest-client/reqwest_client_ext_without_feature.rs new file mode 100644 index 0000000..e4575fd --- /dev/null +++ b/tests/ui/reqwest-client/reqwest_client_ext_without_feature.rs @@ -0,0 +1,18 @@ +use restate_sdk::prelude::*; + +#[restate_sdk::service] +trait LocalService { + async fn my_handler() -> HandlerResult; +} + +fn main() -> Result<(), Box> { + let client = ingress::Client::new("http://localhost:8080".try_into()?, None); + let executor = reqwest::Client::new(); + + let _ = client + .service_client::() + .my_handler() + .idempotency_key("abc") + .call(&executor); + Ok(()) +} diff --git a/tests/ui/reqwest-client/reqwest_client_ext_without_feature.stderr b/tests/ui/reqwest-client/reqwest_client_ext_without_feature.stderr new file mode 100644 index 0000000..c7c0590 --- /dev/null +++ b/tests/ui/reqwest-client/reqwest_client_ext_without_feature.stderr @@ -0,0 +1,16 @@ +error[E0277]: the trait bound `reqwest::Client: Executor` is not satisfied + --> tests/ui/reqwest-client/reqwest_client_ext_without_feature.rs:16:15 + | +16 | .call(&executor); + | ---- ^^^^^^^^^ the trait `Executor` is not implemented for `reqwest::Client` + | | + | required by a bound introduced by this call + | +note: required by a bound in `restate_sdk::ingress::Request::::call` + --> src/ingress.rs + | + | pub async fn call(self, executor: &E) -> Result + | ---- required by a bound in this associated function + | where + | E: executor::Executor, + | ^^^^^^^^^^^^^^^^^^ required by this bound in `Request::::call`