Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a5197b4
feat: redesign Run History + row counts per model (OR-1477)
wicky-zipstack Apr 23, 2026
599c6a4
feat: per-adapter row count capture with insert/update/delete breakdown
wicky-zipstack Apr 23, 2026
8dff116
fix: exclude ephemeral models from counts, avatar trigger, color fixes
wicky-zipstack Apr 23, 2026
35e1353
fix: initialize row count variables before try block (Trino, Snowflak…
wicky-zipstack Apr 24, 2026
9aaad8e
chore: cleanup unused imports, fix JSONField search, logging level
wicky-zipstack Apr 24, 2026
7e5ecc4
feat: job switcher bar in Run History + View job config opens drawer
wicky-zipstack Apr 24, 2026
e719d6d
fix: reduce job switcher select + arrow buttons from large to middle
wicky-zipstack Apr 24, 2026
b053ed0
fix: address PR review — sparkline order, periodic_task 500, N+1 quer…
wicky-zipstack Apr 24, 2026
9cde74f
fix: prefer adapter cursor.rowcount over total table size for increme…
wicky-zipstack Apr 24, 2026
c8799f5
fix: do not expose exception details in run_stats API response
wicky-zipstack Apr 24, 2026
0f2df1a
fix: guard against cursor.rowcount returning -1 across all adapters
wicky-zipstack Apr 25, 2026
6240104
fix: resolve ESLint and Prettier formatting errors in Runhistory.jsx
wicky-zipstack Apr 25, 2026
a525d56
fix: Prettier formatting for RunHistory.css
wicky-zipstack Apr 25, 2026
147e654
fix: address Tahier's review — serializer fields, kwargs guard, run_n…
wicky-zipstack Apr 25, 2026
25470e6
fix: cron label, per-model field names, duration parser
wicky-zipstack Apr 27, 2026
923d615
fix: zero rows now shows 0 instead of — (null)
wicky-zipstack Apr 28, 2026
6d9ee7c
fix: Databricks cursor safety, Postgres fallback return, search debounce
wicky-zipstack Apr 28, 2026
7ae287c
fix: add missing useEffect dependencies (pageSize, fetchHistory, fetc…
wicky-zipstack Apr 28, 2026
85ee5d9
fix: add loading spinner to Retry button in run history table
wicky-zipstack Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions backend/backend/core/scheduler/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,30 @@ def _clean_name(raw):
"status": r.status,
"end_status": r.end_status,
"sequence": r.sequence_num,
"rows_affected": getattr(r, "rows_affected", None),
"rows_inserted": getattr(r, "rows_inserted", None),
"rows_updated": getattr(r, "rows_updated", None),
"rows_deleted": getattr(r, "rows_deleted", None),
"type": getattr(r, "materialization", "") or "",
"duration_ms": getattr(r, "duration_ms", None),
}
for r in user_results
],
"total": len(user_results),
"passed": sum(1 for r in user_results if r.end_status == "OK"),
"failed": sum(1 for r in user_results if r.end_status == "FAIL"),
"rows_processed": sum(
getattr(r, "rows_affected", 0) or 0 for r in user_results
) or None,
"rows_added": sum(
getattr(r, "rows_inserted", 0) or 0 for r in user_results
Comment thread
wicky-zipstack marked this conversation as resolved.
Outdated
) or None,
"rows_modified": sum(
getattr(r, "rows_updated", 0) or 0 for r in user_results
) or None,
"rows_deleted": sum(
getattr(r, "rows_deleted", 0) or 0 for r in user_results
) or None,
}
except Exception:
_clear_base_result()
Expand Down
87 changes: 83 additions & 4 deletions backend/backend/core/scheduler/serializer.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,96 @@
from django.contrib.auth import get_user_model
from rest_framework import serializers

from backend.core.scheduler.models import TaskRunHistory

User = get_user_model()


class TaskRunHistorySerializer(serializers.ModelSerializer):
duration = serializers.SerializerMethodField()
duration_ms = serializers.SerializerMethodField()
run_number = serializers.SerializerMethodField()
triggered_by = serializers.SerializerMethodField()
model_count = serializers.SerializerMethodField()
failed_models = serializers.SerializerMethodField()
skipped_count = serializers.SerializerMethodField()

