Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
}
created := ca.Created
node := ca.Group.node
caErr := ca.err
js.mu.RUnlock()

// Check if not running at all.
Expand All @@ -720,6 +721,14 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
// We'll start erroring once we're sure this consumer is actually broken.
return nil
}
if caErr != nil {
// The consumer assignment has a recorded error from a failed creation attempt.
// This is an orphaned assignment that will not recover on its own.
// Don't report it as a health error since there's nothing running to be unhealthy.
// These consumers can be detected via consumer list (shows the error)
// and cleaned up via consumer delete.
return nil
Comment on lines +724 to +730
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Restrict healthz bypass to true orphaned assignment errors

This branch now suppresses health failures for any missing consumer with a non-nil ca.err, but ca.err is populated for all consumer-create failures (including transient ones such as out-of-space) and is carried forward in assignment state; it is not reliably cleared after successful recovery paths. That means a consumer assignment that ever recorded an error can later lose its consumer and still pass healthz, causing readiness checks to report healthy while the consumer is actually absent. Please gate this bypass to the specific non-recoverable orphan condition(s) instead of all ca.err != nil cases.

Useful? React with 👍 / 👎.

}
return errors.New("consumer not found")
}

Expand Down
43 changes: 41 additions & 2 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7845,16 +7845,55 @@ func TestJetStreamClusterConsumerHealthCheckDeleted(t *testing.T) {
sjs.mu.Unlock()

// The health check gathers all assignments and does checking after.
// If the consumer was deleted in the meantime, it should not report an error.
// If the consumer was deleted in the meantime, it should report "consumer not found"
// because ca.err is nil (the consumer was successfully created before deletion).
require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER"))
require_Error(t, sjs.isConsumerHealthy(mset, "CONSUMER", ca), errors.New("consumer not found"))
err = sjs.isConsumerHealthy(mset, "CONSUMER", ca)
require_Error(t, err)
require_Equal(t, err.Error(), "consumer not found")

// The health check could run earlier than we're able to create the consumer.
// In that case, wait before erroring.
sjs.mu.Lock()
ca.Created = time.Now()
sjs.mu.Unlock()
require_NoError(t, sjs.isConsumerHealthy(mset, "CONSUMER", ca))

// Same behavior for R=3 consumers: after deletion the stale assignment
// has ca.err == nil, so "consumer not found" is reported.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST3",
Subjects: []string{"bar"},
Replicas: 3,
})
require_NoError(t, err)
_, err = js.AddConsumer("TEST3", &nats.ConsumerConfig{
Durable: "CONSUMER3",
Replicas: 3,
})
require_NoError(t, err)

cl3 := c.consumerLeader(globalAccountName, "TEST3", "CONSUMER3")
require_NotNil(t, cl3)
mset3, err := cl3.globalAccount().lookupStream("TEST3")
require_NoError(t, err)

sjs3 := cl3.getJetStream()
sjs3.mu.Lock()
ca3 := sjs3.consumerAssignment(globalAccountName, "TEST3", "CONSUMER3")
if ca3 == nil {
sjs3.mu.Unlock()
t.Fatal("ca3 not found")
}
ca3.Created = time.Time{}
sjs3.mu.Unlock()

require_NoError(t, js.DeleteConsumer("TEST3", "CONSUMER3"))

// R=3 consumer with ca.err == nil: "consumer not found" is reported.
err = sjs3.isConsumerHealthy(mset3, "CONSUMER3", ca3)
require_Error(t, err)
require_Equal(t, err.Error(), "consumer not found")
}

func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
Expand Down
Loading
Loading