Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 2 additions & 108 deletions crates/agent/src/alerts/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use chrono::{DateTime, Utc};
use control_plane_api::alerts::{Alert, fetch_alert_by_id};
use notifications::Renderer;

pub use crate::email::{EmailSender, Sender};

#[derive(Default, Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct NotifierState {
pub(crate) fired_completed: Option<DateTime<Utc>>,
Expand Down Expand Up @@ -87,114 +89,6 @@ impl<ES: EmailSender> AlertNotifications<ES> {
}
}

pub trait EmailSender: std::fmt::Debug + Send + Sync + 'static {
fn send<'s>(
&'s self,
email: notifications::NotificationEmail,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send + 's;
}

/// Sends emails using the resend
#[derive(Debug)]
pub struct ResendSender {
from_address: String,
reply_to_address: String,
resend_client: resend_rs::Resend,
retry_options: resend_rs::rate_limit::RetryOptions,
}

impl ResendSender {
async fn send(&self, notification: notifications::NotificationEmail) -> anyhow::Result<()> {
let notifications::NotificationEmail {
idempotency_key,
recipient: notifications::Recipient { email, .. },
subject,
body,
} = notification;

let Self {
from_address,
reply_to_address,
resend_client,
retry_options,
} = self;

let resend_req =
resend_rs::types::CreateEmailBaseOptions::new(from_address, [email.as_str()], subject)
.with_reply(reply_to_address.as_str())
.with_html(body.as_str())
.with_idempotency_key(idempotency_key.as_str());

// Note on retries: We don't technically need to handle retries here, as
// we could instead return and just schedule ourselves to run again.
// It's common for many alerts to fire more or less simultaneously, and
// the resend rate limit is only 9 req/s. So my current thinking is that
// it's better to handle retries here, so that we don't end up having a
// bunch of other notifier tasks run and then have this one hit another
// rate limit error when we retry it. Better to retry each notification
// until it succeeds. If we exhaust the number of retries, we'll return
// an error and back off somewhat longer.
let response = resend_rs::rate_limit::send_with_retry_opts(
|| async { resend_client.emails.send(resend_req.clone()).await },
retry_options,
)
.await
.context("calling resend API")?;

tracing::debug!(%idempotency_key, to = %email, email_id = %response.id, "successfully sent alert email");

Ok(())
}
}

#[derive(Debug)]
pub enum Sender {
Disabled,
Resend(ResendSender),
}

impl Sender {
pub fn resend(
api_key: &str,
from_address: String,
reply_to_address: String,
http_client: reqwest::Client,
) -> Sender {
let resend_client = resend_rs::Resend::with_client(api_key, http_client);
let inner = ResendSender {
from_address,
reply_to_address,
resend_client,
retry_options: resend_rs::rate_limit::RetryOptions {
duration_ms: 150,
jitter_range_ms: 0..1000,
max_retries: 5,
},
};
Sender::Resend(inner)
}
}

impl EmailSender for Sender {
async fn send<'s>(
&'s self,
notification: notifications::NotificationEmail,
) -> anyhow::Result<()> {
match self {
Sender::Disabled => {
tracing::warn!(
to = %notification.recipient.email,
subject = %notification.subject,
idempotency_key = %notification.idempotency_key,
"skipping sending alert email (disabled)"
);
return Ok(());
}
Sender::Resend(resend) => resend.send(notification).await,
}
}
}

pub enum AlertOutcome {
AwaitResolution,
ResolvedSent,
Expand Down
110 changes: 110 additions & 0 deletions crates/agent/src/email.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/// Sends emails using the Resend API with retry logic for rate limiting.
#[derive(Debug)]
pub struct ResendSender {
from_address: String,
reply_to_address: String,
resend_client: resend_rs::Resend,
retry_options: resend_rs::rate_limit::RetryOptions,
}

impl ResendSender {
pub async fn send(&self, notification: notifications::NotificationEmail) -> anyhow::Result<()> {
let notifications::NotificationEmail {
idempotency_key,
recipient: notifications::Recipient { email, .. },
subject,
body,
} = notification;

let Self {
from_address,
reply_to_address,
resend_client,
retry_options,
} = self;

let resend_req =
resend_rs::types::CreateEmailBaseOptions::new(from_address, [email.as_str()], subject)
.with_reply(reply_to_address.as_str())
.with_html(body.as_str())
.with_idempotency_key(idempotency_key.as_str());

let response = resend_rs::rate_limit::send_with_retry_opts(
|| async { resend_client.emails.send(resend_req.clone()).await },
retry_options,
)
.await
.context("calling resend API")?;

tracing::debug!(%idempotency_key, to = %email, email_id = %response.id, "successfully sent email");

Ok(())
}
}

#[derive(Debug)]
pub enum Sender {
Disabled,
Resend(ResendSender),
}

impl Sender {
pub fn resend(
api_key: &str,
from_address: String,
reply_to_address: String,
http_client: reqwest::Client,
) -> Sender {
let resend_client = resend_rs::Resend::with_client(api_key, http_client);
let inner = ResendSender {
from_address,
reply_to_address,
resend_client,
retry_options: resend_rs::rate_limit::RetryOptions {
duration_ms: 150,
jitter_range_ms: 0..1000,
max_retries: 5,
},
};
Sender::Resend(inner)
}
}

pub trait EmailSender: std::fmt::Debug + Send + Sync + 'static {
fn send<'s>(
&'s self,
email: notifications::NotificationEmail,
) -> impl std::future::Future<Output = anyhow::Result<()>> + Send + 's;
}