class Meta:
model = TaskRunHistory
fields = "__all__" # Include all fields or specify fields like ['id', 'start_time', 'end_time', 'status']
fields = "__all__"
Comment thread
wicky-zipstack marked this conversation as resolved.
Outdated

def get_duration(self, obj):
"""Calculate duration (end_time - start_time)"""
"""Human-readable duration string."""
if obj.start_time and obj.end_time:
delta = obj.end_time - obj.start_time
total_ms = int(delta.total_seconds() * 1000)
if total_ms < 1000:
return f"{total_ms}ms"
elif total_ms < 60000:
return f"{total_ms / 1000:.1f}s"
else:
minutes = total_ms // 60000
seconds = (total_ms % 60000) / 1000
return f"{minutes}m {seconds:.0f}s"
return None

def get_duration_ms(self, obj):
"""Duration in milliseconds for sorting/comparison."""
if obj.start_time and obj.end_time:
return str(obj.end_time - obj.start_time) # Convert timedelta to string
return None # If end_time is missing, return None
return int((obj.end_time - obj.start_time).total_seconds() * 1000)
return None

def get_run_number(self, obj):
"""Sequential run number within the job (1 = oldest)."""
if not hasattr(self, "_run_number_cache"):
self._run_number_cache = {}
task_detail_id = obj.user_task_detail_id
if task_detail_id not in self._run_number_cache:
# Get all run IDs for this job ordered by start_time ASC
run_ids = list(
TaskRunHistory.objects.filter(user_task_detail_id=task_detail_id)
.order_by("start_time")
.values_list("id", flat=True)
)
self._run_number_cache[task_detail_id] = {
rid: idx + 1 for idx, rid in enumerate(run_ids)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
}
return self._run_number_cache[task_detail_id].get(obj.id, 0)

def get_triggered_by(self, obj):
"""Resolve user_id from kwargs to username."""
if not obj.kwargs:
return None
user_id = obj.kwargs.get("user_id")
if not user_id:
return None
try:
user = User.objects.get(id=user_id)
return {
"id": str(user.id),
"username": user.get_full_name() or user.username or user.email,
}
except (User.DoesNotExist, ValueError):
return {"id": str(user_id), "username": str(user_id)}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated

Comment thread
wicky-zipstack marked this conversation as resolved.
def get_model_count(self, obj):
"""Total model count from result."""
if obj.result and isinstance(obj.result, dict):
return obj.result.get("total", 0)
return 0

def get_failed_models(self, obj):
"""List of failed model names."""
if obj.result and isinstance(obj.result, dict):
models = obj.result.get("models", [])
return [m["name"] for m in models if m.get("end_status") == "FAIL" or m.get("status") == "failure"]
return []

def get_skipped_count(self, obj):
"""Count of skipped models (total - passed - failed)."""
if obj.result and isinstance(obj.result, dict):
total = obj.result.get("total", 0)
passed = obj.result.get("passed", 0)
failed = obj.result.get("failed", 0)
return max(0, total - passed - failed)
return 0
2 changes: 2 additions & 0 deletions backend/backend/core/scheduler/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
delete_periodic_task,
update_periodic_task,
task_run_history,
run_stats,
trigger_task_once,
trigger_task_once_for_model,
list_deploy_candidates,
Expand All @@ -30,6 +31,7 @@
name="get_periodic_task",
),
path("/run-history/<int:user_task_id>", task_run_history, name="task_run_history"),
path("/run-stats/<int:user_task_id>", run_stats, name="run_stats"),
path(
"/trigger-periodic-task/<int:user_task_id>",
trigger_task_once,
Expand Down
127 changes: 124 additions & 3 deletions backend/backend/core/scheduler/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,109 @@
)


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def run_stats(request, project_id, user_task_id):
"""Get aggregated run statistics for a job — stats cards data."""
try:
query = {"id": user_task_id}
if _is_valid_project_id(project_id):
query["project__project_uuid"] = project_id
Comment thread
wicky-zipstack marked this conversation as resolved.
task = UserTaskDetails.objects.get(**query)
runs = TaskRunHistory.objects.filter(user_task_detail=task)

