From 3587907a5765ee035083bb47048c8083d8bb645f Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 14:43:03 +0530 Subject: [PATCH 1/5] test(dataflow): add Ginkgo/Gomega package tests Signed-off-by: Harsh --- .../dataflow/dataflow_controller_test.go | 203 +++++ .../v1alpha1/dataflow/error_paths_test.go | 335 +++++++ .../v1alpha1/dataflow/operations_test.go | 849 ++++++++++++++++++ .../v1alpha1/dataflow/suite_test.go | 36 + 4 files changed, 1423 insertions(+) create mode 100644 pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go create mode 100644 pkg/controllers/v1alpha1/dataflow/error_paths_test.go create mode 100644 pkg/controllers/v1alpha1/dataflow/operations_test.go create mode 100644 pkg/controllers/v1alpha1/dataflow/suite_test.go diff --git a/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go b/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go new file mode 100644 index 00000000000..6486db1ce86 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go @@ -0,0 +1,203 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// newTestDataFlowReconciler builds a DataFlowReconciler for unit tests. +func newTestDataFlowReconciler(s *runtime.Scheme, objs ...runtime.Object) *DataFlowReconciler { + if s == nil { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + } + fakeClient := fake.NewFakeClientWithScheme(s, objs...) + log := logf.Log.WithName("dataflow-test") + recorder := record.NewFakeRecorder(32) + return NewDataFlowReconciler(fakeClient, log, recorder, 30*time.Second) +} + +var _ = Describe("DataFlowReconciler", func() { + + Describe("ControllerName", func() { + It("should return the expected controller name", func() { + r := newTestDataFlowReconciler(nil) + Expect(r.ControllerName()).To(Equal("DataFlowReconciler")) + }) + }) + + Describe("NewDataFlowReconciler", func() { + It("should create a non-nil reconciler with correct fields", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + r := newTestDataFlowReconciler(s) + Expect(r).NotTo(BeNil()) + Expect(r.Client).NotTo(BeNil()) + Expect(r.Recorder).NotTo(BeNil()) + Expect(r.ResyncPeriod).To(Equal(30 * time.Second)) + }) + }) + + Describe("Reconcile", func() { + It("should return no error and no requeue when no operation objects exist for a given name", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + r := newTestDataFlowReconciler(s) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "missing", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + + It("should requeue when a DataLoad with RunAfter exists and preceding op is not complete", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "default"}, + Status: datav1alpha1.OperationStatus{ + Phase: "Executing", + }, + } + + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + r := newTestDataFlowReconciler(s, precedingLoad, waitingLoad) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(30 * time.Second)) + }) + + It("should not requeue when a DataLoad with RunAfter exists and preceding op is complete", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "default"}, + Status: datav1alpha1.OperationStatus{ + Phase: "Complete", + }, + } + + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + r := newTestDataFlowReconciler(s, precedingLoad, waitingLoad) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + }) +}) + +// DataFlowEnabled and setupWatches depend on discovery.GetFluidDiscovery() which uses a sync.Once +// singleton that requires a live cluster connection. We test the code paths that avoid touching +// the discovery singleton by temporarily substituting an empty reconcileKinds map so the loop body +// (which calls GetFluidDiscovery) is never entered. This covers the for-loop entry, toSetup +// construction, and return-false / return-bld paths without any cluster dependency. + +var _ = Describe("DataFlowEnabled with empty reconcileKinds", func() { + var saved map[string]client.Object + + BeforeEach(func() { + saved = reconcileKinds + reconcileKinds = map[string]client.Object{} + }) + + AfterEach(func() { + reconcileKinds = saved + }) + + It("should return false when no resource kinds are registered", func() { + Expect(DataFlowEnabled()).To(BeFalse()) + }) +}) + +var _ = Describe("setupWatches with empty reconcileKinds", func() { + var saved map[string]client.Object + + BeforeEach(func() { + saved = reconcileKinds + reconcileKinds = map[string]client.Object{} + }) + + AfterEach(func() { + reconcileKinds = saved + }) + + It("should return the builder unchanged when no resource kinds are registered", func() { + // With an empty reconcileKinds the toSetup slice stays empty, neither bld.For nor + // bld.Watches is called, and the function returns bld as-is. We pass nil as bld to + // keep the test self-contained without requiring a real controller-runtime Builder. + result := setupWatches(nil, nil, builder.Predicates{}) + Expect(result).To(BeNil()) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go new file mode 100644 index 00000000000..80766926484 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go @@ -0,0 +1,335 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// newGetErrorClient returns a client that fails every Get with a generic error. +func newGetErrorClient(s *runtime.Scheme) client.Client { + injectErr := fmt.Errorf("server unavailable") + fakeBase := fakeclient.NewClientBuilder().WithScheme(s).Build() + return interceptor.NewClient(fakeBase, interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + return injectErr + }, + }) +} + +var _ = Describe("Reconcile error path: Get failure propagates to error return", func() { + + var s *runtime.Scheme + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return an error when the underlying client Get fails", func() { + log := logf.Log.WithName("dataflow-error-test") + recorder := record.NewFakeRecorder(32) + r := NewDataFlowReconciler(newGetErrorClient(s), log, recorder, 30*time.Second) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("server unavailable")) + // RequeueIfError returns ctrl.Result{} + the error + Expect(result).To(Equal(ctrl.Result{})) + }) +}) + +var _ = Describe("reconcileDataLoad: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataLoad", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get dataload")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileDataBackup: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataBackup", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get databackup")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileDataMigrate: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataMigrate", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get datamigrate")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileDataProcess: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataProcess", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get")) + Expect(err.Error()).To(ContainSubstring("server unavailable")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileOperationDataFlow: updateStatusFn inner Get NotFound path", func() { + // Verify that when the inner updateStatusFn re-fetches the object and gets NotFound, + // reconcileDataLoad returns (false, nil) — i.e., the not-found is treated as a no-op. + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should succeed (no error, no requeue) when updateStatusFn inner Get returns NotFound", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: "Complete", + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + // Use a call-counting interceptor: + // calls 1–2: outer reconcileDataLoad fetches "waiting" and "preceding" → pass through + // call 3+: inside updateStatusFn fetch → return a custom not-found error + callCount := 0 + fakeBase := fakeclient.NewClientBuilder().WithScheme(s). + WithRuntimeObjects(precedingLoad, waitingLoad). + WithStatusSubresource(waitingLoad). + Build() + + countClient := interceptor.NewClient(fakeBase, interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + callCount++ + if callCount >= 3 && key.Name == "waiting" { + // Wrap a notFound to exercise the IgnoreNotFound branch in updateStatusFn + return &statusNotFoundError{name: key.Name} + } + return c.Get(ctx, key, obj, opts...) + }, + }) + + rCtx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "waiting", Namespace: namespace}, + Client: countClient, + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataLoad(rCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) +}) + +// statusNotFoundError satisfies the k8s.io/apimachinery/pkg/api/errors IsNotFound check. +// utils.IgnoreNotFound uses apierrors.IsNotFound which checks for Reason == StatusReasonNotFound. +type statusNotFoundError struct{ name string } + +func (e *statusNotFoundError) Error() string { return fmt.Sprintf("%q not found", e.name) } +func (e *statusNotFoundError) Status() metav1.Status { + return metav1.Status{Reason: metav1.StatusReasonNotFound} +} + +var _ = Describe("reconcileDataBackup: updateStatusFn inner Get NotFound path", func() { + // Verify that when updateStatusFn re-fetches the DataBackup and gets NotFound, + // reconcileDataBackup returns (false, nil) — the not-found is treated as a no-op. + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should succeed when updateStatusFn inner Get returns NotFound", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "load-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: "Complete", + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "load-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + callCount := 0 + fakeBase := fakeclient.NewClientBuilder().WithScheme(s). + WithRuntimeObjects(precedingLoad, waitingBackup). + WithStatusSubresource(waitingBackup). + Build() + + countClient := interceptor.NewClient(fakeBase, interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + callCount++ + if callCount >= 3 && key.Name == "backup-1" { + return &statusNotFoundError{name: key.Name} + } + return c.Get(ctx, key, obj, opts...) + }, + }) + + rCtx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "backup-1", Namespace: namespace}, + Client: countClient, + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataBackup(rCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataflow/operations_test.go b/pkg/controllers/v1alpha1/dataflow/operations_test.go new file mode 100644 index 00000000000..f8cf6668173 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/operations_test.go @@ -0,0 +1,849 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// makeTestCtx creates a reconcileRequestContext with a fake client seeded with objs. +func makeTestCtx(s *runtime.Scheme, name, namespace string, objs ...runtime.Object) reconcileRequestContext { + if s == nil { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + } + return reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, objs...), + Log: logf.Log.WithName("dataflow-ops-test"), + Recorder: record.NewFakeRecorder(32), + } +} + +var _ = Describe("reconcileOperationDataFlow", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the preceding operation is not found", func() { + It("should record a warning event and request requeue", func() { + // Build a DataLoad as the waiting object; preceding DataLoad does not exist. + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "waiting", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingLoad), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + // Expect a warning event to be recorded. + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) + + Context("when the preceding operation is not yet complete", func() { + It("should record a normal waiting event and request requeue", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseExecuting, + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "waiting", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, precedingLoad, waitingLoad), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationWaiting))) + }) + }) + + Context("when the preceding operation is complete", func() { + It("should clear WaitingFor.OperationComplete and not requeue", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, precedingLoad, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataLoad{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "waiting", Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) + + Context("when the waiting DataLoad is not found", func() { + It("should skip reconciling and not requeue", func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataBackup", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the waiting DataBackup is not found", func() { + It("should skip reconciling and not requeue", func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) + + Context("when preceding operation is complete", func() { + It("should clear WaitingFor.OperationComplete and not requeue", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "load-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "load-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "backup-1", namespace, precedingLoad, waitingBackup) + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataBackup{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "backup-1", Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) + + Context("when preceding operation is not complete", func() { + It("should request requeue and record a waiting event", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "load-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "load-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "backup-1", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, precedingLoad, waitingBackup), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationWaiting))) + }) + }) +}) + +var _ = Describe("reconcileDataMigrate", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the waiting DataMigrate is not found", func() { + It("should skip reconciling and not requeue", func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) + + Context("when preceding operation is complete", func() { + It("should clear WaitingFor.OperationComplete and not requeue", func() { + precedingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataBackup", + Name: "backup-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "migrate-1", namespace, precedingBackup, waitingMigrate) + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataMigrate{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "migrate-1", Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataProcess", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the waiting DataProcess is not found", func() { + It("should skip reconciling and not requeue", func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) + + Context("when preceding operation is complete", func() { + It("should clear WaitingFor.OperationComplete and not requeue", func() { + precedingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataMigrate", + Name: "migrate-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "process-1", namespace, precedingMigrate, waitingProcess) + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataProcess{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "process-1", Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) + + Context("when preceding operation is not complete", func() { + It("should request requeue and record a waiting event", func() { + precedingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseExecuting, + }, + } + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataMigrate", + Name: "migrate-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "process-1", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, precedingMigrate, waitingProcess), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationWaiting))) + }) + }) +}) + +var _ = Describe("reconcileOperationDataFlow with cross-namespace preceding op", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when RunAfter specifies a different namespace", func() { + It("should look up the preceding op in the specified namespace and not requeue when complete", func() { + // Preceding load in a different namespace. + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "other-ns"}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + Namespace: "other-ns", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, precedingLoad, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileOperationDataFlow with unsupported RunAfter kind", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when RunAfter.Kind is not supported", func() { + It("should return an error and request requeue", func() { + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "UnknownKind", + Name: "some-op", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get preceding operation status")) + Expect(needRequeue).To(BeTrue()) + }) + }) +}) + +var _ = Describe("reconcileDataLoad updateStatusFn no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when WaitingFor.OperationComplete is already false when updateStatusFn runs", func() { + It("should succeed without calling Status().Update (reflect.DeepEqual skips update)", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + // OperationComplete is already false — updateStatusFn will find no delta and skip the update. + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, precedingLoad, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataBackup no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when WaitingFor.OperationComplete is already false", func() { + It("should succeed without calling Status().Update", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, "backup-1", namespace, precedingLoad, waitingBackup) + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataMigrate no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when WaitingFor.OperationComplete is already false", func() { + It("should succeed without calling Status().Update", func() { + precedingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataBackup", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, "migrate-1", namespace, precedingBackup, waitingMigrate) + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataProcess no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when WaitingFor.OperationComplete is already false", func() { + It("should succeed without calling Status().Update", func() { + precedingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataMigrate", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, "process-1", namespace, precedingMigrate, waitingProcess) + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("reconcileDataMigrate with preceding op not found", func() { + It("should record a warning and requeue", func() { + waitingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "migrate-1", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingMigrate), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) + + Context("reconcileDataProcess with preceding op not found", func() { + It("should record a warning and requeue", func() { + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataBackup", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "process-1", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingProcess), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) + + Context("reconcileDataBackup with preceding op not found", func() { + It("should record a warning and requeue", func() { + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "backup-1", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingBackup), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataflow/suite_test.go b/pkg/controllers/v1alpha1/dataflow/suite_test.go new file mode 100644 index 00000000000..9cd76caf20e --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/suite_test.go @@ -0,0 +1,36 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "DataFlow Controller Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(fake.NullLogger()) +}) From 3776279ea97181bec6dba34880536ba57aa2eef9 Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 14:54:06 +0530 Subject: [PATCH 2/5] fix(dataflow): cover DataProcess error reporting Signed-off-by: Harsh --- .../v1alpha1/dataflow/dataflow_controller_test.go | 5 +++-- pkg/controllers/v1alpha1/dataflow/error_paths_test.go | 2 +- pkg/controllers/v1alpha1/dataflow/operations.go | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go b/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go index 6486db1ce86..19fcffbacbc 100644 --- a/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go +++ b/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -89,7 +90,7 @@ var _ = Describe("DataFlowReconciler", func() { precedingLoad := &datav1alpha1.DataLoad{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "default"}, Status: datav1alpha1.OperationStatus{ - Phase: "Executing", + Phase: common.PhaseExecuting, }, } @@ -126,7 +127,7 @@ var _ = Describe("DataFlowReconciler", func() { precedingLoad := &datav1alpha1.DataLoad{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "default"}, Status: datav1alpha1.OperationStatus{ - Phase: "Complete", + Phase: common.PhaseComplete, }, } diff --git a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go index 80766926484..d8675650d7f 100644 --- a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go +++ b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go @@ -179,7 +179,7 @@ var _ = Describe("reconcileDataProcess: outer Get failure", func() { needRequeue, err := reconcileDataProcess(ctx) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to get")) + Expect(err.Error()).To(ContainSubstring("failed to get dataprocess")) Expect(err.Error()).To(ContainSubstring("server unavailable")) Expect(needRequeue).To(BeTrue()) }) diff --git a/pkg/controllers/v1alpha1/dataflow/operations.go b/pkg/controllers/v1alpha1/dataflow/operations.go index 910a381d0e3..47eadbc578e 100644 --- a/pkg/controllers/v1alpha1/dataflow/operations.go +++ b/pkg/controllers/v1alpha1/dataflow/operations.go @@ -127,10 +127,10 @@ func reconcileDataProcess(ctx reconcileRequestContext) (needRequeue bool, err er dataProcess, err := utils.GetDataProcess(ctx.Client, ctx.Name, ctx.Namespace) if err != nil { if utils.IgnoreNotFound(err) == nil { - ctx.Log.V(1).Info("DataMigrate not found, skip reconciling") + ctx.Log.V(1).Info("DataProcess not found, skip reconciling") return false, nil } - return true, errors.Wrap(err, "failed to get datamigrate") + return true, errors.Wrap(err, "failed to get dataprocess") } updateStatusFn := func() error { From 5750b51260c24f80aba7fbec62f784f665516ecc Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 17:30:45 +0530 Subject: [PATCH 3/5] test(dataflow): deduplicate repeated test literals Signed-off-by: Harsh --- .../v1alpha1/dataflow/error_paths_test.go | 8 +- .../v1alpha1/dataflow/operations_test.go | 115 ++++++++++-------- 2 files changed, 69 insertions(+), 54 deletions(-) diff --git a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go index d8675650d7f..dd08513c890 100644 --- a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go +++ b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go @@ -37,9 +37,11 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" ) +const serverUnavailable = "server unavailable" + // newGetErrorClient returns a client that fails every Get with a generic error. func newGetErrorClient(s *runtime.Scheme) client.Client { - injectErr := fmt.Errorf("server unavailable") + injectErr := fmt.Errorf(serverUnavailable) fakeBase := fakeclient.NewClientBuilder().WithScheme(s).Build() return interceptor.NewClient(fakeBase, interceptor.Funcs{ Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { @@ -66,7 +68,7 @@ var _ = Describe("Reconcile error path: Get failure propagates to error return", } result, err := r.Reconcile(context.TODO(), req) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("server unavailable")) + Expect(err.Error()).To(ContainSubstring(serverUnavailable)) // RequeueIfError returns ctrl.Result{} + the error Expect(result).To(Equal(ctrl.Result{})) }) @@ -180,7 +182,7 @@ var _ = Describe("reconcileDataProcess: outer Get failure", func() { needRequeue, err := reconcileDataProcess(ctx) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("failed to get dataprocess")) - Expect(err.Error()).To(ContainSubstring("server unavailable")) + Expect(err.Error()).To(ContainSubstring(serverUnavailable)) Expect(needRequeue).To(BeTrue()) }) }) diff --git a/pkg/controllers/v1alpha1/dataflow/operations_test.go b/pkg/controllers/v1alpha1/dataflow/operations_test.go index f8cf6668173..54a3441c8fd 100644 --- a/pkg/controllers/v1alpha1/dataflow/operations_test.go +++ b/pkg/controllers/v1alpha1/dataflow/operations_test.go @@ -33,6 +33,19 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" ) +const ( + backupName = "backup-1" + loadName = "load-1" + migrateName = "migrate-1" + processName = "process-1" + + contextWhenPrecedingOperationIsComplete = "when preceding operation is complete" + contextWhenWaitingForOperationCompleteIsAlreadyFalse = "when WaitingFor.OperationComplete is already false" + itShouldClearWaitingForOperationCompleteAndNotRequeue = "should clear WaitingFor.OperationComplete and not requeue" + itShouldRecordWarningAndRequeue = "should record a warning and requeue" + itShouldSkipReconcilingAndNotRequeue = "should skip reconciling and not requeue" +) + // makeTestCtx creates a reconcileRequestContext with a fake client seeded with objs. func makeTestCtx(s *runtime.Scheme, name, namespace string, objs ...runtime.Object) reconcileRequestContext { if s == nil { @@ -179,7 +192,7 @@ var _ = Describe("reconcileOperationDataFlow", func() { }) Context("when the waiting DataLoad is not found", func() { - It("should skip reconciling and not requeue", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { ctx := makeTestCtx(s, "missing", namespace) needRequeue, err := reconcileDataLoad(ctx) @@ -202,7 +215,7 @@ var _ = Describe("reconcileDataBackup", func() { }) Context("when the waiting DataBackup is not found", func() { - It("should skip reconciling and not requeue", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { ctx := makeTestCtx(s, "missing", namespace) needRequeue, err := reconcileDataBackup(ctx) @@ -211,21 +224,21 @@ var _ = Describe("reconcileDataBackup", func() { }) }) - Context("when preceding operation is complete", func() { - It("should clear WaitingFor.OperationComplete and not requeue", func() { + Context(contextWhenPrecedingOperationIsComplete, func() { + It(itShouldClearWaitingForOperationCompleteAndNotRequeue, func() { precedingLoad := &datav1alpha1.DataLoad{ - ObjectMeta: metav1.ObjectMeta{Name: "load-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: loadName, Namespace: namespace}, Status: datav1alpha1.OperationStatus{ Phase: common.PhaseComplete, }, } waitingBackup := &datav1alpha1.DataBackup{ - ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, Spec: datav1alpha1.DataBackupSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ Kind: "DataLoad", - Name: "load-1", + Name: loadName, }, }, }, @@ -236,14 +249,14 @@ var _ = Describe("reconcileDataBackup", func() { }, } - ctx := makeTestCtx(s, "backup-1", namespace, precedingLoad, waitingBackup) + ctx := makeTestCtx(s, backupName, namespace, precedingLoad, waitingBackup) needRequeue, err := reconcileDataBackup(ctx) Expect(err).NotTo(HaveOccurred()) Expect(needRequeue).To(BeFalse()) updated := &datav1alpha1.DataBackup{} - Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "backup-1", Namespace: namespace}, updated)).To(Succeed()) + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: backupName, Namespace: namespace}, updated)).To(Succeed()) Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) }) @@ -252,18 +265,18 @@ var _ = Describe("reconcileDataBackup", func() { Context("when preceding operation is not complete", func() { It("should request requeue and record a waiting event", func() { precedingLoad := &datav1alpha1.DataLoad{ - ObjectMeta: metav1.ObjectMeta{Name: "load-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: loadName, Namespace: namespace}, Status: datav1alpha1.OperationStatus{ Phase: common.PhasePending, }, } waitingBackup := &datav1alpha1.DataBackup{ - ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, Spec: datav1alpha1.DataBackupSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ Kind: "DataLoad", - Name: "load-1", + Name: loadName, }, }, }, @@ -277,7 +290,7 @@ var _ = Describe("reconcileDataBackup", func() { recorder := record.NewFakeRecorder(32) ctx := reconcileRequestContext{ Context: context.TODO(), - NamespacedName: types.NamespacedName{Name: "backup-1", Namespace: namespace}, + NamespacedName: types.NamespacedName{Name: backupName, Namespace: namespace}, Client: fake.NewFakeClientWithScheme(s, precedingLoad, waitingBackup), Log: logf.Log.WithName("test"), Recorder: recorder, @@ -305,7 +318,7 @@ var _ = Describe("reconcileDataMigrate", func() { }) Context("when the waiting DataMigrate is not found", func() { - It("should skip reconciling and not requeue", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { ctx := makeTestCtx(s, "missing", namespace) needRequeue, err := reconcileDataMigrate(ctx) @@ -314,21 +327,21 @@ var _ = Describe("reconcileDataMigrate", func() { }) }) - Context("when preceding operation is complete", func() { - It("should clear WaitingFor.OperationComplete and not requeue", func() { + Context(contextWhenPrecedingOperationIsComplete, func() { + It(itShouldClearWaitingForOperationCompleteAndNotRequeue, func() { precedingBackup := &datav1alpha1.DataBackup{ - ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, Status: datav1alpha1.OperationStatus{ Phase: common.PhaseComplete, }, } waitingMigrate := &datav1alpha1.DataMigrate{ - ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, Spec: datav1alpha1.DataMigrateSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ Kind: "DataBackup", - Name: "backup-1", + Name: backupName, }, }, }, @@ -339,14 +352,14 @@ var _ = Describe("reconcileDataMigrate", func() { }, } - ctx := makeTestCtx(s, "migrate-1", namespace, precedingBackup, waitingMigrate) + ctx := makeTestCtx(s, migrateName, namespace, precedingBackup, waitingMigrate) needRequeue, err := reconcileDataMigrate(ctx) Expect(err).NotTo(HaveOccurred()) Expect(needRequeue).To(BeFalse()) updated := &datav1alpha1.DataMigrate{} - Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "migrate-1", Namespace: namespace}, updated)).To(Succeed()) + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: migrateName, Namespace: namespace}, updated)).To(Succeed()) Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) }) @@ -366,7 +379,7 @@ var _ = Describe("reconcileDataProcess", func() { }) Context("when the waiting DataProcess is not found", func() { - It("should skip reconciling and not requeue", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { ctx := makeTestCtx(s, "missing", namespace) needRequeue, err := reconcileDataProcess(ctx) @@ -375,21 +388,21 @@ var _ = Describe("reconcileDataProcess", func() { }) }) - Context("when preceding operation is complete", func() { - It("should clear WaitingFor.OperationComplete and not requeue", func() { + Context(contextWhenPrecedingOperationIsComplete, func() { + It(itShouldClearWaitingForOperationCompleteAndNotRequeue, func() { precedingMigrate := &datav1alpha1.DataMigrate{ - ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, Status: datav1alpha1.OperationStatus{ Phase: common.PhaseComplete, }, } waitingProcess := &datav1alpha1.DataProcess{ - ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, Spec: datav1alpha1.DataProcessSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ Kind: "DataMigrate", - Name: "migrate-1", + Name: migrateName, }, }, }, @@ -400,14 +413,14 @@ var _ = Describe("reconcileDataProcess", func() { }, } - ctx := makeTestCtx(s, "process-1", namespace, precedingMigrate, waitingProcess) + ctx := makeTestCtx(s, processName, namespace, precedingMigrate, waitingProcess) needRequeue, err := reconcileDataProcess(ctx) Expect(err).NotTo(HaveOccurred()) Expect(needRequeue).To(BeFalse()) updated := &datav1alpha1.DataProcess{} - Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "process-1", Namespace: namespace}, updated)).To(Succeed()) + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: processName, Namespace: namespace}, updated)).To(Succeed()) Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) }) @@ -416,18 +429,18 @@ var _ = Describe("reconcileDataProcess", func() { Context("when preceding operation is not complete", func() { It("should request requeue and record a waiting event", func() { precedingMigrate := &datav1alpha1.DataMigrate{ - ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, Status: datav1alpha1.OperationStatus{ Phase: common.PhaseExecuting, }, } waitingProcess := &datav1alpha1.DataProcess{ - ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, Spec: datav1alpha1.DataProcessSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ Kind: "DataMigrate", - Name: "migrate-1", + Name: migrateName, }, }, }, @@ -441,7 +454,7 @@ var _ = Describe("reconcileDataProcess", func() { recorder := record.NewFakeRecorder(32) ctx := reconcileRequestContext{ Context: context.TODO(), - NamespacedName: types.NamespacedName{Name: "process-1", Namespace: namespace}, + NamespacedName: types.NamespacedName{Name: processName, Namespace: namespace}, Client: fake.NewFakeClientWithScheme(s, precedingMigrate, waitingProcess), Log: logf.Log.WithName("test"), Recorder: recorder, @@ -604,7 +617,7 @@ var _ = Describe("reconcileDataBackup no-op path", func() { _ = datav1alpha1.AddToScheme(s) }) - Context("when WaitingFor.OperationComplete is already false", func() { + Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { It("should succeed without calling Status().Update", func() { precedingLoad := &datav1alpha1.DataLoad{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, @@ -613,7 +626,7 @@ var _ = Describe("reconcileDataBackup no-op path", func() { }, } waitingBackup := &datav1alpha1.DataBackup{ - ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, Spec: datav1alpha1.DataBackupSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -629,7 +642,7 @@ var _ = Describe("reconcileDataBackup no-op path", func() { }, } - ctx := makeTestCtx(s, "backup-1", namespace, precedingLoad, waitingBackup) + ctx := makeTestCtx(s, backupName, namespace, precedingLoad, waitingBackup) needRequeue, err := reconcileDataBackup(ctx) Expect(err).NotTo(HaveOccurred()) @@ -650,7 +663,7 @@ var _ = Describe("reconcileDataMigrate no-op path", func() { _ = datav1alpha1.AddToScheme(s) }) - Context("when WaitingFor.OperationComplete is already false", func() { + Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { It("should succeed without calling Status().Update", func() { precedingBackup := &datav1alpha1.DataBackup{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, @@ -659,7 +672,7 @@ var _ = Describe("reconcileDataMigrate no-op path", func() { }, } waitingMigrate := &datav1alpha1.DataMigrate{ - ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, Spec: datav1alpha1.DataMigrateSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -675,7 +688,7 @@ var _ = Describe("reconcileDataMigrate no-op path", func() { }, } - ctx := makeTestCtx(s, "migrate-1", namespace, precedingBackup, waitingMigrate) + ctx := makeTestCtx(s, migrateName, namespace, precedingBackup, waitingMigrate) needRequeue, err := reconcileDataMigrate(ctx) Expect(err).NotTo(HaveOccurred()) @@ -696,7 +709,7 @@ var _ = Describe("reconcileDataProcess no-op path", func() { _ = datav1alpha1.AddToScheme(s) }) - Context("when WaitingFor.OperationComplete is already false", func() { + Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { It("should succeed without calling Status().Update", func() { precedingMigrate := &datav1alpha1.DataMigrate{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, @@ -705,7 +718,7 @@ var _ = Describe("reconcileDataProcess no-op path", func() { }, } waitingProcess := &datav1alpha1.DataProcess{ - ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, Spec: datav1alpha1.DataProcessSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -721,7 +734,7 @@ var _ = Describe("reconcileDataProcess no-op path", func() { }, } - ctx := makeTestCtx(s, "process-1", namespace, precedingMigrate, waitingProcess) + ctx := makeTestCtx(s, processName, namespace, precedingMigrate, waitingProcess) needRequeue, err := reconcileDataProcess(ctx) Expect(err).NotTo(HaveOccurred()) @@ -743,9 +756,9 @@ var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events }) Context("reconcileDataMigrate with preceding op not found", func() { - It("should record a warning and requeue", func() { + It(itShouldRecordWarningAndRequeue, func() { waitingMigrate := &datav1alpha1.DataMigrate{ - ObjectMeta: metav1.ObjectMeta{Name: "migrate-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, Spec: datav1alpha1.DataMigrateSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -764,7 +777,7 @@ var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events recorder := record.NewFakeRecorder(32) ctx := reconcileRequestContext{ Context: context.TODO(), - NamespacedName: types.NamespacedName{Name: "migrate-1", Namespace: namespace}, + NamespacedName: types.NamespacedName{Name: migrateName, Namespace: namespace}, Client: fake.NewFakeClientWithScheme(s, waitingMigrate), Log: logf.Log.WithName("test"), Recorder: recorder, @@ -778,9 +791,9 @@ var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events }) Context("reconcileDataProcess with preceding op not found", func() { - It("should record a warning and requeue", func() { + It(itShouldRecordWarningAndRequeue, func() { waitingProcess := &datav1alpha1.DataProcess{ - ObjectMeta: metav1.ObjectMeta{Name: "process-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, Spec: datav1alpha1.DataProcessSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -799,7 +812,7 @@ var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events recorder := record.NewFakeRecorder(32) ctx := reconcileRequestContext{ Context: context.TODO(), - NamespacedName: types.NamespacedName{Name: "process-1", Namespace: namespace}, + NamespacedName: types.NamespacedName{Name: processName, Namespace: namespace}, Client: fake.NewFakeClientWithScheme(s, waitingProcess), Log: logf.Log.WithName("test"), Recorder: recorder, @@ -813,9 +826,9 @@ var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events }) Context("reconcileDataBackup with preceding op not found", func() { - It("should record a warning and requeue", func() { + It(itShouldRecordWarningAndRequeue, func() { waitingBackup := &datav1alpha1.DataBackup{ - ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, Spec: datav1alpha1.DataBackupSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -834,7 +847,7 @@ var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events recorder := record.NewFakeRecorder(32) ctx := reconcileRequestContext{ Context: context.TODO(), - NamespacedName: types.NamespacedName{Name: "backup-1", Namespace: namespace}, + NamespacedName: types.NamespacedName{Name: backupName, Namespace: namespace}, Client: fake.NewFakeClientWithScheme(s, waitingBackup), Log: logf.Log.WithName("test"), Recorder: recorder, From 6b8ddb5cf29df63ab8c40edacbf3c971c19e74a2 Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 17:57:33 +0530 Subject: [PATCH 4/5] test(dataflow): deduplicate remaining test literals Signed-off-by: Harsh --- pkg/controllers/v1alpha1/dataflow/error_paths_test.go | 6 +++--- pkg/controllers/v1alpha1/dataflow/operations_test.go | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go index dd08513c890..2a2056ff4ac 100644 --- a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go +++ b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go @@ -290,7 +290,7 @@ var _ = Describe("reconcileDataBackup: updateStatusFn inner Get NotFound path", }, } waitingBackup := &datav1alpha1.DataBackup{ - ObjectMeta: metav1.ObjectMeta{Name: "backup-1", Namespace: namespace}, + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, Spec: datav1alpha1.DataBackupSpec{ RunAfter: &datav1alpha1.OperationRef{ ObjectRef: datav1alpha1.ObjectRef{ @@ -315,7 +315,7 @@ var _ = Describe("reconcileDataBackup: updateStatusFn inner Get NotFound path", countClient := interceptor.NewClient(fakeBase, interceptor.Funcs{ Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { callCount++ - if callCount >= 3 && key.Name == "backup-1" { + if callCount >= 3 && key.Name == backupName { return &statusNotFoundError{name: key.Name} } return c.Get(ctx, key, obj, opts...) @@ -324,7 +324,7 @@ var _ = Describe("reconcileDataBackup: updateStatusFn inner Get NotFound path", rCtx := reconcileRequestContext{ Context: context.TODO(), - NamespacedName: types.NamespacedName{Name: "backup-1", Namespace: namespace}, + NamespacedName: types.NamespacedName{Name: backupName, Namespace: namespace}, Client: countClient, Log: logf.Log.WithName("test"), Recorder: record.NewFakeRecorder(32), diff --git a/pkg/controllers/v1alpha1/dataflow/operations_test.go b/pkg/controllers/v1alpha1/dataflow/operations_test.go index 54a3441c8fd..e55d70c11db 100644 --- a/pkg/controllers/v1alpha1/dataflow/operations_test.go +++ b/pkg/controllers/v1alpha1/dataflow/operations_test.go @@ -44,6 +44,7 @@ const ( itShouldClearWaitingForOperationCompleteAndNotRequeue = "should clear WaitingFor.OperationComplete and not requeue" itShouldRecordWarningAndRequeue = "should record a warning and requeue" itShouldSkipReconcilingAndNotRequeue = "should skip reconciling and not requeue" + itShouldSucceedWithoutCallingStatusUpdate = "should succeed without calling Status().Update" ) // makeTestCtx creates a reconcileRequestContext with a fake client seeded with objs. @@ -618,7 +619,7 @@ var _ = Describe("reconcileDataBackup no-op path", func() { }) Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { - It("should succeed without calling Status().Update", func() { + It(itShouldSucceedWithoutCallingStatusUpdate, func() { precedingLoad := &datav1alpha1.DataLoad{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, Status: datav1alpha1.OperationStatus{ @@ -664,7 +665,7 @@ var _ = Describe("reconcileDataMigrate no-op path", func() { }) Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { - It("should succeed without calling Status().Update", func() { + It(itShouldSucceedWithoutCallingStatusUpdate, func() { precedingBackup := &datav1alpha1.DataBackup{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, Status: datav1alpha1.OperationStatus{ @@ -710,7 +711,7 @@ var _ = Describe("reconcileDataProcess no-op path", func() { }) Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { - It("should succeed without calling Status().Update", func() { + It(itShouldSucceedWithoutCallingStatusUpdate, func() { precedingMigrate := &datav1alpha1.DataMigrate{ ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, Status: datav1alpha1.OperationStatus{ From 0aec24848a82b8bba3c78e91e610b6d9b3b306e5 Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 19:19:14 +0530 Subject: [PATCH 5/5] test(dataflow): fix staticcheck error helper Signed-off-by: Harsh --- pkg/controllers/v1alpha1/dataflow/error_paths_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go index 2a2056ff4ac..f5f9f7dfd8e 100644 --- a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go +++ b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go @@ -18,6 +18,7 @@ package dataflow import ( "context" + "errors" "fmt" "time" @@ -41,7 +42,7 @@ const serverUnavailable = "server unavailable" // newGetErrorClient returns a client that fails every Get with a generic error. func newGetErrorClient(s *runtime.Scheme) client.Client { - injectErr := fmt.Errorf(serverUnavailable) + injectErr := errors.New(serverUnavailable) fakeBase := fakeclient.NewClientBuilder().WithScheme(s).Build() return interceptor.NewClient(fakeBase, interceptor.Funcs{ Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {