Skip to content

Commit a8973f9

Browse files
committed
add stopAndClearTimer
Signed-off-by: Tilak Raj <tilak.raj94@gmail.com>
1 parent b9939b8 commit a8973f9

2 files changed

Lines changed: 20 additions & 5 deletions

File tree

server/jetstream.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,9 @@ func (s *Server) shutdownJetStream() {
10301030
qch, stopped = cc.qch, cc.stopped
10311031
cc.qch, cc.stopped = nil, nil
10321032
}
1033+
// Cancel any pending checkForOrphans timer so it cannot fire on a
1034+
// server that is already shutting down.
1035+
stopAndClearTimer(&cc.orphanTimer)
10331036
js.stopUpdatesSub()
10341037
if cc.c != nil {
10351038
cc.c.closeConnection(ClientClosed)

server/jetstream_cluster.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ type jetStreamCluster struct {
8787
// with existing cluster assignments during ptc promotion. This prevents
8888
// the ptc marker from being cleaned up while unresolved conflicts remain.
8989
ptcConflicts int
90+
// Tracks the pending checkForOrphans timer so it can be cancelled on
91+
// shutdown, preventing the function from firing on a stopped server.
92+
orphanTimer *time.Timer
9093
}
9194

9295
// Used to track inflight stream create/update/delete requests that have been proposed but not yet applied.
@@ -1465,15 +1468,19 @@ func (js *jetStream) checkForOrphans() {
14651468
return
14661469
}
14671470
if meta.Leaderless() {
1471+
// Track the timer in cc.orphanTimer so shutdownJetStream can cancel it.
1472+
stopAndClearTimer(&cc.orphanTimer)
1473+
cc.orphanTimer = time.AfterFunc(10*time.Second, js.checkForOrphans)
14681474
js.mu.Unlock()
14691475
s.Debugf("JetStream cluster skipping orphan check, no meta-leader, will retry")
1470-
time.AfterFunc(10*time.Second, js.checkForOrphans)
14711476
return
14721477
}
14731478
if !meta.Current() {
1479+
// Track the timer in cc.orphanTimer so shutdownJetStream can cancel it.
1480+
stopAndClearTimer(&cc.orphanTimer)
1481+
cc.orphanTimer = time.AfterFunc(10*time.Second, js.checkForOrphans)
14741482
js.mu.Unlock()
14751483
s.Debugf("JetStream cluster skipping orphan check, meta not current yet will retry")
1476-
time.AfterFunc(10*time.Second, js.checkForOrphans)
14771484
return
14781485
}
14791486
ourPeerID := meta.ID()
@@ -1626,7 +1633,12 @@ func (js *jetStream) checkForOrphans() {
16261633
// looping indefinitely for persistent conflicts (ptcConflicts)
16271634
// that require operator attention.
16281635
if pendingAdoptions {
1629-
time.AfterFunc(10*time.Second, js.checkForOrphans)
1636+
js.mu.Lock()
1637+
if js.cluster != nil {
1638+
stopAndClearTimer(&js.cluster.orphanTimer)
1639+
js.cluster.orphanTimer = time.AfterFunc(10*time.Second, js.checkForOrphans)
1640+
}
1641+
js.mu.Unlock()
16301642
}
16311643
} else {
16321644
if err := js.removePTCMarker(); err != nil {
@@ -2240,7 +2252,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
22402252
mset.mu.RLock()
22412253
cfg := mset.cfg
22422254
mset.mu.RUnlock()
2243-
// During active PTC promotion, keep R1 orphans alive
2255+
// keep them alive, checkForOrphans() will pick them up and propose adoptions
22442256
if cfg.Replicas == 1 && ptc {
22452257
continue
22462258
}
@@ -2254,7 +2266,7 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
22542266
mset.mu.RLock()
22552267
replicas := mset.cfg.Replicas
22562268
mset.mu.RUnlock()
2257-
// During active PTC promotion, keep R1 orphans alive
2269+
// keep them alive, checkForOrphans() will pick them up and propose adoptions
22582270
if replicas == 1 && ptc {
22592271
continue
22602272
}

0 commit comments

Comments
 (0)