Skip to content
111 changes: 100 additions & 11 deletions cmd/agenttrace/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func main() {
}

if strings.TrimSpace(*searchQuery) != "" {
sessions := engine.LoadAll(sessionsDir)
sessions := loadSessionsForSearch(sessionsDir)
if len(sessions) == 0 {
fmt.Fprintf(os.Stderr, i18n.T("no_session_files")+"\n", sessionsDir)
os.Exit(1)
Expand Down Expand Up @@ -281,21 +281,17 @@ func main() {

// Waste analysis for latest session
if *wasteFlag {
files := engine.FindSessionFiles(sessionsDir)
cache := engine.LoadSessionCache()
files := engine.FindSessionFilesCached(sessionsDir, cache)
if len(files) == 0 {
fmt.Fprintf(os.Stderr, i18n.T("no_session_files")+"\n", sessionsDir)
os.Exit(1)
}
targetPath := latestSessionFile(files)
if targetPath == "" {
s := loadLatestSession(files)
if s == nil {
fmt.Fprint(os.Stderr, i18n.T("cli_no_session_files"))
os.Exit(1)
}
s, err := engine.LoadSession(targetPath)
if err != nil {
fmt.Fprintf(os.Stderr, i18n.T("cli_error"), err)
os.Exit(1)
}
wr := engine.ComputeWasteReportFromSession(s)
fmt.Print(engine.WasteReportText(wr))
return
Expand All @@ -304,12 +300,16 @@ func main() {
// Resolve target path
targetPath := path
if *latest {
files := engine.FindSessionFiles(sessionsDir)
cache := engine.LoadSessionCache()
files := engine.FindSessionFilesCached(sessionsDir, cache)
if len(files) == 0 {
fmt.Fprintf(os.Stderr, i18n.T("no_session_files")+"\n", sessionsDir)
os.Exit(1)
}
targetPath = latestSessionFile(files)
s := loadLatestSession(files)
if s != nil {
targetPath = s.Path
}
}

if targetPath == "" {
Expand Down Expand Up @@ -353,6 +353,21 @@ func resolveDefaultDir() string {
return ""
}

func loadSessionsForSearch(sessionsDir string) []engine.Session {
if sessionsDir == "" {
return engine.LoadAll("")
}
var sessions []engine.Session
for _, f := range engine.FindSessionFiles(sessionsDir) {
s, err := engine.LoadSession(f)
if err != nil {
continue
}
sessions = append(sessions, *s)
}
return sessions
}

func latestSessionFile(files []string) string {
type candidate struct {
path string
Expand Down Expand Up @@ -381,6 +396,80 @@ func latestSessionFile(files []string) string {
return latest.path
}

// loadLatestSession loads the latest session using cache for better performance
func loadLatestSession(files []string) *engine.Session {
cache := engine.LoadSessionCache()
var latest *engine.Session
var latestTime time.Time
var latestMtime time.Time
var hasTimestamp bool // Track if current best has SessionStart
var uncachedPaths []string

// First try cache
for _, f := range files {
if s, ok := engine.CachedSession(f, cache); ok {
if s.Metrics.SessionStart != "" {
if ts, err := time.Parse(time.RFC3339, s.Metrics.SessionStart); err == nil {
if latest == nil || !hasTimestamp || ts.After(latestTime) {
latest = &s
latestTime = ts
hasTimestamp = true
}
}
} else if !hasTimestamp {
// Fallback to mtime only if no timestamped session found
if info, err := os.Stat(f); err == nil {
if latest == nil || info.ModTime().After(latestMtime) {
latest = &s
latestMtime = info.ModTime()
}
}
}
} else {
uncachedPaths = append(uncachedPaths, f)
}
}

// Always check uncached paths for newer sessions
for _, f := range uncachedPaths {
s, err := engine.LoadSession(f)
if err != nil {
continue
}

// Save to cache
info, err := os.Stat(f)
if err != nil {
continue
}
cache.Entries[f] = engine.CacheEntry{
ModTime: info.ModTime().UnixNano(),
Size: info.Size(),
Session: *s,
}

if s.Metrics.SessionStart != "" {
if ts, err := time.Parse(time.RFC3339, s.Metrics.SessionStart); err == nil {
if latest == nil || !hasTimestamp || ts.After(latestTime) {
latest = s
latestTime = ts
hasTimestamp = true
}
}
} else if !hasTimestamp {
// Fallback to mtime only if no timestamped session found
if latest == nil || info.ModTime().After(latestMtime) {
latest = s
latestMtime = info.ModTime()
}
}
}

// Save cache
engine.SaveSessionCache(cache)
return latest
}

func newerSessionCandidate(a, b struct {
path string
sessionTime time.Time
Expand Down
27 changes: 26 additions & 1 deletion cmd/agenttrace/main_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import "testing"
import (
"os"
"path/filepath"
"testing"
)

func TestHasPostPricingAction(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -38,3 +42,24 @@ func TestHasPostPricingAction(t *testing.T) {
})
}
}

func TestLoadSessionsForSearchIncludesNestedDirs(t *testing.T) {
dir := t.TempDir()
nested := filepath.Join(dir, "2026", "05", "25")
if err := os.MkdirAll(nested, 0755); err != nil {
t.Fatal(err)
}
path := filepath.Join(nested, "session.jsonl")
raw := `{"timestamp":"2026-05-25T00:00:00Z","type":"session_meta","payload":{"id":"search-nested","cwd":"/tmp/search-nested","model":"gpt-5.4"}}` + "\n" +
`{"timestamp":"2026-05-25T00:00:01Z","type":"response_item","payload":{"type":"message","role":"user","content":[{"type":"input_text","text":"hello"}]}}` + "\n"
if err := os.WriteFile(path, []byte(raw), 0644); err != nil {
t.Fatal(err)
}
sessions := loadSessionsForSearch(dir)
if len(sessions) != 1 {
t.Fatalf("expected nested session to be loaded, got %d", len(sessions))
}
if sessions[0].Path != path {
t.Fatalf("loaded wrong path: %s", sessions[0].Path)
}
}
179 changes: 179 additions & 0 deletions internal/engine/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// batch.go - Batch processing optimizations
// Copyright 2026 agenttrace contributors. MIT License.

package engine

import (
"sort"
"sync"
)

// BatchProcessor processes sessions in batches for optimized bulk operations.
type BatchProcessor struct {
mu sync.RWMutex
sessions []Session
errors []error
}

// NewBatchProcessor creates a new batch processor.
func NewBatchProcessor() *BatchProcessor {
return &BatchProcessor{
sessions: make([]Session, 0, 256),
errors: make([]error, 0, 16),
}
}

// Process processes sessions in batches.
func (bp *BatchProcessor) Process(paths []string, workers int) []Session {
if len(paths) == 0 {
return nil
}

if workers <= 0 {
workers = 4
}

jobs := make(chan string, len(paths))
results := make(chan struct {
session Session
err error
}, len(paths))

// Start workers
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for path := range jobs {
s, err := LoadSession(path)
if err != nil {
results <- struct {
session Session
err error
}{err: err}
continue
}
results <- struct {
session Session
err error
}{session: *s}
}
}()
}

// Dispatch jobs
go func() {
for _, path := range paths {
jobs <- path
}
close(jobs)
}()

// Collect results
go func() {
wg.Wait()
close(results)
}()

// Process results
for r := range results {
if r.err != nil {
bp.errors = append(bp.errors, r.err)
continue
}
bp.sessions = append(bp.sessions, r.session)
}

// Sort by timestamp
sort.Slice(bp.sessions, func(i, j int) bool {
return bp.sessions[i].Metrics.SessionStart > bp.sessions[j].Metrics.SessionStart
})

return bp.sessions
}

// Stats holds processing statistics.
type Stats struct {
Total int
Success int
Failed int
Errors []error
}

// ProcessWithStats processes sessions with statistics tracking.
func ProcessWithStats(paths []string, workers int) ([]Session, Stats) {
bp := NewBatchProcessor()
sessions := bp.Process(paths, workers)

stats := Stats{
Total: len(paths),
Success: len(sessions),
Failed: len(bp.errors),
Errors: bp.errors,
}

return sessions, stats
}

// FilterByHealth filters sessions by minimum health score.
func FilterByHealth(sessions []Session, minHealth int) []Session {
var filtered []Session
for _, s := range sessions {
if s.Health >= minHealth {
filtered = append(filtered, s)
}
}
return filtered
}

// FilterBySource filters sessions by source tool.
func FilterBySource(sessions []Session, source string) []Session {
var filtered []Session
for _, s := range sessions {
if s.Metrics.SourceTool == source {
filtered = append(filtered, s)
}
}
return filtered
}

// FilterByModel filters sessions by model.
func FilterByModel(sessions []Session, model string) []Session {
var filtered []Session
for _, s := range sessions {
if s.Metrics.ModelUsed == model {
filtered = append(filtered, s)
}
}
return filtered
}

// AggregateByAgent aggregates statistics by agent source.
func AggregateByAgent(sessions []Session) map[string]AgentOverview {
agents := make(map[string]AgentOverview)
for _, s := range sessions {
agent := s.Metrics.SourceTool
ao := agents[agent]
ao.Sessions++
ao.Cost += s.Metrics.CostEstimated
agents[agent] = ao
}
return agents
}

// AggregateByModel aggregates statistics by model.
func AggregateByModel(sessions []Session) map[string]ModelOverview {
models := make(map[string]ModelOverview)
for _, s := range sessions {
model := s.Metrics.ModelUsed
if model == "" {
model = "unknown"
}
mo := models[model]
mo.Sessions++
mo.Cost += s.Metrics.CostEstimated
models[model] = mo
}
return models
}
Loading