impl EmailSender for Sender {
async fn send<'s>(
&'s self,
notification: notifications::NotificationEmail,
) -> anyhow::Result<()> {
match self {
Sender::Disabled => {
tracing::warn!(
to = %notification.recipient.email,
subject = %notification.subject,
idempotency_key = %notification.idempotency_key,
"skipping sending email (disabled)"
);
return Ok(());
}
Sender::Resend(resend) => resend.send(notification).await,
}
}
}

use anyhow::Context;
use std::sync::Arc;

impl EmailSender for Arc<Sender> {
async fn send<'s>(
&'s self,
email: notifications::NotificationEmail,
) -> anyhow::Result<()> {
(**self).send(email).await
}
}
1 change: 1 addition & 0 deletions crates/agent/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod alerts;
pub(crate) mod connector_tags;
pub mod controllers;
pub mod email;
pub(crate) mod controlplane;
mod directives;
mod discovers;
Expand Down
50 changes: 26 additions & 24 deletions crates/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,31 +352,33 @@ async fn async_main(args: Args) -> Result<(), anyhow::Error> {
args.data_movement_alert_interval,
));

let sender = if let Some(api_key) = &args.resend_api_key {
let from_email = args
.email_from_address
.clone()
.expect("missing email-from-address");
let reply_to_email = args
.email_reply_to_address
.clone()
.expect("missing email-reply-to-address");
tracing::info!(%from_email, %reply_to_email, "Email sending is enabled");
agent::email::Sender::resend(
api_key,
from_email,
reply_to_email,
new_http_client()?,
)
} else {
tracing::warn!("Email sending is disabled");
agent::email::Sender::Disabled
};
let sender = std::sync::Arc::new(sender);

if args.serve_alert_notifications {
let sender = if let Some(api_key) = &args.resend_api_key {
// These two are required if api-key is provided, so clap should have ensured they are present
let from_email = args
.email_from_address
.clone()
.expect("missing email-from-address");
let reply_to_email = args
.email_reply_to_address
.clone()
.expect("missing email-reply-to-address");
tracing::info!(%from_email, %reply_to_email, "Sending of alert emails is enabled");
agent::alerts::Sender::resend(
api_key,
from_email,
reply_to_email,
new_http_client()?,
)
} else {
// Hopefully this is a local env
tracing::warn!("Sending of alert emails is disabled");
agent::alerts::Sender::Disabled
};
let alert_notifications =
agent::alerts::AlertNotifications::new(&args.dashboard_base_url, sender)?;
let alert_notifications = agent::alerts::AlertNotifications::new(
&args.dashboard_base_url,
sender.clone(),
)?;
automations_server = automations_server.register(alert_notifications);
}

Expand Down
26 changes: 15 additions & 11 deletions crates/notifications/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,9 @@ impl Renderer {
}
}

fn register_common_templates<'a>(registry: &mut handlebars::Handlebars<'a>) -> anyhow::Result<()> {
// Common email wrapper template that's used by all alert types, so we have consistent styling.
// Note that this html was generated by an LLM, from the original `mjml` template in the legacy
// alerts edge function. That legacy template used the `mjml-browser` library to render html
// with styling that matches our UI. This LLM-translation doesn't match that _perfectly_,
// but was considered good enough to allow getting rid of `mjml`.
registry.register_template_string(
"email_wrapper",
r#"<!DOCTYPE html>
/// Branded HTML wrapper template shared by all Estuary emails (alerts, invites, etc.).
/// Expects `full_name` (optional) and a `body_template_name` partial to inject content.
pub const EMAIL_WRAPPER_TEMPLATE: &str = r#"<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
Expand Down Expand Up @@ -269,8 +263,18 @@ fn register_common_templates<'a>(registry: &mut handlebars::Handlebars<'a>) -> a
</div>
</div>
</body>
</html>"#,
).context("registering email_wrapper template")?;
</html>"#;

/// Registers the shared email wrapper template. Call this before registering
/// any body templates that use `{{> (lookup this "body_template_name")}}`.
pub fn register_email_wrapper(registry: &mut handlebars::Handlebars) -> anyhow::Result<()> {
registry
.register_template_string("email_wrapper", EMAIL_WRAPPER_TEMPLATE)
.context("registering email_wrapper template")
}

fn register_common_templates<'a>(registry: &mut handlebars::Handlebars<'a>) -> anyhow::Result<()> {
register_email_wrapper(registry)?;

// Helper partial for rendering a catalog name identifier
registry
Expand Down
Loading