Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions internal/jobscheduler/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ type RootScheduler struct {
active map[string]string // queue -> job id
activeClones int
maxCloneConcurrency int
cancel context.CancelFunc
wg sync.WaitGroup
store ScheduleStore
metrics *schedulerMetrics
// ctx is cancelled when the scheduler is shutting down. Periodic re-arm
// goroutines select on it so they exit cleanly instead of submitting to a
// dead scheduler.
ctx context.Context //nolint:containedctx
cancel context.CancelFunc
wg sync.WaitGroup
store ScheduleStore
metrics *schedulerMetrics
}

var _ Scheduler = &RootScheduler{}
Expand Down Expand Up @@ -126,6 +130,7 @@ func New(ctx context.Context, config Config) (*RootScheduler, error) {
}
q.cond = sync.NewCond(&q.lock)
ctx, cancel := context.WithCancel(ctx)
q.ctx = ctx
q.cancel = cancel
// Wake all workers on context cancellation so they can observe done and exit.
go func() {
Expand Down Expand Up @@ -158,13 +163,20 @@ func (q *RootScheduler) WithQueuePrefix(prefix string) Scheduler {

func (q *RootScheduler) Submit(queue, id string, run func(ctx context.Context) error) {
q.lock.Lock()
if q.done {
q.lock.Unlock()
return
}
q.queue = append(q.queue, queueJob{queue: queue, id: id, run: run})
q.metrics.queueDepth.Record(context.Background(), int64(len(q.queue)))
q.lock.Unlock()
q.cond.Signal()
}

func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Duration, run func(ctx context.Context) error) {
if q.ctx.Err() != nil {
return
}
key := jobKey(queue, id)
delay := q.periodicDelay(key, interval)
submit := func() {
Expand All @@ -175,21 +187,34 @@ func (q *RootScheduler) SubmitPeriodicJob(queue, id string, interval time.Durati
logging.FromContext(ctx).WarnContext(ctx, "Failed to record job last run", "key", key, "error", storeErr)
}
}
go func() {
time.Sleep(interval)
// Re-arm the next firing on a cancellation-aware timer. Without
// this select, a SIGTERM during the sleep would leave the goroutine
// to wake and submit to a dead scheduler. The new pod's
// warmExistingRepos re-registers periodic jobs on startup.
go q.sleepThenSubmit(interval, func() {
q.SubmitPeriodicJob(queue, id, interval, run)
}()
})
return errors.WithStack(err)
})
}
if delay <= 0 {
submit()
return
}
go func() {
time.Sleep(delay)
submit()
}()
go q.sleepThenSubmit(delay, submit)
}

// sleepThenSubmit waits for d, then runs fn — unless the scheduler is
// shutting down, in which case it returns immediately.
func (q *RootScheduler) sleepThenSubmit(d time.Duration, fn func()) {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-q.ctx.Done():
return
case <-timer.C:
fn()
}
}

func (q *RootScheduler) periodicDelay(key string, interval time.Duration) time.Duration {
Expand Down
53 changes: 53 additions & 0 deletions internal/jobscheduler/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,59 @@ func TestJobSchedulerPeriodicJobWithError(t *testing.T) {
}, "periodic job should continue executing even after errors")
}

// TestJobSchedulerPeriodicJobStopsOnCancel verifies that the periodic re-arm
// goroutine exits cleanly when the scheduler's context is cancelled. Without
// this, a SIGTERM would leave the re-arm goroutine to wake and submit to a
// dead scheduler — and on a real pod that goroutine dies with the process,
// causing the periodic job to skip a full interval after every restart.
func TestJobSchedulerPeriodicJobStopsOnCancel(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
defer cancel()

scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

var executions atomic.Int32
scheduler.SubmitPeriodicJob("queue1", "periodic", 50*time.Millisecond, func(_ context.Context) error {
executions.Add(1)
return nil
})

eventually(t, time.Second, func() bool { return executions.Load() >= 2 },
"periodic job should fire at least twice before cancel")

cancel()
// Give workers and the re-arm goroutine time to observe the cancel.
time.Sleep(150 * time.Millisecond)
before := executions.Load()
// Wait several intervals; no further executions should occur.
time.Sleep(300 * time.Millisecond)
assert.Equal(t, before, executions.Load(),
"periodic job should not fire after scheduler context is cancelled")
}

// TestJobSchedulerSubmitDroppedAfterShutdown verifies that submissions made
// after the scheduler has been shut down are silently dropped rather than
// accumulating in the queue (which would leak the closure capture forever).
func TestJobSchedulerSubmitDroppedAfterShutdown(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)

scheduler := newTestScheduler(ctx, t, jobscheduler.Config{Concurrency: 2})

cancel()
// Wait for the scheduler to observe the cancel and set q.done.
time.Sleep(50 * time.Millisecond)

var executed atomic.Bool
scheduler.Submit("queue1", "post-shutdown", func(_ context.Context) error {
executed.Store(true)
return nil
})
time.Sleep(100 * time.Millisecond)
assert.False(t, executed.Load(), "submissions after shutdown should be dropped")
}

func TestJobSchedulerMultipleQueues(t *testing.T) {
_, ctx := logging.Configure(context.Background(), logging.Config{Level: slog.LevelError})
ctx, cancel := context.WithCancel(ctx)
Expand Down