diff --git a/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go b/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go new file mode 100644 index 00000000000..19fcffbacbc --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/dataflow_controller_test.go @@ -0,0 +1,204 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// newTestDataFlowReconciler builds a DataFlowReconciler for unit tests. +func newTestDataFlowReconciler(s *runtime.Scheme, objs ...runtime.Object) *DataFlowReconciler { + if s == nil { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + } + fakeClient := fake.NewFakeClientWithScheme(s, objs...) + log := logf.Log.WithName("dataflow-test") + recorder := record.NewFakeRecorder(32) + return NewDataFlowReconciler(fakeClient, log, recorder, 30*time.Second) +} + +var _ = Describe("DataFlowReconciler", func() { + + Describe("ControllerName", func() { + It("should return the expected controller name", func() { + r := newTestDataFlowReconciler(nil) + Expect(r.ControllerName()).To(Equal("DataFlowReconciler")) + }) + }) + + Describe("NewDataFlowReconciler", func() { + It("should create a non-nil reconciler with correct fields", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + r := newTestDataFlowReconciler(s) + Expect(r).NotTo(BeNil()) + Expect(r.Client).NotTo(BeNil()) + Expect(r.Recorder).NotTo(BeNil()) + Expect(r.ResyncPeriod).To(Equal(30 * time.Second)) + }) + }) + + Describe("Reconcile", func() { + It("should return no error and no requeue when no operation objects exist for a given name", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + r := newTestDataFlowReconciler(s) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "missing", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + + It("should requeue when a DataLoad with RunAfter exists and preceding op is not complete", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "default"}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseExecuting, + }, + } + + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + r := newTestDataFlowReconciler(s, precedingLoad, waitingLoad) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(30 * time.Second)) + }) + + It("should not requeue when a DataLoad with RunAfter exists and preceding op is complete", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "default"}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + r := newTestDataFlowReconciler(s, precedingLoad, waitingLoad) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + }) +}) + +// DataFlowEnabled and setupWatches depend on discovery.GetFluidDiscovery() which uses a sync.Once +// singleton that requires a live cluster connection. We test the code paths that avoid touching +// the discovery singleton by temporarily substituting an empty reconcileKinds map so the loop body +// (which calls GetFluidDiscovery) is never entered. This covers the for-loop entry, toSetup +// construction, and return-false / return-bld paths without any cluster dependency. + +var _ = Describe("DataFlowEnabled with empty reconcileKinds", func() { + var saved map[string]client.Object + + BeforeEach(func() { + saved = reconcileKinds + reconcileKinds = map[string]client.Object{} + }) + + AfterEach(func() { + reconcileKinds = saved + }) + + It("should return false when no resource kinds are registered", func() { + Expect(DataFlowEnabled()).To(BeFalse()) + }) +}) + +var _ = Describe("setupWatches with empty reconcileKinds", func() { + var saved map[string]client.Object + + BeforeEach(func() { + saved = reconcileKinds + reconcileKinds = map[string]client.Object{} + }) + + AfterEach(func() { + reconcileKinds = saved + }) + + It("should return the builder unchanged when no resource kinds are registered", func() { + // With an empty reconcileKinds the toSetup slice stays empty, neither bld.For nor + // bld.Watches is called, and the function returns bld as-is. We pass nil as bld to + // keep the test self-contained without requiring a real controller-runtime Builder. + result := setupWatches(nil, nil, builder.Predicates{}) + Expect(result).To(BeNil()) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataflow/error_paths_test.go b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go new file mode 100644 index 00000000000..f5f9f7dfd8e --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/error_paths_test.go @@ -0,0 +1,338 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + "errors" + "fmt" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + fakeclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const serverUnavailable = "server unavailable" + +// newGetErrorClient returns a client that fails every Get with a generic error. +func newGetErrorClient(s *runtime.Scheme) client.Client { + injectErr := errors.New(serverUnavailable) + fakeBase := fakeclient.NewClientBuilder().WithScheme(s).Build() + return interceptor.NewClient(fakeBase, interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + return injectErr + }, + }) +} + +var _ = Describe("Reconcile error path: Get failure propagates to error return", func() { + + var s *runtime.Scheme + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return an error when the underlying client Get fails", func() { + log := logf.Log.WithName("dataflow-error-test") + recorder := record.NewFakeRecorder(32) + r := NewDataFlowReconciler(newGetErrorClient(s), log, recorder, 30*time.Second) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(serverUnavailable)) + // RequeueIfError returns ctrl.Result{} + the error + Expect(result).To(Equal(ctrl.Result{})) + }) +}) + +var _ = Describe("reconcileDataLoad: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataLoad", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get dataload")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileDataBackup: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataBackup", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get databackup")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileDataMigrate: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataMigrate", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get datamigrate")) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileDataProcess: outer Get failure", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should return wrapped error when client Get fails for DataProcess", func() { + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "test", Namespace: namespace}, + Client: newGetErrorClient(s), + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get dataprocess")) + Expect(err.Error()).To(ContainSubstring(serverUnavailable)) + Expect(needRequeue).To(BeTrue()) + }) +}) + +var _ = Describe("reconcileOperationDataFlow: updateStatusFn inner Get NotFound path", func() { + // Verify that when the inner updateStatusFn re-fetches the object and gets NotFound, + // reconcileDataLoad returns (false, nil) — i.e., the not-found is treated as a no-op. + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should succeed (no error, no requeue) when updateStatusFn inner Get returns NotFound", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: "Complete", + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + // Use a call-counting interceptor: + // calls 1–2: outer reconcileDataLoad fetches "waiting" and "preceding" → pass through + // call 3+: inside updateStatusFn fetch → return a custom not-found error + callCount := 0 + fakeBase := fakeclient.NewClientBuilder().WithScheme(s). + WithRuntimeObjects(precedingLoad, waitingLoad). + WithStatusSubresource(waitingLoad). + Build() + + countClient := interceptor.NewClient(fakeBase, interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + callCount++ + if callCount >= 3 && key.Name == "waiting" { + // Wrap a notFound to exercise the IgnoreNotFound branch in updateStatusFn + return &statusNotFoundError{name: key.Name} + } + return c.Get(ctx, key, obj, opts...) + }, + }) + + rCtx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "waiting", Namespace: namespace}, + Client: countClient, + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataLoad(rCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) +}) + +// statusNotFoundError satisfies the k8s.io/apimachinery/pkg/api/errors IsNotFound check. +// utils.IgnoreNotFound uses apierrors.IsNotFound which checks for Reason == StatusReasonNotFound. +type statusNotFoundError struct{ name string } + +func (e *statusNotFoundError) Error() string { return fmt.Sprintf("%q not found", e.name) } +func (e *statusNotFoundError) Status() metav1.Status { + return metav1.Status{Reason: metav1.StatusReasonNotFound} +} + +var _ = Describe("reconcileDataBackup: updateStatusFn inner Get NotFound path", func() { + // Verify that when updateStatusFn re-fetches the DataBackup and gets NotFound, + // reconcileDataBackup returns (false, nil) — the not-found is treated as a no-op. + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + It("should succeed when updateStatusFn inner Get returns NotFound", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "load-1", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: "Complete", + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "load-1", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + callCount := 0 + fakeBase := fakeclient.NewClientBuilder().WithScheme(s). + WithRuntimeObjects(precedingLoad, waitingBackup). + WithStatusSubresource(waitingBackup). + Build() + + countClient := interceptor.NewClient(fakeBase, interceptor.Funcs{ + Get: func(ctx context.Context, c client.WithWatch, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + callCount++ + if callCount >= 3 && key.Name == backupName { + return &statusNotFoundError{name: key.Name} + } + return c.Get(ctx, key, obj, opts...) + }, + }) + + rCtx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: backupName, Namespace: namespace}, + Client: countClient, + Log: logf.Log.WithName("test"), + Recorder: record.NewFakeRecorder(32), + } + + needRequeue, err := reconcileDataBackup(rCtx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataflow/operations.go b/pkg/controllers/v1alpha1/dataflow/operations.go index 910a381d0e3..47eadbc578e 100644 --- a/pkg/controllers/v1alpha1/dataflow/operations.go +++ b/pkg/controllers/v1alpha1/dataflow/operations.go @@ -127,10 +127,10 @@ func reconcileDataProcess(ctx reconcileRequestContext) (needRequeue bool, err er dataProcess, err := utils.GetDataProcess(ctx.Client, ctx.Name, ctx.Namespace) if err != nil { if utils.IgnoreNotFound(err) == nil { - ctx.Log.V(1).Info("DataMigrate not found, skip reconciling") + ctx.Log.V(1).Info("DataProcess not found, skip reconciling") return false, nil } - return true, errors.Wrap(err, "failed to get datamigrate") + return true, errors.Wrap(err, "failed to get dataprocess") } updateStatusFn := func() error { diff --git a/pkg/controllers/v1alpha1/dataflow/operations_test.go b/pkg/controllers/v1alpha1/dataflow/operations_test.go new file mode 100644 index 00000000000..e55d70c11db --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/operations_test.go @@ -0,0 +1,863 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + "k8s.io/utils/ptr" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + backupName = "backup-1" + loadName = "load-1" + migrateName = "migrate-1" + processName = "process-1" + + contextWhenPrecedingOperationIsComplete = "when preceding operation is complete" + contextWhenWaitingForOperationCompleteIsAlreadyFalse = "when WaitingFor.OperationComplete is already false" + itShouldClearWaitingForOperationCompleteAndNotRequeue = "should clear WaitingFor.OperationComplete and not requeue" + itShouldRecordWarningAndRequeue = "should record a warning and requeue" + itShouldSkipReconcilingAndNotRequeue = "should skip reconciling and not requeue" + itShouldSucceedWithoutCallingStatusUpdate = "should succeed without calling Status().Update" +) + +// makeTestCtx creates a reconcileRequestContext with a fake client seeded with objs. +func makeTestCtx(s *runtime.Scheme, name, namespace string, objs ...runtime.Object) reconcileRequestContext { + if s == nil { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + } + return reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: name, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, objs...), + Log: logf.Log.WithName("dataflow-ops-test"), + Recorder: record.NewFakeRecorder(32), + } +} + +var _ = Describe("reconcileOperationDataFlow", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the preceding operation is not found", func() { + It("should record a warning event and request requeue", func() { + // Build a DataLoad as the waiting object; preceding DataLoad does not exist. + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "waiting", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingLoad), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + // Expect a warning event to be recorded. + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) + + Context("when the preceding operation is not yet complete", func() { + It("should record a normal waiting event and request requeue", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseExecuting, + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: "waiting", Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, precedingLoad, waitingLoad), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationWaiting))) + }) + }) + + Context("when the preceding operation is complete", func() { + It("should clear WaitingFor.OperationComplete and not requeue", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, precedingLoad, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataLoad{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: "waiting", Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) + + Context("when the waiting DataLoad is not found", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataBackup", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the waiting DataBackup is not found", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) + + Context(contextWhenPrecedingOperationIsComplete, func() { + It(itShouldClearWaitingForOperationCompleteAndNotRequeue, func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: loadName, Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: loadName, + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, backupName, namespace, precedingLoad, waitingBackup) + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataBackup{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: backupName, Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) + + Context("when preceding operation is not complete", func() { + It("should request requeue and record a waiting event", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: loadName, Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: loadName, + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: backupName, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, precedingLoad, waitingBackup), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationWaiting))) + }) + }) +}) + +var _ = Describe("reconcileDataMigrate", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the waiting DataMigrate is not found", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) + + Context(contextWhenPrecedingOperationIsComplete, func() { + It(itShouldClearWaitingForOperationCompleteAndNotRequeue, func() { + precedingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataBackup", + Name: backupName, + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, migrateName, namespace, precedingBackup, waitingMigrate) + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataMigrate{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: migrateName, Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataProcess", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when the waiting DataProcess is not found", func() { + It(itShouldSkipReconcilingAndNotRequeue, func() { + ctx := makeTestCtx(s, "missing", namespace) + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) + + Context(contextWhenPrecedingOperationIsComplete, func() { + It(itShouldClearWaitingForOperationCompleteAndNotRequeue, func() { + precedingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataMigrate", + Name: migrateName, + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, processName, namespace, precedingMigrate, waitingProcess) + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + + updated := &datav1alpha1.DataProcess{} + Expect(ctx.Client.Get(context.TODO(), types.NamespacedName{Name: processName, Namespace: namespace}, updated)).To(Succeed()) + Expect(updated.Status.WaitingFor.OperationComplete).NotTo(BeNil()) + Expect(*updated.Status.WaitingFor.OperationComplete).To(BeFalse()) + }) + }) + + Context("when preceding operation is not complete", func() { + It("should request requeue and record a waiting event", func() { + precedingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseExecuting, + }, + } + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataMigrate", + Name: migrateName, + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: processName, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, precedingMigrate, waitingProcess), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationWaiting))) + }) + }) +}) + +var _ = Describe("reconcileOperationDataFlow with cross-namespace preceding op", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when RunAfter specifies a different namespace", func() { + It("should look up the preceding op in the specified namespace and not requeue when complete", func() { + // Preceding load in a different namespace. + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: "other-ns"}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + Namespace: "other-ns", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, precedingLoad, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileOperationDataFlow with unsupported RunAfter kind", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when RunAfter.Kind is not supported", func() { + It("should return an error and request requeue", func() { + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "UnknownKind", + Name: "some-op", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to get preceding operation status")) + Expect(needRequeue).To(BeTrue()) + }) + }) +}) + +var _ = Describe("reconcileDataLoad updateStatusFn no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("when WaitingFor.OperationComplete is already false when updateStatusFn runs", func() { + It("should succeed without calling Status().Update (reflect.DeepEqual skips update)", func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + // OperationComplete is already false — updateStatusFn will find no delta and skip the update. + waitingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "waiting", Namespace: namespace}, + Spec: datav1alpha1.DataLoadSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, "waiting", namespace, precedingLoad, waitingLoad) + + needRequeue, err := reconcileDataLoad(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataBackup no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { + It(itShouldSucceedWithoutCallingStatusUpdate, func() { + precedingLoad := &datav1alpha1.DataLoad{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, backupName, namespace, precedingLoad, waitingBackup) + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataMigrate no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { + It(itShouldSucceedWithoutCallingStatusUpdate, func() { + precedingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataBackup", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, migrateName, namespace, precedingBackup, waitingMigrate) + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataProcess no-op path", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context(contextWhenWaitingForOperationCompleteIsAlreadyFalse, func() { + It(itShouldSucceedWithoutCallingStatusUpdate, func() { + precedingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: "preceding", Namespace: namespace}, + Status: datav1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataMigrate", + Name: "preceding", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(false), + }, + }, + } + + ctx := makeTestCtx(s, processName, namespace, precedingMigrate, waitingProcess) + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeFalse()) + }) + }) +}) + +var _ = Describe("reconcileDataMigrate and reconcileDataProcess not-found events", func() { + + var ( + s *runtime.Scheme + namespace = "default" + ) + + BeforeEach(func() { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + }) + + Context("reconcileDataMigrate with preceding op not found", func() { + It(itShouldRecordWarningAndRequeue, func() { + waitingMigrate := &datav1alpha1.DataMigrate{ + ObjectMeta: metav1.ObjectMeta{Name: migrateName, Namespace: namespace}, + Spec: datav1alpha1.DataMigrateSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: migrateName, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingMigrate), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataMigrate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) + + Context("reconcileDataProcess with preceding op not found", func() { + It(itShouldRecordWarningAndRequeue, func() { + waitingProcess := &datav1alpha1.DataProcess{ + ObjectMeta: metav1.ObjectMeta{Name: processName, Namespace: namespace}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataBackup", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: processName, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingProcess), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataProcess(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) + + Context("reconcileDataBackup with preceding op not found", func() { + It(itShouldRecordWarningAndRequeue, func() { + waitingBackup := &datav1alpha1.DataBackup{ + ObjectMeta: metav1.ObjectMeta{Name: backupName, Namespace: namespace}, + Spec: datav1alpha1.DataBackupSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{ + Kind: "DataLoad", + Name: "nonexistent", + }, + }, + }, + Status: datav1alpha1.OperationStatus{ + WaitingFor: datav1alpha1.WaitingStatus{ + OperationComplete: ptr.To(true), + }, + }, + } + + recorder := record.NewFakeRecorder(32) + ctx := reconcileRequestContext{ + Context: context.TODO(), + NamespacedName: types.NamespacedName{Name: backupName, Namespace: namespace}, + Client: fake.NewFakeClientWithScheme(s, waitingBackup), + Log: logf.Log.WithName("test"), + Recorder: recorder, + } + + needRequeue, err := reconcileDataBackup(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(needRequeue).To(BeTrue()) + Eventually(recorder.Events).Should(Receive(ContainSubstring(common.DataOperationNotFound))) + }) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataflow/suite_test.go b/pkg/controllers/v1alpha1/dataflow/suite_test.go new file mode 100644 index 00000000000..9cd76caf20e --- /dev/null +++ b/pkg/controllers/v1alpha1/dataflow/suite_test.go @@ -0,0 +1,36 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataflow + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestAPIs(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "DataFlow Controller Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(fake.NullLogger()) +})