From 9f116dedc3936189ab94ef81a0c0bd9137df8baf Mon Sep 17 00:00:00 2001 From: Dan Lemon Date: Fri, 19 Dec 2025 15:34:09 +0000 Subject: [PATCH 1/4] Trigger a build... From c382ea3f4df09d32c8334b5d9d3ff8061883e003 Mon Sep 17 00:00:00 2001 From: Dan Lemon Date: Tue, 23 Dec 2025 19:41:45 +0000 Subject: [PATCH 2/4] chore: add recon job to cleanup trial accounts --- app/core/worker.py | 59 ++++++++++++ scripts/trigger_trial_recon_job.py | 85 +++++++++++++++++ tests/test_monitor_trial_users.py | 145 +++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+) create mode 100644 scripts/trigger_trial_recon_job.py create mode 100644 tests/test_monitor_trial_users.py diff --git a/app/core/worker.py b/app/core/worker.py index dbf0257b..0269d1f8 100644 --- a/app/core/worker.py +++ b/app/core/worker.py @@ -1137,3 +1137,62 @@ def generate_pricing_url(admin_email: str, validity_hours: int = 24) -> str: # Add the token as a query parameter return f"{url}?token={token}" +async def monitor_trial_users(db: Session): + """ + Monitor trial users and expire them if they have exceeded their budget. + """ + logger.info("Monitoring trial users") + try: + # Get trial team + trial_team = db.query(DBTeam).filter(DBTeam.admin_email == settings.AI_TRIAL_TEAM_EMAIL).first() + if not trial_team: + logger.info("Trial team not found, skipping") + return + + # Get all active users in the trial team (excluding admin) + users = db.query(DBUser).filter( + DBUser.team_id == trial_team.id, + DBUser.is_active, + DBUser.role == "user" + ).all() + + for user in users: + # Check budget limit + user_limit = db.query(DBLimitedResource).filter( + and_( + DBLimitedResource.owner_type == OwnerType.USER, + DBLimitedResource.owner_id == user.id, + DBLimitedResource.resource == ResourceType.BUDGET + ) + ).first() + + if user_limit and user_limit.current_value is not None: + # Check if usage has been used up + if user_limit.current_value >= user_limit.max_value: + logger.info(f"Trial user {user.email} (ID: {user.id}) has fully used up their budget ({user_limit.current_value} >= {user_limit.max_value}). Setting for removal.") + + # 1. Disable user + user.is_active = False + user.updated_at = datetime.now(UTC) + + # 2. Disable keys (set duration to 0) + keys = db.query(DBPrivateAIKey).filter(DBPrivateAIKey.owner_id == user.id).all() + for key in keys: + if key.litellm_token and key.region: + try: + litellm_service = LiteLLMService( + api_url=key.region.litellm_api_url, + api_key=key.region.litellm_api_key + ) + await litellm_service.update_key_duration(key.litellm_token, "0d") + logger.info(f"Set duration to 0d for key {key.id}") + except Exception as e: + logger.error(f"Failed to expire key {key.id}: {e}") + + db.commit() + logger.info(f"Finished monitoring {len(users)} trial users") + + except Exception as e: + logger.error(f"Error in trial user monitoring: {e}") + db.rollback() + diff --git a/scripts/trigger_trial_recon_job.py b/scripts/trigger_trial_recon_job.py new file mode 100644 index 00000000..cbe56494 --- /dev/null +++ b/scripts/trigger_trial_recon_job.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3 + +import os +import sys +import asyncio +import logging + +# Add the parent directory to the Python path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from sqlalchemy.orm import sessionmaker +from app.db.database import engine +from app.core.worker import monitor_trial_users +from app.core.locking import try_acquire_lock, release_lock + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger(__name__) + +async def trigger_trial_recon_job(): + """Manually trigger the trial recon job (monitor_trial_users)""" + + # Create database session + SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + db = SessionLocal() + + lock_name = "monitor_trial_users" + + try: + logger.info("Starting manual trial recon job trigger...") + + # Try to acquire the lock + if try_acquire_lock(lock_name, db, lock_timeout=10): + logger.info("Acquired monitor_trial_users lock, executing job") + try: + await monitor_trial_users(db) + logger.info("Trial recon job completed successfully") + except Exception as e: + logger.error(f"Error in trial recon job execution: {str(e)}") + raise + finally: + # Always release the lock when done + release_lock(lock_name, db) + logger.info("Released monitor_trial_users lock") + else: + logger.warning("Another process has the monitor_trial_users lock, cannot execute job") + return False + + except Exception as e: + logger.error(f"Error in trial recon job trigger: {str(e)}") + # Try to release lock in case of error + try: + release_lock(lock_name, db) + logger.info("Released lock after error") + except Exception as release_error: + logger.error(f"Error releasing lock: {str(release_error)}") + raise + finally: + db.close() + + return True + +def main(): + """Main function to run the script""" + try: + logger.info("Triggering trial recon job manually...") + success = asyncio.run(trigger_trial_recon_job()) + + if success: + logger.info("✅ Trial recon job completed successfully") + sys.exit(0) + else: + logger.info("⚠️ Trial recon job could not be executed (lock held by another process)") + sys.exit(1) + + except Exception as e: + logger.error(f"❌ Script failed: {str(e)}") + sys.exit(1) + +if __name__ == "__main__": + main() diff --git a/tests/test_monitor_trial_users.py b/tests/test_monitor_trial_users.py new file mode 100644 index 00000000..eca38ad7 --- /dev/null +++ b/tests/test_monitor_trial_users.py @@ -0,0 +1,145 @@ +import pytest +from unittest.mock import AsyncMock, patch +from sqlalchemy.orm import Session +from app.core.worker import monitor_trial_users +from app.db.models import DBTeam, DBUser, DBLimitedResource, DBPrivateAIKey, DBRegion +from app.schemas.limits import ResourceType, OwnerType, LimitType, UnitType, LimitSource +from app.core.config import settings + +@pytest.fixture +def trial_team(db: Session): + team = DBTeam( + name="AI Trial Team", + admin_email=settings.AI_TRIAL_TEAM_EMAIL, + is_active=True + ) + db.add(team) + db.commit() + return team + +@pytest.fixture +def trial_user(db: Session, trial_team: DBTeam): + user = DBUser( + email="trial-user@example.com", + team_id=trial_team.id, + is_active=True, + role="user" + ) + db.add(user) + db.commit() + return user + +@pytest.fixture +def trial_region(db: Session): + region = DBRegion( + name="test-trial-region", + litellm_api_url="http://mock-litellm", + litellm_api_key="mock-key", + is_active=True + ) + db.add(region) + db.commit() + return region + +@pytest.fixture +def trial_key(db: Session, trial_user: DBUser, trial_region: DBRegion, trial_team: DBTeam): + key = DBPrivateAIKey( + owner_id=trial_user.id, + team_id=trial_team.id, + region_id=trial_region.id, + litellm_token="mock-token", + name="Trial Key" + ) + db.add(key) + db.commit() + return key + +@pytest.fixture +def user_budget_limit(db: Session, trial_user: DBUser): + limit = DBLimitedResource( + resource=ResourceType.BUDGET, + limit_type=LimitType.DATA_PLANE, + unit=UnitType.DOLLAR, + owner_type=OwnerType.USER, + owner_id=trial_user.id, + max_value=10.0, + current_value=0.0, + limited_by=LimitSource.MANUAL, + set_by="test" + ) + db.add(limit) + db.commit() + return limit + +@pytest.fixture +def mock_litellm(): + """Fixture to mock LiteLLMService.""" + with patch('app.core.worker.LiteLLMService', autospec=True) as MockLiteLLM: + mock_instance = MockLiteLLM.return_value + mock_instance.update_key_duration = AsyncMock() + yield mock_instance + +@pytest.mark.asyncio +async def test_monitor_trial_users_no_overage(db, trial_team, trial_user, trial_key, user_budget_limit, mock_litellm): + """Test that users within budget are not affected.""" + # usage is 5.0, max is 10.0 + user_budget_limit.current_value = 5.0 + db.commit() + + await monitor_trial_users(db) + + # Verify user is still active + db.refresh(trial_user) + assert trial_user.is_active is True + + # Verify LiteLLM was not called + assert mock_litellm.update_key_duration.call_count == 0 + +@pytest.mark.asyncio +async def test_monitor_trial_users_with_overage(db, trial_team, trial_user, trial_key, user_budget_limit, mock_litellm): + """Test that users over budget are disabled and keys expired.""" + # usage is 10.0, max is 10.0 (limit reached) + user_budget_limit.current_value = 10.0 + db.commit() + + await monitor_trial_users(db) + + # Verify user is deactivated + db.refresh(trial_user) + assert trial_user.is_active is False + + # Verify LiteLLM called with 0d duration + mock_litellm.update_key_duration.assert_called_once_with("mock-token", "0d") + +@pytest.mark.asyncio +async def test_monitor_trial_users_skips_admin(db, trial_team, mock_litellm): + """Test that admin user is skipped even if over budget.""" + admin_user = DBUser( + email="admin@example.com", + team_id=trial_team.id, + is_active=True, + role="admin" + ) + db.add(admin_user) + db.commit() + + # Even if we add a limit + limit = DBLimitedResource( + resource=ResourceType.BUDGET, + limit_type=LimitType.DATA_PLANE, + unit=UnitType.DOLLAR, + owner_type=OwnerType.USER, + owner_id=admin_user.id, + max_value=10.0, + current_value=15.0, + limited_by=LimitSource.MANUAL, + set_by="test" + ) + db.add(limit) + db.commit() + + await monitor_trial_users(db) + + db.refresh(admin_user) + assert admin_user.is_active is True + assert mock_litellm.update_key_duration.call_count == 0 \ No newline at end of file From 0b066e8b1056b6872e48d49030f9fd5e11b36f72 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 7 Apr 2026 19:55:24 +0000 Subject: [PATCH 3/4] refactor: optimize trial user monitor with batch query and deactivate helper Agent-Logs-Url: https://github.com/amazeeio/amazee.ai/sessions/0b7151c4-cab2-42b4-b53b-59968f4f4f14 Co-authored-by: dan2k3k4 <158704+dan2k3k4@users.noreply.github.com> --- app/core/worker.py | 55 ++++++++++++++++--------------- tests/test_monitor_trial_users.py | 5 +++ 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/app/core/worker.py b/app/core/worker.py index 26fac4b7..1acd9368 100644 --- a/app/core/worker.py +++ b/app/core/worker.py @@ -1380,6 +1380,24 @@ def generate_pricing_url(admin_email: str, validity_hours: int = 24) -> str: # Add the token as a query parameter return f"{url}?token={token}" +async def deactivate_trial_user(db: Session, user: DBUser): + """Deactivate a trial user and expire all their LiteLLM keys.""" + user.is_active = False + user.updated_at = datetime.now(UTC) + + keys = db.query(DBPrivateAIKey).filter(DBPrivateAIKey.owner_id == user.id).all() + for key in keys: + if key.litellm_token and key.region: + try: + litellm_service = LiteLLMService( + api_url=key.region.litellm_api_url, + api_key=key.region.litellm_api_key + ) + await litellm_service.update_key_duration(key.litellm_token, "0d") + logger.info(f"Set duration to 0d for key {key.id}") + except Exception as e: + logger.error(f"Failed to expire key {key.id}: {e}") + async def monitor_trial_users(db: Session): """ Monitor trial users and expire them if they have exceeded their budget. @@ -1399,38 +1417,23 @@ async def monitor_trial_users(db: Session): DBUser.role == "user" ).all() + # Fetch all user budget limits in one query + user_limits = db.query(DBLimitedResource).filter( + DBLimitedResource.owner_type == OwnerType.USER, + DBLimitedResource.owner_id.in_([user.id for user in users]), + DBLimitedResource.resource == ResourceType.BUDGET + ).all() + + user_limit_map = {limit.owner_id: limit for limit in user_limits} + for user in users: - # Check budget limit - user_limit = db.query(DBLimitedResource).filter( - and_( - DBLimitedResource.owner_type == OwnerType.USER, - DBLimitedResource.owner_id == user.id, - DBLimitedResource.resource == ResourceType.BUDGET - ) - ).first() + user_limit = user_limit_map.get(user.id) if user_limit and user_limit.current_value is not None: # Check if usage has been used up if user_limit.current_value >= user_limit.max_value: logger.info(f"Trial user {user.email} (ID: {user.id}) has fully used up their budget ({user_limit.current_value} >= {user_limit.max_value}). Setting for removal.") - - # 1. Disable user - user.is_active = False - user.updated_at = datetime.now(UTC) - - # 2. Disable keys (set duration to 0) - keys = db.query(DBPrivateAIKey).filter(DBPrivateAIKey.owner_id == user.id).all() - for key in keys: - if key.litellm_token and key.region: - try: - litellm_service = LiteLLMService( - api_url=key.region.litellm_api_url, - api_key=key.region.litellm_api_key - ) - await litellm_service.update_key_duration(key.litellm_token, "0d") - logger.info(f"Set duration to 0d for key {key.id}") - except Exception as e: - logger.error(f"Failed to expire key {key.id}: {e}") + await deactivate_trial_user(db, user) db.commit() logger.info(f"Finished monitoring {len(users)} trial users") diff --git a/tests/test_monitor_trial_users.py b/tests/test_monitor_trial_users.py index eca38ad7..6425bfda 100644 --- a/tests/test_monitor_trial_users.py +++ b/tests/test_monitor_trial_users.py @@ -15,6 +15,7 @@ def trial_team(db: Session): ) db.add(team) db.commit() + db.refresh(team) return team @pytest.fixture @@ -27,6 +28,7 @@ def trial_user(db: Session, trial_team: DBTeam): ) db.add(user) db.commit() + db.refresh(user) return user @pytest.fixture @@ -39,6 +41,7 @@ def trial_region(db: Session): ) db.add(region) db.commit() + db.refresh(region) return region @pytest.fixture @@ -52,6 +55,7 @@ def trial_key(db: Session, trial_user: DBUser, trial_region: DBRegion, trial_tea ) db.add(key) db.commit() + db.refresh(key) return key @pytest.fixture @@ -69,6 +73,7 @@ def user_budget_limit(db: Session, trial_user: DBUser): ) db.add(limit) db.commit() + db.refresh(limit) return limit @pytest.fixture From ad8bbcbb9c4ed08eb30191305eae17eecff5d71b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Jun 2026 17:35:50 +0000 Subject: [PATCH 4/4] fix: re-raise, ordering, is_active filter in monitor_trial_users - Add raise after db.rollback() so callers see job failures - Commit DB user deactivation before expiring LiteLLM keys to avoid inconsistent state if the external call fails - Add DBTeam.is_active.is_(True) filter to trial team lookup - Extract expire_trial_user_keys() helper for the post-commit key expiry --- app/core/worker.py | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/app/core/worker.py b/app/core/worker.py index 1acd9368..0df208ba 100644 --- a/app/core/worker.py +++ b/app/core/worker.py @@ -1380,11 +1380,8 @@ def generate_pricing_url(admin_email: str, validity_hours: int = 24) -> str: # Add the token as a query parameter return f"{url}?token={token}" -async def deactivate_trial_user(db: Session, user: DBUser): - """Deactivate a trial user and expire all their LiteLLM keys.""" - user.is_active = False - user.updated_at = datetime.now(UTC) - +async def expire_trial_user_keys(db: Session, user: DBUser): + """Expire all LiteLLM keys for a trial user (best-effort, failures are logged).""" keys = db.query(DBPrivateAIKey).filter(DBPrivateAIKey.owner_id == user.id).all() for key in keys: if key.litellm_token and key.region: @@ -1398,14 +1395,28 @@ async def deactivate_trial_user(db: Session, user: DBUser): except Exception as e: logger.error(f"Failed to expire key {key.id}: {e}") + +async def deactivate_trial_user(db: Session, user: DBUser): + """Deactivate a trial user in the DB and expire all their LiteLLM keys. + + The DB deactivation is committed by the caller before key expiry so that + the user is marked inactive even if the external LiteLLM call fails. + """ + user.is_active = False + user.updated_at = datetime.now(UTC) + + async def monitor_trial_users(db: Session): """ Monitor trial users and expire them if they have exceeded their budget. """ logger.info("Monitoring trial users") try: - # Get trial team - trial_team = db.query(DBTeam).filter(DBTeam.admin_email == settings.AI_TRIAL_TEAM_EMAIL).first() + # Get trial team (only consider active teams) + trial_team = db.query(DBTeam).filter( + DBTeam.admin_email == settings.AI_TRIAL_TEAM_EMAIL, + DBTeam.is_active.is_(True) + ).first() if not trial_team: logger.info("Trial team not found, skipping") return @@ -1426,19 +1437,27 @@ async def monitor_trial_users(db: Session): user_limit_map = {limit.owner_id: limit for limit in user_limits} + # Collect over-budget users and mark them inactive in the DB + over_budget_users = [] for user in users: user_limit = user_limit_map.get(user.id) - if user_limit and user_limit.current_value is not None: - # Check if usage has been used up if user_limit.current_value >= user_limit.max_value: logger.info(f"Trial user {user.email} (ID: {user.id}) has fully used up their budget ({user_limit.current_value} >= {user_limit.max_value}). Setting for removal.") await deactivate_trial_user(db, user) + over_budget_users.append(user) + # Commit DB changes first so users are deactivated even if key expiry fails db.commit() + + # Expire LiteLLM keys after the DB commit (best-effort) + for user in over_budget_users: + await expire_trial_user_keys(db, user) + logger.info(f"Finished monitoring {len(users)} trial users") except Exception as e: logger.error(f"Error in trial user monitoring: {e}") db.rollback() + raise