diff --git a/cmd/main.go b/cmd/main.go index d168ab90af..27a739be70 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -401,12 +401,16 @@ func run(opts *options) error { return setupErrorf(setupLog, err, "Unable to setup Helm metadata forwarder") } - // Start ticker-based metadata forwarders after leader election + // Register CRD metadata forwarder as a manager Runnable + if err = setupAndStartCRDMetadataForwarder(metadataLog, mgr, versionInfo.String(), opts, options.CredsManager); err != nil { + return setupErrorf(setupLog, err, "Unable to setup CRD metadata forwarder") + } + + // Operator metadata forwarder still uses a poll loop; start after leader election. go func() { <-mgr.Elected() - setupLog.Info("Starting metadata forwarders") + setupLog.Info("Starting operator metadata forwarder") setupAndStartOperatorMetadataForwarder(metadataLog, mgr.GetClient(), versionInfo.String(), opts, options.CredsManager) - setupAndStartCRDMetadataForwarder(metadataLog, mgr.GetClient(), versionInfo.String(), opts, options.CredsManager) }() // +kubebuilder:scaffold:builder @@ -627,10 +631,10 @@ func setupAndStartOperatorMetadataForwarder(logger logr.Logger, client client.Re omf.Start() } -func setupAndStartCRDMetadataForwarder(logger logr.Logger, client client.Reader, kubernetesVersion string, options *options, credsManager *config.CredentialManager) { +func setupAndStartCRDMetadataForwarder(logger logr.Logger, mgr manager.Manager, kubernetesVersion string, options *options, credsManager *config.CredentialManager) error { cmf := metadata.NewCRDMetadataForwarder( logger, - client, + mgr, kubernetesVersion, version.GetVersion(), credsManager, @@ -641,7 +645,7 @@ func setupAndStartCRDMetadataForwarder(logger logr.Logger, client client.Reader, DatadogAgentProfileEnabled: options.datadogAgentProfileEnabled, }, ) - cmf.Start() + return mgr.Add(cmf) } func setupAndStartHelmMetadataForwarder(logger logr.Logger, mgr manager.Manager, client client.Reader, kubernetesVersion string, credsManager *config.CredentialManager) error { diff --git a/pkg/controller/utils/metadata/crd_metadata.go b/pkg/controller/utils/metadata/crd_metadata.go index 2ba7d66b23..d2ccf52b36 100644 --- a/pkg/controller/utils/metadata/crd_metadata.go +++ b/pkg/controller/utils/metadata/crd_metadata.go @@ -12,12 +12,13 @@ import ( "fmt" "io" "maps" - "strings" "sync" "time" "github.com/go-logr/logr" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" @@ -25,23 +26,26 @@ import ( ) const ( - crdMetadataInterval = 1 * time.Minute - crdMetadataHeartbeatTTL = 10 * time.Minute + crdHeartbeatInterval = 10 * time.Minute + crdNumWorkers = 3 ) -// crdCacheEntry tracks both the hash and last sent time for heartbeat detection -type crdCacheEntry struct { +// crdSnapshot holds the last-sent CRDInstance and its content hash, keyed by EncodeKey(). +type crdSnapshot struct { + instance CRDInstance hash string - lastSent time.Time } type CRDMetadataForwarder struct { *SharedMetadata + mgr manager.Manager enabledCRDs EnabledCRDKindsConfig - crdCache map[string]*crdCacheEntry - cacheMutex sync.RWMutex + runner *InformerWorkQueue + + // crdSnapshots is keyed by "//" and stores *crdSnapshot. + crdSnapshots sync.Map } type CRDMetadataPayload struct { @@ -85,55 +89,72 @@ type EnabledCRDKindsConfig struct { DatadogAgentProfileEnabled bool } -// NewCRDMetadataForwarder creates a new instance of the CRD metadata forwarder -func NewCRDMetadataForwarder(logger logr.Logger, k8sClient client.Reader, kubernetesVersion string, operatorVersion string, credsManager *config.CredentialManager, config EnabledCRDKindsConfig) *CRDMetadataForwarder { +// NewCRDMetadataForwarder creates a new CRD metadata forwarder. The forwarder must be +// registered with the manager via mgr.Add(...) by the caller. +func NewCRDMetadataForwarder( + logger logr.Logger, + mgr manager.Manager, + kubernetesVersion string, + operatorVersion string, + credsManager *config.CredentialManager, + cfg EnabledCRDKindsConfig, +) *CRDMetadataForwarder { forwarderLogger := logger.WithName("crd") - return &CRDMetadataForwarder{ - SharedMetadata: NewSharedMetadata(forwarderLogger, k8sClient, kubernetesVersion, operatorVersion, credsManager), - enabledCRDs: config, - crdCache: make(map[string]*crdCacheEntry), + cmf := &CRDMetadataForwarder{ + SharedMetadata: NewSharedMetadata(forwarderLogger, mgr.GetClient(), kubernetesVersion, operatorVersion, credsManager), + mgr: mgr, + enabledCRDs: cfg, } + cmf.runner = NewInformerWorkQueue( + forwarderLogger, + mgr, + crdNumWorkers, + crdHeartbeatInterval, + cmf.processKey, + cmf.handleDelete, + cmf.heartbeat, + ) + return cmf } -// Start starts the CRD metadata forwarder -func (cmf *CRDMetadataForwarder) Start() { - cmf.logger.Info("Starting metadata forwarder") - - ticker := time.NewTicker(crdMetadataInterval) - go func() { - for range ticker.C { - if err := cmf.sendMetadata(); err != nil { - cmf.logger.V(1).Info("Error while sending metadata", "error", err) - } - } - }() -} - -func (cmf *CRDMetadataForwarder) sendMetadata() error { - ctx, cancel := context.WithTimeout(context.Background(), DefaultOperationTimeout) - defer cancel() - - allCRDs, listSuccess := cmf.getAllActiveCRDs(ctx) - crdsToSend := cmf.getCRDsToSend(allCRDs) - - if len(crdsToSend) == 0 { - return nil +// Start implements manager.Runnable. Registers informer watches for each enabled +// CRD kind and then runs the InformerWorkQueue until ctx is cancelled. +func (cmf *CRDMetadataForwarder) Start(ctx context.Context) error { + if cmf.enabledCRDs.DatadogAgentEnabled { + cmf.runner.AddWatch(ctx, WatchTarget{ + Object: &v2alpha1.DatadogAgent{}, + Group: "datadoghq.com", + Resource: "datadogagents", + Kind: "DatadogAgent", + }) } - - cmf.logger.V(1).Info("Sending metadata", "count", len(crdsToSend)) - - // Send individual payloads for each CRD - for _, crd := range crdsToSend { - if err := cmf.sendCRDMetadata(ctx, crd); err != nil { - cmf.logger.V(1).Info("Failed to send metadata", "error", err, - "kind", crd.Kind, "name", crd.Name, "namespace", crd.Namespace) - } + if cmf.enabledCRDs.DatadogAgentInternalEnabled { + cmf.runner.AddWatch(ctx, WatchTarget{ + Object: &v1alpha1.DatadogAgentInternal{}, + Group: "datadoghq.com", + Resource: "datadogagentinternals", + Kind: "DatadogAgentInternal", + }) + } + if cmf.enabledCRDs.DatadogAgentProfileEnabled { + cmf.runner.AddWatch(ctx, WatchTarget{ + Object: &v1alpha1.DatadogAgentProfile{}, + Group: "datadoghq.com", + Resource: "datadogagentprofiles", + Kind: "DatadogAgentProfile", + }) } - cmf.cleanupDeletedCRDs(allCRDs, listSuccess) + go cmf.runner.Run(ctx) return nil } +// NeedLeaderElection implements manager.LeaderElectionRunnable. CRD metadata sends +// must be deduplicated across operator replicas. +func (cmf *CRDMetadataForwarder) NeedLeaderElection() bool { + return true +} + func (cmf *CRDMetadataForwarder) sendCRDMetadata(ctx context.Context, crdInstance CRDInstance) error { clusterUID, err := cmf.GetOrCreateClusterUID(ctx) if err != nil { @@ -222,197 +243,158 @@ func (cmf *CRDMetadataForwarder) buildPayload(clusterUID string, crdInstance CRD return jsonPayload } -// getAllActiveCRDs returns all active CRDs and a map of list successes for each CRD type -// Currently only DatadogAgent, DatadogAgentInternal, and DatadogAgentProfile are collected -func (cmf *CRDMetadataForwarder) getAllActiveCRDs(ctx context.Context) ([]CRDInstance, map[string]bool) { - var crds []CRDInstance - listSuccess := make(map[string]bool) +// hashCRD computes a SHA256 hash of the CRD spec, labels, and annotations for change detection +func hashCRD(crd CRDInstance) (string, error) { + // Hash spec, labels, and annotations together + hashable := struct { + Spec any `json:"spec"` + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` + }{ + Spec: crd.Spec, + Labels: crd.Labels, + Annotations: crd.Annotations, + } - if cmf.k8sClient == nil { - return crds, listSuccess + hashableJSON, err := json.Marshal(hashable) + if err != nil { + return "", err } - // DDA - if cmf.enabledCRDs.DatadogAgentEnabled { - ddaList := &v2alpha1.DatadogAgentList{} - if err := cmf.k8sClient.List(ctx, ddaList); err == nil { - listSuccess["DatadogAgent"] = true - for _, dda := range ddaList.Items { - annotations := maps.Clone(dda.Annotations) - delete(annotations, "kubectl.kubernetes.io/last-applied-configuration") - - crds = append(crds, CRDInstance{ - Kind: "DatadogAgent", - Name: dda.Name, - Namespace: dda.Namespace, - APIVersion: dda.APIVersion, - UID: string(dda.UID), - Spec: dda.Spec, - Labels: dda.Labels, - Annotations: annotations, - }) - } - } else { - cmf.logger.V(1).Info("Error listing DatadogAgents", "error", err) + hash := sha256.Sum256(hashableJSON) + return fmt.Sprintf("%x", hash), nil +} + +// processKey fetches the CRD by kind/namespace/name, builds a CRDInstance, hashes, +// compares against the last snapshot, and sends if new or changed. +func (cmf *CRDMetadataForwarder) processKey(ctx context.Context, kind, namespace, name string) error { + instance, err := cmf.fetchCRDInstance(ctx, kind, namespace, name) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil } + return fmt.Errorf("failed to fetch %s/%s/%s: %w", kind, namespace, name, err) + } + if instance == nil { + return nil } - // DDAI - if cmf.enabledCRDs.DatadogAgentInternalEnabled { - ddaiList := &v1alpha1.DatadogAgentInternalList{} - if err := cmf.k8sClient.List(ctx, ddaiList); err == nil { - listSuccess["DatadogAgentInternal"] = true - for _, ddai := range ddaiList.Items { - annotations := maps.Clone(ddai.Annotations) - delete(annotations, "kubectl.kubernetes.io/last-applied-configuration") - - crds = append(crds, CRDInstance{ - Kind: "DatadogAgentInternal", - Name: ddai.Name, - Namespace: ddai.Namespace, - APIVersion: ddai.APIVersion, - UID: string(ddai.UID), - Spec: ddai.Spec, - Labels: ddai.Labels, - Annotations: annotations, - }) - } - } else { - cmf.logger.V(1).Info("Error listing DatadogAgentInternals", "error", err) - } + newHash, err := hashCRD(*instance) + if err != nil { + cmf.logger.V(1).Info("Failed to hash CRD", "error", err, + "kind", kind, "namespace", namespace, "name", name) + return nil } - // DAP - if cmf.enabledCRDs.DatadogAgentProfileEnabled { - dapList := &v1alpha1.DatadogAgentProfileList{} - if err := cmf.k8sClient.List(ctx, dapList); err == nil { - listSuccess["DatadogAgentProfile"] = true - for _, dap := range dapList.Items { - annotations := maps.Clone(dap.Annotations) - delete(annotations, "kubectl.kubernetes.io/last-applied-configuration") - - crds = append(crds, CRDInstance{ - Kind: "DatadogAgentProfile", - Name: dap.Name, - Namespace: dap.Namespace, - APIVersion: dap.APIVersion, - UID: string(dap.UID), - Spec: dap.Spec, - Labels: dap.Labels, - Annotations: annotations, - }) - } - } else { - cmf.logger.V(1).Info("Error listing DatadogAgentProfiles", "error", err) + key := EncodeKey(kind, namespace, name) + if existing, ok := cmf.crdSnapshots.Load(key); ok { + if existing.(*crdSnapshot).hash == newHash { + return nil } } - return crds, listSuccess + // Store the new snapshot before sending so a concurrent heartbeat tick can't + // observe and re-send the stale (previous) snapshot after this send completes. + // On send failure, mark the hash empty so the workqueue retry detects a + // mismatch and re-sends instead of being short-circuited by the cache. + cmf.crdSnapshots.Store(key, &crdSnapshot{instance: *instance, hash: newHash}) + if err := cmf.sendCRDMetadata(ctx, *instance); err != nil { + cmf.crdSnapshots.Store(key, &crdSnapshot{instance: *instance, hash: ""}) + cmf.logger.V(1).Info("Failed to send CRD metadata", "error", err, + "kind", kind, "namespace", namespace, "name", name) + return err + } + return nil } -// getCRDsToSend returns CRDs that need to be sent due to changes or heartbeat -func (cmf *CRDMetadataForwarder) getCRDsToSend(crds []CRDInstance) []CRDInstance { - cmf.cacheMutex.Lock() - defer cmf.cacheMutex.Unlock() - - now := time.Now() - var toSend []CRDInstance - - for _, crd := range crds { - key := buildCacheKey(crd) - newHash, err := hashCRD(crd) - if err != nil { - cmf.logger.V(1).Info("Failed to hash CRD", "error", err, "kind", crd.Kind, "namespace", crd.Namespace, "name", crd.Name) - continue - } - - cacheEntry, exists := cmf.crdCache[key] - - // New CRD (never seen before) - if !exists { - toSend = append(toSend, crd) - cmf.crdCache[key] = &crdCacheEntry{ - hash: newHash, - lastSent: now, - } - cmf.logger.V(1).Info("New CRD detected", "kind", crd.Kind, "namespace", crd.Namespace, "name", crd.Name) - continue +// fetchCRDInstance gets the typed object from the cache and converts to CRDInstance. +// Returns (nil, nil) if the kind is unknown. +func (cmf *CRDMetadataForwarder) fetchCRDInstance(ctx context.Context, kind, namespace, name string) (*CRDInstance, error) { + switch kind { + case "DatadogAgent": + dda := &v2alpha1.DatadogAgent{} + if err := cmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, dda); err != nil { + return nil, err } - - // Hash changed (spec/labels/annotations modified) - if cacheEntry.hash != newHash { - toSend = append(toSend, crd) - cmf.crdCache[key] = &crdCacheEntry{ - hash: newHash, - lastSent: now, - } - cmf.logger.V(1).Info("CRD change detected", "kind", crd.Kind, "namespace", crd.Namespace, "name", crd.Name) - continue + return crdInstanceFromDDA(dda), nil + case "DatadogAgentInternal": + ddai := &v1alpha1.DatadogAgentInternal{} + if err := cmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, ddai); err != nil { + return nil, err } - - // Heartbeat needed (unchanged but 10+ minutes since last send) - timeSinceLastSend := now.Sub(cacheEntry.lastSent) - if timeSinceLastSend >= crdMetadataHeartbeatTTL { - toSend = append(toSend, crd) - cmf.crdCache[key].lastSent = now - cmf.logger.V(2).Info("CRD heartbeat due", "kind", crd.Kind, "namespace", crd.Namespace, "name", crd.Name, - "timeSinceLastSend", timeSinceLastSend.Round(time.Second)) - continue + return crdInstanceFromDDAI(ddai), nil + case "DatadogAgentProfile": + dap := &v1alpha1.DatadogAgentProfile{} + if err := cmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, dap); err != nil { + return nil, err } + return crdInstanceFromDAP(dap), nil + default: + cmf.logger.V(1).Info("Unknown CRD kind", "kind", kind) + return nil, nil } - - return toSend } -// cleanupDeletedCRDs removes cache entries for CRDs that got deleted -func (cmf *CRDMetadataForwarder) cleanupDeletedCRDs(currentCRDs []CRDInstance, successfulKinds map[string]bool) { - cmf.cacheMutex.Lock() - defer cmf.cacheMutex.Unlock() +func cleanedAnnotations(in map[string]string) map[string]string { + out := maps.Clone(in) + delete(out, "kubectl.kubernetes.io/last-applied-configuration") + return out +} - currentKeys := make(map[string]bool) - for _, crd := range currentCRDs { - currentKeys[buildCacheKey(crd)] = true +func crdInstanceFromDDA(dda *v2alpha1.DatadogAgent) *CRDInstance { + return &CRDInstance{ + Kind: "DatadogAgent", + Name: dda.Name, + Namespace: dda.Namespace, + APIVersion: dda.APIVersion, + UID: string(dda.UID), + Spec: dda.Spec, + Labels: dda.Labels, + Annotations: cleanedAnnotations(dda.Annotations), } +} - for key := range cmf.crdCache { - cachedKind, rest, found := strings.Cut(key, "/") - if !found { - continue - } - - // Only clean up cache for kinds that were successfully listed - if successfulKinds[cachedKind] { - if !currentKeys[key] { - delete(cmf.crdCache, key) - cachedNS, cachedName, _ := strings.Cut(rest, "/") - cmf.logger.V(1).Info("Removed deleted CRD from cache", "kind", cachedKind, "namespace", cachedNS, "name", cachedName) - } - } +func crdInstanceFromDDAI(ddai *v1alpha1.DatadogAgentInternal) *CRDInstance { + return &CRDInstance{ + Kind: "DatadogAgentInternal", + Name: ddai.Name, + Namespace: ddai.Namespace, + APIVersion: ddai.APIVersion, + UID: string(ddai.UID), + Spec: ddai.Spec, + Labels: ddai.Labels, + Annotations: cleanedAnnotations(ddai.Annotations), } } -// Cache helper functions -// buildCacheKey builds a unique key for a CRD instance, with format "kind/namespace/name" -func buildCacheKey(crd CRDInstance) string { - return fmt.Sprintf("%s/%s/%s", crd.Kind, crd.Namespace, crd.Name) +func crdInstanceFromDAP(dap *v1alpha1.DatadogAgentProfile) *CRDInstance { + return &CRDInstance{ + Kind: "DatadogAgentProfile", + Name: dap.Name, + Namespace: dap.Namespace, + APIVersion: dap.APIVersion, + UID: string(dap.UID), + Spec: dap.Spec, + Labels: dap.Labels, + Annotations: cleanedAnnotations(dap.Annotations), + } } -// hashCRD computes a SHA256 hash of the CRD spec, labels, and annotations for change detection -func hashCRD(crd CRDInstance) (string, error) { - // Hash spec, labels, and annotations together - hashable := struct { - Spec any `json:"spec"` - Labels map[string]string `json:"labels,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` - }{ - Spec: crd.Spec, - Labels: crd.Labels, - Annotations: crd.Annotations, - } +// handleDelete is the DeleteFunc callback for InformerWorkQueue. +func (cmf *CRDMetadataForwarder) handleDelete(kind, namespace, name string) { + key := EncodeKey(kind, namespace, name) + cmf.crdSnapshots.Delete(key) + cmf.logger.V(1).Info("Removed deleted CRD from snapshot store", + "kind", kind, "namespace", namespace, "name", name) +} - hashableJSON, err := json.Marshal(hashable) - if err != nil { - return "", err - } - hash := sha256.Sum256(hashableJSON) - return fmt.Sprintf("%x", hash), nil +// heartbeat is the HeartbeatFunc callback for InformerWorkQueue. +func (cmf *CRDMetadataForwarder) heartbeat(ctx context.Context) { + cmf.crdSnapshots.Range(func(key, value any) bool { + snap := value.(*crdSnapshot) + if err := cmf.sendCRDMetadata(ctx, snap.instance); err != nil { + cmf.logger.V(1).Info("Failed to send CRD metadata during heartbeat", + "key", key, "error", err) + } + return true + }) } diff --git a/pkg/controller/utils/metadata/crd_metadata_test.go b/pkg/controller/utils/metadata/crd_metadata_test.go index 9a662fae7c..805af2d119 100644 --- a/pkg/controller/utils/metadata/crd_metadata_test.go +++ b/pkg/controller/utils/metadata/crd_metadata_test.go @@ -6,363 +6,146 @@ package metadata import ( - "encoding/json" + "net/http" + "net/http/httptest" + "sync/atomic" "testing" - "time" - "github.com/DataDog/datadog-operator/pkg/config" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/log/zap" -) - -// Test that payload generation works correctly for CRD metadata -func Test_CRDBuildPayload(t *testing.T) { - expectedKubernetesVersion := "v1.28.0" - expectedOperatorVersion := "v1.19.0" - expectedClusterUID := "test-cluster-uid-12345" - expectedCRDKind := "DatadogAgent" - expectedCRDName := "my-datadog-agent" - expectedCRDNamespace := "datadog" - expectedCRDAPIVersion := "datadoghq.com/v2alpha1" - expectedCRDUID := "crd-uid-67890" - - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - expectedKubernetesVersion, - expectedOperatorVersion, - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{ - DatadogAgentEnabled: true, - DatadogAgentInternalEnabled: true, - DatadogAgentProfileEnabled: true, - }, - ) - - // Create a test CRD instance - testSpec := map[string]any{ - "global": map[string]any{ - "credentials": map[string]any{ - "apiKey": "secret-key", - }, - }, - } - testLabels := map[string]string{ - "app": "datadog-agent", - "env": "test", - } - testAnnotations := map[string]string{ - "owner": "sre-team", - "version": "1.0", - } - - crdInstance := CRDInstance{ - Kind: expectedCRDKind, - Name: expectedCRDName, - Namespace: expectedCRDNamespace, - APIVersion: expectedCRDAPIVersion, - UID: expectedCRDUID, - Spec: testSpec, - Labels: testLabels, - Annotations: testAnnotations, - } - payload := cmf.buildPayload(expectedClusterUID, crdInstance) - - // Verify payload is valid JSON - if len(payload) == 0 { - t.Error("buildPayload() returned empty payload") - } + "github.com/DataDog/datadog-operator/api/datadoghq/v1alpha1" + "github.com/DataDog/datadog-operator/api/datadoghq/v2alpha1" + "github.com/DataDog/datadog-operator/pkg/config" + "github.com/DataDog/datadog-operator/pkg/constants" +) - // Parse JSON to validate specific values - var parsed map[string]any - if err := json.Unmarshal(payload, &parsed); err != nil { - t.Fatalf("buildPayload() returned invalid JSON: %v", err) - } +// crdTestHarness sets up everything processKey needs to run end-to-end: +// - kube-system Namespace seeded for GetOrCreateClusterUID +// - DD_API_KEY env var set so credential lookup doesn't need a DDA +// - httptest.Server intercepts the metadata POST; sendCount counts hits +// - fake k8s client wired through SharedMetadata +type crdTestHarness struct { + cmf *CRDMetadataForwarder + sendCount *atomic.Int32 + srv *httptest.Server + client client.Client +} - if timestamp, ok := parsed["timestamp"].(float64); !ok || timestamp <= 0 { - t.Errorf("buildPayload() timestamp = %v, want positive number", timestamp) - } +func newCRDTestHarness(t *testing.T, seedObjs ...client.Object) *crdTestHarness { + t.Helper() - if uuid, ok := parsed["uuid"].(string); !ok || uuid != expectedClusterUID { - t.Errorf("buildPayload() uuid = %v, want %v", uuid, expectedClusterUID) - } + t.Setenv(constants.DDAPIKey, "test-api-key") - if clusterID, ok := parsed["cluster_id"].(string); !ok || clusterID != expectedClusterUID { - t.Errorf("buildPayload() cluster_id = %v, want %v", clusterID, expectedClusterUID) + scheme := runtime.NewScheme() + if err := clientgoscheme.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme: %v", err) } - - // Validate metadata object exists - metadata, ok := parsed["datadog_operator_crd_metadata"].(map[string]any) - if !ok { - t.Fatal("buildPayload() missing or invalid datadog_operator_crd_metadata") + if err := v2alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme v2alpha1: %v", err) } - - // Validate CRD-specific fields in metadata - if operatorVersion, ok := metadata["operator_version"].(string); !ok || operatorVersion != expectedOperatorVersion { - t.Errorf("buildPayload() metadata.operator_version = %v, want %v", operatorVersion, expectedOperatorVersion) + if err := v1alpha1.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme v1alpha1: %v", err) } - if kubernetesVersion, ok := metadata["kubernetes_version"].(string); !ok || kubernetesVersion != expectedKubernetesVersion { - t.Errorf("buildPayload() metadata.kubernetes_version = %v, want %v", kubernetesVersion, expectedKubernetesVersion) + kubeSystem := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "kube-system", UID: "kube-system-uid"}, } + objs := append([]client.Object{kubeSystem}, seedObjs...) + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() - if clusterID, ok := metadata["cluster_id"].(string); !ok || clusterID != expectedClusterUID { - t.Errorf("buildPayload() metadata.cluster_id = %v, want %v", clusterID, expectedClusterUID) - } + var sendCount atomic.Int32 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + sendCount.Add(1) + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + t.Setenv(constants.DDURL, srv.URL) - if crdKind, ok := metadata["crd_kind"].(string); !ok || crdKind != expectedCRDKind { - t.Errorf("buildPayload() metadata.crd_kind = %v, want %v", crdKind, expectedCRDKind) - } - - if crdName, ok := metadata["crd_name"].(string); !ok || crdName != expectedCRDName { - t.Errorf("buildPayload() metadata.crd_name = %v, want %v", crdName, expectedCRDName) - } - - if crdNamespace, ok := metadata["crd_namespace"].(string); !ok || crdNamespace != expectedCRDNamespace { - t.Errorf("buildPayload() metadata.crd_namespace = %v, want %v", crdNamespace, expectedCRDNamespace) - } - - if crdAPIVersion, ok := metadata["crd_api_version"].(string); !ok || crdAPIVersion != expectedCRDAPIVersion { - t.Errorf("buildPayload() metadata.crd_api_version = %v, want %v", crdAPIVersion, expectedCRDAPIVersion) + cmf := &CRDMetadataForwarder{ + SharedMetadata: NewSharedMetadata(zap.New(zap.UseDevMode(true)), c, "v1.28.0", "v1.19.0", config.NewCredentialManager(c)), + enabledCRDs: EnabledCRDKindsConfig{DatadogAgentEnabled: true, DatadogAgentInternalEnabled: true, DatadogAgentProfileEnabled: true}, } + return &crdTestHarness{cmf: cmf, sendCount: &sendCount, srv: srv, client: c} +} - if crdUID, ok := metadata["crd_uid"].(string); !ok || crdUID != expectedCRDUID { - t.Errorf("buildPayload() metadata.crd_uid = %v, want %v", crdUID, expectedCRDUID) +func Test_CRDProcessKey_NewCRDSends(t *testing.T) { + dda := &v2alpha1.DatadogAgent{ + ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "datadog", UID: "uid-1"}, } + dda.APIVersion = "datadoghq.com/v2alpha1" - // Validate crd_spec_full exists and is valid JSON - if crdSpecFull, ok := metadata["crd_spec_full"].(string); !ok || crdSpecFull == "" { - t.Errorf("buildPayload() metadata.crd_spec_full = %v, want non-empty JSON string", crdSpecFull) - } else { - // Verify it's valid JSON - var specParsed map[string]any - if err := json.Unmarshal([]byte(crdSpecFull), &specParsed); err != nil { - t.Errorf("buildPayload() metadata.crd_spec_full is not valid JSON: %v", err) - } + h := newCRDTestHarness(t, dda) + if err := h.cmf.processKey(t.Context(), "DatadogAgent", "datadog", "agent"); err != nil { + t.Fatalf("processKey: %v", err) } - - // Validate crd_labels (stored as JSON string in the payload) - if crdLabelsJSON, ok := metadata["crd_labels"].(string); !ok { - t.Errorf("buildPayload() metadata.crd_labels type = %T, want string", metadata["crd_labels"]) - } else { - // Parse the JSON string to validate contents - var labels map[string]string - if err := json.Unmarshal([]byte(crdLabelsJSON), &labels); err != nil { - t.Errorf("buildPayload() metadata.crd_labels invalid JSON: %v", err) - } else if labels["app"] != "datadog-agent" || labels["env"] != "test" { - t.Errorf("buildPayload() metadata.crd_labels = %v, want app=datadog-agent, env=test", labels) - } + if got := h.sendCount.Load(); got != 1 { + t.Errorf("send count = %d, want 1", got) } - - // Validate crd_annotations (stored as JSON string in the payload) - if crdAnnotationsJSON, ok := metadata["crd_annotations"].(string); !ok { - t.Errorf("buildPayload() metadata.crd_annotations type = %T, want string", metadata["crd_annotations"]) - } else { - // Parse the JSON string to validate contents - var annotations map[string]string - if err := json.Unmarshal([]byte(crdAnnotationsJSON), &annotations); err != nil { - t.Errorf("buildPayload() metadata.crd_annotations invalid JSON: %v", err) - } else if annotations["owner"] != "sre-team" || annotations["version"] != "1.0" { - t.Errorf("buildPayload() metadata.crd_annotations = %v, want owner=sre-team, version=1.0", annotations) - } + if _, ok := h.cmf.crdSnapshots.Load(EncodeKey("DatadogAgent", "datadog", "agent")); !ok { + t.Errorf("snapshot not stored") } } -// Test that hash-based change detection works correctly -func Test_CRDCacheDetection(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{ - DatadogAgentEnabled: true, - DatadogAgentInternalEnabled: true, - DatadogAgentProfileEnabled: true, - }, - ) - - crd1 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - Labels: map[string]string{"app": "agent"}, - Annotations: map[string]string{"owner": "team-a"}, +func Test_CRDProcessKey_UnchangedSkipsSend(t *testing.T) { + dda := &v2alpha1.DatadogAgent{ + ObjectMeta: metav1.ObjectMeta{Name: "agent", Namespace: "datadog", UID: "uid-1"}, } + h := newCRDTestHarness(t, dda) - crd2 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-2", - Namespace: "default", - Spec: map[string]any{"version": "7.51.0"}, - Labels: map[string]string{"app": "agent"}, - Annotations: map[string]string{"owner": "team-b"}, + if err := h.cmf.processKey(t.Context(), "DatadogAgent", "datadog", "agent"); err != nil { + t.Fatalf("first processKey: %v", err) } - // First call - both CRDs should be new (changed) - changed := cmf.getCRDsToSend([]CRDInstance{crd1, crd2}) - if len(changed) != 2 { - t.Errorf("Expected 2 changed CRDs on first run, got %d", len(changed)) + if err := h.cmf.processKey(t.Context(), "DatadogAgent", "datadog", "agent"); err != nil { + t.Fatalf("second processKey: %v", err) } - - // Second call with same specs - no changes expected - changed = cmf.getCRDsToSend([]CRDInstance{crd1, crd2}) - if len(changed) != 0 { - t.Errorf("Expected 0 changed CRDs on second run, got %d", len(changed)) - } - - // Modify crd1 spec - crd1Modified := crd1 - crd1Modified.Spec = map[string]any{"version": "7.52.0"} - - // Third call with modified crd1 spec - only 1 change expected - changed = cmf.getCRDsToSend([]CRDInstance{crd1Modified, crd2}) - if len(changed) != 1 { - t.Errorf("Expected 1 changed CRD after spec modification, got %d", len(changed)) - } - if len(changed) > 0 && changed[0].Name != "test-agent" { - t.Errorf("Expected changed CRD to be 'test-agent', got '%s'", changed[0].Name) - } - - // Modify crd1 labels - should detect change - crd1ModifiedLabels := crd1 - crd1ModifiedLabels.Labels = map[string]string{"app": "agent", "env": "prod"} - - changed = cmf.getCRDsToSend([]CRDInstance{crd1ModifiedLabels, crd2}) - if len(changed) != 1 { - t.Errorf("Expected 1 changed CRD after label modification, got %d", len(changed)) - } - - // Modify crd1 annotations - should detect change - crd1ModifiedAnnotations := crd1ModifiedLabels - crd1ModifiedAnnotations.Annotations = map[string]string{"owner": "team-c"} - - changed = cmf.getCRDsToSend([]CRDInstance{crd1ModifiedAnnotations, crd2}) - if len(changed) != 1 { - t.Errorf("Expected 1 changed CRD after annotation modification, got %d", len(changed)) + if got := h.sendCount.Load(); got != 1 { + t.Errorf("send count = %d, want 1 (unchanged should not re-send)", got) } } -// Test that cache cleanup works correctly -func Test_CRDCacheCleanup(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{DatadogAgentEnabled: true}, - ) - - crd1 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, +func Test_CRDProcessKey_NotFoundIsNoOp(t *testing.T) { + h := newCRDTestHarness(t) + if err := h.cmf.processKey(t.Context(), "DatadogAgent", "datadog", "ghost"); err != nil { + t.Errorf("processKey on missing object should be no-op, got %v", err) } - - crd2 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-2", - Namespace: "default", - Spec: map[string]any{"version": "7.51.0"}, + if got := h.sendCount.Load(); got != 0 { + t.Errorf("send count = %d, want 0", got) } +} - successfulKinds := map[string]bool{"DatadogAgent": true} - - // Add both CRDs to cache - cmf.getCRDsToSend([]CRDInstance{crd1, crd2}) - - cmf.cacheMutex.RLock() - initialCacheSize := len(cmf.crdCache) - cmf.cacheMutex.RUnlock() - if initialCacheSize != 2 { - t.Errorf("Expected cache size 2, got %d", initialCacheSize) - } +func Test_CRDHandleDelete(t *testing.T) { + h := newCRDTestHarness(t) + key := EncodeKey("DatadogAgent", "datadog", "agent") + h.cmf.crdSnapshots.Store(key, &crdSnapshot{}) - // Remove crd2 and cleanup - cmf.cleanupDeletedCRDs([]CRDInstance{crd1}, successfulKinds) + h.cmf.handleDelete("DatadogAgent", "datadog", "agent") - cmf.cacheMutex.RLock() - finalCacheSize := len(cmf.crdCache) - cmf.cacheMutex.RUnlock() - if finalCacheSize != 1 { - t.Errorf("Expected cache size 1 after cleanup, got %d", finalCacheSize) + if _, ok := h.cmf.crdSnapshots.Load(key); ok { + t.Errorf("snapshot still present after delete") } } -// Test that per-kind error handling preserves cache correctly -func Test_CRDPerKindErrorHandling(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{ - DatadogAgentEnabled: true, - DatadogAgentInternalEnabled: true, - }, +func Test_CRDHeartbeatResendsAllSnapshots(t *testing.T) { + h := newCRDTestHarness(t) + h.cmf.crdSnapshots.Store( + EncodeKey("DatadogAgent", "ns", "a"), + &crdSnapshot{instance: CRDInstance{Kind: "DatadogAgent", Name: "a", Namespace: "ns"}, hash: "h-a"}, + ) + h.cmf.crdSnapshots.Store( + EncodeKey("DatadogAgent", "ns", "b"), + &crdSnapshot{instance: CRDInstance{Kind: "DatadogAgent", Name: "b", Namespace: "ns"}, hash: "h-b"}, ) - ddaCRD := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-dda", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - ddaiCRD := CRDInstance{ - Kind: "DatadogAgentInternal", - Name: "test-ddai", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - cmf.getCRDsToSend([]CRDInstance{ddaCRD, ddaiCRD}) - - cmf.cacheMutex.RLock() - cacheSize := len(cmf.crdCache) - cmf.cacheMutex.RUnlock() - if cacheSize != 2 { - t.Errorf("Expected cache size 2, got %d", cacheSize) - } - - // Second run: DatadogAgent successful, DatadogAgentInternal failed - onlyDDASuccessful := map[string]bool{"DatadogAgent": true} - - // Filter should only process DDA (no changes since spec is same) - changed := cmf.getCRDsToSend([]CRDInstance{ddaCRD}) - if len(changed) != 0 { - t.Errorf("Expected 0 changed CRDs for DDA (unchanged spec), got %d", len(changed)) - } - - // Cleanup should only process DDA (DDAI cache should be preserved) - cmf.cleanupDeletedCRDs([]CRDInstance{ddaCRD}, onlyDDASuccessful) + h.cmf.heartbeat(t.Context()) - // Verify cache still has 2 entries (DDAI not cleaned up because it failed to list) - cmf.cacheMutex.RLock() - finalCacheSize := len(cmf.crdCache) - cmf.cacheMutex.RUnlock() - if finalCacheSize != 2 { - t.Errorf("Expected cache size 2 (DDAI preserved), got %d", finalCacheSize) - } -} - -// Test buildCacheKey function -func Test_BuildCacheKey(t *testing.T) { - crd := CRDInstance{ - Kind: "DatadogAgent", - Name: "my-agent", - Namespace: "datadog", - } - - key := buildCacheKey(crd) - expected := "DatadogAgent/datadog/my-agent" - if key != expected { - t.Errorf("buildCacheKey() = %s, want %s", key, expected) + if got := h.sendCount.Load(); got != 2 { + t.Errorf("heartbeat sent %d, want 2", got) } } @@ -451,285 +234,3 @@ func Test_HashCRD(t *testing.T) { t.Errorf("Expected different hash for different labels, both got %s", hash1) } } - -// Test that heartbeat triggers after 10 minutes for unchanged CRDs -func Test_CRDHeartbeatTriggersAfter10Minutes(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{DatadogAgentEnabled: true}, - ) - - crd := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - Labels: map[string]string{"app": "agent"}, - } - - // First call - should be new - toSend := cmf.getCRDsToSend([]CRDInstance{crd}) - if len(toSend) != 1 { - t.Errorf("Expected 1 CRD to send (new), got %d", len(toSend)) - } - - // Second call immediately - should not send (no change, no heartbeat due) - toSend = cmf.getCRDsToSend([]CRDInstance{crd}) - if len(toSend) != 0 { - t.Errorf("Expected 0 CRDs to send (no change, too soon), got %d", len(toSend)) - } - - // Simulate 10+ minutes passing by backdating the cache entry - cmf.cacheMutex.Lock() - key := buildCacheKey(crd) - if entry, exists := cmf.crdCache[key]; exists { - entry.lastSent = entry.lastSent.Add(-11 * time.Minute) - } - cmf.cacheMutex.Unlock() - - // Third call after 10+ minutes - should trigger heartbeat - toSend = cmf.getCRDsToSend([]CRDInstance{crd}) - if len(toSend) != 1 { - t.Errorf("Expected 1 CRD to send (heartbeat due), got %d", len(toSend)) - } - - // Verify the timestamp was updated - cmf.cacheMutex.RLock() - entry := cmf.crdCache[key] - timeSinceLastSent := time.Since(entry.lastSent) - cmf.cacheMutex.RUnlock() - - if timeSinceLastSent > 1*time.Second { - t.Errorf("Expected lastSent to be updated to now, but it was %v ago", timeSinceLastSent) - } -} - -// Test that spec changes still work with heartbeat logic -func Test_CRDChangeDetectionWithHeartbeat(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{DatadogAgentEnabled: true}, - ) - - crd := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - // First call - new CRD - toSend := cmf.getCRDsToSend([]CRDInstance{crd}) - if len(toSend) != 1 { - t.Fatalf("Expected 1 CRD to send (new), got %d", len(toSend)) - } - - // Modify spec immediately - crdModified := crd - crdModified.Spec = map[string]any{"version": "7.51.0"} - - // Should send immediately due to change - toSend = cmf.getCRDsToSend([]CRDInstance{crdModified}) - if len(toSend) != 1 { - t.Errorf("Expected 1 CRD to send (spec changed), got %d", len(toSend)) - } - - // Verify hash was updated - cmf.cacheMutex.RLock() - key := buildCacheKey(crd) - newHash, _ := hashCRD(crdModified) - if cmf.crdCache[key].hash != newHash { - t.Errorf("Expected hash to be updated after spec change") - } - cmf.cacheMutex.RUnlock() -} - -// Test that heartbeat timer resets when spec changes -func Test_CRDHeartbeatResetsOnChange(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{DatadogAgentEnabled: true}, - ) - - crd := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - // First call - new CRD - cmf.getCRDsToSend([]CRDInstance{crd}) - - // Backdate the cache entry to 9 minutes ago - cmf.cacheMutex.Lock() - key := buildCacheKey(crd) - cmf.crdCache[key].lastSent = cmf.crdCache[key].lastSent.Add(-9 * time.Minute) - cmf.cacheMutex.Unlock() - - // Modify spec (heartbeat not due yet, but spec changed) - crdModified := crd - crdModified.Spec = map[string]any{"version": "7.51.0"} - - // Should send due to spec change - toSend := cmf.getCRDsToSend([]CRDInstance{crdModified}) - if len(toSend) != 1 { - t.Errorf("Expected 1 CRD to send (spec changed), got %d", len(toSend)) - } - - // Verify timestamp was reset to now - cmf.cacheMutex.RLock() - timeSinceLastSent := time.Since(cmf.crdCache[key].lastSent) - cmf.cacheMutex.RUnlock() - - if timeSinceLastSent > 1*time.Second { - t.Errorf("Expected lastSent to be reset to now, but it was %v ago", timeSinceLastSent) - } - - // Wait should not trigger heartbeat yet (just reset) - toSend = cmf.getCRDsToSend([]CRDInstance{crdModified}) - if len(toSend) != 0 { - t.Errorf("Expected 0 CRDs to send (timer reset), got %d", len(toSend)) - } -} - -// Test that multiple CRDs can have independent heartbeat timers -func Test_CRDMultipleHeartbeats(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{DatadogAgentEnabled: true}, - ) - - crd1 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-1", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - crd2 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-2", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - // First call - both new - toSend := cmf.getCRDsToSend([]CRDInstance{crd1, crd2}) - if len(toSend) != 2 { - t.Fatalf("Expected 2 CRDs to send (new), got %d", len(toSend)) - } - - // Backdate only crd1 to 11 minutes ago - cmf.cacheMutex.Lock() - key1 := buildCacheKey(crd1) - cmf.crdCache[key1].lastSent = cmf.crdCache[key1].lastSent.Add(-11 * time.Minute) - cmf.cacheMutex.Unlock() - - // Should only send crd1 (heartbeat due) - toSend = cmf.getCRDsToSend([]CRDInstance{crd1, crd2}) - if len(toSend) != 1 { - t.Errorf("Expected 1 CRD to send (crd1 heartbeat), got %d", len(toSend)) - } - if len(toSend) > 0 && toSend[0].Name != "test-agent-1" { - t.Errorf("Expected crd1 to be sent, got %s", toSend[0].Name) - } - - // Backdate crd2 to 11 minutes ago - cmf.cacheMutex.Lock() - key2 := buildCacheKey(crd2) - cmf.crdCache[key2].lastSent = cmf.crdCache[key2].lastSent.Add(-11 * time.Minute) - cmf.cacheMutex.Unlock() - - // Should only send crd2 now (crd1 was just sent) - toSend = cmf.getCRDsToSend([]CRDInstance{crd1, crd2}) - if len(toSend) != 1 { - t.Errorf("Expected 1 CRD to send (crd2 heartbeat), got %d", len(toSend)) - } - if len(toSend) > 0 && toSend[0].Name != "test-agent-2" { - t.Errorf("Expected crd2 to be sent, got %s", toSend[0].Name) - } -} - -// Test mixed scenarios: some changed, some heartbeat, some neither -func Test_CRDMixedChangesAndHeartbeats(t *testing.T) { - cmf := NewCRDMetadataForwarder( - zap.New(zap.UseDevMode(true)), - nil, - "v1.28.0", - "v1.19.0", - config.NewCredentialManager(fake.NewFakeClient()), - EnabledCRDKindsConfig{DatadogAgentEnabled: true}, - ) - - crd1 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-1", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - crd2 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-2", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - crd3 := CRDInstance{ - Kind: "DatadogAgent", - Name: "test-agent-3", - Namespace: "default", - Spec: map[string]any{"version": "7.50.0"}, - } - - // Initialize all CRDs - cmf.getCRDsToSend([]CRDInstance{crd1, crd2, crd3}) - - // Setup: crd1 needs heartbeat, crd2 will change, crd3 neither - cmf.cacheMutex.Lock() - cmf.crdCache[buildCacheKey(crd1)].lastSent = cmf.crdCache[buildCacheKey(crd1)].lastSent.Add(-11 * time.Minute) - cmf.cacheMutex.Unlock() - - // Modify crd2 - crd2Modified := crd2 - crd2Modified.Spec = map[string]any{"version": "7.51.0"} - - // Should send crd1 (heartbeat) and crd2 (changed) - toSend := cmf.getCRDsToSend([]CRDInstance{crd1, crd2Modified, crd3}) - if len(toSend) != 2 { - t.Errorf("Expected 2 CRDs to send (1 heartbeat, 1 changed), got %d", len(toSend)) - } - - // Verify both crd1 and crd2 are in the list - sentNames := make(map[string]bool) - for _, crd := range toSend { - sentNames[crd.Name] = true - } - - if !sentNames["test-agent-1"] { - t.Error("Expected test-agent-1 to be sent (heartbeat)") - } - if !sentNames["test-agent-2"] { - t.Error("Expected test-agent-2 to be sent (changed)") - } - if sentNames["test-agent-3"] { - t.Error("Did not expect test-agent-3 to be sent") - } -} diff --git a/pkg/controller/utils/metadata/helm_metadata.go b/pkg/controller/utils/metadata/helm_metadata.go index b377641da1..1d4ab2986b 100644 --- a/pkg/controller/utils/metadata/helm_metadata.go +++ b/pkg/controller/utils/metadata/helm_metadata.go @@ -22,12 +22,8 @@ import ( "github.com/go-logr/logr" "gopkg.in/yaml.v2" - authorizationv1 "k8s.io/api/authorization/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - toolscache "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -37,12 +33,10 @@ import ( const ( // releasePrefix is the prefix for Helm release ConfigMaps and Secrets releasePrefix = "sh.helm.release.v1." - // tickerInterval is how often the ticker sends all snapshots - tickerInterval = 5 * time.Minute - // deletePrefix is prepended to queue keys to signal a deletion - deletePrefix = "delete:" - // numWorkers is the number of concurrent workers - numWorkers = 3 + // helmHeartbeatInterval is how often the heartbeat sends all snapshots + helmHeartbeatInterval = 5 * time.Minute + // helmNumWorkers is the number of concurrent workers + helmNumWorkers = 3 ) var ( @@ -59,21 +53,13 @@ type HelmMetadataForwarder struct { mgr manager.Manager - // Workqueue for processing Helm releases - queue workqueue.TypedRateLimitingInterface[string] + // runner owns the workqueue, worker pool, and heartbeat ticker. + runner *InformerWorkQueue // Track latest snapshot of each release // Key: "namespace/releaseName" // Value: *ReleaseEntry releaseSnapshots sync.Map - - // configMapAccessEnabled tracks whether the operator has permission to list/watch ConfigMaps. - // Set once in Start() and used to skip the ConfigMap path in processKey. - configMapAccessEnabled bool - - // secretAccessEnabled tracks whether the operator has permission to list/watch Secrets. - // Set once in Start() and used to skip the Secret path in processKey. - secretAccessEnabled bool } // ReleaseEntry wraps a ReleaseSnapshot with a mutex for safe concurrent access @@ -157,123 +143,41 @@ type HelmReleaseMinimal struct { func NewHelmMetadataForwarderWithManager(logger logr.Logger, mgr manager.Manager, k8sClient client.Reader, kubernetesVersion string, operatorVersion string, credsManager *config.CredentialManager) *HelmMetadataForwarder { forwarderLogger := logger.WithName("helm") - return &HelmMetadataForwarder{ + hmf := &HelmMetadataForwarder{ SharedMetadata: NewSharedMetadata(forwarderLogger, k8sClient, kubernetesVersion, operatorVersion, credsManager), mgr: mgr, - queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), } + hmf.runner = NewInformerWorkQueue( + forwarderLogger, + mgr, + helmNumWorkers, + helmHeartbeatInterval, + hmf.processKey, + hmf.handleDelete, + hmf.heartbeat, + ) + return hmf } -// canListWatch checks if the operator has permission to list and watch the given resource -func (hmf *HelmMetadataForwarder) canListWatch(ctx context.Context, resource string) bool { - for _, verb := range []string{"list", "watch"} { - sar := &authorizationv1.SelfSubjectAccessReview{ - Spec: authorizationv1.SelfSubjectAccessReviewSpec{ - ResourceAttributes: &authorizationv1.ResourceAttributes{ - Verb: verb, - Resource: resource, - }, - }, - } - if err := hmf.mgr.GetClient().Create(ctx, sar); err != nil { - hmf.logger.V(1).Info("Failed to check RBAC permission", "resource", resource, "verb", verb, "error", err) - return false - } - if !sar.Status.Allowed { - return false - } - } - return true -} - -// Start implements manager.Runnable interface -// It is called by the manager after the cache is synced but we don't need to initialize resources at start. -// Cache sends synthetic 'Add' events to the newly registered handler, see -// https://github.com/kubernetes/client-go/blob/v0.35.0/tools/cache/shared_informer.go#L693-L697 -// Errors are logged but do not prevent the operator from starting +// Start implements manager.Runnable. func (hmf *HelmMetadataForwarder) Start(ctx context.Context) error { - hmf.configMapAccessEnabled = false - if hmf.canListWatch(ctx, "configmaps") { - cmInformer, err := hmf.mgr.GetCache().GetInformer(ctx, &corev1.ConfigMap{}) - if err != nil { - hmf.logger.Info("Unable to get ConfigMap informer, Helm metadata collection from ConfigMaps will be disabled", "error", err) - } else { - _, err = cmInformer.AddEventHandler(toolscache.FilteringResourceEventHandler{ - FilterFunc: func(obj any) bool { - cm, ok := obj.(*corev1.ConfigMap) - return ok && - cm.Labels["owner"] == "helm" && - strings.HasPrefix(cm.Name, releasePrefix) - }, - Handler: toolscache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - if key, keyErr := toolscache.MetaNamespaceKeyFunc(obj); keyErr == nil { - hmf.queue.Add(key) - hmf.logger.V(2).Info("Enqueued ConfigMap for processing", "key", key) - } - }, - DeleteFunc: func(obj any) { - if key, keyErr := toolscache.DeletionHandlingMetaNamespaceKeyFunc(obj); keyErr == nil { - hmf.queue.Add(deletePrefix + key) - hmf.logger.V(2).Info("Enqueued ConfigMap deletion for processing", "key", key) - } - }, - }, - }) - if err != nil { - hmf.logger.Info("Unable to add ConfigMap event handler, Helm metadata collection from ConfigMaps will be disabled", "error", err) - } else { - hmf.configMapAccessEnabled = true - } - } - } else { - hmf.logger.Info("No permission to list/watch ConfigMaps, Helm metadata collection from ConfigMaps will be disabled") - } - - hmf.secretAccessEnabled = false - if hmf.canListWatch(ctx, "secrets") { - secretInformer, secretErr := hmf.mgr.GetCache().GetInformer(ctx, &corev1.Secret{}) - if secretErr != nil { - hmf.logger.Info("Unable to get Secret informer, Helm metadata collection from Secrets will be disabled", "error", secretErr) - } else { - _, secretErr = secretInformer.AddEventHandler(toolscache.FilteringResourceEventHandler{ - FilterFunc: func(obj any) bool { - secret, ok := obj.(*corev1.Secret) - return ok && - secret.Labels["owner"] == "helm" && - strings.HasPrefix(secret.Name, releasePrefix) - }, - Handler: toolscache.ResourceEventHandlerFuncs{ - AddFunc: func(obj any) { - if key, keyErr := toolscache.MetaNamespaceKeyFunc(obj); keyErr == nil { - hmf.queue.Add(key) - } - }, - DeleteFunc: func(obj any) { - if key, keyErr := toolscache.DeletionHandlingMetaNamespaceKeyFunc(obj); keyErr == nil { - hmf.queue.Add(deletePrefix + key) - } - }, - }, - }) - if secretErr != nil { - hmf.logger.Info("Unable to add Secret event handler, Helm metadata collection from Secrets will be disabled", "error", secretErr) - } else { - hmf.secretAccessEnabled = true - } - } - } else { - hmf.logger.Info("No permission to list/watch Secrets, Helm metadata collection from Secrets will be disabled") + cmFilter := func(obj any) bool { + cm, ok := obj.(*corev1.ConfigMap) + return ok && cm.Labels["owner"] == "helm" && strings.HasPrefix(cm.Name, releasePrefix) + } + secretFilter := func(obj any) bool { + s, ok := obj.(*corev1.Secret) + return ok && s.Labels["owner"] == "helm" && strings.HasPrefix(s.Name, releasePrefix) } - // Cache is already synced by the manager before Start() is called - - // Start worker pool - go hmf.runWorkers(ctx, numWorkers) - - // Start ticker for periodic sends - go hmf.tickerLoop(ctx) + hmf.runner.AddWatch(ctx, WatchTarget{ + Object: &corev1.ConfigMap{}, Resource: "configmaps", Kind: "ConfigMap", Filter: cmFilter, + }) + hmf.runner.AddWatch(ctx, WatchTarget{ + Object: &corev1.Secret{}, Resource: "secrets", Kind: "Secret", Filter: secretFilter, + }) + go hmf.runner.Run(ctx) return nil } @@ -282,96 +186,41 @@ func (hmf *HelmMetadataForwarder) NeedLeaderElection() bool { return true } -// runWorkers spawns multiple worker goroutines to process items from the workqueue concurrently -func (hmf *HelmMetadataForwarder) runWorkers(ctx context.Context, numWorkers int) { - go func() { - <-ctx.Done() - hmf.logger.Info("Context cancelled, shutting down Helm metadata forwarder") - hmf.queue.ShutDown() - }() - - for i := range numWorkers { - go func(workerID int) { - // Recover from panics to prevent one worker crash from affecting others - defer utilruntime.HandleCrash() - - for { - key, shutdown := hmf.queue.Get() - if shutdown { - return - } - - // Process item with deferred cleanup - func() { - defer hmf.queue.Done(key) - - ctx, cancel := context.WithTimeout(context.Background(), DefaultOperationTimeout) - defer cancel() - - var err error - if after, ok := strings.CutPrefix(key, deletePrefix); ok { - hmf.handleDelete(after) - } else { - err = hmf.processKey(ctx, key) - } - - if err != nil { - hmf.queue.AddRateLimited(key) - } else { - hmf.queue.Forget(key) - } - }() - } - }(i) - } -} - -// processKey processes a single Helm release by its namespaced key -func (hmf *HelmMetadataForwarder) processKey(ctx context.Context, key string) error { - namespace, name, err := toolscache.SplitMetaNamespaceKey(key) - if err != nil { - return fmt.Errorf("invalid key format: %w", err) - } - - var lastErr error - - // Try to get as ConfigMap first (only if we have permission, to avoid lazily registering a ConfigMap informer) - if hmf.configMapAccessEnabled { +// processKey processes a single Helm release by its kind and namespaced key +func (hmf *HelmMetadataForwarder) processKey(ctx context.Context, kind, namespace, name string) error { + switch kind { + case "ConfigMap": cm := &corev1.ConfigMap{} - lastErr = hmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, cm) - if lastErr == nil && cm.Labels["owner"] == "helm" { - hmf.handleHelmResource(ctx, cm.Name, cm.Namespace, string(cm.UID), []byte(cm.Data["release"])) + if err := hmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, cm); err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get ConfigMap: %w", err) + } + if cm.Labels["owner"] != "helm" { return nil } - } - - // Try as Secret (only if we have permission, to avoid lazily registering a Secret informer) - if hmf.secretAccessEnabled { + hmf.handleHelmResource(ctx, cm.Name, cm.Namespace, string(cm.UID), []byte(cm.Data["release"])) + case "Secret": secret := &corev1.Secret{} - lastErr = hmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, secret) - if lastErr == nil && secret.Labels["owner"] == "helm" { - hmf.handleHelmResource(ctx, secret.Name, secret.Namespace, string(secret.UID), secret.Data["release"]) + if err := hmf.k8sClient.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, secret); err != nil { + if errors.IsNotFound(err) { + return nil + } + return fmt.Errorf("failed to get Secret: %w", err) + } + if secret.Labels["owner"] != "helm" { return nil } + hmf.handleHelmResource(ctx, secret.Name, secret.Namespace, string(secret.UID), secret.Data["release"]) + default: + hmf.logger.V(1).Info("Unknown kind in processKey", "kind", kind) } - - // If not found, likely a race condition with deletion - ignore it - if errors.IsNotFound(lastErr) { - return nil - } - - if lastErr != nil { - return fmt.Errorf("failed to get resource: %w", lastErr) - } - return nil } // handleDelete handles deletion of a Helm release -func (hmf *HelmMetadataForwarder) handleDelete(key string) { - namespace, name, _ := toolscache.SplitMetaNamespaceKey(key) - - // Parse the release name and revision from the resource name +func (hmf *HelmMetadataForwarder) handleDelete(_ /*kind*/ string, namespace, name string) { _, releaseName, revision, ok := hmf.parseHelmResource(name, nil) if !ok || releaseName == "" { return @@ -379,8 +228,6 @@ func (hmf *HelmMetadataForwarder) handleDelete(key string) { releaseKey := fmt.Sprintf("%s/%s", namespace, releaseName) - // Only delete if the snapshot is for this specific revision - // This prevents deleting a newer snapshot when Helm cleans up old revisions if existing, loaded := hmf.releaseSnapshots.Load(releaseKey); loaded { entry := existing.(*ReleaseEntry) entry.mu.Lock() @@ -389,11 +236,6 @@ func (hmf *HelmMetadataForwarder) handleDelete(key string) { if entry.snapshot != nil && entry.snapshot.Revision == revision { hmf.releaseSnapshots.Delete(releaseKey) hmf.logger.V(1).Info("Deleted release snapshot", "releaseKey", releaseKey, "revision", revision) - } else if entry.snapshot != nil { - hmf.logger.V(1).Info("Skipping delete - snapshot is for different revision", - "releaseKey", releaseKey, - "deletedRevision", revision, - "currentRevision", entry.snapshot.Revision) } } } @@ -505,59 +347,24 @@ func (hmf *HelmMetadataForwarder) snapshotToReleaseData(snapshot *ReleaseSnapsho } } -// tickerLoop runs the periodic ticker to send all snapshots -func (hmf *HelmMetadataForwarder) tickerLoop(ctx context.Context) { - ticker := time.NewTicker(tickerInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - hmf.sendAllSnapshots() - } - } -} - -// sendAllSnapshots sends all release snapshots -func (hmf *HelmMetadataForwarder) sendAllSnapshots() { - count := 0 - errors := 0 - +// heartbeat sends all release snapshots and is called by the runner on every tick. +func (hmf *HelmMetadataForwarder) heartbeat(ctx context.Context) { hmf.releaseSnapshots.Range(func(key, value any) bool { entry := value.(*ReleaseEntry) - entry.mu.Lock() snapshot := entry.snapshot entry.mu.Unlock() - // Skip if no snapshot exists yet if snapshot == nil { return true } - ctx, cancel := context.WithTimeout(context.Background(), DefaultOperationTimeout) - defer cancel() - releaseData := hmf.snapshotToReleaseData(snapshot) if err := hmf.sendSingleReleasePayload(ctx, releaseData); err != nil { - hmf.logger.V(1).Info("Failed to send snapshot", - "key", key, - "error", err) - errors++ - } else { - count++ + hmf.logger.V(1).Info("Failed to send snapshot during heartbeat", "key", key, "error", err) } - return true }) - - if count > 0 { - hmf.logger.V(2).Info("Ticker: sent Helm release snapshots", - "sent", count, - "errors", errors) - } } func (hmf *HelmMetadataForwarder) sendSingleReleasePayload(ctx context.Context, release HelmReleaseData) error { diff --git a/pkg/controller/utils/metadata/helm_metadata_test.go b/pkg/controller/utils/metadata/helm_metadata_test.go index 7b9ae20d67..d797e0c59c 100644 --- a/pkg/controller/utils/metadata/helm_metadata_test.go +++ b/pkg/controller/utils/metadata/helm_metadata_test.go @@ -58,28 +58,13 @@ func createValidReleaseData() ([]byte, error) { func Test_workqueueInitialization(t *testing.T) { hmf := createTestForwarder() - if hmf.queue == nil { - t.Fatal("Workqueue not initialized") + if hmf.runner == nil { + t.Fatal("runner not initialized") } - hmf.queue.Add("test/key") - if hmf.queue.Len() != 1 { - t.Errorf("Queue length = %d, want 1", hmf.queue.Len()) - } - - key, shutdown := hmf.queue.Get() - if shutdown { - t.Error("Queue should not be shutting down") - } - if key != "test/key" { - t.Errorf("Got key %v, want test/key", key) - } - - hmf.queue.Done(key) - hmf.queue.ShutDown() - - if !hmf.queue.ShuttingDown() { - t.Error("Queue should be marked as shutting down") + hmf.runner.Enqueue("ConfigMap", "test", "key") + if got := hmf.runner.QueueLen(); got != 1 { + t.Errorf("QueueLen = %d, want 1", got) } } diff --git a/pkg/controller/utils/metadata/informer_runner.go b/pkg/controller/utils/metadata/informer_runner.go new file mode 100644 index 0000000000..3cdebb5a1f --- /dev/null +++ b/pkg/controller/utils/metadata/informer_runner.go @@ -0,0 +1,320 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-present Datadog, Inc. + +package metadata + +import ( + "context" + "strings" + "time" + + "github.com/go-logr/logr" + authorizationv1 "k8s.io/api/authorization/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + toolscache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + // deletePrefix is prepended to queue keys to signal a deletion event. + deletePrefix = "delete:" + // keySeparator separates kind, namespace, and name in queue keys. + keySeparator = "/" +) + +// ProcessFunc is invoked for add/update events. The runner has already split +// the queue key into (kind, namespace, name) for the caller. +type ProcessFunc func(ctx context.Context, kind, namespace, name string) error + +// DeleteFunc is invoked for delete events. +type DeleteFunc func(kind, namespace, name string) + +// HeartbeatFunc is invoked on every heartbeat tick. nil disables the ticker. +type HeartbeatFunc func(ctx context.Context) + +// InformerWorkQueue is the shared engine for event-driven metadata forwarders. +// It owns a rate-limited workqueue, a worker pool that drains it, and an +// optional heartbeat ticker. Callers register informers via AddWatch and +// supply ProcessFunc / DeleteFunc / HeartbeatFunc callbacks. +type InformerWorkQueue struct { + logger logr.Logger + mgr manager.Manager + queue workqueue.TypedRateLimitingInterface[string] + numWorkers int + heartbeatInterval time.Duration + + processFn ProcessFunc + deleteFn DeleteFunc + heartbeatFn HeartbeatFunc +} + +// NewInformerWorkQueue constructs a runner. heartbeatInterval = 0 disables the ticker. +func NewInformerWorkQueue( + logger logr.Logger, + mgr manager.Manager, + numWorkers int, + heartbeatInterval time.Duration, + processFn ProcessFunc, + deleteFn DeleteFunc, + heartbeatFn HeartbeatFunc, +) *InformerWorkQueue { + return &InformerWorkQueue{ + logger: logger, + mgr: mgr, + queue: workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()), + numWorkers: numWorkers, + heartbeatInterval: heartbeatInterval, + processFn: processFn, + deleteFn: deleteFn, + heartbeatFn: heartbeatFn, + } +} + +// EncodeKey returns "//". +func EncodeKey(kind, namespace, name string) string { + return kind + keySeparator + namespace + keySeparator + name +} + +// decodeKey splits "//" into its parts. +// Returns ok=false if the key is malformed. +func decodeKey(key string) (kind, namespace, name string, ok bool) { + parts := strings.SplitN(key, keySeparator, 3) + if len(parts) != 3 { + return "", "", "", false + } + return parts[0], parts[1], parts[2], true +} + +// Run blocks until ctx is done. It spawns numWorkers goroutines that drain the +// queue, plus a heartbeat ticker if heartbeatInterval > 0 and heartbeatFn != nil. +func (r *InformerWorkQueue) Run(ctx context.Context) { + go func() { + <-ctx.Done() + r.logger.Info("Context cancelled, shutting down InformerWorkQueue") + r.queue.ShutDown() + }() + + for i := 0; i < r.numWorkers; i++ { + go r.runWorker() + } + + if r.heartbeatFn != nil && r.heartbeatInterval > 0 { + go r.runHeartbeat(ctx) + } + + <-ctx.Done() +} + +func (r *InformerWorkQueue) runWorker() { + defer utilruntime.HandleCrash() + for { + key, shutdown := r.queue.Get() + if shutdown { + return + } + r.dispatch(key) + } +} + +func (r *InformerWorkQueue) dispatch(key string) { + defer r.queue.Done(key) + + ctx, cancel := context.WithTimeout(context.Background(), DefaultOperationTimeout) + defer cancel() + + if rest, ok := strings.CutPrefix(key, deletePrefix); ok { + kind, ns, name, decoded := decodeKey(rest) + if !decoded { + r.logger.V(1).Info("Dropping malformed delete key", "key", key) + r.queue.Forget(key) + return + } + if r.deleteFn != nil { + r.deleteFn(kind, ns, name) + } + r.queue.Forget(key) + return + } + + kind, ns, name, decoded := decodeKey(key) + if !decoded { + r.logger.V(1).Info("Dropping malformed key", "key", key) + r.queue.Forget(key) + return + } + + if r.processFn == nil { + r.queue.Forget(key) + return + } + + if err := r.processFn(ctx, kind, ns, name); err != nil { + r.logger.V(1).Info("Error processing key, requeuing", "key", key, "error", err) + r.queue.AddRateLimited(key) + return + } + r.queue.Forget(key) +} + +func (r *InformerWorkQueue) runHeartbeat(ctx context.Context) { + ticker := time.NewTicker(r.heartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + tickCtx, cancel := context.WithTimeout(context.Background(), DefaultOperationTimeout) + r.heartbeatFn(tickCtx) + cancel() + } + } +} + +// Enqueue adds a key for an add/update event. +func (r *InformerWorkQueue) Enqueue(kind, namespace, name string) { + r.queue.Add(EncodeKey(kind, namespace, name)) +} + +// EnqueueDelete adds a key for a delete event. +func (r *InformerWorkQueue) EnqueueDelete(kind, namespace, name string) { + r.queue.Add(deletePrefix + EncodeKey(kind, namespace, name)) +} + +// WatchTarget describes one informer registration. +type WatchTarget struct { + // Object is a typed example of the resource to watch (e.g. &corev1.ConfigMap{}). + Object client.Object + // Group is the API group for RBAC checks (e.g. "datadoghq.com"). Empty string + // means the core/legacy API group ("" matches v1 core resources like ConfigMap). + Group string + // Resource is the lowercase plural resource name used for RBAC checks (e.g. "configmaps"). + Resource string + // Kind is the prefix used for queue keys produced by this watch (e.g. "ConfigMap"). + Kind string + // Filter, if non-nil, gates whether an event is enqueued. + Filter func(obj any) bool + // MetaKey extracts (namespace, name) from the object. If nil, controller-runtime's + // MetaNamespaceKeyFunc is used and the result split on "/". + MetaKey func(obj any) (namespace, name string, ok bool) +} + +// canListWatch verifies the operator has list+watch permission for the given group/resource. +// Empty group means the core API group. +func (r *InformerWorkQueue) canListWatch(ctx context.Context, group, resource string) bool { + for _, verb := range []string{"list", "watch"} { + sar := &authorizationv1.SelfSubjectAccessReview{ + Spec: authorizationv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Verb: verb, + Group: group, + Resource: resource, + }, + }, + } + if err := r.mgr.GetClient().Create(ctx, sar); err != nil { + r.logger.V(1).Info("Failed to check RBAC permission", + "group", group, "resource", resource, "verb", verb, "error", err) + return false + } + if !sar.Status.Allowed { + return false + } + } + return true +} + +// AddWatch registers an informer for the target. Returns true if RBAC and informer +// setup both succeeded; false (with a logged warning) otherwise. Failure is non-fatal: +// the caller may continue running with a subset of watches enabled. +func (r *InformerWorkQueue) AddWatch(ctx context.Context, target WatchTarget) bool { + if !r.canListWatch(ctx, target.Group, target.Resource) { + r.logger.Info("No permission to list/watch resource; informer will not be registered", + "group", target.Group, "resource", target.Resource, "kind", target.Kind) + return false + } + + informer, err := r.mgr.GetCache().GetInformer(ctx, target.Object) + if err != nil { + r.logger.Info("Unable to get informer; informer will not be registered", + "resource", target.Resource, "kind", target.Kind, "error", err) + return false + } + + keyOf := func(obj any) (string, string, bool) { + if target.MetaKey != nil { + return target.MetaKey(obj) + } + key, err := toolscache.MetaNamespaceKeyFunc(obj) + if err != nil { + return "", "", false + } + ns, name, ok := strings.Cut(key, "/") + if !ok { + // Cluster-scoped: namespace is empty. + return "", key, true + } + return ns, name, true + } + + deleteKeyOf := func(obj any) (string, string, bool) { + if target.MetaKey != nil { + return target.MetaKey(obj) + } + key, err := toolscache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return "", "", false + } + ns, name, ok := strings.Cut(key, "/") + if !ok { + return "", key, true + } + return ns, name, true + } + + handler := toolscache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + if ns, name, ok := keyOf(obj); ok { + r.Enqueue(target.Kind, ns, name) + } + }, + UpdateFunc: func(_, obj any) { + if ns, name, ok := keyOf(obj); ok { + r.Enqueue(target.Kind, ns, name) + } + }, + DeleteFunc: func(obj any) { + if ns, name, ok := deleteKeyOf(obj); ok { + r.EnqueueDelete(target.Kind, ns, name) + } + }, + } + + var err2 error + if target.Filter != nil { + _, err2 = informer.AddEventHandler(toolscache.FilteringResourceEventHandler{ + FilterFunc: target.Filter, + Handler: handler, + }) + } else { + _, err2 = informer.AddEventHandler(handler) + } + if err2 != nil { + r.logger.Info("Unable to add event handler; informer will not be registered", + "resource", target.Resource, "kind", target.Kind, "error", err2) + return false + } + + r.logger.V(1).Info("Registered informer", "kind", target.Kind, "group", target.Group, "resource", target.Resource) + return true +} + +// QueueLen exposes the queue length for tests and metrics. +func (r *InformerWorkQueue) QueueLen() int { + return r.queue.Len() +} diff --git a/pkg/controller/utils/metadata/informer_runner_test.go b/pkg/controller/utils/metadata/informer_runner_test.go new file mode 100644 index 0000000000..59f4754e90 --- /dev/null +++ b/pkg/controller/utils/metadata/informer_runner_test.go @@ -0,0 +1,181 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2026-present Datadog, Inc. + +package metadata + +import ( + "context" + "sync/atomic" + "testing" + "time" + + authorizationv1 "k8s.io/api/authorization/v1" + "k8s.io/apimachinery/pkg/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +func Test_EncodeDecodeKey(t *testing.T) { + key := EncodeKey("DatadogAgent", "datadog", "my-agent") + if key != "DatadogAgent/datadog/my-agent" { + t.Fatalf("EncodeKey = %q, want DatadogAgent/datadog/my-agent", key) + } + + kind, ns, name, ok := decodeKey(key) + if !ok || kind != "DatadogAgent" || ns != "datadog" || name != "my-agent" { + t.Fatalf("decodeKey(%q) = (%q, %q, %q, %v), want (DatadogAgent, datadog, my-agent, true)", + key, kind, ns, name, ok) + } + + if _, _, _, ok := decodeKey("malformed"); ok { + t.Errorf("decodeKey(\"malformed\") ok = true, want false") + } +} + +func Test_InformerWorkQueue_DispatchesAdd(t *testing.T) { + var ( + gotKind, gotNS, gotName string + called atomic.Int32 + ) + r := NewInformerWorkQueue( + zap.New(zap.UseDevMode(true)), + nil, + 1, + 0, + func(ctx context.Context, kind, ns, name string) error { + gotKind, gotNS, gotName = kind, ns, name + called.Add(1) + return nil + }, + nil, + nil, + ) + + go r.Run(t.Context()) + + r.Enqueue("DatadogAgent", "datadog", "my-agent") + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if called.Load() == 1 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if called.Load() != 1 { + t.Fatalf("processFn called %d times, want 1", called.Load()) + } + if gotKind != "DatadogAgent" || gotNS != "datadog" || gotName != "my-agent" { + t.Errorf("got (%q, %q, %q), want (DatadogAgent, datadog, my-agent)", gotKind, gotNS, gotName) + } +} + +func Test_InformerWorkQueue_DispatchesDelete(t *testing.T) { + var called atomic.Int32 + r := NewInformerWorkQueue( + zap.New(zap.UseDevMode(true)), + nil, + 1, + 0, + func(ctx context.Context, kind, ns, name string) error { return nil }, + func(kind, ns, name string) { + called.Add(1) + }, + nil, + ) + + go r.Run(t.Context()) + + r.EnqueueDelete("DatadogAgent", "datadog", "my-agent") + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if called.Load() == 1 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if called.Load() != 1 { + t.Fatalf("deleteFn called %d times, want 1", called.Load()) + } +} + +func Test_InformerWorkQueue_HeartbeatFires(t *testing.T) { + var called atomic.Int32 + r := NewInformerWorkQueue( + zap.New(zap.UseDevMode(true)), + nil, + 1, + 20*time.Millisecond, + func(ctx context.Context, kind, ns, name string) error { return nil }, + nil, + func(ctx context.Context) { called.Add(1) }, + ) + + go r.Run(t.Context()) + + deadline := time.Now().Add(500 * time.Millisecond) + for time.Now().Before(deadline) { + if called.Load() >= 2 { + break + } + time.Sleep(10 * time.Millisecond) + } + + if called.Load() < 2 { + t.Fatalf("heartbeatFn called %d times, want >= 2", called.Load()) + } +} + +// fakeManagerForRBAC returns a manager.Manager-shaped value whose GetClient() returns +// a fake client that always allows SARs. Only GetClient() is used by canListWatch. +type fakeManagerForRBAC struct { + manager.Manager + c client.Client +} + +func (f *fakeManagerForRBAC) GetClient() client.Client { return f.c } + +func newAllowingFakeManager(t *testing.T) *fakeManagerForRBAC { + t.Helper() + scheme := runtime.NewScheme() + if err := clientgoscheme.AddToScheme(scheme); err != nil { + t.Fatalf("AddToScheme: %v", err) + } + c := fake.NewClientBuilder(). + WithScheme(scheme). + WithInterceptorFuncs(interceptor.Funcs{ + Create: func(ctx context.Context, c client.WithWatch, obj client.Object, opts ...client.CreateOption) error { + if sar, ok := obj.(*authorizationv1.SelfSubjectAccessReview); ok { + sar.Status.Allowed = true + return nil + } + return c.Create(ctx, obj, opts...) + }, + }). + Build() + return &fakeManagerForRBAC{c: c} +} + +func Test_InformerWorkQueue_CanListWatch_Allowed(t *testing.T) { + r := NewInformerWorkQueue( + zap.New(zap.UseDevMode(true)), + newAllowingFakeManager(t), + 1, 0, nil, nil, nil, + ) + + if !r.canListWatch(context.Background(), "", "configmaps") { + t.Errorf("canListWatch(core, \"configmaps\") = false, want true") + } + if !r.canListWatch(context.Background(), "datadoghq.com", "datadogagents") { + t.Errorf("canListWatch(\"datadoghq.com\", \"datadogagents\") = false, want true") + } +}