now = timezone.now()
last_7d = now - timedelta(days=7)
last_24h = now - timedelta(hours=24)
prev_24h_start = now - timedelta(hours=48)

# Success rate (7 days)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — success_rate_7d quietly penalizes in-progress and silently-killed runs

total_7d is runs_7d.count() (every status), but success_7d is restricted to status="SUCCESS". A run that's currently STARTED (or one that was killed by an OOM and is still STARTED in the DB) is in the denominator but not in the numerator — so the success rate dips during/after every active run, even though no actual failure happened.

Two reasonable normalizations:

  • Exclude in-flight runs from the denominator: total_7d = runs_7d.filter(status__in=["SUCCESS", "FAILURE"]).count() — measures completed runs only.
  • Treat orphaned STARTED rows as failures explicitly: a sweeper that flips stale STARTEDFAILURE after a timeout. Probably overkill for this PR.

At minimum, the metric's definition should be documented next to the API or in the UI tooltip so users know what they're looking at.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 147e654 — denominator now only counts completed runs (status__in=["SUCCESS", "FAILURE"]). In-progress/stale STARTED runs no longer drag down the rate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 147e654 — denominator now only counts completed runs (status__in=["SUCCESS", "FAILURE"]). In-progress/stale STARTED runs no longer drag down the rate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 147e654 — denominator now only counts completed runs (SUCCESS/FAILURE). In-progress runs excluded.

runs_7d = runs.filter(start_time__gte=last_7d)
total_7d = runs_7d.count()
success_7d = runs_7d.filter(status="SUCCESS").count()
success_rate = round((success_7d / total_7d * 100), 1) if total_7d > 0 else None

# Average duration (successful runs, 7 days)
successful_runs_7d = runs_7d.filter(status="SUCCESS", start_time__isnull=False, end_time__isnull=False)
avg_duration_ms = None
if successful_runs_7d.exists():
durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in successful_runs_7d]
avg_duration_ms = int(sum(durations) / len(durations))

# Failures (24h) + comparison with previous 24h
failures_24h = runs.filter(start_time__gte=last_24h, status="FAILURE").count()
failures_prev_24h = runs.filter(
start_time__gte=prev_24h_start, start_time__lt=last_24h, status="FAILURE"
).count()

# Last successful run
last_success = runs.filter(status="SUCCESS").order_by("-end_time").first()
last_success_time = last_success.end_time if last_success else None

# Expected duration (avg of last 5 successful runs)
recent_successes = runs.filter(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — Aggregations done in Python, not SQL

Both avg_duration_ms and expected_duration_ms materialize every successful run into Python and then iterate to compute a mean. For a job with thousands of weekly runs this is slow and memory-heavy. Push it to the database:

from django.db.models import Avg, F, ExpressionWrapper, DurationField, Count, Case, When
from django.db.models.functions import Extract

avg_dur = successful_runs_7d.aggregate(
    avg=Avg(ExpressionWrapper(F("end_time") - F("start_time"), output_field=DurationField()))
)["avg"]
avg_duration_ms = int(avg_dur.total_seconds() * 1000) if avg_dur else None

More broadly, this endpoint fires 8+ separate queries (runs_7d.count(), success_7d.count(), failures_24h.count(), failures_prev_24h.count(), runs.count(), the last_success query, successful_runs_7d.exists(), the recent-successes slice, the duration-trend slice). Most of the counts could collapse into a single runs.aggregate(Count(Case(When(...)))) call. For a high-volume endpoint that the new dashboard hits on every page load, the round-trip cost adds up.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. For the current scale (most jobs have <1k weekly runs), the Python-side aggregation works fine. Will refactor to Django ORM Aggregate calls in a follow-up optimization pass when we tackle the high-volume jobs use case. Noted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. For current scale (most jobs <1k weekly runs), Python-side aggregation works fine. Will refactor to Django ORM Aggregate calls in a follow-up optimization pass. Noted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. Current scale works fine. Will refactor to Django ORM Aggregate calls in a follow-up optimization pass.

status="SUCCESS", start_time__isnull=False, end_time__isnull=False
).order_by("-end_time")[:5]
expected_duration_ms = None
if recent_successes.exists():
durations = [(r.end_time - r.start_time).total_seconds() * 1000 for r in recent_successes]
expected_duration_ms = int(sum(durations) / len(durations))

