Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions app/api/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
UserSpendRegion,
UserSpendByEmailResponse,
UserSpendTeam,
UserMarketingUpdatesByEmailUpdate,
)
from app.db.models import (
DBPrivateAIKey,
Expand All @@ -49,6 +50,7 @@
from app.core.email import normalize_email_for_lookup
from app.core.roles import UserRole
from app.services.litellm import LiteLLMService
from app.services.hubspot import HubSpotService
from datetime import datetime, UTC
import logging
import asyncio
Expand Down Expand Up @@ -498,6 +500,50 @@ async def get_users_by_email(
return result


@router.put(
"/by-email/marketing-updates",
response_model=List[User],
dependencies=[Depends(get_role_min_system_admin)],
)
async def update_users_marketing_updates_by_email(
payload: UserMarketingUpdatesByEmailUpdate,
db: Session = Depends(get_db),
):
normalized_email = normalize_email_for_lookup(payload.email)

users = (
db.query(DBUser)
.outerjoin(DBTeam, DBUser.team_id == DBTeam.id)
.filter(
func.regexp_replace(func.lower(DBUser.email), r"\+[^@]*@", "@")
== normalized_email,
DBUser.is_active.is_(True),
(DBUser.team_id.is_(None)) | (DBTeam.deleted_at.is_(None)),
)
.all()
)
hubspot = HubSpotService()
try:
await hubspot.upsert_contact_marketing_updates(
email=normalized_email, enabled=payload.receive_marketing_updates
)
except HTTPException:
logger.exception(
"HubSpot marketing-updates sync failed for email=%s", normalized_email
)

if not users:
return []

for user in users:
user.receive_marketing_updates = payload.receive_marketing_updates
db.commit()
for user in users:
db.refresh(user)

return users


@router.get(
Comment thread
dspachos marked this conversation as resolved.
"/spend",
response_model=UserSpendByEmailResponse,
Comment thread
dspachos marked this conversation as resolved.
Expand Down Expand Up @@ -747,6 +793,11 @@ async def _create_user_in_db(user: UserCreate, db: Session) -> DBUser:
is_admin=False, # Users are created as non-admin by default
team_id=user.team_id,
role=user.role,
receive_marketing_updates=(
user.receive_marketing_updates
if user.receive_marketing_updates is not None
else False
),
)

db.add(db_user)
Expand Down Expand Up @@ -897,6 +948,7 @@ async def update_user(
raise

db.refresh(db_user)

return db_user


Expand Down
7 changes: 7 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ class Settings(BaseSettings):
STRIPE_SECRET_KEY: str = os.getenv("STRIPE_SECRET_KEY", "sk_test_string")
STRIPE_PUBLISHABLE_KEY: str = os.getenv("STRIPE_PUBLISHABLE_KEY", "pk_test_string")
WEBHOOK_SIG: str = os.getenv("WEBHOOK_SIG", "whsec_test_1234567890")
HUBSPOT_TOKEN: str = os.getenv("HUBSPOT_TOKEN", "")
HUBSPOT_MARKETING_UPDATES_PROPERTY: str = os.getenv(
"HUBSPOT_MARKETING_UPDATES_PROPERTY", "receive_marketing_updates"
)
HUBSPOT_MARKETING_SUBSCRIPTION_ID: str = os.getenv(
"HUBSPOT_MARKETING_SUBSCRIPTION_ID", "1110685904"
)
ENABLE_METRICS: bool = os.getenv("ENABLE_METRICS", "false") == "true"
PROMETHEUS_API_KEY: str = os.getenv("PROMETHEUS_API_KEY", "")
POOL_BUDGET_EXPIRATION_DAYS: int = int(
Expand Down
3 changes: 3 additions & 0 deletions app/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class DBUser(Base):
is_admin = Column(Boolean, default=False)
created_at = Column(DateTime(timezone=True), default=func.now())
role = Column(String, default="user") # user, admin, key_creator, read_only
receive_marketing_updates = Column(
Boolean, default=False, nullable=False, server_default=text("false")
)
team_id = Column(Integer, ForeignKey("teams.id", name="fk_user_team"))
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""add receive_marketing_updates to users

Revision ID: c4a9d8e1f2b3
Revises: 2f7c9d1e4aab
Create Date: 2026-05-20 16:00:00.000000
"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "c4a9d8e1f2b3"
down_revision = "2f7c9d1e4aab"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.add_column(
"users",
sa.Column(
"receive_marketing_updates",
sa.Boolean(),
nullable=False,
server_default=sa.text("false"),
),
)


def downgrade() -> None:
op.drop_column("users", "receive_marketing_updates")
8 changes: 8 additions & 0 deletions app/schemas/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ class UserCreate(UserBase):
password: Optional[str] = None
team_id: Optional[int] = None
role: Optional[str] = None
receive_marketing_updates: Optional[bool] = None
model_config = ConfigDict(from_attributes=True)


class UserUpdate(BaseModel):
email: Optional[CaseInsensitiveEmailStr] = None
is_admin: Optional[bool] = None
receive_marketing_updates: Optional[bool] = None
current_password: Optional[str] = None
new_password: Optional[str] = None

Expand All @@ -64,10 +66,16 @@ class User(UserBase):
team_id: Optional[int] = None
team_name: Optional[str] = None
role: Optional[str] = None
receive_marketing_updates: bool = False
model_config = ConfigDict(from_attributes=True)
audit_logs: ClassVar = relationship("AuditLog", back_populates="user")


class UserMarketingUpdatesByEmailUpdate(BaseModel):
email: CaseInsensitiveEmailStr
receive_marketing_updates: bool


class APITokenBase(BaseModel):
name: str

Expand Down
155 changes: 155 additions & 0 deletions app/services/hubspot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import logging
from typing import Optional

import httpx
from fastapi import HTTPException, status

from app.core.config import settings

logger = logging.getLogger(__name__)


class HubSpotService:
BASE_URL = "https://api.hubapi.com"
CONTACTS_OBJECT_PATH = "/crm/v3/objects/contacts"
CONTACT_SEARCH_PATH = "/crm/v3/objects/contacts/search"
EMAIL_SUBSCRIPTION_PATH = "/email/public/v1/subscriptions/{email}"

def __init__(self, token: Optional[str] = None):
self.token = token or settings.HUBSPOT_TOKEN
self.marketing_updates_property = settings.HUBSPOT_MARKETING_UPDATES_PROPERTY
self.marketing_subscription_id = settings.HUBSPOT_MARKETING_SUBSCRIPTION_ID

def _headers(self) -> dict[str, str]:
if not self.token:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="HubSpot token is not configured",
)
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json",
}

async def _get_contact_id_by_email(
self, email: str, client: httpx.AsyncClient
) -> Optional[str]:
payload = {
"filterGroups": [
{
"filters": [
{"propertyName": "email", "operator": "EQ", "value": email}
]
}
],
"properties": ["email"],
"limit": 1,
}
response = await client.post(
f"{self.BASE_URL}{self.CONTACT_SEARCH_PATH}",
headers=self._headers(),
json=payload,
)
if response.status_code >= 400:
request_id = response.headers.get("x-hubspot-request-id", "unknown")
logger.error(
"HubSpot contact search failed status=%s request_id=%s body=%s",
response.status_code,
request_id,
response.text[:500],
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to search HubSpot contact",
)

results = response.json().get("results", [])
if not results:
return None
return results[0].get("id")

async def _create_contact(self, email: str, client: httpx.AsyncClient) -> str:
response = await client.post(
f"{self.BASE_URL}{self.CONTACTS_OBJECT_PATH}",
headers=self._headers(),
json={"properties": {"email": email}},
)
if response.status_code >= 400:
request_id = response.headers.get("x-hubspot-request-id", "unknown")
logger.error(
"HubSpot contact create failed status=%s request_id=%s body=%s",
response.status_code,
request_id,
response.text[:500],
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to create HubSpot contact",
)
return str(response.json().get("id"))

async def _update_contact_marketing_property(
self, contact_id: str, enabled: bool, client: httpx.AsyncClient
) -> None:
response = await client.patch(
f"{self.BASE_URL}{self.CONTACTS_OBJECT_PATH}/{contact_id}",
headers=self._headers(),
json={
"properties": {
self.marketing_updates_property: "true" if enabled else "false"
}
},
)
if response.status_code >= 400:
request_id = response.headers.get("x-hubspot-request-id", "unknown")
logger.error(
"HubSpot contact property update failed status=%s request_id=%s body=%s",
response.status_code,
request_id,
response.text[:500],
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to update HubSpot contact marketing property",
)

async def _update_email_subscription(
self, email: str, enabled: bool, client: httpx.AsyncClient
) -> None:
response = await client.put(
f"{self.BASE_URL}{self.EMAIL_SUBSCRIPTION_PATH.format(email=email)}",
headers=self._headers(),
json={
"subscriptionId": self.marketing_subscription_id,
"subscribed": enabled,
},
)
if response.status_code >= 400:
request_id = response.headers.get("x-hubspot-request-id", "unknown")
logger.error(
"HubSpot subscription update failed status=%s request_id=%s body=%s",
response.status_code,
request_id,
response.text[:500],
)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail="Failed to update HubSpot email subscription",
)

async def upsert_contact_marketing_updates(self, email: str, enabled: bool) -> None:
"""Upsert contact and sync marketing updates state.

- If contact exists: update custom contact property + email subscription.
- If contact does not exist: create contact first, then update both.
"""
async with httpx.AsyncClient(timeout=10.0) as client:
contact_id = await self._get_contact_id_by_email(email=email, client=client)
if not contact_id:
contact_id = await self._create_contact(email=email, client=client)
await self._update_contact_marketing_property(
contact_id=contact_id, enabled=enabled, client=client
)
await self._update_email_subscription(
email=email, enabled=enabled, client=client
)
Loading