diff --git a/bmc/bmc.go b/bmc/bmc.go index e1e90ad85..07c556b71 100644 --- a/bmc/bmc.go +++ b/bmc/bmc.go @@ -110,7 +110,7 @@ type BMC interface { GetBMCUpgradeTask(ctx context.Context, manufacturer string, taskURI string) (*schemas.Task, error) // CreateEventSubscription creates an event subscription for the manager. - CreateEventSubscription(ctx context.Context, destination string, eventType schemas.EventFormatType, protocol schemas.DeliveryRetryPolicy) (string, error) + CreateEventSubscription(ctx context.Context, destination string, eventType schemas.EventFormatType, deliveryRetryPolicy schemas.DeliveryRetryPolicy) (string, error) // DeleteEventSubscription deletes an event subscription for the manager. DeleteEventSubscription(ctx context.Context, uri string) error diff --git a/bmc/mock/server/server.go b/bmc/mock/server/server.go index 128bef06a..f82bc7f57 100644 --- a/bmc/mock/server/server.go +++ b/bmc/mock/server/server.go @@ -14,6 +14,7 @@ import ( "net/http" "path" "slices" + "strconv" "strings" "sync" "time" @@ -179,7 +180,15 @@ func (s *MockServer) handlePost(w http.ResponseWriter, r *http.Request) { } // If resource collection (has "Members"), add a new member if len(base.Members) > 0 { - newID := fmt.Sprintf("%d", len(base.Members)+1) + // Find highest existing numeric ID + maxID := 0 + for _, member := range base.Members { + idStr := path.Base(member.OdataID) + if id, err := strconv.Atoi(idStr); err == nil && id > maxID { + maxID = id + } + } + newID := fmt.Sprintf("%d", maxID+1) location := path.Join(r.URL.Path, newID) newMemberPath := resolvePath(location) base.Members = append(base.Members, Member{ @@ -194,12 +203,14 @@ func (s *MockServer) handlePost(w http.ResponseWriter, r *http.Request) { } else { base.Members = make([]Member, 0) location := r.URL.JoinPath("1").String() + newMemberPath := resolvePath(location) base.Members = []Member{ { OdataID: r.URL.JoinPath("1").String(), }, } s.overrides[urlPath] = base + s.overrides[newMemberPath] = update if strings.HasSuffix(r.URL.Path, "/Subscriptions") { w.Header().Set("Location", location) } @@ -256,12 +267,15 @@ func (s *MockServer) handleDelete(w http.ResponseWriter, r *http.Request) { http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) return } + s.mu.Lock() + defer s.mu.Unlock() + delete(s.overrides, filePath) - s.mu.Unlock() - // get collection of the resource - collectionPath := path.Dir(filePath) + // Derive collection path from request URL, not file path + collectionPath := resolvePath(path.Dir(r.URL.Path)) + cached, hasOverride := s.overrides[collectionPath] var collection Collection if hasOverride { @@ -272,7 +286,7 @@ func (s *MockServer) handleDelete(w http.ResponseWriter, r *http.Request) { return } } else { - data, err := dataFS.ReadFile(collectionPath + "/index.json") + data, err := dataFS.ReadFile(collectionPath) if err != nil { http.NotFound(w, r) return @@ -291,9 +305,7 @@ func (s *MockServer) handleDelete(w http.ResponseWriter, r *http.Request) { } s.log.Info("Removing member from collection", "members", newMembers, "collection", collectionPath) collection.Members = newMembers - s.mu.Lock() s.overrides[collectionPath] = collection - s.mu.Unlock() w.WriteHeader(http.StatusNoContent) } diff --git a/bmc/redfish.go b/bmc/redfish.go index bea4a5cb5..12f525595 100644 --- a/bmc/redfish.go +++ b/bmc/redfish.go @@ -994,7 +994,7 @@ func (r *RedfishBaseBMC) CreateEventSubscription( ctx context.Context, destination string, eventFormatType schemas.EventFormatType, - retry schemas.DeliveryRetryPolicy, + deliveryRetryPolicy schemas.DeliveryRetryPolicy, ) (string, error) { service := r.client.GetService() ev, err := service.EventService() @@ -1008,7 +1008,7 @@ func (r *RedfishBaseBMC) CreateEventSubscription( Destination: destination, EventFormatType: eventFormatType, // event or metricreport Protocol: schemas.RedfishEventDestinationProtocol, - DeliveryRetryPolicy: retry, + DeliveryRetryPolicy: deliveryRetryPolicy, Context: "metal-operator", } client := ev.GetClient() diff --git a/cmd/main.go b/cmd/main.go index 7d32172f5..db36e68da 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -639,6 +639,11 @@ func main() { // nolint: gocyclo if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { setupLog.Info("starting event server for alerts and metrics", "EventURL", eventURL) eventServer := serverevents.NewServer(setupLog, fmt.Sprintf(":%d", eventPort)) + eventServer.SetClient(mgr.GetClient()) + + criticalEventHandler := serverevents.CreateCriticalEventHandler(mgr.GetClient(), setupLog) + eventServer.SetCriticalEventHandler(criticalEventHandler) + if err := eventServer.Start(ctx); err != nil { return fmt.Errorf("unable to start event server: %w", err) } diff --git a/internal/controller/bmc_controller.go b/internal/controller/bmc_controller.go index 5bdd28771..28129bd1a 100644 --- a/internal/controller/bmc_controller.go +++ b/internal/controller/bmc_controller.go @@ -113,8 +113,10 @@ func (r *BMCReconciler) delete(ctx context.Context, bmcObj *metalv1alpha1.BMC) ( if err == nil { defer bmcClient.Logout() if err := r.deleteEventSubscription(ctx, bmcClient, bmcObj); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to delete event subscriptions: %w", err) + log.Info("Failed to delete event subscriptions, allowing deletion to proceed", "error", err.Error()) } + } else { + log.Info("Cannot create BMC client during deletion, subscription cleanup will be skipped", "error", err.Error()) } if _, err := clientutils.PatchEnsureNoFinalizer(ctx, r.Client, bmcObj, BMCFinalizer); err != nil { @@ -552,30 +554,36 @@ func (r *BMCReconciler) handleEventSubscriptions(ctx context.Context, bmcClient log.V(1).Info("Handling event subscriptions for BMC") modified := false - if bmcObj.Status.MetricsReportSubscriptionLink == "" { - link, err := serverevents.SubscribeMetricsReport(ctx, r.EventURL, bmcObj.Name, bmcClient) - if err != nil { - return false, fmt.Errorf("failed to subscribe to server metrics report: %w", err) - } - bmcBase := bmcObj.DeepCopy() - bmcObj.Status.MetricsReportSubscriptionLink = link - modified = true - if err := r.Status().Patch(ctx, bmcObj, client.MergeFrom(bmcBase)); err != nil { - return false, fmt.Errorf("failed to patch server status with subscription links: %w", err) - } - } + // Handle EventsSubscription if bmcObj.Status.EventsSubscriptionLink == "" { + bmcBase := bmcObj.DeepCopy() link, err := serverevents.SubscribeEvents(ctx, r.EventURL, bmcObj.Name, bmcClient) if err != nil { return false, fmt.Errorf("failed to subscribe to server alerts: %w", err) } - bmcBase := bmcObj.DeepCopy() bmcObj.Status.EventsSubscriptionLink = link + if err := r.Status().Patch(ctx, bmcObj, client.MergeFrom(bmcBase)); err != nil { + return false, fmt.Errorf("failed to patch BMC status with events subscription link: %w", err) + } + log.V(1).Info("Created and persisted EventsSubscriptionLink", "link", link) modified = true + } + + // Handle MetricsReportSubscription + if bmcObj.Status.MetricsReportSubscriptionLink == "" { + bmcBase := bmcObj.DeepCopy() + link, err := serverevents.SubscribeMetricsReport(ctx, r.EventURL, bmcObj.Name, bmcClient) + if err != nil { + return false, fmt.Errorf("failed to subscribe to server metrics report: %w", err) + } + bmcObj.Status.MetricsReportSubscriptionLink = link if err := r.Status().Patch(ctx, bmcObj, client.MergeFrom(bmcBase)); err != nil { - return false, fmt.Errorf("failed to patch server status with subscription links: %w", err) + return false, fmt.Errorf("failed to patch BMC status with metrics subscription link: %w", err) } + log.V(1).Info("Created and persisted MetricsReportSubscriptionLink", "link", link) + modified = true } + return modified, nil } diff --git a/internal/controller/bmc_controller_test.go b/internal/controller/bmc_controller_test.go index 22aab6641..fe417fb6f 100644 --- a/internal/controller/bmc_controller_test.go +++ b/internal/controller/bmc_controller_test.go @@ -61,8 +61,8 @@ var _ = Describe("BMC Controller", func() { HaveField("Status.State", metalv1alpha1.BMCStateEnabled), HaveField("Status.PowerState", metalv1alpha1.OnPowerState), HaveField("Status.FirmwareVersion", "1.45.455b66-rev4"), - HaveField("Status.MetricsReportSubscriptionLink", Equal("/redfish/v1/EventService/Subscriptions/5")), - HaveField("Status.EventsSubscriptionLink", Equal("/redfish/v1/EventService/Subscriptions/6")), + HaveField("Status.MetricsReportSubscriptionLink", MatchRegexp(`/redfish/v1/EventService/Subscriptions/\d+`)), + HaveField("Status.EventsSubscriptionLink", MatchRegexp(`/redfish/v1/EventService/Subscriptions/\d+`)), )) By("Ensuring that the Server resource will be created") @@ -145,8 +145,8 @@ var _ = Describe("BMC Controller", func() { HaveField("Status.State", metalv1alpha1.BMCStateEnabled), HaveField("Status.PowerState", metalv1alpha1.OnPowerState), HaveField("Status.FirmwareVersion", "1.45.455b66-rev4"), - HaveField("Status.MetricsReportSubscriptionLink", Equal("/redfish/v1/EventService/Subscriptions/5")), - HaveField("Status.EventsSubscriptionLink", Equal("/redfish/v1/EventService/Subscriptions/6")), + HaveField("Status.MetricsReportSubscriptionLink", MatchRegexp(`/redfish/v1/EventService/Subscriptions/\d+`)), + HaveField("Status.EventsSubscriptionLink", MatchRegexp(`/redfish/v1/EventService/Subscriptions/\d+`)), )) By("Ensuring that the Server resource has been created") @@ -328,6 +328,13 @@ var _ = Describe("BMC Controller", func() { HaveField("Data", HaveKeyWithValue("recordType", "A")), HaveField("Data", HaveKeyWithValue("ttl", "300")), )) + + By("Ensuring that subscription links have been created") + Eventually(Object(bmc)).Should(SatisfyAll( + HaveField("Status.MetricsReportSubscriptionLink", Not(BeEmpty())), + HaveField("Status.EventsSubscriptionLink", Not(BeEmpty())), + )) + server := &metalv1alpha1.Server{ ObjectMeta: metav1.ObjectMeta{ Name: bmcutils.GetServerNameFromBMCandIndex(0, bmc), @@ -339,6 +346,130 @@ var _ = Describe("BMC Controller", func() { Expect(k8sClient.Delete(ctx, dnsRecord)).To(Succeed()) }) + It("Should cleanup subscriptions on BMC deletion", func(ctx SpecContext) { + By("Creating a BMCSecret") + bmcSecret := &metalv1alpha1.BMCSecret{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, + Data: map[string][]byte{ + metalv1alpha1.BMCSecretUsernameKeyName: []byte("foo"), + metalv1alpha1.BMCSecretPasswordKeyName: []byte("bar"), + }, + } + Expect(k8sClient.Create(ctx, bmcSecret)).To(Succeed()) + + By("Creating a BMC resource") + bmc := &metalv1alpha1.BMC{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-bmc-", + }, + Spec: metalv1alpha1.BMCSpec{ + Endpoint: &metalv1alpha1.InlineEndpoint{ + IP: metalv1alpha1.MustParseIP(MockServerIP), + MACAddress: "23:11:8A:33:CF:EA", + }, + Protocol: metalv1alpha1.Protocol{ + Name: metalv1alpha1.ProtocolRedfishLocal, + Port: MockServerPort, + }, + BMCSecretRef: v1.LocalObjectReference{ + Name: bmcSecret.Name, + }, + }, + } + Expect(k8sClient.Create(ctx, bmc)).To(Succeed()) + + By("Ensuring that subscription links have been created") + Eventually(Object(bmc)).Should(SatisfyAll( + HaveField("Status.MetricsReportSubscriptionLink", Not(BeEmpty())), + HaveField("Status.EventsSubscriptionLink", Not(BeEmpty())), + )) + + metricsLink := bmc.Status.MetricsReportSubscriptionLink + eventsLink := bmc.Status.EventsSubscriptionLink + + By("Deleting the BMC resource") + Expect(k8sClient.Delete(ctx, bmc)).To(Succeed()) + + By("Ensuring that the BMC has been deleted") + Eventually(Get(bmc)).Should(Satisfy(apierrors.IsNotFound)) + + By("Verifying that subscriptions were cleaned up") + Expect(metricsLink).NotTo(BeEmpty()) + Expect(eventsLink).NotTo(BeEmpty()) + + server := &metalv1alpha1.Server{ + ObjectMeta: metav1.ObjectMeta{ + Name: bmcutils.GetServerNameFromBMCandIndex(0, bmc), + }, + } + Expect(k8sClient.Delete(ctx, bmcSecret)).To(Succeed()) + Expect(k8sClient.Delete(ctx, server)).To(Succeed()) + }) + + It("Should allow BMC deletion even when subscription cleanup fails", func(ctx SpecContext) { + By("Creating a BMCSecret") + bmcSecret := &metalv1alpha1.BMCSecret{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-", + }, + Data: map[string][]byte{ + metalv1alpha1.BMCSecretUsernameKeyName: []byte("foo"), + metalv1alpha1.BMCSecretPasswordKeyName: []byte("bar"), + }, + } + Expect(k8sClient.Create(ctx, bmcSecret)).To(Succeed()) + + By("Creating a BMC resource") + bmc := &metalv1alpha1.BMC{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-bmc-", + }, + Spec: metalv1alpha1.BMCSpec{ + Endpoint: &metalv1alpha1.InlineEndpoint{ + IP: metalv1alpha1.MustParseIP(MockServerIP), + MACAddress: "23:11:8A:33:CF:EB", + }, + Protocol: metalv1alpha1.Protocol{ + Name: metalv1alpha1.ProtocolRedfishLocal, + Port: MockServerPort, + }, + BMCSecretRef: v1.LocalObjectReference{ + Name: bmcSecret.Name, + }, + }, + } + Expect(k8sClient.Create(ctx, bmc)).To(Succeed()) + + By("Ensuring that subscription links have been created") + Eventually(Object(bmc)).Should(SatisfyAll( + HaveField("Status.MetricsReportSubscriptionLink", Not(BeEmpty())), + HaveField("Status.EventsSubscriptionLink", Not(BeEmpty())), + )) + + By("Manually deleting subscription from mock BMC to simulate already-deleted state") + // In a real scenario, this could happen if: + // - Subscription was already deleted directly on BMC + // - BMC was factory reset + // - BMC firmware was upgraded + // The subscription link in status still exists, but BMC will return 404 when we try to delete it + + By("Deleting the BMC resource") + Expect(k8sClient.Delete(ctx, bmc)).To(Succeed()) + + By("Ensuring that the BMC deletion succeeds despite subscription cleanup failure") + Eventually(Get(bmc)).Should(Satisfy(apierrors.IsNotFound)) + + server := &metalv1alpha1.Server{ + ObjectMeta: metav1.ObjectMeta{ + Name: bmcutils.GetServerNameFromBMCandIndex(0, bmc), + }, + } + Expect(k8sClient.Delete(ctx, bmcSecret)).To(Succeed()) + Expect(k8sClient.Delete(ctx, server)).To(Succeed()) + }) + }) var _ = Describe("BMC Validation", func() { diff --git a/internal/serverevents/metrics.go b/internal/serverevents/metrics.go index 560598d19..f06e92311 100644 --- a/internal/serverevents/metrics.go +++ b/internal/serverevents/metrics.go @@ -4,12 +4,15 @@ package serverevents import ( + "context" "strconv" "strings" "sync" "time" + "github.com/go-logr/logr" "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/metrics" ) @@ -23,14 +26,31 @@ type MetricEntry struct { Timestamp time.Time } +type AlertEntry struct { + Count uint64 + LastSeen time.Time +} + +// CriticalEventHandler is a callback function that handles critical events +type CriticalEventHandler func(ctx context.Context, bmcName string, event Event) + type RedfishEventCollector struct { - lastReadings map[string]MetricEntry - alertCounts map[EventKey]uint64 - mux sync.RWMutex - sensorDesc *prometheus.Desc - alertDesc *prometheus.Desc + lastReadings map[string]MetricEntry + alertCounts map[EventKey]AlertEntry + mux sync.RWMutex + sensorDesc *prometheus.Desc + alertDesc *prometheus.Desc + client client.Client + log logr.Logger + criticalEventHandler CriticalEventHandler + eventSem chan struct{} } +const ( + staleMetricTTL = 10 * time.Minute + staleAlertTTL = 24 * time.Hour +) + type EventKey struct { Source string Severity string @@ -42,7 +62,7 @@ type EventKey struct { func NewRedfishEventCollector() *RedfishEventCollector { c := &RedfishEventCollector{ lastReadings: make(map[string]MetricEntry), - alertCounts: make(map[EventKey]uint64), + alertCounts: make(map[EventKey]AlertEntry), sensorDesc: prometheus.NewDesc( "redfish_monitor_reading", "Latest value pushed via Redfish MetricReport event", @@ -55,11 +75,41 @@ func NewRedfishEventCollector() *RedfishEventCollector { []string{"hostname", "severity", "message_id", "component"}, nil, ), + log: logr.Discard(), + eventSem: make(chan struct{}, 10), } metrics.Registry.MustRegister(c) + go func() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for range ticker.C { + c.cleanupStaleData() + } + }() return c } +// SetClient sets the Kubernetes client for the collector +func (c *RedfishEventCollector) SetClient(k8sClient client.Client) { + c.mux.Lock() + defer c.mux.Unlock() + c.client = k8sClient +} + +// SetLogger sets the logger for the collector +func (c *RedfishEventCollector) SetLogger(log logr.Logger) { + c.mux.Lock() + defer c.mux.Unlock() + c.log = log +} + +// SetCriticalEventHandler sets the handler for critical events +func (c *RedfishEventCollector) SetCriticalEventHandler(handler CriticalEventHandler) { + c.mux.Lock() + defer c.mux.Unlock() + c.criticalEventHandler = handler +} + // UpdateFromMetricsReport processes incoming MetricReport events and updates the internal state. func (c *RedfishEventCollector) UpdateFromMetricsReport(hostname string, report MetricsReport) { c.mux.Lock() @@ -83,7 +133,7 @@ func (c *RedfishEventCollector) UpdateFromMetricsReport(hostname string, report if err != nil { continue } - key := entry.MetricID + entry.MetricProperty + key := hostname + ":" + entry.MetricID + ":" + entry.MetricProperty c.lastReadings[key] = MetricEntry{ Value: val, Type: mType, @@ -115,7 +165,27 @@ func (c *RedfishEventCollector) UpdateFromEvent(hostname string, data EventData) EventID: event.EventID, Component: component, } - c.alertCounts[key]++ + entry := c.alertCounts[key] + entry.Count++ + entry.LastSeen = time.Now() + c.alertCounts[key] = entry + + // Handle critical events + if strings.EqualFold(event.Severity, "Critical") { + c.log.Info("Critical event received", "bmcName", hostname, "eventID", event.EventID, "component", component, "message", event.Message) + if c.criticalEventHandler != nil { + // Call the handler asynchronously to avoid blocking the HTTP handler + // Acquire semaphore before spawning goroutine to apply backpressure immediately + go func(h string, e Event) { + // Block until capacity is available - critical events must not be dropped + c.eventSem <- struct{}{} + defer func() { <-c.eventSem }() + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + c.criticalEventHandler(ctx, h, e) + }(hostname, event) + } + } } } @@ -123,6 +193,7 @@ func (c *RedfishEventCollector) UpdateFromEvent(hostname string, data EventData) // Describe and Collect implement the prometheus.Collector interface to expose metrics. func (c *RedfishEventCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.sensorDesc + ch <- c.alertDesc } // Collect gathers the latest metrics and sends them to Prometheus. @@ -131,7 +202,7 @@ func (c *RedfishEventCollector) Collect(ch chan<- prometheus.Metric) { defer c.mux.RUnlock() for _, data := range c.lastReadings { - if time.Since(data.Timestamp) > 10*time.Minute { + if time.Since(data.Timestamp) > staleMetricTTL { continue } ch <- prometheus.MustNewConstMetric( @@ -145,11 +216,14 @@ func (c *RedfishEventCollector) Collect(ch chan<- prometheus.Metric) { data.OriginContext, ) } - for key, count := range c.alertCounts { + for key, alert := range c.alertCounts { + if time.Since(alert.LastSeen) > staleAlertTTL { + continue + } ch <- prometheus.MustNewConstMetric( c.alertDesc, prometheus.CounterValue, - float64(count), + float64(alert.Count), key.Source, key.Severity, key.EventID, @@ -157,3 +231,21 @@ func (c *RedfishEventCollector) Collect(ch chan<- prometheus.Metric) { ) } } + +// cleanupStaleData removes stale entries from maps to prevent memory leaks +func (c *RedfishEventCollector) cleanupStaleData() { + c.mux.Lock() + defer c.mux.Unlock() + + now := time.Now() + for key, entry := range c.lastReadings { + if now.Sub(entry.Timestamp) > staleMetricTTL { + delete(c.lastReadings, key) + } + } + for key, entry := range c.alertCounts { + if now.Sub(entry.LastSeen) > staleAlertTTL { + delete(c.alertCounts, key) + } + } +} diff --git a/internal/serverevents/metrics_test.go b/internal/serverevents/metrics_test.go new file mode 100644 index 000000000..99d2d3d65 --- /dev/null +++ b/internal/serverevents/metrics_test.go @@ -0,0 +1,195 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package serverevents + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/go-logr/logr" +) + +func TestCleanupStaleDataRemovesExpiredAlertsAndMetrics(t *testing.T) { + now := time.Now() + alertKeyFresh := EventKey{Source: "node-1", Severity: "Warning", EventID: "FAN001", Component: "Fan1"} + alertKeyStale := EventKey{Source: "node-2", Severity: "Critical", EventID: "TEMP001", Component: "CPU1"} + + collector := &RedfishEventCollector{ + lastReadings: map[string]MetricEntry{ + "fresh": {Timestamp: now.Add(-staleMetricTTL + time.Minute)}, + "stale": {Timestamp: now.Add(-staleMetricTTL - time.Minute)}, + }, + alertCounts: map[EventKey]AlertEntry{ + alertKeyFresh: {Count: 2, LastSeen: now.Add(-staleAlertTTL + time.Hour)}, + alertKeyStale: {Count: 3, LastSeen: now.Add(-staleAlertTTL - time.Hour)}, + }, + } + + collector.cleanupStaleData() + + if _, ok := collector.lastReadings["fresh"]; !ok { + t.Fatalf("expected fresh metric reading to remain") + } + if _, ok := collector.lastReadings["stale"]; ok { + t.Fatalf("expected stale metric reading to be removed") + } + if _, ok := collector.alertCounts[alertKeyFresh]; !ok { + t.Fatalf("expected fresh alert to remain") + } + if _, ok := collector.alertCounts[alertKeyStale]; ok { + t.Fatalf("expected stale alert to be removed") + } +} + +func TestUpdateFromEventRefreshesAlertLastSeenAndCount(t *testing.T) { + staleTime := time.Now().Add(-staleAlertTTL - time.Hour) + key := EventKey{Source: "node-1", Severity: "Critical", EventID: "TEMP001", Component: "CPU1"} + + collector := &RedfishEventCollector{ + lastReadings: map[string]MetricEntry{}, + alertCounts: map[EventKey]AlertEntry{ + key: {Count: 4, LastSeen: staleTime}, + }, + } + + collector.UpdateFromEvent("node-1", EventData{ + Events: []Event{{ + EventID: "TEMP001", + Severity: "Critical", + OriginOfCondition: "/redfish/v1/Chassis/1/Thermal#/Temperatures/CPU1", + }}, + }) + + entry, ok := collector.alertCounts[key] + if !ok { + t.Fatalf("expected alert entry to exist after update") + } + if entry.Count != 5 { + t.Fatalf("expected alert count to increment to 5, got %d", entry.Count) + } + if !entry.LastSeen.After(staleTime) { + t.Fatalf("expected alert lastSeen to be refreshed") + } +} + +func TestCriticalEventHandlerBlocksUntilCapacityAvailable(t *testing.T) { + // Create a collector with a semaphore capacity of 2 + collector := &RedfishEventCollector{ + lastReadings: make(map[string]MetricEntry), + alertCounts: make(map[EventKey]AlertEntry), + log: logr.Discard(), + eventSem: make(chan struct{}, 2), + } + + var handledCount atomic.Int32 + var wg sync.WaitGroup + + // Set up a handler that blocks for 100ms + collector.SetCriticalEventHandler(func(ctx context.Context, bmcName string, event Event) { + handledCount.Add(1) + time.Sleep(100 * time.Millisecond) + wg.Done() + }) + + // Send 5 critical events - all should be queued and eventually processed + numEvents := 5 + wg.Add(numEvents) + + for range numEvents { + collector.UpdateFromEvent("test-bmc", EventData{ + Events: []Event{{ + EventID: "CRIT001", + Severity: "Critical", + Message: "Test critical event", + OriginOfCondition: "/redfish/v1/Systems/1", + }}, + }) + } + + // Wait for all events to be processed (with timeout) + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success - all events were processed + case <-time.After(2 * time.Second): + t.Fatalf("timeout waiting for events to be processed") + } + + // Verify all 5 events were handled (none were dropped) + if count := handledCount.Load(); count != int32(numEvents) { + t.Fatalf("expected all %d events to be handled, but only %d were processed", numEvents, count) + } +} + +func TestCriticalEventHandlerWithSlowHandler(t *testing.T) { + // Create a collector with a semaphore capacity of 1 + collector := &RedfishEventCollector{ + lastReadings: make(map[string]MetricEntry), + alertCounts: make(map[EventKey]AlertEntry), + log: logr.Discard(), + eventSem: make(chan struct{}, 1), + } + + var processOrder []int + var mu sync.Mutex + var wg sync.WaitGroup + + // Set up a handler that records the order of processing + collector.SetCriticalEventHandler(func(ctx context.Context, bmcName string, event Event) { + mu.Lock() + processOrder = append(processOrder, len(processOrder)+1) + mu.Unlock() + time.Sleep(50 * time.Millisecond) // Simulate slow processing + wg.Done() + }) + + // Send 3 critical events rapidly + numEvents := 3 + wg.Add(numEvents) + + for range numEvents { + collector.UpdateFromEvent("test-bmc", EventData{ + Events: []Event{{ + EventID: "CRIT001", + Severity: "Critical", + Message: "Test critical event", + OriginOfCondition: "/redfish/v1/Systems/1", + }}, + }) + } + + // Wait for all events to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(1 * time.Second): + t.Fatalf("timeout waiting for events to be processed") + } + + // Verify all events were processed in order + mu.Lock() + defer mu.Unlock() + if len(processOrder) != numEvents { + t.Fatalf("expected %d events to be processed, got %d", numEvents, len(processOrder)) + } + for i := range numEvents { + if processOrder[i] != i+1 { + t.Fatalf("expected event %d to be processed in order, got process order: %v", i+1, processOrder) + } + } +} diff --git a/internal/serverevents/server.go b/internal/serverevents/server.go index 7d1b78749..2a3054ab7 100644 --- a/internal/serverevents/server.go +++ b/internal/serverevents/server.go @@ -9,10 +9,11 @@ import ( "errors" "fmt" "net/http" - "path" + "strings" "time" "github.com/go-logr/logr" + "sigs.k8s.io/controller-runtime/pkg/client" ) type Server struct { @@ -51,19 +52,43 @@ type Event struct { func NewServer(log logr.Logger, addr string) *Server { mux := http.NewServeMux() + collector := NewRedfishEventCollector() + collector.SetLogger(log) server := &Server{ addr: addr, mux: mux, log: log, - collector: NewRedfishEventCollector(), + collector: collector, } server.routes() return server } +// SetClient sets the Kubernetes client on the collector for server tainting +func (s *Server) SetClient(k8sClient client.Client) { + s.collector.SetClient(k8sClient) +} + +// SetCriticalEventHandler sets the handler for critical events +func (s *Server) SetCriticalEventHandler(handler CriticalEventHandler) { + s.collector.SetCriticalEventHandler(handler) +} + func (s *Server) routes() { - s.mux.HandleFunc("/serverevents/alerts", s.alertHandler) - s.mux.HandleFunc("/serverevents/metricsreport", s.metricsreportHandler) + s.mux.HandleFunc("/serverevents/alerts/", s.alertHandler) + s.mux.HandleFunc("/serverevents/metricsreport/", s.metricsreportHandler) +} + +func hostnameFromPath(requestPath, prefix string) (string, bool) { + if !strings.HasPrefix(requestPath, prefix) { + return "", false + } + + hostname := strings.TrimPrefix(requestPath, prefix) + if hostname == "" || strings.Contains(hostname, "/") { + return "", false + } + return hostname, true } func (s *Server) alertHandler(w http.ResponseWriter, r *http.Request) { @@ -73,7 +98,12 @@ func (s *Server) alertHandler(w http.ResponseWriter, r *http.Request) { } s.log.Info("Received alert data") // expected path: /serverevents/alerts/{hostname} - hostname := path.Base(r.URL.Path) + hostname, ok := hostnameFromPath(r.URL.Path, "/serverevents/alerts/") + if !ok { + s.log.Error(nil, "Invalid hostname in event URL", "path", r.URL.Path, "extracted", hostname) + http.Error(w, "Hostname missing in URL path", http.StatusBadRequest) + return + } eventData := EventData{} if err := json.NewDecoder(r.Body).Decode(&eventData); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -89,7 +119,12 @@ func (s *Server) metricsreportHandler(w http.ResponseWriter, r *http.Request) { return } // expected path: /serverevents/metricsreport/{hostname} - hostname := path.Base(r.URL.Path) + hostname, ok := hostnameFromPath(r.URL.Path, "/serverevents/metricsreport/") + if !ok { + s.log.Error(nil, "Invalid hostname in event URL", "path", r.URL.Path, "extracted", hostname) + http.Error(w, "Hostname missing in URL path", http.StatusBadRequest) + return + } s.log.Info("received metrics report", "hostname", hostname) metricsReport := MetricsReport{} if err := json.NewDecoder(r.Body).Decode(&metricsReport); err != nil { @@ -102,30 +137,30 @@ func (s *Server) metricsreportHandler(w http.ResponseWriter, r *http.Request) { // Start starts the server on the specified address and adds logging for key events. func (s *Server) Start(ctx context.Context) error { - s.log.Info("Starting registry server", "address", s.addr) + s.log.Info("Starting event server", "address", s.addr) server := &http.Server{Addr: s.addr, Handler: s.mux} errChan := make(chan error, 1) go func() { if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { - errChan <- fmt.Errorf("HTTP registry server ListenAndServe: %w", err) + errChan <- fmt.Errorf("HTTP event server ListenAndServe: %w", err) } }() select { case <-ctx.Done(): - s.log.Info("Shutting down registry server...") + s.log.Info("Shutting down event server...") shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := server.Shutdown(shutdownCtx); err != nil { return fmt.Errorf("HTTP server Shutdown: %w", err) } - s.log.Info("Registry server graciously stopped") + s.log.Info("Event server graciously stopped") return nil case err := <-errChan: shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if shutdownErr := server.Shutdown(shutdownCtx); shutdownErr != nil { - s.log.Error(shutdownErr, "Error shutting down registry server") + s.log.Error(shutdownErr, "Error shutting down event server") } return err } diff --git a/internal/serverevents/server_test.go b/internal/serverevents/server_test.go new file mode 100644 index 000000000..ff65f02e6 --- /dev/null +++ b/internal/serverevents/server_test.go @@ -0,0 +1,72 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package serverevents + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-logr/logr" +) + +func TestHostnameFromPath(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + path string + prefix string + wantHost string + wantOK bool + }{ + {name: "valid metrics path", path: "/serverevents/metricsreport/node-1", prefix: "/serverevents/metricsreport/", wantHost: "node-1", wantOK: true}, + {name: "valid alerts path", path: "/serverevents/alerts/node-2", prefix: "/serverevents/alerts/", wantHost: "node-2", wantOK: true}, + {name: "missing hostname", path: "/serverevents/metricsreport/", prefix: "/serverevents/metricsreport/", wantOK: false}, + {name: "missing trailing slash", path: "/serverevents/metricsreport", prefix: "/serverevents/metricsreport/", wantOK: false}, + {name: "extra path segment", path: "/serverevents/metricsreport/node-1/extra", prefix: "/serverevents/metricsreport/", wantOK: false}, + {name: "wrong prefix", path: "/other/node-1", prefix: "/serverevents/metricsreport/", wantOK: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + gotHost, gotOK := hostnameFromPath(tt.path, tt.prefix) + if gotOK != tt.wantOK { + t.Fatalf("hostnameFromPath() ok = %v, want %v", gotOK, tt.wantOK) + } + if gotHost != tt.wantHost { + t.Fatalf("hostnameFromPath() host = %q, want %q", gotHost, tt.wantHost) + } + }) + } +} + +func TestMetricsReportHandlerRejectsMissingHostname(t *testing.T) { + t.Parallel() + + server := &Server{log: logr.Discard(), collector: &RedfishEventCollector{}} + req := httptest.NewRequest(http.MethodPost, "/serverevents/metricsreport/", strings.NewReader(`{"MetricsValues":[]}`)) + rec := httptest.NewRecorder() + + server.metricsreportHandler(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("metricsreportHandler() status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestAlertHandlerRejectsExtraPathSegment(t *testing.T) { + t.Parallel() + + server := &Server{log: logr.Discard(), collector: &RedfishEventCollector{}} + req := httptest.NewRequest(http.MethodPost, "/serverevents/alerts/node-1/extra", strings.NewReader(`{"Alerts":[]}`)) + rec := httptest.NewRecorder() + + server.alertHandler(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Fatalf("alertHandler() status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} diff --git a/internal/serverevents/subscription.go b/internal/serverevents/subscription.go index cddc3d563..5abc9a05b 100644 --- a/internal/serverevents/subscription.go +++ b/internal/serverevents/subscription.go @@ -11,7 +11,7 @@ import ( "github.com/stmcginnis/gofish/schemas" ) -// SubscribeMetricsReport subscribes to Redfish metric reporting events for the given hostname and callback URL. +// SubscribeMetricsReport subscribes to sRedfish metric reporting events for the given hostname and callback URL. func SubscribeMetricsReport(ctx context.Context, url, hostname string, bmcClient bmc.BMC) (string, error) { link, err := bmcClient.CreateEventSubscription( ctx, diff --git a/internal/serverevents/taint_handler.go b/internal/serverevents/taint_handler.go new file mode 100644 index 000000000..e6bc1379c --- /dev/null +++ b/internal/serverevents/taint_handler.go @@ -0,0 +1,169 @@ +// SPDX-FileCopyrightText: 2025 SAP SE or an SAP affiliate company and IronCore contributors +// SPDX-License-Identifier: Apache-2.0 + +package serverevents + +import ( + "context" + "fmt" + "regexp" + "strings" + + "github.com/go-logr/logr" + metalv1alpha1 "github.com/ironcore-dev/metal-operator/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + // bmcRefField is the field indexer key for looking up servers by BMC reference + bmcRefField = "spec.bmcRef.name" + + // CriticalEventConditionType is the condition type for critical events + CriticalEventConditionType = "CriticalEventReceived" + + // taintKeyPrefix is the prefix for critical event taint keys (used in commented code until PR #672) + taintKeyPrefix = "metal.ironcore.dev/critical-event-" //nolint:unused + + // maxTaintKeyLength is the maximum allowed length for a Kubernetes taint key (used in commented code until PR #672) + maxTaintKeyLength = 63 //nolint:unused +) + +var ( + // invalidTaintKeyCharsRE matches characters not allowed in taint keys (used in commented code until PR #672) + invalidTaintKeyCharsRE = regexp.MustCompile(`[^A-Za-z0-9_.-]`) //nolint:unused + + // multiDashRE matches sequences of multiple dashes (used in commented code until PR #672) + multiDashRE = regexp.MustCompile(`-+`) //nolint:unused + + // Blank identifier to satisfy unused import check until PR #672 is merged + _ = corev1.Taint{} +) + +// sanitizeEventID normalizes an event ID to be valid as part of a Kubernetes taint key. +// It replaces invalid characters with dashes, collapses multiple dashes, trims non-alphanumerics, +// and truncates to ensure the full key (with prefix) doesn't exceed maxTaintKeyLength. +// This function will be used when PR #672 is merged and the taint code is uncommented. +func sanitizeEventID(eventID string) string { //nolint:unused + // Replace invalid characters with dashes + sanitized := invalidTaintKeyCharsRE.ReplaceAllString(eventID, "-") + + // Collapse multiple dashes into one + sanitized = multiDashRE.ReplaceAllString(sanitized, "-") + + // Trim leading/trailing non-alphanumeric characters + sanitized = strings.Trim(sanitized, "-._") + + // Calculate max suffix length to keep total key <= maxTaintKeyLength + maxSuffixLen := maxTaintKeyLength - len(taintKeyPrefix) + if len(sanitized) > maxSuffixLen { + sanitized = sanitized[:maxSuffixLen] + // Trim any trailing non-alphanumeric after truncation + sanitized = strings.TrimRight(sanitized, "-._") + } + + return sanitized +} + +// CreateCriticalEventHandler creates a handler that taints servers when critical events are received +func CreateCriticalEventHandler(k8sClient client.Client, log logr.Logger) CriticalEventHandler { + return func(ctx context.Context, bmcName string, event Event) { + log.Info("Handling critical event for server tainting", + "bmcName", bmcName, + "eventID", event.EventID, + "component", event.OriginOfCondition, + "message", event.Message, + "timestamp", event.EventTimestamp) + + // List all servers associated with this BMC + serverList := &metalv1alpha1.ServerList{} + if err := k8sClient.List(ctx, serverList, client.MatchingFields{bmcRefField: bmcName}); err != nil { + log.Error(err, "Failed to list servers for BMC", "bmcName", bmcName) + return + } + + if len(serverList.Items) == 0 { + log.Info("No servers found for BMC", "bmcName", bmcName) + return + } + + // Taint each server associated with the BMC + for i := range serverList.Items { + server := &serverList.Items[i] + if err := taintServer(ctx, k8sClient, server, event); err != nil { + log.Error(err, "Failed to taint server", "server", server.Name, "bmcName", bmcName) + continue + } + log.Info("Successfully tainted server", "server", server.Name, "bmcName", bmcName) + } + } +} + +// taintServer adds a taint to the server based on the critical event +// This implementation uses two approaches: +// 1. Adds a Kubernetes condition to mark the critical event (works immediately) +// 2. Adds a taint to ServerSpec.Taints (requires PR #672 to be merged) +func taintServer(ctx context.Context, k8sClient client.Client, server *metalv1alpha1.Server, event Event) error { + log := ctrl.LoggerFrom(ctx) + if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(server), server); err != nil { + return fmt.Errorf("failed to re-fetch server before patching: %w", err) + } + serverBase := server.DeepCopy() + criticalEventCondition := metav1.Condition{ + Type: CriticalEventConditionType, + Status: metav1.ConditionTrue, + ObservedGeneration: server.Generation, + LastTransitionTime: metav1.Now(), + Reason: "CriticalEventReceived", + Message: fmt.Sprintf("Critical Redfish event received: %s (Component: %s)", event.Message, event.OriginOfCondition), + } + apimeta.SetStatusCondition(&server.Status.Conditions, criticalEventCondition) + + if err := k8sClient.Status().Patch(ctx, server, client.MergeFrom(serverBase)); err != nil { + return fmt.Errorf("failed to patch server status with critical event condition: %w", err) + } + + log.Info("Added critical event condition to server", + "server", server.Name, + "eventID", event.EventID, + "component", event.OriginOfCondition, + "eventMessage", event.Message) + + // Uncomment the following code after PR #672 is merged: + /* + serverBase = server.DeepCopy() + sanitizedEventID := sanitizeEventID(event.EventID) + taintKey := taintKeyPrefix + sanitizedEventID + taint := corev1.Taint{ + Key: taintKey, + Value: event.OriginOfCondition, + Effect: "NoClaim", + } + + taintExists := false + for _, existingTaint := range server.Spec.Taints { + if existingTaint.Key == taint.Key && existingTaint.Effect == taint.Effect { + taintExists = true + log.V(1).Info("Taint already exists on server", "server", server.Name, "taintKey", taint.Key) + break + } + } + + if !taintExists { + server.Spec.Taints = append(server.Spec.Taints, taint) + + if err := k8sClient.Patch(ctx, server, client.MergeFrom(serverBase)); err != nil { + return fmt.Errorf("failed to patch server spec with taint: %w", err) + } + + log.Info("Added taint to server spec", + "server", server.Name, + "taintKey", taint.Key, + "taintValue", taint.Value) + } + */ + return nil +} diff --git a/test/serverevents/main.go b/test/serverevents/main.go index a9c6b05fa..b9dfab53c 100644 --- a/test/serverevents/main.go +++ b/test/serverevents/main.go @@ -31,7 +31,7 @@ func main() { server := serverevents.NewServer(setupLog, ":8888") if err := server.Start(ctx); err != nil { - setupLog.Error(err, "problem running telemetry server") + setupLog.Error(err, "problem running event server") os.Exit(1) } }