Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions gmail/gmail_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
"""Email helper utilities for Gmail tools."""

from __future__ import annotations

import logging
from collections import Counter
from datetime import datetime, timezone
from email.utils import getaddresses, parseaddr, parsedate_to_datetime
from typing import Any, Optional

logger = logging.getLogger(__name__)


def _normalize_email(address: str) -> str:
"""Lowercase an email address and strip plus-addressing so that
e.g. 'Alex <alex+foo@scopestack.io>' normalizes to 'alex@scopestack.io'.

This is the key primitive for 'is this message from Alex?' checks - plus
addresses are Alex, not a third party.
"""
_name, addr = parseaddr(address or "")
addr = addr.lower().strip()
if not addr or "@" not in addr:
return addr
local, _, domain = addr.partition("@")
local = local.split("+", 1)[0]
return f"{local}@{domain}"


def _parse_date_header(
date_str: str, internal_date_ms: str | int | None
) -> tuple[Optional[str], Optional[datetime]]:
"""Parse Gmail internalDate or a Date header to a UTC-aware datetime.

Prefer Gmail's internalDate because it reflects Gmail's message ordering;
fall back to the Date header when internalDate is unavailable or malformed.
Always returns UTC-aware datetimes so naive/aware comparisons don't raise
TypeError.

Returns (iso_string, datetime) or (None, None) if both sources fail.
"""
if internal_date_ms is not None:
try:
ms = int(internal_date_ms)
dt = datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc)
return dt.isoformat(), dt
except (TypeError, ValueError) as e:
logger.debug(
"Could not convert internalDate %r to timestamp; falling back to "
"Date header: %s",
internal_date_ms,
e,
)

if date_str:
try:
dt = parsedate_to_datetime(date_str)
# Normalize to UTC (parsedate_to_datetime may return naive or offset-aware).
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt.isoformat(), dt
except (TypeError, ValueError) as e:
logger.debug(
"Could not parse Date header %r: %s",
date_str,
e,
)

return None, None


def _analyze_thread_ownership_impl(
thread_response: dict,
user_google_email: str,
) -> dict[str, Any]:
"""Pure analysis of a Gmail thread API response. Takes the response from
users().threads().get(format='full') and returns structured ownership
metadata. Kept separate from the @server.tool wrapper so tests can call
it with fabricated thread data.
"""
messages = thread_response.get("messages", []) or []
thread_id = thread_response.get("id", "")

if not messages:
return {
"thread_id": thread_id,
"thread_subject": None,
"last_sender": None,
"last_timestamp": None,
"ball_in_court_of": None,
"message_count_by_sender": {},
"participants": [],
"excluded_drafts": 0,
"message_count": 0,
}

normalized_user = _normalize_email(user_google_email)

# Thread subject: first message's Subject header
first_headers = {
h["name"]: h["value"]
for h in messages[0].get("payload", {}).get("headers", [])
}
thread_subject = first_headers.get("Subject") or None

sender_counter: Counter[str] = Counter()
participants: set[str] = set()
non_draft_participants: set[str] = set()
excluded_drafts = 0

last_non_draft = None # (datetime, message_dict, headers_dict)

for message in messages:
label_ids = message.get("labelIds", []) or []
is_draft = "DRAFT" in label_ids

headers = {
h["name"]: h["value"]
for h in message.get("payload", {}).get("headers", [])
}

from_addr = headers.get("From", "")
_name, from_email = parseaddr(from_addr)
from_norm = _normalize_email(from_email) if from_email else ""

# Collect participants from From/To/Cc using getaddresses (RFC-correct
# parsing of quoted display names with embedded commas).
header_values = [
headers.get(hdr, "") for hdr in ("From", "To", "Cc")
]
message_participants = set()
for _n, addr in getaddresses([v for v in header_values if v]):
norm = _normalize_email(addr) if addr else ""
if norm and "@" in norm:
participants.add(norm)
message_participants.add(norm)

if is_draft:
excluded_drafts += 1
continue

non_draft_participants.update(message_participants)

if from_norm and "@" in from_norm:
sender_counter[from_norm] += 1

_iso, dt = _parse_date_header(
headers.get("Date", ""), message.get("internalDate")
)
if dt is not None:
if last_non_draft is None or dt >= last_non_draft[0]:
last_non_draft = (dt, message, headers)

