Skip to content

Commit af8c6d9

Browse files
committed
merge pendingAdoptions/conflicts
Signed-off-by: Tilak Raj <tilak.raj94@gmail.com>
1 parent 67c639b commit af8c6d9

1 file changed

Lines changed: 9 additions & 17 deletions

File tree

server/jetstream_cluster.go

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1545,16 +1545,12 @@ func (js *jetStream) checkForOrphans() {
15451545
}
15461546
if b, err := json.Marshal(cfgCopy); err != nil {
15471547
s.Warnf("Failed to marshal config for stream adoption '%s > %s': %v", accName, cfg.Name, err)
1548-
conflicts = true
1549-
continue
15501548
} else {
15511549
sa.ConfigJSON = b
1552-
}
1553-
s.Noticef("Adopting orphaned R1 stream '%s > %s' into cluster assignments", accName, cfg.Name)
1554-
if err := meta.ForwardProposal(encodeAddStreamAssignment(sa)); err != nil {
1555-
s.Warnf("Failed to propose adoption of R1 stream '%s > %s': %v", accName, cfg.Name, err)
1556-
conflicts = true
1557-
continue
1550+
s.Noticef("Adopting orphaned R1 stream '%s > %s' into cluster assignments", accName, cfg.Name)
1551+
if err := meta.ForwardProposal(encodeAddStreamAssignment(sa)); err != nil {
1552+
s.Warnf("Failed to propose adoption of R1 stream '%s > %s': %v", accName, cfg.Name, err)
1553+
}
15581554
}
15591555

15601556
// Consumer adoption is deferred to the next sweep: once the stream
@@ -1563,7 +1559,6 @@ func (js *jetStream) checkForOrphans() {
15631559
// loop below will adopt them. This avoids a race where a consumer
15641560
// proposal could be applied before the stream proposal during a leader
15651561
// transition.
1566-
conflicts = true
15671562
pendingAdoptions = true
15681563
continue
15691564
}
@@ -1611,18 +1606,15 @@ func (js *jetStream) checkForOrphans() {
16111606
}
16121607
if b, err := json.Marshal(oCfg); err != nil {
16131608
s.Warnf("Failed to marshal config for consumer adoption '%s > %s > %s': %v", accName, streamName, oName, err)
1614-
conflicts = true
1615-
continue
16161609
} else {
16171610
ca.ConfigJSON = b
1618-
}
1619-
s.Noticef("Adopting orphaned consumer '%s > %s > %s' into cluster assignments", accName, streamName, oName)
1620-
if err := meta.ForwardProposal(encodeAddConsumerAssignment(ca)); err != nil {
1621-
s.Warnf("Failed to propose adoption of consumer '%s > %s > %s': %v", accName, streamName, oName, err)
1611+
s.Noticef("Adopting orphaned consumer '%s > %s > %s' into cluster assignments", accName, streamName, oName)
1612+
if err := meta.ForwardProposal(encodeAddConsumerAssignment(ca)); err != nil {
1613+
s.Warnf("Failed to propose adoption of consumer '%s > %s > %s': %v", accName, streamName, oName, err)
1614+
}
16221615
}
16231616
// Proposal is async; keep the marker until the next sweep confirms the
16241617
// consumer is no longer an orphan after the Raft entry is committed.
1625-
conflicts = true
16261618
pendingAdoptions = true
16271619
continue
16281620
}
@@ -1645,7 +1637,7 @@ func (js *jetStream) checkForOrphans() {
16451637
return
16461638
}
16471639

1648-
if conflicts {
1640+
if conflicts || pendingAdoptions {
16491641
err := js.writePTCMarker()
16501642
if err != nil {
16511643
s.Warnf("Failed to write ptc marker: %v", err)

0 commit comments

Comments
 (0)