diff --git a/internal/jobscheduler/jobs.go b/internal/jobscheduler/jobs.go index ec36c99..123b08c 100644 --- a/internal/jobscheduler/jobs.go +++ b/internal/jobscheduler/jobs.go @@ -82,10 +82,14 @@ type RootScheduler struct { active map[string]string // queue -> job id activeClones int maxCloneConcurrency int - cancel context.CancelFunc - wg sync.WaitGroup - store ScheduleStore - metrics *schedulerMetrics + // ctx is cancelled when the scheduler is shutting down. Periodic re-arm + // goroutines select on it so they exit cleanly instead of submitting to a + // dead scheduler. + ctx context.Context //nolint:containedctx + cancel context.CancelFunc + wg sync.WaitGroup + store ScheduleStore + metrics *schedulerMetrics } var _ Scheduler = &RootScheduler{} @@ -126,6 +130,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) { } q.cond = sync.NewCond(&q.lock) ctx, cancel := context.WithCancel(ctx) + q.ctx = ctx q.cancel = cancel // Wake all workers on context cancellation so they can observe done and exit. go func() { @@ -158,6 +163,10 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler { func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) { q.lock.Lock() + if q.done { + q.lock.Unlock() + return + } q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run}) q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue))) q.lock.Unlock() @@ -165,6 +174,9 @@ func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) e } func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) { + if q.ctx.Err() != nil { + return + } key := jobKey(queue, id) delay := q.periodicDelay(key, interval) submit := func() { @@ -175,10 +187,13 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr) } } - go func() { - time.Sleep(interval) + // Re-arm the next firing on a cancellation-aware timer. Without + // this select, a SIGTERM during the sleep would leave the goroutine + // to wake and submit to a dead scheduler. The new pod's + // warmExistingRepos re-registers periodic jobs on startup. + go q.sleepThenSubmit(interval, func() { q.SubmitPeriodicJob(queue, id, interval, run) - }() + }) return errors.WithStack(err) }) } @@ -186,10 +201,20 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati submit() return } - go func() { - time.Sleep(delay) - submit() - }() + go q.sleepThenSubmit(delay, submit) +} + +// sleepThenSubmit waits for d, then runs fn — unless the scheduler is +// shutting down, in which case it returns immediately. +func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) { + timer := time.NewTimer(d) + defer timer.Stop() + select { + case <-q.ctx.Done(): + return + case <-timer.C: + fn() + } } func (q *RootScheduler) periodicDelay(key string, interval time.Duration) time.Duration { diff --git a/internal/jobscheduler/jobs_test.go b/internal/jobscheduler/jobs_test.go index 82f9940..228b6c9 100644 --- a/internal/jobscheduler/jobs_test.go +++ b/internal/jobscheduler/jobs_test.go @@ -282,6 +282,59 @@ func TestJobSchedulerPeriodicJobWithError(t *testing.T) { }, "periodic job should continue executing even after errors") } +// TestJobSchedulerPeriodicJobStopsOnCancel verifies that the periodic re-arm +// goroutine exits cleanly when the scheduler's context is cancelled. Without +// this, a SIGTERM would leave the re-arm goroutine to wake and submit to a +// dead scheduler — and on a real pod that goroutine dies with the process, +// causing the periodic job to skip a full interval after every restart. +func TestJobSchedulerPeriodicJobStopsOnCancel(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) + + var executions atomic.Int32 + scheduler.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error { + executions.Add(1) + return nil + }) + + eventually(t, time.Second, func() bool { return executions.Load() >= 2 }, + "periodic job should fire at least twice before cancel") + + cancel() + // Give workers and the re-arm goroutine time to observe the cancel. + time.Sleep(150 * time.Millisecond) + before := executions.Load() + // Wait several intervals; no further executions should occur. + time.Sleep(300 * time.Millisecond) + assert.Equal(t, before, executions.Load(), + "periodic job should not fire after scheduler context is cancelled") +} + +// TestJobSchedulerSubmitDroppedAfterShutdown verifies that submissions made +// after the scheduler has been shut down are silently dropped rather than +// accumulating in the queue (which would leak the closure capture forever). +func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) { + _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) + ctx, cancel := context.WithCancel(ctx) + + scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2}) + + cancel() + // Wait for the scheduler to observe the cancel and set q.done. + time.Sleep(50 * time.Millisecond) + + var executed atomic.Bool + scheduler.Submit("queue1", "post-shutdown", func(_ context.Context) error { + executed.Store(true) + return nil + }) + time.Sleep(100 * time.Millisecond) + assert.False(t, executed.Load(), "submissions after shutdown should be dropped") +} + func TestJobSchedulerMultipleQueues(t *testing.T) { _, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError}) ctx, cancel := context.WithCancel(ctx)