# Duration trend (last 10 completed runs for sparkline)
recent_runs = runs.filter(
start_time__isnull=False, end_time__isnull=False
).order_by("end_time")[:10]
duration_trend = [
int((r.end_time - r.start_time).total_seconds() * 1000) for r in recent_runs
]

# Schedule info
schedule_type = None
schedule_label = None
try:
periodic = task.periodic_task
if periodic:
if periodic.crontab:
schedule_type = "cron"
c = periodic.crontab
schedule_label = f"{c.minute} {c.hour} {c.day_of_week}"
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Outdated
elif periodic.interval:
Comment thread
greptile-apps[bot] marked this conversation as resolved.
schedule_type = "interval"
schedule_label = f"Every {periodic.interval.every} {periodic.interval.period}"
except Exception:
pass

return Response({
"success": True,
"data": {
"success_rate_7d": success_rate,
"success_count_7d": success_7d,
"total_count_7d": total_7d,
"avg_duration_ms": avg_duration_ms,
"failures_24h": failures_24h,
"failures_prev_24h": failures_prev_24h,
"failures_change": failures_24h - failures_prev_24h,
"last_successful_run": last_success_time,
"expected_duration_ms": expected_duration_ms,
"duration_trend": duration_trend,
"total_runs": runs.count(),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 — total_runs does an unbounded COUNT(*) for an unused field

"total_runs": runs.count(),

For a job that's been running every 5 minutes for a year, this is a 100k+ row COUNT on every page load. A grep of the frontend confirms stats.total_runs is never read anywhere — the field is dead weight. Either remove it, or scope it to the same 7d window the rest of the stats use.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is used by the FE for run_number computation (total - offset - idx). Can't remove it. For high-volume jobs this could be optimized with a cached count, but for current scale it's acceptable.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is needed — the FE uses it for run_number computation (total - offset - idx). Can't remove it without breaking run numbering.

"job_name": task.task_name,
"environment": {
"name": task.environment.environment_name if task.environment else None,
"type": task.environment.deployment_type if task.environment else None,
},
"schedule_type": schedule_type,
"schedule_label": schedule_label,
"schedule_enabled": task.periodic_task.enabled if task.periodic_task else False,
},
Comment thread
greptile-apps[bot] marked this conversation as resolved.
}, status=status.HTTP_200_OK)
except UserTaskDetails.DoesNotExist:
return Response({"error": "Task not found"}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error getting run stats: {e}")
return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed


@api_view(["GET"])
@permission_classes([IsAuthenticated])
def task_run_history(request, project_id, user_task_id):
Expand All @@ -600,12 +703,28 @@
trigger_filter = request.GET.get("trigger")
scope_filter = request.GET.get("scope")
status_filter = request.GET.get("status")
date_from = request.GET.get("date_from")
date_to = request.GET.get("date_to")
search = request.GET.get("search")

if trigger_filter:
runs = runs.filter(trigger=trigger_filter)
if scope_filter:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3 — scope_filter is dead code from this UI

scope_filter = request.GET.get("scope")
...
if scope_filter:
    runs = runs.filter(scope=scope_filter)

The redesigned filter bar dropped the Scope select, so the frontend no longer sends scope. The backend still accepts the param — fine for back-compat, but it's now dead from this UI's perspective. Either remove it or document why it's still here (e.g., third-party API consumers).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept intentionally for backward compatibility — the BE still supports the scope filter if any other client sends it. Harmless dead path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kept for backward compatibility — harmless dead path, BE still supports it if other clients send it.

runs = runs.filter(scope=scope_filter)
if status_filter:
runs = runs.filter(status=status_filter)
if date_from:
from django.utils.dateparse import parse_datetime
dt = parse_datetime(date_from)
if dt:
runs = runs.filter(start_time__gte=dt)
if date_to:
from django.utils.dateparse import parse_datetime as parse_dt
dt = parse_dt(date_to)
if dt:
runs = runs.filter(start_time__lte=dt)
Comment thread
greptile-apps[bot] marked this conversation as resolved.
if search:
runs = runs.filter(error_message__icontains=search)

runs = runs.order_by("-start_time")
total = runs.count()
Expand All @@ -620,6 +739,7 @@
"page_items": {
"id": task.id,
"job_name": task.task_name,
"project_id": str(task.project.project_uuid) if task.project else None,
"env_type": task.environment.deployment_type
if task.environment
else None,
Expand Down Expand Up @@ -705,9 +825,10 @@
synchronous (in-process) execution so local dev works without Redis.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Must Fix: _is_valid_project_id bypass weakens authorization

When project_id is _all, the query drops the project filter entirely:

query = {"id": user_task_id}
if _is_valid_project_id(project_id):
    query["project__project_uuid"] = project_id
task = UserTaskDetails.objects.get(**query)

This means any authenticated user can trigger any job by ID regardless of project ownership. If user A doesn't have access to project B's jobs, they can still trigger them by knowing the task ID and passing _all as project_id.

Fix: When project_id is _all, filter by the user's accessible projects:

if _is_valid_project_id(project_id):
    query["project__project_uuid"] = project_id
else:
    # _all: restrict to projects the user has access to
    user_projects = ProjectDetails.objects.filter(
        organization_id=request.user.organization_id
    ).values_list('project_uuid', flat=True)
    query["project__project_uuid__in"] = user_projects

Or at minimum, verify the user has access to the task's project after fetching it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already mitigated — UserTaskDetails uses DefaultOrganizationManagerMixin which auto-filters by organization=get_organization() at the manager level. The _all shortcut only skips project-level filter (intended for cross-project job switcher view). Cross-org access is not possible.

"""
try:
task = UserTaskDetails.objects.get(
id=user_task_id, project__project_uuid=project_id
)
query = {"id": user_task_id}
if _is_valid_project_id(project_id):
query["project__project_uuid"] = project_id
task = UserTaskDetails.objects.get(**query)
except UserTaskDetails.DoesNotExist:
return Response(
{"error": "Task not found"}, status=status.HTTP_404_NOT_FOUND
Expand Down
4 changes: 2 additions & 2 deletions backend/visitran/adapters/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ def db_scd(self) -> BaseSCD:
def db_reader(self) -> BaseDBReader:
return self._db_reader

def run_model(self, visitran_model: VisitranModel) -> None:
def run_model(self, visitran_model: VisitranModel):
self.load_model(model=visitran_model)
fire_event(MaterializationType(materialization=str(visitran_model.materialization)))
self.db_model.execute()
return self.db_model.execute()

def run_seeds(self, schema: str, abs_path: str) -> None:
seed_obj = self.load_seed(schema, abs_path)
Expand Down
12 changes: 3 additions & 9 deletions backend/visitran/adapters/bigquery/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,16 +269,9 @@ def merge_into_table(
target_table_name: str,
select_statement: Table,
primary_key: Union[str, list[str]] = None,
) -> None:
) -> dict:
"""Efficient upsert using DELETE + INSERT for BigQuery.

This approach is more efficient than MERGE for BigQuery because:
1. BigQuery is optimized for bulk operations
2. DELETE + INSERT performs better than UPDATE operations
3. Works better with BigQuery's partitioning strategy

Args:
primary_key: Can be a single column name (str) or list of column names for composite keys
Returns dict with rows_affected.
"""
try:
fire_event(
Expand Down Expand Up @@ -378,6 +371,7 @@ def merge_into_table(
raise Exception(
f"BigQuery incremental upsert failed for {schema_name}.{target_table_name}: {str(e)}"
) from e
return {"rows_affected": None} # BigQuery: fallback to get_table_row_count in BaseModel



Expand Down
4 changes: 3 additions & 1 deletion backend/visitran/adapters/bigquery/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ def execute_incremental(self) -> None:
# Get primary key from model if available
primary_key = getattr(self.model, 'primary_key', None)

self.db_connection.merge_into_table(
result = self.db_connection.merge_into_table(
schema_name=self.model.destination_schema_name,
target_table_name=self.model.destination_table_name,
select_statement=self.model.select_statement,
primary_key=primary_key,
)
if result and isinstance(result, dict):
self._upsert_metrics = result
else:
fire_event(
ExecuteIncrementalCreate(
Expand Down
Loading
Loading