if last_non_draft is None:
# All messages were drafts - no sent state to reason about
return {
"thread_id": thread_id,
"thread_subject": thread_subject,
"last_sender": None,
"last_timestamp": None,
"ball_in_court_of": None,
"message_count_by_sender": dict(sender_counter),
"participants": sorted(participants),
"excluded_drafts": excluded_drafts,
"message_count": len(messages),
}

last_dt, _last_message, last_headers = last_non_draft
last_sender_raw = last_headers.get("From", "")
_n, last_sender_email = parseaddr(last_sender_raw)
last_sender_norm = (
_normalize_email(last_sender_email) if last_sender_email else ""
)

# Ball-in-court: "user" = user owes reply, "them" = other party owes reply,
# None = unresolvable. Use non-draft participants, so outbound-only threads
# still see the recipient while draft-only recipients are ignored.
external_participants = (
non_draft_participants - {normalized_user}
if normalized_user
else non_draft_participants
)
if (
not normalized_user
or "@" not in normalized_user
or "@" not in last_sender_norm
):
ball_in_court_of = None
elif not external_participants:
ball_in_court_of = None
elif last_sender_norm == normalized_user:
ball_in_court_of = "them"
else:
ball_in_court_of = "user"

return {
"thread_id": thread_id,
"thread_subject": thread_subject,
"last_sender": last_sender_raw or None,
"last_timestamp": last_dt.isoformat(),
"ball_in_court_of": ball_in_court_of,
"message_count_by_sender": dict(sender_counter),
"participants": sorted(participants),
"excluded_drafts": excluded_drafts,
"message_count": len(messages),
}
44 changes: 39 additions & 5 deletions gmail/gmail_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
from typing import Annotated, Optional, List, Dict, Literal, Any
from urllib.parse import unquote, urlparse, urlunsplit

import httpx
from email.message import EmailMessage
from email.policy import SMTP
from email.utils import formataddr

import httpx

from pydantic import Field
from googleapiclient.errors import HttpError

Expand All @@ -47,6 +48,7 @@
GMAIL_MODIFY_SCOPE,
GMAIL_LABELS_SCOPE,
)
from gmail.gmail_helpers import _analyze_thread_ownership_impl

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -2419,23 +2421,49 @@ async def get_gmail_thread_content(
),
),
] = "text",
) -> str:
include_analysis: Annotated[
bool,
Field(
description=(
"When True, the return value is a dict with both the formatted "
"thread content AND structured ownership analysis (last sender, "
"ball-in-court verdict, per-sender message counts, participants). "
"Defaults to False, in which case the existing string return shape "
"is preserved."
),
),
] = False,
) -> "str | Dict[str, Any]":
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"""
Retrieves the complete content of a Gmail conversation thread, including all messages.

Optionally also returns structured ownership analysis so a caller can
determine who sent the last message and who owes whom a response without
re-parsing the formatted string or making a second tool call.

Args:
thread_id (str): The unique ID of the Gmail thread to retrieve.
user_google_email (str): The user's Google email address. Required.
body_format (Literal["text", "html", "raw"]): Body output format.
"text" (default) returns plaintext (HTML converted to text as fallback).
"html" returns the raw HTML body as-is without conversion.
"raw" fetches each message's full raw MIME content and returns the base64url-decoded body.
include_analysis (bool): When True, returns a dict containing both the
formatted thread content and structured ownership analysis. When
False (default), returns the formatted content string (existing
behavior, unchanged).

Returns:
str: The complete thread content with all messages formatted for reading.
str: When `include_analysis=False` (default). The complete thread
content with all messages formatted for reading.

Dict[str, Any]: When `include_analysis=True`. A dict with keys
"content" (str) and "analysis" (dict). See
`_analyze_thread_ownership_impl` for the analysis schema.
"""
logger.info(
f"[get_gmail_thread_content] Invoked. Thread ID: '{thread_id}', Email: '{user_google_email}'"
f"[get_gmail_thread_content] Invoked. Thread ID: '{thread_id}', "
f"Email: '{user_google_email}', include_analysis={include_analysis}"
)

# Fetch the complete thread with all messages
Expand All @@ -2454,13 +2482,19 @@ async def get_gmail_thread_content(
service, message_ids, log_prefix="get_gmail_thread_content"
)

return _format_thread_content(
content = _format_thread_content(
thread_response,
thread_id,
body_format=body_format,
raw_contents=raw_contents,
)

if not include_analysis:
return content

analysis = _analyze_thread_ownership_impl(thread_response, user_google_email)
return {"content": content, "analysis": analysis}


@server.tool()
@require_google_service("gmail", "gmail_read")
Expand Down
Loading
Loading