From d1b88af1c998b1614e4149baf2e10fdf6e0c9884 Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 13:51:52 +0530 Subject: [PATCH 1/2] test(dataprocess): migrate package tests and fix status handling Signed-off-by: Harsh --- .../v1alpha1/dataprocess/controller_test.go | 135 ++++++ .../v1alpha1/dataprocess/implement_test.go | 385 ++++++++++++++++++ .../v1alpha1/dataprocess/status_handler.go | 2 +- .../dataprocess/status_handler_test.go | 296 ++++++++++---- .../v1alpha1/dataprocess/suite_test.go | 50 +-- 5 files changed, 734 insertions(+), 134 deletions(-) create mode 100644 pkg/controllers/v1alpha1/dataprocess/controller_test.go create mode 100644 pkg/controllers/v1alpha1/dataprocess/implement_test.go diff --git a/pkg/controllers/v1alpha1/dataprocess/controller_test.go b/pkg/controllers/v1alpha1/dataprocess/controller_test.go new file mode 100644 index 00000000000..04e91d08430 --- /dev/null +++ b/pkg/controllers/v1alpha1/dataprocess/controller_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataprocess + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +// newTestDataProcessReconciler builds a DataProcessReconciler for unit tests. +func newTestDataProcessReconciler(s *runtime.Scheme, objs ...runtime.Object) *DataProcessReconciler { + if s == nil { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + } + fakeClient := fake.NewFakeClientWithScheme(s, objs...) + log := logf.Log.WithName("dataprocess-test") + recorder := record.NewFakeRecorder(32) + return NewDataProcessReconciler(fakeClient, log, s, recorder) +} + +var _ = Describe("DataProcessReconciler", func() { + + Describe("ControllerName", func() { + It("should return the expected controller name", func() { + r := newTestDataProcessReconciler(nil) + Expect(r.ControllerName()).To(Equal("DataProcessReconciler")) + }) + }) + + Describe("NewDataProcessReconciler", func() { + It("should create a non-nil reconciler with a Scheme", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + r := newTestDataProcessReconciler(s) + Expect(r).NotTo(BeNil()) + Expect(r.Scheme).NotTo(BeNil()) + }) + }) + + Describe("Build", func() { + It("should return a dataProcessOperation for a valid DataProcess object", func() { + r := newTestDataProcessReconciler(nil) + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op, err := r.Build(dp) + Expect(err).NotTo(HaveOccurred()) + Expect(op).NotTo(BeNil()) + }) + + It("should return an error when given a non-DataProcess object", func() { + r := newTestDataProcessReconciler(nil) + ds := &datav1alpha1.Dataset{ + ObjectMeta: v1.ObjectMeta{Name: "ds", Namespace: "default"}, + } + op, err := r.Build(ds) + Expect(err).To(HaveOccurred()) + Expect(op).To(BeNil()) + }) + }) + + Describe("Reconcile", func() { + It("should return no error when the DataProcess is not found", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + r := newTestDataProcessReconciler(s) + // No DataProcess objects registered — should get NotFound and return cleanly. + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "missing", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{})) + }) + + It("should reconcile successfully when DataProcess exists", func() { + s := runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "ds", + Namespace: "default", + }, + MountPath: "/data", + }, + Processor: datav1alpha1.Processor{ + Script: &datav1alpha1.ScriptProcessor{ + Source: "echo hello", + }, + }, + }, + } + r := newTestDataProcessReconciler(s, dp) + req := ctrl.Request{ + NamespacedName: types.NamespacedName{Name: "test", Namespace: "default"}, + } + result, err := r.Reconcile(context.TODO(), req) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal(ctrl.Result{RequeueAfter: 20 * time.Second})) + }) + }) +}) + +// Ensure fake.NullLogger is usable in tests (imported via suite_test.go BeforeSuite). +var _ = fake.NullLogger diff --git a/pkg/controllers/v1alpha1/dataprocess/implement_test.go b/pkg/controllers/v1alpha1/dataprocess/implement_test.go new file mode 100644 index 00000000000..65414e9c9fc --- /dev/null +++ b/pkg/controllers/v1alpha1/dataprocess/implement_test.go @@ -0,0 +1,385 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataprocess + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/dataoperation" + cdataprocess "github.com/fluid-cloudnative/fluid/pkg/dataprocess" + cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" +) + +// newTestDataProcessOperation creates a dataProcessOperation for unit tests. +func newTestDataProcessOperation(s *runtime.Scheme, dp *datav1alpha1.DataProcess) *dataProcessOperation { + if s == nil { + s = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(s) + } + fakeClient := fake.NewFakeClientWithScheme(s, dp) + recorder := record.NewFakeRecorder(32) + return &dataProcessOperation{ + Client: fakeClient, + Log: fake.NullLogger(), + Recorder: recorder, + dataProcess: dp, + } +} + +var _ = Describe("dataProcessOperation", func() { + + Describe("GetOperationObject", func() { + It("should return the DataProcess object", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + obj := op.GetOperationObject() + Expect(obj).NotTo(BeNil()) + Expect(obj.GetName()).To(Equal("test")) + Expect(obj.GetNamespace()).To(Equal("default")) + }) + }) + + Describe("HasPrecedingOperation", func() { + It("should return false when RunAfter is nil", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{}, + } + op := newTestDataProcessOperation(nil, dp) + Expect(op.HasPrecedingOperation()).To(BeFalse()) + }) + + It("should return true when RunAfter is set", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + RunAfter: &datav1alpha1.OperationRef{ + ObjectRef: datav1alpha1.ObjectRef{Name: "prev-op", Kind: "DataLoad"}, + }, + }, + } + op := newTestDataProcessOperation(nil, dp) + Expect(op.HasPrecedingOperation()).To(BeTrue()) + }) + }) + + Describe("GetPossibleTargetDatasetNamespacedNames", func() { + It("should return a single namespaced name from spec.dataset", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "my-dataset", + Namespace: "data-ns", + }, + }, + }, + } + op := newTestDataProcessOperation(nil, dp) + names := op.GetPossibleTargetDatasetNamespacedNames() + Expect(names).To(HaveLen(1)) + Expect(names[0]).To(Equal(types.NamespacedName{Name: "my-dataset", Namespace: "data-ns"})) + }) + }) + + Describe("GetReleaseNameSpacedName", func() { + It("should return the helm release name in the DataProcess namespace", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "myproc", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + nn := op.GetReleaseNameSpacedName() + Expect(nn.Namespace).To(Equal("default")) + Expect(nn.Name).To(Equal(utils.GetDataProcessReleaseName("myproc"))) + }) + }) + + Describe("GetChartsDirectory", func() { + It("should return a path containing the DataProcess chart name", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + dir := op.GetChartsDirectory() + Expect(dir).To(ContainSubstring(cdataprocess.DataProcessChart)) + }) + }) + + Describe("GetOperationType", func() { + It("should return DataProcessType", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + Expect(op.GetOperationType()).To(Equal(dataoperation.DataProcessType)) + }) + }) + + Describe("GetStatusHandler", func() { + It("should return a non-nil OnceStatusHandler", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + handler := op.GetStatusHandler() + Expect(handler).NotTo(BeNil()) + _, ok := handler.(*OnceStatusHandler) + Expect(ok).To(BeTrue()) + }) + }) + + Describe("GetTTL", func() { + It("should return nil TTL when not set", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + ttl, err := op.GetTTL() + Expect(err).NotTo(HaveOccurred()) + Expect(ttl).To(BeNil()) + }) + + It("should return the configured TTL when set", func() { + var ttlVal int32 = 300 + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + TTLSecondsAfterFinished: &ttlVal, + }, + } + op := newTestDataProcessOperation(nil, dp) + ttl, err := op.GetTTL() + Expect(err).NotTo(HaveOccurred()) + Expect(ttl).NotTo(BeNil()) + Expect(*ttl).To(Equal(int32(300))) + }) + }) + + Describe("GetParallelTaskNumber", func() { + It("should always return 1", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + Expect(op.GetParallelTaskNumber()).To(Equal(int32(1))) + }) + }) + + Describe("UpdateStatusInfoForCompleted", func() { + It("should return nil (no-op)", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + err := op.UpdateStatusInfoForCompleted(map[string]string{"key": "val"}) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("SetTargetDatasetStatusInProgress", func() { + It("should not panic (no-op)", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + dataset := &datav1alpha1.Dataset{} + Expect(func() { op.SetTargetDatasetStatusInProgress(dataset) }).NotTo(Panic()) + }) + }) + + Describe("RemoveTargetDatasetStatusInProgress", func() { + It("should not panic (no-op)", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + } + op := newTestDataProcessOperation(nil, dp) + dataset := &datav1alpha1.Dataset{} + Expect(func() { op.RemoveTargetDatasetStatusInProgress(dataset) }).NotTo(Panic()) + }) + }) + + Describe("Validate", func() { + var ( + testScheme *runtime.Scheme + ctx cruntime.ReconcileRequestContext + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + _ = datav1alpha1.AddToScheme(testScheme) + ctx = cruntime.ReconcileRequestContext{ + Log: fake.NullLogger(), + } + }) + + Context("when DataProcess namespace differs from spec.dataset.namespace", func() { + It("should return a condition and error about namespace mismatch", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "ds", + Namespace: "other-namespace", // mismatch + }, + }, + Processor: datav1alpha1.Processor{ + Job: &datav1alpha1.JobProcessor{}, + }, + }, + } + op := newTestDataProcessOperation(testScheme, dp) + conditions, err := op.Validate(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("namespace")) + Expect(conditions).To(HaveLen(1)) + Expect(string(conditions[0].Reason)).To(Equal("TargetDatasetNamespaceNotSame")) + }) + }) + + Context("when no processor is set (both Job and Script are nil)", func() { + It("should return a condition and error about missing processor", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "ds", + Namespace: "default", // same namespace + }, + }, + Processor: datav1alpha1.Processor{ + Job: nil, + Script: nil, + }, + }, + } + op := newTestDataProcessOperation(testScheme, dp) + conditions, err := op.Validate(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("spec.processor")) + Expect(conditions).To(HaveLen(1)) + Expect(string(conditions[0].Reason)).To(Equal("ProcessorNotSpecified")) + }) + }) + + Context("when both Job and Script processors are set", func() { + It("should return a condition and error about multiple processors", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "ds", + Namespace: "default", + }, + }, + Processor: datav1alpha1.Processor{ + Job: &datav1alpha1.JobProcessor{}, + Script: &datav1alpha1.ScriptProcessor{}, + }, + }, + } + op := newTestDataProcessOperation(testScheme, dp) + conditions, err := op.Validate(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("multiple processors")) + Expect(conditions).To(HaveLen(1)) + Expect(string(conditions[0].Reason)).To(Equal("MultipleProcessorSpecified")) + }) + }) + + Context("when script processor has a conflicting mountPath with dataset mountPath", func() { + It("should return a condition and error about conflicting mount path", func() { + conflictPath := "/data" + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "ds", + Namespace: "default", + }, + MountPath: conflictPath, + }, + Processor: datav1alpha1.Processor{ + Script: &datav1alpha1.ScriptProcessor{ + Source: "echo hello", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "conflict-vol", + MountPath: conflictPath, // same path as dataset mountPath + }, + }, + }, + }, + }, + } + op := newTestDataProcessOperation(testScheme, dp) + conditions, err := op.Validate(ctx) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("conflict")) + Expect(conditions).To(HaveLen(1)) + Expect(string(conditions[0].Reason)).To(Equal("ConflictMountPath")) + }) + }) + + Context("when DataProcess has a valid script processor with no conflicts", func() { + It("should return no conditions and no error", func() { + dp := &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: datav1alpha1.DataProcessSpec{ + Dataset: datav1alpha1.TargetDatasetWithMountPath{ + TargetDataset: datav1alpha1.TargetDataset{ + Name: "ds", + Namespace: "default", + }, + MountPath: "/data", + }, + Processor: datav1alpha1.Processor{ + Script: &datav1alpha1.ScriptProcessor{ + Source: "echo hello", + VolumeMounts: []corev1.VolumeMount{ + { + Name: "other-vol", + MountPath: "/other", // no conflict + }, + }, + }, + }, + }, + } + op := newTestDataProcessOperation(testScheme, dp) + conditions, err := op.Validate(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(conditions).To(BeNil()) + }) + }) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataprocess/status_handler.go b/pkg/controllers/v1alpha1/dataprocess/status_handler.go index d1e2b768ea0..a8f3c0c1b40 100644 --- a/pkg/controllers/v1alpha1/dataprocess/status_handler.go +++ b/pkg/controllers/v1alpha1/dataprocess/status_handler.go @@ -78,7 +78,7 @@ func (handler *OnceStatusHandler) GetOperationStatus(ctx runtime.ReconcileReques } // job either failed or complete, update DataLoad's phase status - jobCondition := job.Status.Conditions[0] + jobCondition := *finishedJobCondition result.Conditions = []datav1alpha1.Condition{ { diff --git a/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go b/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go index 1a752331860..1c9b180ddfb 100644 --- a/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go +++ b/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go @@ -1,113 +1,235 @@ /* - Copyright 2023 The Fluid Authors. +Copyright 2026 The Fluid Authors. - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ package dataprocess import ( - "testing" "time" - "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/agiledragon/gomonkey/v2" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + "github.com/fluid-cloudnative/fluid/pkg/utils/helm" batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ) -func TestOnceGetOperationStatus(t *testing.T) { - testScheme := runtime.NewScheme() - _ = v1alpha1.AddToScheme(testScheme) - _ = batchv1.AddToScheme(testScheme) - - mockDataProcess := v1alpha1.DataProcess{ - ObjectMeta: v1.ObjectMeta{ - Name: "test", - Namespace: "default", - }, - Spec: v1alpha1.DataProcessSpec{}, - } - - mockJob := batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-processor-job", - Namespace: "default", - }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobComplete, - LastProbeTime: v1.NewTime(time.Now()), - LastTransitionTime: v1.NewTime(time.Now()), - }, - }, - }, - } - - mockFailedJob := batchv1.Job{ - ObjectMeta: v1.ObjectMeta{ - Name: "test-processor-job", - Namespace: "default", - }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{ - { - Type: batchv1.JobFailed, - LastProbeTime: v1.NewTime(time.Now()), - LastTransitionTime: v1.NewTime(time.Now()), - }, +var _ = Describe("OnceStatusHandler", func() { + var ( + testScheme *runtime.Scheme + mockDataProcess *datav1alpha1.DataProcess + ctx cruntime.ReconcileRequestContext + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(datav1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + mockDataProcess = &datav1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", }, - }, - } - - testcases := []struct { - name string - job batchv1.Job - expectedPhase common.Phase - }{ - { - name: "job success", - job: mockJob, - expectedPhase: common.PhaseComplete, - }, - { - name: "job failed", - job: mockFailedJob, - expectedPhase: common.PhaseFailed, - }, - } - - for _, testcase := range testcases { - client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess, &testcase.job) - onceStatusHandler := &OnceStatusHandler{Client: client, dataProcess: &mockDataProcess} - ctx := cruntime.ReconcileRequestContext{ + Spec: datav1alpha1.DataProcessSpec{}, + } + + ctx = cruntime.ReconcileRequestContext{ NamespacedName: types.NamespacedName{ Namespace: "default", - Name: "", + Name: "test", }, Log: fake.NullLogger(), } - opStatus, err := onceStatusHandler.GetOperationStatus(ctx, &mockDataProcess.Status) - if err != nil { - t.Errorf("fail to GetOperationStatus with error %v", err) - } - if opStatus.Phase != testcase.expectedPhase { - t.Error("Failed to GetOperationStatus", "expected phase", testcase.expectedPhase, "get", opStatus.Phase) - } - } -} + }) + + Describe("GetOperationStatus", func() { + Context("when job completes successfully", func() { + It("should return PhaseComplete", func() { + job := &batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, + }, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, mockDataProcess, job) + handler := &OnceStatusHandler{ + Client: fakeClient, + dataProcess: mockDataProcess, + } + + opStatus := &datav1alpha1.OperationStatus{} + result, err := handler.GetOperationStatus(ctx, opStatus) + Expect(err).NotTo(HaveOccurred()) + Expect(result).NotTo(BeNil()) + Expect(result.Phase).To(Equal(common.PhaseComplete)) + }) + }) + + Context("when job fails", func() { + It("should return PhaseFailed", func() { + job := &batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, + }, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, mockDataProcess, job) + handler := &OnceStatusHandler{ + Client: fakeClient, + dataProcess: mockDataProcess, + } + + opStatus := &datav1alpha1.OperationStatus{} + result, err := handler.GetOperationStatus(ctx, opStatus) + Expect(err).NotTo(HaveOccurred()) + Expect(result).NotTo(BeNil()) + Expect(result.Phase).To(Equal(common.PhaseFailed)) + }) + }) + + Context("when job is still running (no finished condition)", func() { + It("should return current opStatus phase unchanged", func() { + job := &batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + // No conditions — job is still running + Conditions: []batchv1.JobCondition{}, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, mockDataProcess, job) + handler := &OnceStatusHandler{ + Client: fakeClient, + dataProcess: mockDataProcess, + } + + opStatus := &datav1alpha1.OperationStatus{ + Phase: common.PhaseExecuting, + } + result, err := handler.GetOperationStatus(ctx, opStatus) + Expect(err).NotTo(HaveOccurred()) + Expect(result).NotTo(BeNil()) + // Phase should remain as it was before — handler returns early + Expect(result.Phase).To(Equal(common.PhaseExecuting)) + }) + }) + + Context("when the job is not found", func() { + It("should delete the helm release and return the unchanged status copy without error", func() { + // Patch helm.DeleteReleaseIfExists to avoid exec'ing ddc-helm in a unit test. + patch := gomonkey.ApplyFunc(helm.DeleteReleaseIfExists, + func(_ string, _ string) error { return nil }) + defer patch.Reset() + + fakeClient := fake.NewFakeClientWithScheme(testScheme, mockDataProcess) + handler := &OnceStatusHandler{ + Client: fakeClient, + dataProcess: mockDataProcess, + } + + opStatus := &datav1alpha1.OperationStatus{Phase: common.PhaseExecuting} + result, err := handler.GetOperationStatus(ctx, opStatus) + Expect(err).NotTo(HaveOccurred()) + Expect(result).NotTo(BeNil()) + Expect(result).NotTo(BeIdenticalTo(opStatus)) + Expect(result.Phase).To(Equal(common.PhaseExecuting)) + }) + }) + + Context("when job conditions are out of order", func() { + It("should use the finished condition instead of the first condition", func() { + createdAt := time.Now().Add(-2 * time.Minute) + suspendedAt := createdAt.Add(30 * time.Second) + failedAt := createdAt.Add(90 * time.Second) + + job := &batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + CreationTimestamp: v1.NewTime(createdAt), + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobSuspended, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(suspendedAt), + LastTransitionTime: v1.NewTime(suspendedAt), + }, + { + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + Reason: "FailedReason", + Message: "failed after resume", + LastProbeTime: v1.NewTime(failedAt), + LastTransitionTime: v1.NewTime(failedAt), + }, + }, + }, + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, mockDataProcess, job) + handler := &OnceStatusHandler{ + Client: fakeClient, + dataProcess: mockDataProcess, + } + + result, err := handler.GetOperationStatus(ctx, &datav1alpha1.OperationStatus{}) + Expect(err).NotTo(HaveOccurred()) + Expect(result).NotTo(BeNil()) + Expect(result.Phase).To(Equal(common.PhaseFailed)) + Expect(result.Conditions).To(HaveLen(1)) + Expect(result.Conditions[0].Type).To(Equal(common.ConditionType(batchv1.JobFailed))) + Expect(result.Conditions[0].Reason).To(Equal("FailedReason")) + Expect(result.Duration).To(Equal(failedAt.Sub(createdAt).String())) + }) + }) + }) +}) diff --git a/pkg/controllers/v1alpha1/dataprocess/suite_test.go b/pkg/controllers/v1alpha1/dataprocess/suite_test.go index fb8f9f340dc..cf3180e322c 100644 --- a/pkg/controllers/v1alpha1/dataprocess/suite_test.go +++ b/pkg/controllers/v1alpha1/dataprocess/suite_test.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Fluid Authors. +Copyright 2026 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -17,65 +17,23 @@ limitations under the License. package dataprocess import ( - "path/filepath" "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" - //+kubebuilder:scaffold:imports + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" ) // These tests use Ginkgo (BDD-style Go testing framework). Refer to // http://onsi.github.io/ginkgo/ to learn more about Ginkgo. -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment - func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) - - RunSpecs(t, - "Controller Suite") + RunSpecs(t, "DataProcess Controller Suite") } var _ = BeforeSuite(func() { - logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) - - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")}, - ErrorIfCRDPathMissing: true, - } - - var err error - // cfg is defined in this file globally. - cfg, err = testEnv.Start() - Expect(err).NotTo(HaveOccurred()) - Expect(cfg).NotTo(BeNil()) - - err = datav1alpha1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - //+kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).NotTo(HaveOccurred()) - Expect(k8sClient).NotTo(BeNil()) - -}, 60) - -var _ = AfterSuite(func() { - By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).NotTo(HaveOccurred()) + logf.SetLogger(fake.NullLogger()) }) From 89db0f2405e38ff327a9283d82bf85505e4d918f Mon Sep 17 00:00:00 2001 From: Harsh Date: Sun, 29 Mar 2026 14:01:49 +0530 Subject: [PATCH 2/2] test(dataprocess): address bot review follow-ups Signed-off-by: Harsh --- pkg/controllers/v1alpha1/dataprocess/controller_test.go | 3 --- pkg/controllers/v1alpha1/dataprocess/status_handler.go | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/controllers/v1alpha1/dataprocess/controller_test.go b/pkg/controllers/v1alpha1/dataprocess/controller_test.go index 04e91d08430..144ffb5b0cc 100644 --- a/pkg/controllers/v1alpha1/dataprocess/controller_test.go +++ b/pkg/controllers/v1alpha1/dataprocess/controller_test.go @@ -130,6 +130,3 @@ var _ = Describe("DataProcessReconciler", func() { }) }) }) - -// Ensure fake.NullLogger is usable in tests (imported via suite_test.go BeforeSuite). -var _ = fake.NullLogger diff --git a/pkg/controllers/v1alpha1/dataprocess/status_handler.go b/pkg/controllers/v1alpha1/dataprocess/status_handler.go index a8f3c0c1b40..f52f1d103e7 100644 --- a/pkg/controllers/v1alpha1/dataprocess/status_handler.go +++ b/pkg/controllers/v1alpha1/dataprocess/status_handler.go @@ -55,6 +55,7 @@ func (handler *OnceStatusHandler) GetOperationStatus(ctx runtime.ReconcileReques ctx.Log.Error(err, "failed to delete dataprocess helm release", "namespace", ctx.Namespace, "releaseName", releaseName) return } + return } // In cases of other error