diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 5bde03b49b8..c4598f172e4 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -710,6 +710,7 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum } created := ca.Created node := ca.Group.node + caErr := ca.err js.mu.RUnlock() // Check if not running at all. @@ -720,6 +721,14 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum // We'll start erroring once we're sure this consumer is actually broken. return nil } + if caErr != nil { + // The consumer assignment has a recorded error from a failed creation attempt. + // This is an orphaned assignment that will not recover on its own. + // Don't report it as a health error since there's nothing running to be unhealthy. + // These consumers can be detected via consumer list (shows the error) + // and cleaned up via consumer delete. + return nil + } return errors.New("consumer not found") } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 348eae6842e..d3b54d96d09 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -7845,9 +7845,12 @@ func TestJetStreamClusterConsumerHealthCheckDeleted(t *testing.T) { sjs.mu.Unlock() // The health check gathers all assignments and does checking after. - // If the consumer was deleted in the meantime, it should not report an error. + // If the consumer was deleted in the meantime, it should report "consumer not found" + // because ca.err is nil (the consumer was successfully created before deletion). require_NoError(t, js.DeleteConsumer("TEST", "CONSUMER")) - require_Error(t, sjs.isConsumerHealthy(mset, "CONSUMER", ca), errors.New("consumer not found")) + err = sjs.isConsumerHealthy(mset, "CONSUMER", ca) + require_Error(t, err) + require_Equal(t, err.Error(), "consumer not found") // The health check could run earlier than we're able to create the consumer. // In that case, wait before erroring. @@ -7855,6 +7858,42 @@ func TestJetStreamClusterConsumerHealthCheckDeleted(t *testing.T) { ca.Created = time.Now() sjs.mu.Unlock() require_NoError(t, sjs.isConsumerHealthy(mset, "CONSUMER", ca)) + + // Same behavior for R=3 consumers: after deletion the stale assignment + // has ca.err == nil, so "consumer not found" is reported. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST3", + Subjects: []string{"bar"}, + Replicas: 3, + }) + require_NoError(t, err) + _, err = js.AddConsumer("TEST3", &nats.ConsumerConfig{ + Durable: "CONSUMER3", + Replicas: 3, + }) + require_NoError(t, err) + + cl3 := c.consumerLeader(globalAccountName, "TEST3", "CONSUMER3") + require_NotNil(t, cl3) + mset3, err := cl3.globalAccount().lookupStream("TEST3") + require_NoError(t, err) + + sjs3 := cl3.getJetStream() + sjs3.mu.Lock() + ca3 := sjs3.consumerAssignment(globalAccountName, "TEST3", "CONSUMER3") + if ca3 == nil { + sjs3.mu.Unlock() + t.Fatal("ca3 not found") + } + ca3.Created = time.Time{} + sjs3.mu.Unlock() + + require_NoError(t, js.DeleteConsumer("TEST3", "CONSUMER3")) + + // R=3 consumer with ca.err == nil: "consumer not found" is reported. + err = sjs3.isConsumerHealthy(mset3, "CONSUMER3", ca3) + require_Error(t, err) + require_Equal(t, err.Error(), "consumer not found") } func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) { diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index f99b829ad35..7cfe1b2f99e 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -7624,3 +7624,328 @@ func TestJetStreamClusterConsumerSetStoreStateOldUpdateRestart(t *testing.T) { require_NoError(t, err) } } + +func TestJetStreamClusterHealthzConsumerNotFoundR1(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create a healthy R=1 consumer first. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "HEALTHY", + Replicas: 1, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Block consumer store creation on all servers by placing a regular file + // at the path where the consumer directory would be created. This causes + // os.MkdirAll to fail, simulating an i/o error. + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + fs := mset.store.(*fileStore) + consumerPath := filepath.Join(fs.fcfg.StoreDir, consumerDir, "R1CONSUMER") + // Create the consumer dir if it doesn't exist, then place a file where + // the consumer subdir would go, so MkdirAll fails. + require_NoError(t, os.MkdirAll(filepath.Dir(consumerPath), defaultDirPerms)) + require_NoError(t, os.WriteFile(consumerPath, []byte("BLOCKED"), 0644)) + defer os.Remove(consumerPath) + } + + // Attempt to create an R=1 consumer. This will fail because the store + // directory is unwritable, but the assignment will persist in the meta layer. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "R1CONSUMER", + Replicas: 1, + AckPolicy: nats.AckExplicitPolicy, + }) + require_Error(t, err) + + // Find the member server that has the consumer assignment. Only the + // member runs processClusterCreateConsumer, which sets ca.err on failure. + var srv *Server + var mset *stream + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.RLock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "R1CONSUMER") + isMember := ca != nil && ca.Group.isMember(sjs.cluster.meta.ID()) + sjs.mu.RUnlock() + if isMember { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + m, err := acc.lookupStream("TEST") + require_NoError(t, err) + srv = s + mset = m + break + } + } + require_True(t, srv != nil) + + // Verify the consumer does not exist but the assignment does. + require_True(t, mset.lookupConsumer("R1CONSUMER") == nil) + sjs := srv.getJetStream() + sjs.mu.RLock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "R1CONSUMER") + sjs.mu.RUnlock() + require_True(t, ca != nil) + + // The failed consumer creation should have recorded an error. + sjs.mu.RLock() + require_True(t, ca.err != nil) + sjs.mu.RUnlock() + + // Set Created far in the past so the 5-second grace period is expired. + sjs.mu.Lock() + ca.Created = time.Time{} + sjs.mu.Unlock() + + // isConsumerHealthy should NOT report an error for this orphaned assignment + // because ca.err is set, indicating a failed creation attempt. + err = sjs.isConsumerHealthy(mset, "R1CONSUMER", ca) + require_NoError(t, err) + + // The healthy consumer should still be reported as healthy. + sjs.mu.RLock() + healthyCA := sjs.consumerAssignment(globalAccountName, "TEST", "HEALTHY") + sjs.mu.RUnlock() + if healthyCA != nil { + err = sjs.isConsumerHealthy(mset, "HEALTHY", healthyCA) + require_NoError(t, err) + } + + // Restart all servers. The meta Raft log will replay the consumer + // assignment, but since the blocking files are still in place the + // consumer creation will fail again during recovery. The orphaned + // assignment should persist and healthz should still report the error. + nc.Close() + + // Re-place blocking files on all servers to ensure they survive the + // error cleanup path that may remove them during consumer creation failure. + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + fs := mset.store.(*fileStore) + consumerPath := filepath.Join(fs.fcfg.StoreDir, consumerDir, "R1CONSUMER") + os.RemoveAll(consumerPath) + require_NoError(t, os.MkdirAll(filepath.Dir(consumerPath), defaultDirPerms)) + require_NoError(t, os.WriteFile(consumerPath, []byte("blocked"), 0644)) + } + + c.stopAll() + c.restartAllSamePorts() + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Find the member server that still has the orphaned consumer assignment + // after restart. + srv = nil + mset = nil + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.RLock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "R1CONSUMER") + isMember := ca != nil && ca.Group.isMember(sjs.cluster.meta.ID()) + sjs.mu.RUnlock() + if isMember { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + m, err := acc.lookupStream("TEST") + require_NoError(t, err) + srv = s + mset = m + break + } + } + require_True(t, srv != nil) + + // The consumer should still not exist after restart. + require_True(t, mset.lookupConsumer("R1CONSUMER") == nil) + + sjs = srv.getJetStream() + sjs.mu.RLock() + ca = sjs.consumerAssignment(globalAccountName, "TEST", "R1CONSUMER") + sjs.mu.RUnlock() + require_True(t, ca != nil) + + // ca.err should still be set after restart since the consumer + // creation fails again during meta recovery. + sjs.mu.RLock() + require_True(t, ca.err != nil) + sjs.mu.RUnlock() + + // Set Created far in the past again since the assignment was + // re-created during meta recovery. + sjs.mu.Lock() + ca.Created = time.Time{} + sjs.mu.Unlock() + + // After restart, the orphaned assignment should still not cause a healthz error. + err = sjs.isConsumerHealthy(mset, "R1CONSUMER", ca) + require_NoError(t, err) + + // Now delete the consumer via the API to clean up the orphaned assignment. + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + err = js.DeleteConsumer("TEST", "R1CONSUMER") + require_NoError(t, err) + + // After deletion, the consumer assignment should be gone from all servers. + for _, s := range c.servers { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + sjs := s.getJetStream() + sjs.mu.RLock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "R1CONSUMER") + sjs.mu.RUnlock() + if ca != nil { + return fmt.Errorf("consumer assignment still exists on %s", s.Name()) + } + return nil + }) + } +} + +func TestJetStreamClusterHealthzOrphanedConsumerAssignmentR3(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Create a real consumer first, then simulate the orphan state. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // All servers should be healthy. + for _, s := range c.servers { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + hs := s.healthz(&HealthzOptions{}) + if hs.Error != _EMPTY_ { + return fmt.Errorf("server %s not healthy: %s", s.Name(), hs.Error) + } + return nil + }) + } + + // Simulate the orphaned consumer state that occurs when processClusterCreateConsumer fails. + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + if o == nil { + continue // Consumer not on this server (shouldn't happen with R3). + } + + // Stop the consumer's Raft node and delete the consumer, simulating + // what processClusterCreateConsumer does on failure. + if node := o.raftNode(); node != nil { + node.Delete() + } + o.stop() + + // Simulate the failure state from processClusterCreateConsumer: + // clear the Raft node and set ca.err. + sjs := s.getJetStream() + sjs.mu.Lock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "CONSUMER") + require_True(t, ca != nil) + ca.Group.node = nil + ca.err = errConsumerStoreFailed + + // Set Created far in the past so the 5-second grace period is expired. + ca.Created = time.Time{} + sjs.mu.Unlock() + + // Delete the actual consumer from the stream without going through + // the meta layer, leaving the assignment orphaned. + o.deleteWithoutAdvisory() + } + + // Verify the actual consumer no longer exists on any server. + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + require_True(t, mset.lookupConsumer("CONSUMER") == nil) + } + + // Verify the consumer assignment still exists in the meta layer, meaning + // that it is orphaned. + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.RLock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "CONSUMER") + sjs.mu.RUnlock() + require_True(t, ca != nil) + } + + // Healthz should not report a permanent error for the orphaned consumer. + // Before the fix, this would permanently fail with: + // "JetStream consumer '$G > TEST > CONSUMER' is not current: consumer not found" + for _, s := range c.servers { + hs := s.healthz(&HealthzOptions{}) + if hs.Error != _EMPTY_ { + t.Fatalf("Server %s should be healthy with orphaned consumer, got: %s", s.Name(), hs.Error) + } + } + + // Also test with details mode. + for _, s := range c.servers { + hs := s.healthz(&HealthzOptions{Details: true}) + for _, e := range hs.Errors { + if e.Type == HealthzErrorConsumer && e.Consumer == "CONSUMER" { + t.Fatalf("Server %s should not report orphaned consumer as error in details mode, got: %s", s.Name(), e.Error) + } + } + } + + // Deleting the consumer via the API should clean up the orphaned + // assignment from all servers. The consumer's Raft group was manually + // destroyed so no consumer leader will respond, but the meta proposal + // still gets processed and removes the assignment. + deleteSubj := fmt.Sprintf(JSApiConsumerDeleteT, "TEST", "CONSUMER") + nc.Request(deleteSubj, nil, 2*time.Second) + + for _, s := range c.servers { + checkFor(t, 5*time.Second, 250*time.Millisecond, func() error { + sjs := s.getJetStream() + sjs.mu.RLock() + ca := sjs.consumerAssignment(globalAccountName, "TEST", "CONSUMER") + sjs.mu.RUnlock() + if ca != nil { + return fmt.Errorf("consumer assignment still exists on %s", s.Name()) + } + return nil + }) + } +}