From ecadd425acf7787fb0c2128f5a26fa11a52f3c0a Mon Sep 17 00:00:00 2001 From: xliuqq Date: Thu, 9 Apr 2026 15:43:58 +0800 Subject: [PATCH 1/8] init poc, add prepare ufs Signed-off-by: xliuqq support app pod cv mount succeed fix mount todo: use bin/cv mount to filter already mount path master worker pod starts succeed set envs service name should be exposed Signed-off-by: liuzhiqiang <923463801@qq.com> --- api/v1alpha1/cacheruntimeclass_types.go | 19 ++ .../data.fluid.io_cacheruntimeclasses.yaml | 25 +++ charts/fluid/fluid/values.yaml | 4 +- .../data.fluid.io_cacheruntimeclasses.yaml | 25 +++ pkg/common/cacheruntime.go | 66 +++++- pkg/common/constants.go | 2 + pkg/common/label.go | 9 + pkg/ddc/base/runtime.go | 5 +- pkg/ddc/cache/component/component_manager.go | 54 +++++ pkg/ddc/cache/component/daemonset_manager.go | 179 +++++++++++++++++ .../cache/component/statefulset_manager.go | 190 ++++++++++++++++++ pkg/ddc/cache/engine/client.go | 51 ++++- pkg/ddc/cache/engine/cm.go | 171 +++++++++++++++- pkg/ddc/cache/engine/dataset.go | 56 ++++++ pkg/ddc/cache/engine/engine.go | 4 + pkg/ddc/cache/engine/fileutils.go | 69 +++++++ pkg/ddc/cache/engine/master.go | 67 +++++- pkg/ddc/cache/engine/runtime.go | 78 ++++++- pkg/ddc/cache/engine/setup.go | 8 +- pkg/ddc/cache/engine/shutdown.go | 18 +- pkg/ddc/cache/engine/status.go | 71 ++++++- pkg/ddc/cache/engine/sync.go | 59 +++++- pkg/ddc/cache/engine/transform.go | 186 ++++++++++++++++- pkg/ddc/cache/engine/transform_client.go | 95 +++++++++ pkg/ddc/cache/engine/transform_master.go | 57 ++++++ pkg/ddc/cache/engine/transform_worker.go | 57 ++++++ pkg/ddc/cache/engine/ufs.go | 24 ++- pkg/ddc/cache/engine/util.go | 85 ++++++++ pkg/ddc/cache/engine/volume.go | 39 +++- pkg/ddc/cache/engine/worker.go | 53 ++++- pkg/ddc/efc/runtime_info.go | 1 + pkg/utils/kubeclient/configmap.go | 13 ++ pkg/utils/runtimes.go | 12 -- 33 files changed, 1803 insertions(+), 49 deletions(-) create mode 100644 pkg/ddc/cache/component/component_manager.go create mode 100644 pkg/ddc/cache/component/daemonset_manager.go create mode 100644 pkg/ddc/cache/component/statefulset_manager.go create mode 100644 pkg/ddc/cache/engine/fileutils.go create mode 100644 pkg/ddc/cache/engine/transform_client.go create mode 100644 pkg/ddc/cache/engine/transform_master.go create mode 100644 pkg/ddc/cache/engine/transform_worker.go create mode 100644 pkg/ddc/cache/engine/util.go diff --git a/api/v1alpha1/cacheruntimeclass_types.go b/api/v1alpha1/cacheruntimeclass_types.go index 911005f3c9e..575a7257eff 100644 --- a/api/v1alpha1/cacheruntimeclass_types.go +++ b/api/v1alpha1/cacheruntimeclass_types.go @@ -60,6 +60,25 @@ type RuntimeComponentDefinition struct { // Dependencies specifies the dependencies required by the component // +optional Dependencies RuntimeComponentDependencies `json:"dependencies,omitempty"` + + // ExecutionEntries entries to support out-of-tree integration. + // +optional + ExecutionEntries *ExecutionEntries `json:"executionEntries,omitempty"` +} + +type ExecutionEntries struct { + // MountUFS defines the operations for mounting UFS + MountUFS *ExecutionCommonEntry `json:"mountUFS,omitempty"` + + // ReportSummary it defines the operation how to get cache status like capacity, hit ratio etc. + ReportSummary *ExecutionCommonEntry `json:"reportSummary,omitempty"` +} + +type ExecutionCommonEntry struct { + Command []string `json:"command"` + + // Timeout is the timeout(seconds) for the execution entry + Timeout int `json:"timeout,omitempty"` } // EncryptOptionComponentDependency defines the configuration for encrypt option dependency diff --git a/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml b/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml index aa877cb96e2..e652db785e0 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml @@ -3453,6 +3453,31 @@ spec: type: array type: object type: object + executionEntries: + properties: + mountUFS: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + reportSummary: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + type: object options: additionalProperties: type: string diff --git a/charts/fluid/fluid/values.yaml b/charts/fluid/fluid/values.yaml index f7e44eeaac3..1a8fb024a3e 100644 --- a/charts/fluid/fluid/values.yaml +++ b/charts/fluid/fluid/values.yaml @@ -12,9 +12,9 @@ image: imagePullSecrets: [] # Default registry, namespace and version tag for images managed by fluid -imagePrefix: &defaultImagePrefix fluidcloudnative +imagePrefix: &defaultImagePrefix registry.cn-hangzhou.aliyuncs.com/xliu1992 # imagePrefix: &defaultImagePrefix registry.aliyuncs.com/fluid -version: &defaultVersion v1.1.0-b457855 +version: &defaultVersion v1.1.0-b0bdac58 crdUpgrade: enabled: true diff --git a/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml b/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml index aa877cb96e2..e652db785e0 100644 --- a/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml +++ b/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml @@ -3453,6 +3453,31 @@ spec: type: array type: object type: object + executionEntries: + properties: + mountUFS: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + reportSummary: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + type: object options: additionalProperties: type: string diff --git a/pkg/common/cacheruntime.go b/pkg/common/cacheruntime.go index 30c61b11862..8d992a8b168 100644 --- a/pkg/common/cacheruntime.go +++ b/pkg/common/cacheruntime.go @@ -16,6 +16,11 @@ package common +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + type ComponentType string const ( @@ -24,12 +29,71 @@ const ( CacheEngineImpl = CacheRuntime ) +const ( + ComponentTypeMaster ComponentType = "master" + ComponentTypeWorker ComponentType = "worker" + ComponentTypeClient ComponentType = "client" +) + type CacheRuntimeValue struct { + // RuntimeIdentity is used to identify the runtime (name/namespace) + RuntimeIdentity RuntimeIdentity `json:"runtimeIdentity"` + Master *CacheRuntimeComponentValue `json:"master,omitempty"` Worker *CacheRuntimeComponentValue `json:"worker,omitempty"` Client *CacheRuntimeComponentValue `json:"client,omitempty"` } +// CacheRuntimeComponentValue is the common value for building CacheRuntimeValue. type CacheRuntimeComponentValue struct { - Enabled bool `json:"enabled"` + // Component name, not Runtime name + Name string + Namespace string + Enabled bool + WorkloadType metav1.TypeMeta + Replicas int32 + PodTemplateSpec corev1.PodTemplateSpec + Owner *OwnerReference + ComponentType ComponentType `json:"componentType,omitempty"` + + // Service name, can be not same as Component name + Service *CacheRuntimeComponentServiceConfig +} + +// CacheRuntimeConfig defines the config of runtime, will be auto mounted by configmap in the component pod. +type CacheRuntimeConfig struct { + // Mounts from Dataset Spec + Mounts []MountConfig `json:"mounts,omitempty"` + // AccessModes from Dataset Spec + AccessModes []corev1.PersistentVolumeAccessMode `json:"accessModes,omitempty"` + // fuse mount path, used in Worker or Client Pod according to Topology. + TargetPath string `json:"targetPath,omitempty"` + + Master *CacheRuntimeComponentConfig `json:"master,omitempty"` + Worker *CacheRuntimeComponentConfig `json:"worker,omitempty"` + Client *CacheRuntimeComponentConfig `json:"client,omitempty"` +} + +// MountConfig defines the mount config about dataset Mounts +type MountConfig struct { + MountPoint string `json:"mountPoint"` + // TODO: separate encrypt options with mount files for security + Options map[string]string `json:"options,omitempty"` + Name string `json:"name,omitempty"` + Path string `json:"path,omitempty"` + ReadOnly bool `json:"readOnly,omitempty"` + Shared bool `json:"shared,omitempty"` +} + +type CacheRuntimeComponentConfig struct { + Enabled bool `json:"enabled,omitempty"` + Name string `json:"name,omitempty"` + Options map[string]string `json:"options,omitempty"` + Replicas int32 `json:"replicas,omitempty"` + + Service CacheRuntimeComponentServiceConfig `json:"service,omitempty"` +} + +type CacheRuntimeComponentServiceConfig struct { + Name string `json:"name"` } diff --git a/pkg/common/constants.go b/pkg/common/constants.go index e715dd5e888..ca6385ef986 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -184,6 +184,8 @@ const ( PodRoleType = "role" DataloadPod = "dataload-pod" NamespaceFluidSystem = "fluid-system" + + DefaultNameSpace = "default" ) const ( diff --git a/pkg/common/label.go b/pkg/common/label.go index f735ed92651..f5dd1ec8589 100644 --- a/pkg/common/label.go +++ b/pkg/common/label.go @@ -146,6 +146,15 @@ const ( UpdateLabel OperationType = "UpdateValue" ) +// label and annotations for cacheRuntime +const ( + CacheRuntimeLabelAnnotationPrefix = "cacheruntime." + LabelAnnotationPrefix + + LabelCacheRuntimeName = CacheRuntimeLabelAnnotationPrefix + "name" + + LabelCacheRuntimeComponentName = CacheRuntimeLabelAnnotationPrefix + "component-name" +) + // LabelToModify modifies the labelKey in operationType. type LabelToModify struct { labelKey string diff --git a/pkg/ddc/base/runtime.go b/pkg/ddc/base/runtime.go index 45fcdce39a8..11264d3332e 100644 --- a/pkg/ddc/base/runtime.go +++ b/pkg/ddc/base/runtime.go @@ -433,7 +433,7 @@ func convertToTieredstoreInfo(tieredstore datav1alpha1.TieredStore) (TieredStore return tieredstoreInfo, nil } -// GetRuntimeInfo gets the RuntimeInfo according to name and namespace of it +// GetRuntimeInfo gets the RuntimeInfo according to name and namespace of it, must be called after dataset bound. func GetRuntimeInfo(reader client.Reader, name, namespace string) (runtimeInfo RuntimeInfoInterface, err error) { dataset, err := utils.GetDataset(reader, name, namespace) if err != nil { @@ -574,6 +574,8 @@ func GetRuntimeInfo(reader client.Reader, name, namespace string) (runtimeInfo R } runtimeInfo.SetFuseNodeSelector(cacheRuntime.Spec.Client.NodeSelector) runtimeInfo.SetupFuseCleanPolicy(cacheRuntime.Spec.Client.CleanPolicy) + // TODO(cache runtime): is this common logic for all runtimes? If so, move to below 'SetOwnerDatasetUID' line. + runtimeInfo.SetupWithDataset(dataset) default: err = fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType) return @@ -643,6 +645,7 @@ func GetRuntimeStatus(client client.Client, runtimeType, name, namespace string) return status, err } return &runtime.Status, nil + // TODO: how to handle with cache runtime? (currently used in app pod affinity scene) default: err = fmt.Errorf("fail to get runtimeInfo for runtime type: %s", runtimeType) return nil, err diff --git a/pkg/ddc/cache/component/component_manager.go b/pkg/ddc/cache/component/component_manager.go new file mode 100644 index 00000000000..77282ce730d --- /dev/null +++ b/pkg/ddc/cache/component/component_manager.go @@ -0,0 +1,54 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ComponentManager interface { + Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error + ConstructComponentStatus(todo context.Context, value *common.CacheRuntimeComponentValue) (v1alpha1.RuntimeComponentStatus, error) +} + +func NewComponentHelper(workloadType metav1.TypeMeta, scheme *runtime.Scheme, client client.Client) ComponentManager { + if workloadType.APIVersion == "apps/v1" { + if workloadType.Kind == "StatefulSet" { + return newStatefulSetManager(scheme, client) + } else if workloadType.Kind == "DaemonSet" { + return newDaemonSetManager(scheme, client) + } + } + + return newStatefulSetManager(scheme, client) +} + +// getCommonLabelsFromComponent returns the common labels for component used for stateful +func getCommonLabelsFromComponent(component *common.CacheRuntimeComponentValue) map[string]string { + // These labels are used as sts.spec.selector which cannot be updated. + // If changed, may cause all exist runtime failed. + return map[string]string{ + common.LabelCacheRuntimeName: component.Owner.Name, + // format: runtimeName-componentType + common.LabelCacheRuntimeComponentName: component.Name, + } +} diff --git a/pkg/ddc/cache/component/daemonset_manager.go b/pkg/ddc/cache/component/daemonset_manager.go new file mode 100644 index 00000000000..e83aeaaaf05 --- /dev/null +++ b/pkg/ddc/cache/component/daemonset_manager.go @@ -0,0 +1,179 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type DaemonSetManager struct { + scheme *runtime.Scheme + client client.Client +} + +func newDaemonSetManager(scheme *runtime.Scheme, client client.Client) *DaemonSetManager { + return &DaemonSetManager{scheme: scheme, client: client} +} + +func (s *DaemonSetManager) Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error { + if err := s.reconcileDaemonSet(ctx, component); err != nil { + return err + } + + return s.reconcileService(ctx, component) +} + +func (s *DaemonSetManager) reconcileDaemonSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { + logger := log.FromContext(ctx) + logger.Info("start to reconciling dst workload") + + ds := &appsv1.DaemonSet{} + err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, ds) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + // return if already created + if err == nil { + return nil + } + // create the stateful set + ds = s.constructDaemonSet(component) + err = s.client.Create(ctx, ds) + if err != nil { + return err + } + logger.Info("create sts workload succeed") + return nil +} +func (s *DaemonSetManager) constructDaemonSet(component *common.CacheRuntimeComponentValue) *appsv1.DaemonSet { + matchLabels := getCommonLabelsFromComponent(component) + + podTemplateSpec := component.PodTemplateSpec + podTemplateSpec.Labels = utils.UnionMapsWithOverride(podTemplateSpec.Labels, matchLabels) + + trueVar := true + ds := &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: component.Name, + Namespace: component.Namespace, + Labels: matchLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: component.Owner.APIVersion, + Kind: component.Owner.Kind, + Name: component.Owner.Name, + UID: types.UID(component.Owner.UID), + BlockOwnerDeletion: &trueVar, + Controller: &trueVar, + }, + }, + }, + Spec: appsv1.DaemonSetSpec{ + Template: podTemplateSpec, + Selector: &metav1.LabelSelector{ + MatchLabels: matchLabels, + }, + }, + } + return ds +} +func (s *DaemonSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { + if component.Service == nil { + return nil + } + logger := log.FromContext(ctx) + logger.Info("start to reconciling headless service") + + svc := &corev1.Service{} + err := s.client.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + // return if already created + if err == nil { + return nil + } + svc = s.constructService(component) + err = s.client.Create(ctx, svc) + if err != nil { + return err + } + logger.Info("create headless service succeed") + return nil +} + +func (s *DaemonSetManager) constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { + matchLabels := getCommonLabelsFromComponent(component) + + trueVar := true + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: component.Service.Name, + Namespace: component.Namespace, + Labels: matchLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: component.Owner.APIVersion, + Kind: component.Owner.Kind, + Name: component.Owner.Name, + UID: types.UID(component.Owner.UID), + BlockOwnerDeletion: &trueVar, + Controller: &trueVar, + }, + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Selector: matchLabels, + PublishNotReadyAddresses: true, + }, + } + return svc +} + +func (s *DaemonSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) { + logger := log.FromContext(ctx) + logger.Info("start to ConstructComponentStatus") + + ds := &appsv1.DaemonSet{} + err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, ds) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", component.Namespace, component.Name)) + return datav1alpha1.RuntimeComponentStatus{}, err + } + + return datav1alpha1.RuntimeComponentStatus{ + Phase: datav1alpha1.RuntimePhaseReady, + DesiredReplicas: ds.Status.DesiredNumberScheduled, + CurrentReplicas: ds.Status.CurrentNumberScheduled, + AvailableReplicas: ds.Status.NumberAvailable, + UnavailableReplicas: ds.Status.NumberUnavailable, + ReadyReplicas: ds.Status.NumberReady, + }, nil +} diff --git a/pkg/ddc/cache/component/statefulset_manager.go b/pkg/ddc/cache/component/statefulset_manager.go new file mode 100644 index 00000000000..2738b4e4838 --- /dev/null +++ b/pkg/ddc/cache/component/statefulset_manager.go @@ -0,0 +1,190 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package component + +import ( + "context" + "fmt" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type StatefulSetManager struct { + scheme *runtime.Scheme + client client.Client +} + +func newStatefulSetManager(scheme *runtime.Scheme, client client.Client) *StatefulSetManager { + return &StatefulSetManager{scheme: scheme, client: client} +} + +func (s *StatefulSetManager) Reconciler(ctx context.Context, component *common.CacheRuntimeComponentValue) error { + if err := s.reconcileStatefulSet(ctx, component); err != nil { + return err + } + + return s.reconcileService(ctx, component) +} + +func (s *StatefulSetManager) reconcileStatefulSet(ctx context.Context, component *common.CacheRuntimeComponentValue) error { + logger := log.FromContext(ctx) + logger.Info("start to reconciling sts workload") + + sts := &appsv1.StatefulSet{} + err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, sts) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + // return if already created + if err == nil { + return nil + } + // create the stateful set + sts = s.constructStatefulSet(component) + err = s.client.Create(ctx, sts) + if err != nil { + return err + } + logger.Info("create sts workload succeed") + return nil +} +func (s *StatefulSetManager) constructStatefulSet(component *common.CacheRuntimeComponentValue) *appsv1.StatefulSet { + matchLabels := getCommonLabelsFromComponent(component) + + podTemplateSpec := component.PodTemplateSpec + podTemplateSpec.Labels = utils.UnionMapsWithOverride(podTemplateSpec.Labels, matchLabels) + + trueVar := true + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: component.Name, + Namespace: component.Namespace, + Labels: matchLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: component.Owner.APIVersion, + Kind: component.Owner.Kind, + Name: component.Owner.Name, + UID: types.UID(component.Owner.UID), + BlockOwnerDeletion: &trueVar, + Controller: &trueVar, + }, + }, + }, + Spec: appsv1.StatefulSetSpec{ + ServiceName: component.Service.Name, + Replicas: &component.Replicas, + Template: podTemplateSpec, + PodManagementPolicy: appsv1.ParallelPodManagement, + Selector: &metav1.LabelSelector{ + MatchLabels: matchLabels, + }, + }, + } + return sts +} +func (s *StatefulSetManager) reconcileService(ctx context.Context, component *common.CacheRuntimeComponentValue) error { + if component.Service == nil { + return nil + } + logger := log.FromContext(ctx) + logger.Info("start to reconciling headless service") + + svc := &corev1.Service{} + err := s.client.Get(ctx, types.NamespacedName{Name: component.Service.Name, Namespace: component.Namespace}, svc) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + // return if already created + if err == nil { + return nil + } + svc = s.constructService(component) + err = s.client.Create(ctx, svc) + if err != nil { + return err + } + logger.Info("create headless service succeed") + return nil +} + +func (s *StatefulSetManager) constructService(component *common.CacheRuntimeComponentValue) *corev1.Service { + matchLabels := getCommonLabelsFromComponent(component) + + trueVar := true + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: component.Service.Name, + Namespace: component.Namespace, + Labels: matchLabels, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: component.Owner.APIVersion, + Kind: component.Owner.Kind, + Name: component.Owner.Name, + UID: types.UID(component.Owner.UID), + BlockOwnerDeletion: &trueVar, + Controller: &trueVar, + }, + }, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "None", + Selector: matchLabels, + PublishNotReadyAddresses: true, + }, + } + return svc +} + +func (s *StatefulSetManager) ConstructComponentStatus(ctx context.Context, component *common.CacheRuntimeComponentValue) (datav1alpha1.RuntimeComponentStatus, error) { + logger := log.FromContext(ctx) + logger.Info("start to ConstructComponentStatus") + + sts := &appsv1.StatefulSet{} + err := s.client.Get(ctx, types.NamespacedName{Name: component.Name, Namespace: component.Namespace}, sts) + if err != nil { + logger.Error(err, fmt.Sprintf("failed to get component: %s/%s", component.Namespace, component.Name)) + return datav1alpha1.RuntimeComponentStatus{}, err + } + + desiredReplicas := *sts.Spec.Replicas + readyReplicas := sts.Status.ReadyReplicas + + runtimePhase := datav1alpha1.RuntimePhaseNotReady + if desiredReplicas == readyReplicas { + runtimePhase = datav1alpha1.RuntimePhaseReady + } + + return datav1alpha1.RuntimeComponentStatus{ + Phase: runtimePhase, + DesiredReplicas: desiredReplicas, + CurrentReplicas: sts.Status.CurrentReplicas, + AvailableReplicas: sts.Status.AvailableReplicas, + UnavailableReplicas: sts.Status.CurrentReplicas - sts.Status.AvailableReplicas, + ReadyReplicas: readyReplicas, + }, nil +} diff --git a/pkg/ddc/cache/engine/client.go b/pkg/ddc/cache/engine/client.go index 54b3df0725e..751b5323e60 100644 --- a/pkg/ddc/cache/engine/client.go +++ b/pkg/ddc/cache/engine/client.go @@ -17,8 +17,14 @@ package engine import ( + "context" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" + "github.com/fluid-cloudnative/fluid/pkg/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + "reflect" ) func (e *CacheEngine) SetupClientComponent(clientValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -50,5 +56,48 @@ func (e *CacheEngine) ShouldSetupClient() (bool, error) { } func (e *CacheEngine) SetupClientInternal(clientValue *common.CacheRuntimeComponentValue) error { - return newNotImplementError("SetupClientInternal") + // 1. reconcile to create client workload + manager := component.NewComponentHelper(clientValue.WorkloadType, e.Scheme, e.Client) + err := manager.Reconciler(context.TODO(), clientValue) + if err != nil { + return err + } + + // 2. Update the status of the runtime + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + runtime, err := e.getRuntime() + if err != nil { + return err + } + + clientStatus, err := manager.ConstructComponentStatus(context.TODO(), clientValue) + if err != nil { + return err + } + clientStatus.Phase = datav1alpha1.RuntimePhaseNotReady + + runtimeToUpdate := runtime.DeepCopy() + runtimeToUpdate.Status.Client = clientStatus + if runtime.Status.Client.Phase == datav1alpha1.RuntimePhaseNone && clientStatus.Phase != datav1alpha1.RuntimePhaseNone { + if len(runtimeToUpdate.Status.Conditions) == 0 { + runtimeToUpdate.Status.Conditions = []datav1alpha1.RuntimeCondition{} + } + cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterInitialized, datav1alpha1.RuntimeMasterInitializedReason, + "The client setup finished.", corev1.ConditionTrue) + runtimeToUpdate.Status.Conditions = + utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, + cond) + } + + if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { + return e.Client.Status().Update(context.TODO(), runtimeToUpdate) + } + + return nil + }); err != nil { + e.Log.Error(err, "update runtime status") + return err + } + + return nil } diff --git a/pkg/ddc/cache/engine/cm.go b/pkg/ddc/cache/engine/cm.go index 870413b040e..1fcfeb279dc 100644 --- a/pkg/ddc/cache/engine/cm.go +++ b/pkg/ddc/cache/engine/cm.go @@ -16,8 +16,173 @@ package engine -import "github.com/fluid-cloudnative/fluid/pkg/common" +import ( + "encoding/json" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) -func (e *CacheEngine) createRuntimeValueConfigMap(value *common.CacheRuntimeValue) error { - return newNotImplementError("createRuntimeValueConfigMap") +func (e *CacheEngine) createRuntimeConfigMaps(runtimeClass *datav1alpha1.CacheRuntimeClass) error { + runtime, err := e.getRuntime() + if err != nil { + return err + } + + var True = true + owner := []metav1.OwnerReference{ + { + APIVersion: runtime.APIVersion, + Kind: runtime.Kind, + Name: runtime.Name, + UID: runtime.UID, + Controller: &True, + BlockOwnerDeletion: &True, + }, + } + + // create the config map defined in CacheRuntimeClass ExtraResources, runtimeClass PodTemplate can use these. + err = e.createConfigMapInRuntimeClass(&runtimeClass.ExtraResources, owner) + if err != nil { + return err + } + + // create the config map generated by fluid, automatically mounted in component pods. + err = e.createRuntimeValueConfigMap(runtime, owner) + return err +} + +// create if not exists +func (e *CacheEngine) createConfigMapInRuntimeClass(extraResources *datav1alpha1.RuntimeExtraResources, owner []metav1.OwnerReference) error { + if extraResources.ConfigMaps == nil { + return nil + } + + for _, configMap := range extraResources.ConfigMaps { + cm, err := kubeclient.GetConfigmapByName(e.Client, configMap.Name, e.namespace) + if err != nil { + return err + } + if cm == nil { + err = kubeclient.CreateConfigMapWithOwner(e.Client, configMap.Name, e.namespace, configMap.Data, owner) + if err != nil { + return err + } + e.Log.Info("Create ConfigMap succeed", "name", configMap.Name, "namespace", e.namespace) + } + } + return nil +} + +// create if not exists +func (e *CacheEngine) createRuntimeValueConfigMap(runtime *datav1alpha1.CacheRuntime, owner []metav1.OwnerReference) error { + configMap, err := kubeclient.GetConfigmapByName(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace) + if err != nil { + return err + } + // create if not exists + if configMap != nil { + return nil + } + data, err := e.generateRuntimeConfigData(runtime) + if err != nil { + return errors.Wrap(err, "failed to generate runtime config") + } + + return kubeclient.CreateConfigMapWithOwner(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace, data, owner) +} + +// generateRuntimeConfigData generate the data in the config map for runtime config +func (e *CacheEngine) generateRuntimeConfigData(runtime *datav1alpha1.CacheRuntime) (map[string]string, error) { + dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) + if err != nil { + return nil, err + } + + runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) + if err != nil { + return nil, err + } + + var mounts []common.MountConfig + for _, m := range dataset.Spec.Mounts { + mountCg := common.MountConfig{ + Name: m.Name, + MountPoint: m.MountPoint, + ReadOnly: m.ReadOnly, + Shared: m.Shared, + Path: m.Path, + } + // TODO: 默认的加密项的处理?(挂载的形式到 Master 等 Pod 中?)安全性该如何考虑 + options, err := e.generateDatasetMountOptions(&m, dataset.Spec.SharedEncryptOptions, dataset.Spec.SharedOptions) + if err != nil { + return nil, err + } + mountCg.Options = options + + mounts = append(mounts, mountCg) + } + + config := common.CacheRuntimeConfig{ + Mounts: mounts, + AccessModes: dataset.Spec.AccessModes, + TargetPath: e.getFuseMountPoint(), + } + if len(config.AccessModes) == 0 { + config.AccessModes = []corev1.PersistentVolumeAccessMode{corev1.ReadOnlyMany} + } + + if !runtime.Spec.Master.Disabled { + config.Master = &common.CacheRuntimeComponentConfig{ + Enabled: true, + Name: GetComponentName(e.name, common.ComponentTypeMaster), + Replicas: runtime.Spec.Master.Replicas, + Options: utils.UnionMapsWithOverride( + utils.UnionMapsWithOverride(runtimeClass.Topology.Master.Options, runtime.Spec.Options), runtime.Spec.Master.Options), + } + if runtimeClass.Topology.Master.Service.Headless != nil { + config.Master.Service = common.CacheRuntimeComponentServiceConfig{ + Name: GetComponentServiceName(e.name, common.ComponentTypeMaster), + } + } + } + if !runtime.Spec.Worker.Disabled { + config.Worker = &common.CacheRuntimeComponentConfig{ + Enabled: true, + Name: GetComponentName(e.name, common.ComponentTypeWorker), + Replicas: runtime.Spec.Master.Replicas, + Options: utils.UnionMapsWithOverride( + utils.UnionMapsWithOverride(runtimeClass.Topology.Worker.Options, runtime.Spec.Options), runtime.Spec.Worker.Options), + } + if runtimeClass.Topology.Worker.Service.Headless != nil { + config.Worker.Service = common.CacheRuntimeComponentServiceConfig{ + Name: GetComponentServiceName(e.name, common.ComponentTypeWorker), + } + } + } + if !runtime.Spec.Client.Disabled { + config.Worker = &common.CacheRuntimeComponentConfig{ + Enabled: true, + Name: GetComponentName(e.name, common.ComponentTypeClient), + Replicas: runtime.Spec.Master.Replicas, + Options: utils.UnionMapsWithOverride( + utils.UnionMapsWithOverride(runtimeClass.Topology.Client.Options, runtime.Spec.Options), runtime.Spec.Client.Options), + } + if runtimeClass.Topology.Client.Service.Headless != nil { + config.Client.Service = common.CacheRuntimeComponentServiceConfig{ + Name: GetComponentServiceName(e.name, common.ComponentTypeClient), + } + } + } + + b, _ := json.Marshal(config) + data := map[string]string{ + // key can not be modified, will be mounted as a file using by runtime. + e.getRuntimeConfigFileName(): string(b), + } + return data, nil } diff --git a/pkg/ddc/cache/engine/dataset.go b/pkg/ddc/cache/engine/dataset.go index c270a06fcdb..df2fc43483f 100644 --- a/pkg/ddc/cache/engine/dataset.go +++ b/pkg/ddc/cache/engine/dataset.go @@ -18,9 +18,12 @@ package engine import ( "context" + "fmt" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + securityutil "github.com/fluid-cloudnative/fluid/pkg/utils/security" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" @@ -97,3 +100,56 @@ func (e *CacheEngine) UpdateDatasetStatus(phase datav1alpha1.DatasetPhase) (err return } + +func (e *CacheEngine) generateDatasetMountOptions(m *datav1alpha1.Mount, sharedEncryptOptions []datav1alpha1.EncryptOption, + sharedOptions map[string]string) (map[string]string, error) { + + // initialize mount options + mOptions := map[string]string{} + for k, v := range sharedOptions { + mOptions[k] = v + } + + for key, value := range m.Options { + mOptions[key] = value + } + + // if encryptOptions have the same key with options, it will overwrite the corresponding value + var err error + mOptions, err = e.genEncryptOptions(sharedEncryptOptions, mOptions, m.Name) + if err != nil { + return mOptions, err + } + + // gen public encryptOptions + mOptions, err = e.genEncryptOptions(m.EncryptOptions, mOptions, m.Name) + if err != nil { + return mOptions, err + } + + return mOptions, nil +} + +func (e *CacheEngine) genEncryptOptions(EncryptOptions []datav1alpha1.EncryptOption, mOptions map[string]string, name string) (map[string]string, error) { + for _, item := range EncryptOptions { + if _, ok := mOptions[item.Name]; ok { + err := fmt.Errorf("the option %s is set more than one times, please double check the dataset's option and encryptOptions", item.Name) + return mOptions, err + } + + securityutil.UpdateSensitiveKey(item.Name) + sRef := item.ValueFrom.SecretKeyRef + secret, err := kubeclient.GetSecret(e.Client, sRef.Name, e.namespace) + if err != nil { + e.Log.Error(err, "get secret by mount encrypt options failed", "name", item.Name) + return mOptions, err + } + + e.Log.Info("get value from secret", "mount name", name, "secret key", sRef.Key) + + v := secret.Data[sRef.Key] + mOptions[item.Name] = string(v) + } + + return mOptions, nil +} diff --git a/pkg/ddc/cache/engine/engine.go b/pkg/ddc/cache/engine/engine.go index 700e25f5cf8..26302a07034 100644 --- a/pkg/ddc/cache/engine/engine.go +++ b/pkg/ddc/cache/engine/engine.go @@ -60,6 +60,10 @@ type CacheEngine struct { runtimeType string engineImpl string + + // TODO(cache runtime): use narrowed interface, and as a part of RuntimeInfoInterface. + // always use getRuntimeInfo() method instead of use this directly. + runtimeInfo base.RuntimeInfoInterface } // ID returns the id of the engine diff --git a/pkg/ddc/cache/engine/fileutils.go b/pkg/ddc/cache/engine/fileutils.go new file mode 100644 index 00000000000..758e600616a --- /dev/null +++ b/pkg/ddc/cache/engine/fileutils.go @@ -0,0 +1,69 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + securityutils "github.com/fluid-cloudnative/fluid/pkg/utils/security" + "github.com/go-logr/logr" + "github.com/pkg/errors" + "time" +) + +type CacheFileUtils struct { + podName string + namespace string + container string + log logr.Logger +} + +func newCacheFileUtils(podName string, containerName string, namespace string, log logr.Logger) CacheFileUtils { + + return CacheFileUtils{ + podName: podName, + namespace: namespace, + container: containerName, + log: log, + } +} + +// exec with timeout +func (c CacheFileUtils) exec(command []string, timeout time.Duration) (stdout string, stderr string, err error) { + // redact sensitive info in command for printing + redactedCommand := securityutils.FilterCommand(command) + + c.log.V(1).Info("Exec command start", "command", redactedCommand) + stdout, stderr, err = kubeclient.ExecCommandInContainerWithTimeout(c.podName, c.container, c.namespace, command, timeout) + if err != nil { + err = errors.Wrapf(err, "error when executing command %v", redactedCommand) + return + } + c.log.V(1).Info("Exec command finished", "command", redactedCommand) + + return +} + +func (c CacheFileUtils) Mount(command []string, timeout time.Duration) (err error) { + stdout, stderr, err := c.exec(command, timeout) + + if err != nil { + c.log.Error(err, "CacheFileUtils.Mount() failed", "stdout", stdout, "stderr", stderr) + return + } + + return nil +} diff --git a/pkg/ddc/cache/engine/master.go b/pkg/ddc/cache/engine/master.go index d29ea8f46c3..6c1996d72e2 100644 --- a/pkg/ddc/cache/engine/master.go +++ b/pkg/ddc/cache/engine/master.go @@ -17,8 +17,15 @@ package engine import ( + "context" + "errors" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" + "github.com/fluid-cloudnative/fluid/pkg/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + "reflect" ) func (e *CacheEngine) SetupMasterComponent(masterValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -50,5 +57,63 @@ func (e *CacheEngine) shouldSetupMaster() (bool, error) { } func (e *CacheEngine) setupMasterInternal(masterValue *common.CacheRuntimeComponentValue) error { - return newNotImplementError("setupMasterInternal") + manager := component.NewComponentHelper(masterValue.WorkloadType, e.Scheme, e.Client) + err := manager.Reconciler(context.TODO(), masterValue) + if err != nil { + return err + } + + // update status of master + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + runtime, err := e.getRuntime() + if err != nil { + return err + } + + masterStatus, err := manager.ConstructComponentStatus(context.TODO(), masterValue) + if err != nil { + return err + } + // from RuntimePhaseNone to RuntimePhaseNotReady, not reconcile the master component the next time. + masterStatus.Phase = datav1alpha1.RuntimePhaseNotReady + + runtimeToUpdate := runtime.DeepCopy() + runtimeToUpdate.Status.Master = masterStatus + + // TODO(cache runtime): figure out how to use this selector + // runtimeToUpdate.Status.Selector = e.getWorkerSelectors() + + if len(runtimeToUpdate.Status.Conditions) == 0 { + runtimeToUpdate.Status.Conditions = []datav1alpha1.RuntimeCondition{} + } + cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeMasterInitialized, datav1alpha1.RuntimeMasterInitializedReason, + "The master is initialized.", corev1.ConditionTrue) + runtimeToUpdate.Status.Conditions = + utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, + cond) + + if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { + return e.Client.Status().Update(context.TODO(), runtimeToUpdate) + } + + return nil + }) + if err != nil { + e.Log.Error(err, "failed to update runtime status") + return err + } + + return nil +} + +func (e *CacheEngine) getMasterPodInfo(value *common.CacheRuntimeValue) (podName string, containerName string, err error) { + // pod name is auto generated + podName = GetComponentName(e.name, common.ComponentTypeMaster) + "-0" + // container name, use the first container name + if value.Master == nil || len(value.Master.PodTemplateSpec.Spec.Containers) == 0 { + return "", "", errors.New("no container in master pod template") + } + containerName = value.Master.PodTemplateSpec.Spec.Containers[0].Name + + return } diff --git a/pkg/ddc/cache/engine/runtime.go b/pkg/ddc/cache/engine/runtime.go index 74fe1619ead..64dc55f1abd 100644 --- a/pkg/ddc/cache/engine/runtime.go +++ b/pkg/ddc/cache/engine/runtime.go @@ -18,9 +18,11 @@ package engine import ( "context" - "k8s.io/apimachinery/pkg/types" - datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/ddc/base" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/testutil" + "k8s.io/apimachinery/pkg/types" ) // getRuntime get the current runtime @@ -37,3 +39,75 @@ func (e *CacheEngine) getRuntime() (*datav1alpha1.CacheRuntime, error) { return &runtime, nil } + +func (e *CacheEngine) getRuntimeClass(runtimeClassName string) (*datav1alpha1.CacheRuntimeClass, error) { + key := types.NamespacedName{ + Name: runtimeClassName, + } + var runtimeClass datav1alpha1.CacheRuntimeClass + if err := e.Get(context.TODO(), key, &runtimeClass); err != nil { + return nil, err + } + + return &runtimeClass, nil +} + +// getRuntimeInfo get the runtime info, may be called before dataset bound, so can not use base.GetRuntimeInfo but has +// the same processing logic. +func (e *CacheEngine) getRuntimeInfo() (base.RuntimeInfoInterface, error) { + if e.runtimeInfo == nil { + runtime, err := e.getRuntime() + if err != nil { + return e.runtimeInfo, err + } + opts := []base.RuntimeInfoOption{ + // TODO(cache runtime): useless code? + base.WithTieredStore(datav1alpha1.TieredStore{}), + // below used for create volume + base.WithMetadataList(base.GetMetadataListFromAnnotation(runtime)), + base.WithAnnotations(runtime.Annotations), + } + e.runtimeInfo, err = base.BuildRuntimeInfo(e.name, e.namespace, e.runtimeType, opts...) + if err != nil { + return e.runtimeInfo, err + } + + // Setup Fuse Deploy Mode + e.runtimeInfo.SetFuseNodeSelector(runtime.Spec.Client.NodeSelector) + } + + if testutil.IsUnitTest() { + return e.runtimeInfo, nil + } + + // Handling information of bound dataset. XXXEngine.getRuntimeInfo() might be called before the runtime is bound to a dataset, + // so here we must lazily set dataset-related information once we found there's one bound dataset. + if len(e.runtimeInfo.GetOwnerDatasetUID()) == 0 { + runtime, err := e.getRuntime() + if err != nil { + return e.runtimeInfo, err + } + + uid, err := base.GetOwnerDatasetUIDFromRuntimeMeta(runtime.ObjectMeta) + if err != nil { + return nil, err + } + + if len(uid) > 0 { + e.runtimeInfo.SetOwnerDatasetUID(uid) + } + } + + if !e.runtimeInfo.IsPlacementModeSet() { + dataset, err := utils.GetDataset(e.Client, e.name, e.namespace) + if utils.IgnoreNotFound(err) != nil { + return nil, err + } + + if dataset != nil { + e.runtimeInfo.SetupWithDataset(dataset) + } + } + + return e.runtimeInfo, nil +} diff --git a/pkg/ddc/cache/engine/setup.go b/pkg/ddc/cache/engine/setup.go index cf0789a5e60..eb45ee32420 100644 --- a/pkg/ddc/cache/engine/setup.go +++ b/pkg/ddc/cache/engine/setup.go @@ -34,7 +34,7 @@ func (e *CacheEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, e dataset := ctx.Dataset runtime := ctx.Runtime.(*datav1alpha1.CacheRuntime) - runtimeClass, err := utils.GetCacheRuntimeClass(ctx.Client, runtime.Spec.RuntimeClassName) + runtimeClass, err := e.getRuntimeClass(runtime.Spec.RuntimeClassName) if err != nil { return false, errors.Wrapf(err, "failed to get CacheRuntimeClass %s", runtime.Spec.RuntimeClassName) } @@ -45,7 +45,7 @@ func (e *CacheEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, e } // create runtime value configmap for runtime mount - err = e.createRuntimeValueConfigMap(runtimeValue) + err = e.createRuntimeConfigMaps(runtimeClass) if err != nil { return false, err } @@ -78,8 +78,8 @@ func (e *CacheEngine) Setup(ctx cruntime.ReconcileRequestContext) (ready bool, e // dataset mount if runtimeValue.Master.Enabled { - // TODO(cache runtime): only master-slave architecture should execute ufs mount? - err = e.PrepareUFS(runtimeValue) + // currently only support mount ufs for master + err = e.PrepareUFS(runtimeClass.Topology.Master.ExecutionEntries, runtimeValue) if err != nil { return false, err } diff --git a/pkg/ddc/cache/engine/shutdown.go b/pkg/ddc/cache/engine/shutdown.go index c0a644e4563..e75324bf1eb 100644 --- a/pkg/ddc/cache/engine/shutdown.go +++ b/pkg/ddc/cache/engine/shutdown.go @@ -16,7 +16,23 @@ package engine +import ( + "github.com/fluid-cloudnative/fluid/pkg/ctrl" +) + // Shutdown and clean up the engine func (e *CacheEngine) Shutdown() (err error) { - return newNotImplementError("Shutdown") + info, err := e.getRuntimeInfo() + if err != nil { + return err + } + helper := ctrl.BuildHelper(info, e.Client, e.Log) + count, err := helper.CleanUpFuse() + if err != nil { + e.Log.Error(err, "Err in cleaning fuse") + return err + } + e.Log.Info("clean up fuse count", "n", count) + + return nil } diff --git a/pkg/ddc/cache/engine/status.go b/pkg/ddc/cache/engine/status.go index 2e650b08ad6..d745a20cb01 100644 --- a/pkg/ddc/cache/engine/status.go +++ b/pkg/ddc/cache/engine/status.go @@ -21,6 +21,7 @@ import ( "fmt" fluidapi "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" "github.com/fluid-cloudnative/fluid/pkg/utils" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" @@ -28,14 +29,66 @@ import ( "time" ) -func (e *CacheEngine) setMasterComponentStatus(status *fluidapi.RuntimeComponentStatus) (ready bool, err error) { - return true, newNotImplementError("setMasterComponentStatus") +func (e *CacheEngine) setMasterComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + + masterStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) + if err != nil { + return false, err + } + if masterStatus.ReadyReplicas == masterStatus.DesiredReplicas { + masterStatus.Phase = fluidapi.RuntimePhaseReady + ready = true + } else { + masterStatus.Phase = fluidapi.RuntimePhaseNotReady + } + status.Master = masterStatus + + return ready, err } -func (e *CacheEngine) setWorkerComponentStatus(status *fluidapi.RuntimeComponentStatus) (ready bool, err error) { - return true, newNotImplementError("setWorkerComponentStatus") +func (e *CacheEngine) setWorkerComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (ready bool, err error) { + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + + workerStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) + if err != nil { + return false, err + } + + if workerStatus.DesiredReplicas == 0 { + workerStatus.Phase = fluidapi.RuntimePhaseReady + ready = true + } else if workerStatus.ReadyReplicas > 0 { + if workerStatus.DesiredReplicas == workerStatus.ReadyReplicas { + workerStatus.Phase = fluidapi.RuntimePhaseReady + ready = true + } else if workerStatus.ReadyReplicas >= 1 { + workerStatus.Phase = fluidapi.RuntimePhasePartialReady + ready = true + } + } else { + workerStatus.Phase = fluidapi.RuntimePhaseNotReady + } + status.Worker = workerStatus + + return ready, err } -func (e *CacheEngine) setClientComponentStatus(status *fluidapi.RuntimeComponentStatus) (err error) { - return newNotImplementError("setClientComponentStatus") +func (e *CacheEngine) setClientComponentStatus(componentValue *common.CacheRuntimeComponentValue, status *fluidapi.CacheRuntimeStatus) (err error) { + manager := component.NewComponentHelper(componentValue.WorkloadType, e.Scheme, e.Client) + + clientStatus, err := manager.ConstructComponentStatus(context.TODO(), componentValue) + if err != nil { + return err + } + if clientStatus.DesiredReplicas > 0 { + if clientStatus.DesiredReplicas == clientStatus.ReadyReplicas { + clientStatus.Phase = fluidapi.RuntimePhaseReady + } else if clientStatus.ReadyReplicas >= 1 { + clientStatus.Phase = fluidapi.RuntimePhasePartialReady + } + } + status.Client = clientStatus + + return nil } func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValue) (bool, error) { var masterReady, workerReady, runtimeReady = true, true, false @@ -48,21 +101,21 @@ func (e *CacheEngine) CheckAndUpdateRuntimeStatus(value *common.CacheRuntimeValu runtimeToUpdate := runtime.DeepCopy() if value.Master.Enabled { - masterReady, err = e.setMasterComponentStatus(&runtimeToUpdate.Status.Master) + masterReady, err = e.setMasterComponentStatus(value.Master, &runtimeToUpdate.Status) if err != nil { return err } } if value.Worker.Enabled { - workerReady, err = e.setWorkerComponentStatus(&runtimeToUpdate.Status.Worker) + workerReady, err = e.setWorkerComponentStatus(value.Worker, &runtimeToUpdate.Status) if err != nil { return err } } if value.Client.Enabled { - err = e.setClientComponentStatus(&runtimeToUpdate.Status.Client) + err = e.setClientComponentStatus(value.Client, &runtimeToUpdate.Status) if err != nil { return err } diff --git a/pkg/ddc/cache/engine/sync.go b/pkg/ddc/cache/engine/sync.go index b0924ee0f4b..dd2d16ca2e6 100644 --- a/pkg/ddc/cache/engine/sync.go +++ b/pkg/ddc/cache/engine/sync.go @@ -17,25 +17,80 @@ package engine import ( + "context" + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" + "reflect" "time" ) func (e *CacheEngine) Sync(ctx cruntime.ReconcileRequestContext) (err error) { - // sync runtime status + // sync the runtime value configmap + runtime, err := e.getRuntime() + if err != nil { + return err + } + + err = e.syncRuntimeValueConfigMap(runtime) + if err != nil { + return err + } + + // TODO: implement other logic // handle ufs change + // sync runtime status + // handle runtime spec change // sync metadata // SyncScheduleInfoToCacheNodes - return newNotImplementError("Sync") + return nil } +func (e *CacheEngine) syncRuntimeValueConfigMap(runtime *datav1alpha1.CacheRuntime) error { + configMap, err := kubeclient.GetConfigmapByName(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace) + if err != nil { + return err + } + data, err := e.generateRuntimeConfigData(runtime) + if err != nil { + return err + } + + var True = true + owner := []metav1.OwnerReference{ + { + APIVersion: runtime.APIVersion, + Kind: runtime.Kind, + Name: runtime.Name, + UID: runtime.UID, + Controller: &True, + BlockOwnerDeletion: &True, + }, + } + + if configMap == nil { + return kubeclient.CreateConfigMapWithOwner(e.Client, e.getRuntimeConfigConfigMapName(), e.namespace, data, owner) + } + + configMapToUpdate := configMap.DeepCopy() + configMapToUpdate.Data = data + if !reflect.DeepEqual(configMapToUpdate, configMap) { + err = e.Client.Update(context.TODO(), configMapToUpdate) + if err != nil { + return err + } + } + + return err +} func getSyncRetryDuration() (d *time.Duration, err error) { if value, existed := os.LookupEnv(syncRetryDurationEnv); existed { duration, err := time.ParseDuration(value) diff --git a/pkg/ddc/cache/engine/transform.go b/pkg/ddc/cache/engine/transform.go index 9fb967181f5..7fce3f42702 100644 --- a/pkg/ddc/cache/engine/transform.go +++ b/pkg/ddc/cache/engine/transform.go @@ -17,10 +17,194 @@ package engine import ( + "errors" + "fmt" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/transformer" + corev1 "k8s.io/api/core/v1" + "time" ) +// CacheRuntimeComponentCommonConfig common config for transform +type CacheRuntimeComponentCommonConfig struct { + Owner *common.OwnerReference + + // TODO: add ImagePullSecrets, NodeSelector, Tolerations, Envs, PlacementMode etc. + + // configmaps mounted by all component pods + RuntimeConfigs *RuntimeConfigVolumeConfig +} + +type TargetPathVolumeConfig struct { + TargetPathHostVolume corev1.Volume + TargetPathVolumeMount corev1.VolumeMount +} + +type RuntimeConfigVolumeConfig struct { + // runtime config's config map defined by fluid + RuntimeConfigVolume corev1.Volume + RuntimeConfigVolumeMount corev1.VolumeMount + // config map names defined in ClassRuntimeClass + ExtraConfigMapNames map[string]bool +} + func (e *CacheEngine) transform(dataset *datav1alpha1.Dataset, runtime *datav1alpha1.CacheRuntime, runtimeClass *datav1alpha1.CacheRuntimeClass) (*common.CacheRuntimeValue, error) { - return nil, newNotImplementError("transform") + + if runtimeClass.Topology.Master == nil && runtimeClass.Topology.Worker == nil && runtimeClass.Topology.Client == nil { + return nil, fmt.Errorf("at least one component should be defined in runtimeClass") + } + defer utils.TimeTrack(time.Now(), "CacheRuntime.transform", "name", runtime.Name) + + runtimeValue := &common.CacheRuntimeValue{ + RuntimeIdentity: common.RuntimeIdentity{ + Namespace: runtime.Namespace, + Name: runtime.Name, + }, + } + + // get common config for transform components + runtimeCommonConfig, err := e.transformComponentCommonConfig(runtime, runtimeClass) + if err != nil { + return nil, err + } + + // transform the master/worker/client + err = e.transformMaster(runtime, runtimeClass, runtimeCommonConfig, runtimeValue) + if err != nil { + return nil, err + } + err = e.transformWorker(runtime, runtimeClass, runtimeCommonConfig, runtimeValue) + if err != nil { + return nil, err + } + err = e.transformClient(runtime, runtimeClass, runtimeCommonConfig, runtimeValue) + if err != nil { + return nil, err + } + + return runtimeValue, nil +} + +func (e *CacheEngine) transformComponentCommonConfig(runtime *datav1alpha1.CacheRuntime, runtimeClass *datav1alpha1.CacheRuntimeClass) (*CacheRuntimeComponentCommonConfig, error) { + config := &CacheRuntimeComponentCommonConfig{ + Owner: transformer.GenerateOwnerReferenceFromObject(runtime), + } + e.transformRuntimeConfigVolume(config, runtimeClass) + + return config, nil +} + +func (e *CacheEngine) transformRuntimeConfigVolume(config *CacheRuntimeComponentCommonConfig, runtimeClass *datav1alpha1.CacheRuntimeClass) { + // create the runtime config mount info + volumeName := e.getRuntimeConfigVolumeName() + config.RuntimeConfigs = &RuntimeConfigVolumeConfig{ + RuntimeConfigVolume: corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: e.getRuntimeConfigConfigMapName(), + }, + }, + }, + }, + RuntimeConfigVolumeMount: corev1.VolumeMount{ + Name: volumeName, + MountPath: e.getRuntimeConfigDir(), + ReadOnly: true, + }, + } + + if len(runtimeClass.ExtraResources.ConfigMaps) == 0 { + return + } + config.RuntimeConfigs.ExtraConfigMapNames = map[string]bool{} + // TODO: 当前,这些 configmap 当前需要 component 中定义使用,是否对于所有 component 是通用的? + for _, cm := range runtimeClass.ExtraResources.ConfigMaps { + config.RuntimeConfigs.ExtraConfigMapNames[cm.Name] = true + } +} + +func (e *CacheEngine) addCommonConfigForComponent(commonConfig *CacheRuntimeComponentCommonConfig, componentValue *common.CacheRuntimeComponentValue, + componentDefinition *datav1alpha1.RuntimeComponentDefinition) error { + componentValue.PodTemplateSpec.Spec.Volumes = append(componentValue.PodTemplateSpec.Spec.Volumes, commonConfig.RuntimeConfigs.RuntimeConfigVolume) + + // assume the first container uses the runtime config + componentValue.PodTemplateSpec.Spec.InitContainers[0].VolumeMounts = append(componentValue.PodTemplateSpec.Spec.InitContainers[0].VolumeMounts, commonConfig.RuntimeConfigs.RuntimeConfigVolumeMount) + componentValue.PodTemplateSpec.Spec.Containers[0].VolumeMounts = append(componentValue.PodTemplateSpec.Spec.Containers[0].VolumeMounts, commonConfig.RuntimeConfigs.RuntimeConfigVolumeMount) + + // other config maps defined in CacheRuntimeClass + if componentDefinition.Dependencies.ExtraResources == nil { + return nil + } + names := commonConfig.RuntimeConfigs.ExtraConfigMapNames + for _, cm := range componentDefinition.Dependencies.ExtraResources.ConfigMaps { + if names[cm.Name] == false { + e.Log.Error(errors.New("component has undefined config map extra resource"), "type", componentValue.ComponentType, "configMapName", cm.Name) + } + componentValue.PodTemplateSpec.Spec.Volumes = append(componentValue.PodTemplateSpec.Spec.Volumes, corev1.Volume{ + Name: e.getRuntimeClassExtraConfigMapVolumeName(cm.Name), + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: cm.Name, + }, + }, + }, + }) + if len(componentValue.PodTemplateSpec.Spec.InitContainers) > 0 { + componentValue.PodTemplateSpec.Spec.InitContainers[0].VolumeMounts = append(componentValue.PodTemplateSpec.Spec.InitContainers[0].VolumeMounts, + corev1.VolumeMount{ + Name: e.getRuntimeClassExtraConfigMapVolumeName(cm.Name), + MountPath: cm.MountPath, + ReadOnly: true, + }) + } + componentValue.PodTemplateSpec.Spec.Containers[0].VolumeMounts = append(componentValue.PodTemplateSpec.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: e.getRuntimeClassExtraConfigMapVolumeName(cm.Name), + MountPath: cm.MountPath, + ReadOnly: true, + }) + } + + // add envs + serviceName := "" + if componentValue.Service != nil { + serviceName = componentValue.Service.Name + } + addEnvs := []corev1.EnvVar{ + { + Name: "FLUID_DATASET_NAME", + Value: e.name, + }, + { + Name: "FLUID_DATASET_NAMESPACE", + Value: e.namespace, + }, + { + Name: "FLUID_RUNTIME_CONFIG_PATH", + Value: e.getRuntimeConfigPath(), + }, + { + Name: "FLUID_RUNTIME_MOUNT_PATH", + Value: e.getFuseMountPoint(), + }, + { + Name: "FLUID_RUNTIME_COMPONENT_TYPE", + Value: string(componentValue.ComponentType), + }, + { + // curvine master sets the CURVINE_MASTER_HOSTNAME with service name + Name: "FLUID_RUNTIME_COMPONENT_SVC_NAME", + Value: serviceName, + }, + } + // inject envs should come first. + componentValue.PodTemplateSpec.Spec.Containers[0].Env = append(addEnvs, componentValue.PodTemplateSpec.Spec.Containers[0].Env...) + if len(componentValue.PodTemplateSpec.Spec.InitContainers) > 0 { + componentValue.PodTemplateSpec.Spec.InitContainers[0].Env = append(addEnvs, componentValue.PodTemplateSpec.Spec.InitContainers[0].Env...) + } + return nil } diff --git a/pkg/ddc/cache/engine/transform_client.go b/pkg/ddc/cache/engine/transform_client.go new file mode 100644 index 00000000000..c98320cbc90 --- /dev/null +++ b/pkg/ddc/cache/engine/transform_client.go @@ -0,0 +1,95 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + corev1 "k8s.io/api/core/v1" +) + +func (e *CacheEngine) transformClient(runtime *datav1alpha1.CacheRuntime, runtimeClass *datav1alpha1.CacheRuntimeClass, + config *CacheRuntimeComponentCommonConfig, value *common.CacheRuntimeValue) error { + + if runtimeClass.Topology.Client == nil || runtime.Spec.Client.Disabled { + value.Client.Enabled = false + return nil + } + + component := runtimeClass.Topology.Client + value.Client = &common.CacheRuntimeComponentValue{ + Name: GetComponentName(e.name, common.ComponentTypeClient), + Namespace: e.namespace, + Enabled: true, + ComponentType: common.ComponentTypeClient, + WorkloadType: component.WorkloadType, + PodTemplateSpec: component.Template, + Owner: config.Owner, + Replicas: 1, + } + if runtimeClass.Topology.Client.Service.Headless != nil { + value.Client.Service = &common.CacheRuntimeComponentServiceConfig{ + Name: GetComponentServiceName(e.name, common.ComponentTypeClient), + } + } + + err := e.addCommonConfigForComponent(config, value.Client, component) + if err != nil { + return err + } + + podTemplateSpec := &value.Client.PodTemplateSpec + + // TODO: transform runtime.Spec.Client, runtimeClass.Topology.Client, dataset.Spec into PodTemplateSpec + + runtimeInfo, err := e.getRuntimeInfo() + if err != nil { + return err + } + + // fuse label, keep the same key/value with CSI node server + if podTemplateSpec.Spec.NodeSelector == nil { + podTemplateSpec.Spec.NodeSelector = map[string]string{} + } + podTemplateSpec.Spec.NodeSelector[runtimeInfo.GetFuseLabelName()] = "true" + + // fuse volume mount + e.transformFuseMountPointVolumes(podTemplateSpec) + + return nil +} +func (e *CacheEngine) transformFuseMountPointVolumes(podTemplate *corev1.PodTemplateSpec) { + volumeName := e.getFuseMountPointVolumeName() + targetPath := e.getFuseMountPoint() + hostPathDirectoryOrCreate := corev1.HostPathDirectoryOrCreate + mountPropagation := corev1.MountPropagationBidirectional + + podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: targetPath, + Type: &hostPathDirectoryOrCreate, + }, + }, + }) + podTemplate.Spec.Containers[0].VolumeMounts = append(podTemplate.Spec.Containers[0].VolumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: targetPath, + MountPropagation: &mountPropagation, + }) +} diff --git a/pkg/ddc/cache/engine/transform_master.go b/pkg/ddc/cache/engine/transform_master.go new file mode 100644 index 00000000000..04112d4f7de --- /dev/null +++ b/pkg/ddc/cache/engine/transform_master.go @@ -0,0 +1,57 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" +) + +func (e *CacheEngine) transformMaster(runtime *datav1alpha1.CacheRuntime, runtimeClass *datav1alpha1.CacheRuntimeClass, + config *CacheRuntimeComponentCommonConfig, value *common.CacheRuntimeValue) error { + // TODO: these two field both indicate Master enabled or not, should be combined into one field. + if runtimeClass.Topology.Master == nil || runtime.Spec.Master.Disabled { + value.Master.Enabled = false + return nil + } + + component := runtimeClass.Topology.Master + value.Master = &common.CacheRuntimeComponentValue{ + Name: GetComponentName(e.name, common.ComponentTypeMaster), + Namespace: e.namespace, + Enabled: true, + ComponentType: common.ComponentTypeMaster, + WorkloadType: component.WorkloadType, + PodTemplateSpec: component.Template, + Owner: config.Owner, + Replicas: runtime.Spec.Master.Replicas, + } + if runtimeClass.Topology.Master.Service.Headless != nil { + value.Master.Service = &common.CacheRuntimeComponentServiceConfig{ + Name: GetComponentServiceName(e.name, common.ComponentTypeMaster), + } + } + + err := e.addCommonConfigForComponent(config, value.Master, component) + if err != nil { + return err + } + + // TODO: transform runtime.Spec.Master, runtimeClass.Topology.Master, dataset.Spec into PodTemplateSpec + + return nil +} diff --git a/pkg/ddc/cache/engine/transform_worker.go b/pkg/ddc/cache/engine/transform_worker.go new file mode 100644 index 00000000000..6671c7949fc --- /dev/null +++ b/pkg/ddc/cache/engine/transform_worker.go @@ -0,0 +1,57 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" +) + +func (e *CacheEngine) transformWorker(runtime *datav1alpha1.CacheRuntime, runtimeClass *datav1alpha1.CacheRuntimeClass, + config *CacheRuntimeComponentCommonConfig, value *common.CacheRuntimeValue) error { + + if runtimeClass.Topology.Worker == nil || runtime.Spec.Worker.Disabled { + value.Worker.Enabled = false + return nil + } + + component := runtimeClass.Topology.Worker + value.Worker = &common.CacheRuntimeComponentValue{ + Name: GetComponentName(e.name, common.ComponentTypeWorker), + Namespace: e.namespace, + Enabled: true, + ComponentType: common.ComponentTypeWorker, + WorkloadType: component.WorkloadType, + PodTemplateSpec: component.Template, + Owner: config.Owner, + Replicas: runtime.Spec.Worker.Replicas, + } + if runtimeClass.Topology.Worker.Service.Headless != nil { + value.Worker.Service = &common.CacheRuntimeComponentServiceConfig{ + Name: GetComponentServiceName(e.name, common.ComponentTypeWorker), + } + } + + err := e.addCommonConfigForComponent(config, value.Worker, component) + if err != nil { + return err + } + + // TODO: transform runtime.Spec.Worker, runtimeClass.Topology.Worker, dataset.Spec into PodTemplateSpec + + return nil +} diff --git a/pkg/ddc/cache/engine/ufs.go b/pkg/ddc/cache/engine/ufs.go index 8aaf5d1d327..5486a8778c0 100644 --- a/pkg/ddc/cache/engine/ufs.go +++ b/pkg/ddc/cache/engine/ufs.go @@ -16,10 +16,28 @@ package engine -import "github.com/fluid-cloudnative/fluid/pkg/common" +import ( + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "time" +) -func (e *CacheEngine) PrepareUFS(value *common.CacheRuntimeValue) error { +func (e *CacheEngine) PrepareUFS(entries *datav1alpha1.ExecutionEntries, value *common.CacheRuntimeValue) error { // execute mount command in master pod + mountUfs := entries.MountUFS + if mountUfs == nil { + return nil + } + podName, containerName, err := e.getMasterPodInfo(value) + if err != nil { + return err + } - return newNotImplementError("PrepareUFS") + fileUtils := newCacheFileUtils(podName, containerName, e.namespace, e.Log) + err = fileUtils.Mount(mountUfs.Command, time.Duration(mountUfs.Timeout)*time.Second) + if err != nil { + return err + } + + return nil } diff --git a/pkg/ddc/cache/engine/util.go b/pkg/ddc/cache/engine/util.go new file mode 100644 index 00000000000..0d31df41a7b --- /dev/null +++ b/pkg/ddc/cache/engine/util.go @@ -0,0 +1,85 @@ +/* + Copyright 2026 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package engine + +import ( + "fmt" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" +) + +// GetComponentName gets the component name using runtime name and component type. +func GetComponentName(runtimeName string, componentType common.ComponentType) string { + return fmt.Sprintf("%s-%s", runtimeName, componentType) +} + +// GetComponentServiceName gets the component service name. +func GetComponentServiceName(runtimeName string, componentType common.ComponentType) string { + return fmt.Sprintf("svc-%s-%s", runtimeName, componentType) +} + +// getFuseMountPointVolumeName get the volume name of mount path in fuse pod (e.g. /runtime-mnt). +func (e *CacheEngine) getFuseMountPointVolumeName() string { + return fmt.Sprintf("fluid-cache-runtime-shared-path") +} + +func (e *CacheEngine) getFuseMountPoint() string { + mountRoot, err := utils.GetMountRoot() + if err != nil { + mountRoot = "/" + common.CacheRuntime + } else { + mountRoot = mountRoot + "/" + common.CacheRuntime + } + + e.Log.Info("mountRoot", "path", mountRoot) + return fmt.Sprintf("%s/%s/%s/cache-fuse", mountRoot, e.namespace, e.name) +} + +// getRuntimeEncryptOptionPath get the mounted path of encrypt options for runtime pod. +func (e *CacheEngine) getRuntimeEncryptOptionPath(secretName string) string { + return fmt.Sprintf("/etc/fluid/secrets/%s", secretName) +} + +// getRuntimeEncryptVolumeName get the volume name of getRuntimeEncryptOptionPath. +func (e *CacheEngine) getRuntimeEncryptVolumeName(secretName string) string { + return fmt.Sprintf("fluid-runtime-secret-%s", secretName) +} + +// getRuntimeConfigConfigMapName get the configmap name of the runtime config. +func (e *CacheEngine) getRuntimeConfigConfigMapName() string { + return fmt.Sprintf("fluid-runtime-config-%s", e.name) +} +func (e *CacheEngine) getRuntimeConfigVolumeName() (targetPath string) { + return fmt.Sprintf("fluid-runtime-%s-config", e.name) +} + +// getRuntimeConfigDir defines the mount directory of runtime config in the pod. +func (e *CacheEngine) getRuntimeConfigDir() string { + return fmt.Sprintf("/etc/fluid/config") +} + +// getRuntimeConfigPath defines the mount path of runtime config in the pod. +func (e *CacheEngine) getRuntimeConfigPath() string { + return fmt.Sprintf("%s/%s", e.getRuntimeConfigDir(), e.getRuntimeConfigFileName()) +} +func (e *CacheEngine) getRuntimeConfigFileName() string { + return "runtime.json" +} + +func (e *CacheEngine) getRuntimeClassExtraConfigMapVolumeName(name string) string { + return fmt.Sprintf("fluid-extra-%s-%s", e.name, name) +} diff --git a/pkg/ddc/cache/engine/volume.go b/pkg/ddc/cache/engine/volume.go index 6164264f32b..928bed471c7 100644 --- a/pkg/ddc/cache/engine/volume.go +++ b/pkg/ddc/cache/engine/volume.go @@ -1,5 +1,5 @@ /* - Copyright 2022 The Fluid Authors. + Copyright 2026 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,6 +16,11 @@ package engine +import ( + "github.com/fluid-cloudnative/fluid/pkg/common" + volumeHelper "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/volume" +) + func (e *CacheEngine) CreateVolume() (err error) { if err = e.createFusePersistentVolume(); err != nil { return err @@ -40,17 +45,41 @@ func (e *CacheEngine) DeleteVolume() (err error) { } func (e *CacheEngine) createFusePersistentVolume() error { - return newNotImplementError("createFusePersistentVolume") + runtimeInfo, err := e.getRuntimeInfo() + if err != nil { + return err + } + + return volumeHelper.CreatePersistentVolumeForRuntime(e.Client, + runtimeInfo, + e.getFuseMountPoint(), + common.CacheRuntime, + e.Log) } func (e *CacheEngine) createFusePersistentVolumeClaim() error { - return newNotImplementError("createFusePersistentVolumeClaim") + runtimeInfo, err := e.getRuntimeInfo() + if err != nil { + return err + } + + return volumeHelper.CreatePersistentVolumeClaimForRuntime(e.Client, runtimeInfo, e.Log) } func (e *CacheEngine) deleteFusePersistentVolume() error { - return newNotImplementError("deleteFusePersistentVolume") + runtimeInfo, err := e.getRuntimeInfo() + if err != nil { + return err + } + + return volumeHelper.DeleteFusePersistentVolume(e.Client, runtimeInfo, e.Log) } func (e *CacheEngine) deleteFusePersistentVolumeClaim() error { - return newNotImplementError("deleteFusePersistentVolumeClaim") + runtimeInfo, err := e.getRuntimeInfo() + if err != nil { + return err + } + + return volumeHelper.DeleteFusePersistentVolumeClaim(e.Client, runtimeInfo, e.Log) } diff --git a/pkg/ddc/cache/engine/worker.go b/pkg/ddc/cache/engine/worker.go index dd228cb778e..f83da79ea45 100644 --- a/pkg/ddc/cache/engine/worker.go +++ b/pkg/ddc/cache/engine/worker.go @@ -17,8 +17,14 @@ package engine import ( + "context" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/ddc/cache/component" + "github.com/fluid-cloudnative/fluid/pkg/utils" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" + "reflect" ) func (e *CacheEngine) SetupWorkerComponent(workerValue *common.CacheRuntimeComponentValue) (bool, error) { @@ -49,5 +55,50 @@ func (e *CacheEngine) ShouldSetupWorker() (bool, error) { } func (e *CacheEngine) SetupWorkerInternal(workerValue *common.CacheRuntimeComponentValue) error { - return newNotImplementError("SetupWorkerInternal") + manager := component.NewComponentHelper(workerValue.WorkloadType, e.Scheme, e.Client) + err := manager.Reconciler(context.TODO(), workerValue) + if err != nil { + return err + } + + // update status of worker + err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + runtime, err := e.getRuntime() + if err != nil { + return err + } + + masterStatus, err := manager.ConstructComponentStatus(context.TODO(), workerValue) + if err != nil { + return err + } + // from RuntimePhaseNone to RuntimePhaseNotReady, not reconcile the master component the next time. + masterStatus.Phase = datav1alpha1.RuntimePhaseNotReady + + // TODO: support builds workers affinity ? do it in transformer ? + runtimeToUpdate := runtime.DeepCopy() + runtimeToUpdate.Status.Master = masterStatus + + // TODO(cache runtime): why need this line judgement ? + if runtime.Status.Worker.Phase == datav1alpha1.RuntimePhaseNone { + if len(runtimeToUpdate.Status.Conditions) == 0 { + runtimeToUpdate.Status.Conditions = []datav1alpha1.RuntimeCondition{} + } + cond := utils.NewRuntimeCondition(datav1alpha1.RuntimeWorkersReady, datav1alpha1.RuntimeWorkersInitializedReason, + "The worker is initialized.", corev1.ConditionTrue) + runtimeToUpdate.Status.Conditions = utils.UpdateRuntimeCondition(runtimeToUpdate.Status.Conditions, cond) + + } + if !reflect.DeepEqual(runtime.Status, runtimeToUpdate.Status) { + return e.Client.Status().Update(context.TODO(), runtimeToUpdate) + } + + return nil + }) + if err != nil { + e.Log.Error(err, "failed to update runtime status") + return err + } + + return nil } diff --git a/pkg/ddc/efc/runtime_info.go b/pkg/ddc/efc/runtime_info.go index fe686ad7326..9e48d8e63bb 100644 --- a/pkg/ddc/efc/runtime_info.go +++ b/pkg/ddc/efc/runtime_info.go @@ -57,6 +57,7 @@ func (e *EFCEngine) getRuntimeInfo() (info base.RuntimeInfoInterface, err error) return e.runtimeInfo, err } + // TODO: why not just get Dataset instance's UID ? uid, err := base.GetOwnerDatasetUIDFromRuntimeMeta(runtime.ObjectMeta) if err != nil { return nil, err diff --git a/pkg/utils/kubeclient/configmap.go b/pkg/utils/kubeclient/configmap.go index b6a79bb175c..65296e1d209 100644 --- a/pkg/utils/kubeclient/configmap.go +++ b/pkg/utils/kubeclient/configmap.go @@ -166,3 +166,16 @@ func CreateConfigMap(client client.Client, name string, namespace string, key st return client.Create(context.TODO(), configMap) } + +func CreateConfigMapWithOwner(client client.Client, name string, namespace string, data map[string]string, ownerReference []metav1.OwnerReference) (err error) { + configMap := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: ownerReference, + }, + Data: data, + } + + return client.Create(context.TODO(), configMap) +} diff --git a/pkg/utils/runtimes.go b/pkg/utils/runtimes.go index 35e9f554359..4682e51133e 100644 --- a/pkg/utils/runtimes.go +++ b/pkg/utils/runtimes.go @@ -175,15 +175,3 @@ func GetCacheRuntime(client client.Reader, name, namespace string) (*datav1alpha return &runtime, nil } - -func GetCacheRuntimeClass(client client.Client, name string) (*datav1alpha1.CacheRuntimeClass, error) { - key := types.NamespacedName{ - Name: name, - } - var runtimeClass datav1alpha1.CacheRuntimeClass - if err := client.Get(context.TODO(), key, &runtimeClass); err != nil { - return nil, err - } - - return &runtimeClass, nil -} From a9fbb2ad5e60ffca1b07cf738fe72927425f09bb Mon Sep 17 00:00:00 2001 From: liuzhiqiang <923463801@qq.com> Date: Thu, 9 Apr 2026 16:08:59 +0800 Subject: [PATCH 2/8] update crds Signed-off-by: xliuqq --- .../data.fluid.io_cacheruntimeclasses.yaml | 50 +++++++++++++++++++ .../data.fluid.io_cacheruntimeclasses.yaml | 50 +++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml b/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml index e652db785e0..0b6632b421b 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_cacheruntimeclasses.yaml @@ -63,6 +63,31 @@ spec: type: array type: object type: object + executionEntries: + properties: + mountUFS: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + reportSummary: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + type: object options: additionalProperties: type: string @@ -6868,6 +6893,31 @@ spec: type: array type: object type: object + executionEntries: + properties: + mountUFS: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + reportSummary: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + type: object options: additionalProperties: type: string diff --git a/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml b/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml index e652db785e0..0b6632b421b 100644 --- a/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml +++ b/config/crd/bases/data.fluid.io_cacheruntimeclasses.yaml @@ -63,6 +63,31 @@ spec: type: array type: object type: object + executionEntries: + properties: + mountUFS: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + reportSummary: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + type: object options: additionalProperties: type: string @@ -6868,6 +6893,31 @@ spec: type: array type: object type: object + executionEntries: + properties: + mountUFS: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + reportSummary: + properties: + command: + items: + type: string + type: array + timeout: + type: integer + required: + - command + type: object + type: object options: additionalProperties: type: string From 0c575aa6a9056fd2b1bea7f8db81b27d902423d8 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Thu, 9 Apr 2026 16:19:41 +0800 Subject: [PATCH 3/8] fix lint error Signed-off-by: xliuqq --- pkg/ddc/cache/engine/transform.go | 2 +- pkg/ddc/cache/engine/util.go | 14 ++------------ 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/pkg/ddc/cache/engine/transform.go b/pkg/ddc/cache/engine/transform.go index 7fce3f42702..fd3b5727c6c 100644 --- a/pkg/ddc/cache/engine/transform.go +++ b/pkg/ddc/cache/engine/transform.go @@ -141,7 +141,7 @@ func (e *CacheEngine) addCommonConfigForComponent(commonConfig *CacheRuntimeComp } names := commonConfig.RuntimeConfigs.ExtraConfigMapNames for _, cm := range componentDefinition.Dependencies.ExtraResources.ConfigMaps { - if names[cm.Name] == false { + if !names[cm.Name] { e.Log.Error(errors.New("component has undefined config map extra resource"), "type", componentValue.ComponentType, "configMapName", cm.Name) } componentValue.PodTemplateSpec.Spec.Volumes = append(componentValue.PodTemplateSpec.Spec.Volumes, corev1.Volume{ diff --git a/pkg/ddc/cache/engine/util.go b/pkg/ddc/cache/engine/util.go index 0d31df41a7b..bc07a17fb11 100644 --- a/pkg/ddc/cache/engine/util.go +++ b/pkg/ddc/cache/engine/util.go @@ -32,9 +32,9 @@ func GetComponentServiceName(runtimeName string, componentType common.ComponentT return fmt.Sprintf("svc-%s-%s", runtimeName, componentType) } -// getFuseMountPointVolumeName get the volume name of mount path in fuse pod (e.g. /runtime-mnt). +// getFuseMountPointVolumeName get the volume name of mount path in fuse pod. func (e *CacheEngine) getFuseMountPointVolumeName() string { - return fmt.Sprintf("fluid-cache-runtime-shared-path") + return "fluid-cache-runtime-shared-path" } func (e *CacheEngine) getFuseMountPoint() string { @@ -49,16 +49,6 @@ func (e *CacheEngine) getFuseMountPoint() string { return fmt.Sprintf("%s/%s/%s/cache-fuse", mountRoot, e.namespace, e.name) } -// getRuntimeEncryptOptionPath get the mounted path of encrypt options for runtime pod. -func (e *CacheEngine) getRuntimeEncryptOptionPath(secretName string) string { - return fmt.Sprintf("/etc/fluid/secrets/%s", secretName) -} - -// getRuntimeEncryptVolumeName get the volume name of getRuntimeEncryptOptionPath. -func (e *CacheEngine) getRuntimeEncryptVolumeName(secretName string) string { - return fmt.Sprintf("fluid-runtime-secret-%s", secretName) -} - // getRuntimeConfigConfigMapName get the configmap name of the runtime config. func (e *CacheEngine) getRuntimeConfigConfigMapName() string { return fmt.Sprintf("fluid-runtime-config-%s", e.name) From a8e65a035a0b35497e68e3bdd959c973ee4e42ea Mon Sep 17 00:00:00 2001 From: xliuqq Date: Thu, 9 Apr 2026 16:52:39 +0800 Subject: [PATCH 4/8] add e2e test Signed-off-by: xliuqq --- pkg/ddc/cache/engine/util.go | 2 +- test/gha-e2e/curvine/cacheruntime.yaml | 36 ++++ test/gha-e2e/curvine/cacheruntimeclass.yaml | 199 ++++++++++++++++++ test/gha-e2e/curvine/dataset.yaml | 30 +++ test/gha-e2e/curvine/minio.yaml | 50 +++++ test/gha-e2e/curvine/minio_create_bucket.yaml | 19 ++ test/gha-e2e/curvine/mount.yaml | 76 +++++++ test/gha-e2e/curvine/read_job.yaml | 34 +++ test/gha-e2e/curvine/test.sh | 120 +++++++++++ test/gha-e2e/curvine/write_job.yaml | 31 +++ 10 files changed, 596 insertions(+), 1 deletion(-) create mode 100644 test/gha-e2e/curvine/cacheruntime.yaml create mode 100644 test/gha-e2e/curvine/cacheruntimeclass.yaml create mode 100644 test/gha-e2e/curvine/dataset.yaml create mode 100644 test/gha-e2e/curvine/minio.yaml create mode 100644 test/gha-e2e/curvine/minio_create_bucket.yaml create mode 100644 test/gha-e2e/curvine/mount.yaml create mode 100644 test/gha-e2e/curvine/read_job.yaml create mode 100644 test/gha-e2e/curvine/test.sh create mode 100644 test/gha-e2e/curvine/write_job.yaml diff --git a/pkg/ddc/cache/engine/util.go b/pkg/ddc/cache/engine/util.go index bc07a17fb11..ff95d183cad 100644 --- a/pkg/ddc/cache/engine/util.go +++ b/pkg/ddc/cache/engine/util.go @@ -59,7 +59,7 @@ func (e *CacheEngine) getRuntimeConfigVolumeName() (targetPath string) { // getRuntimeConfigDir defines the mount directory of runtime config in the pod. func (e *CacheEngine) getRuntimeConfigDir() string { - return fmt.Sprintf("/etc/fluid/config") + return "/etc/fluid/config" } // getRuntimeConfigPath defines the mount path of runtime config in the pod. diff --git a/test/gha-e2e/curvine/cacheruntime.yaml b/test/gha-e2e/curvine/cacheruntime.yaml new file mode 100644 index 00000000000..699ecee8dea --- /dev/null +++ b/test/gha-e2e/curvine/cacheruntime.yaml @@ -0,0 +1,36 @@ +apiVersion: data.fluid.io/v1alpha1 +kind: CacheRuntime +metadata: + name: curvine-demo +spec: + runtimeClassName: curvine-demo + master: + # 如何区分 master 和 journal 的配置,两个在一个进程中(前缀,交给Curvine自行处理) + options: # https://curvineio.github.io/zh-cn/docs/Deploy/Deploy-Curvine-Cluster/Distributed-Mode/conf#master%E9%85%8D%E7%BD%AE%E9%A1%B9 + key1: master-value1 + replicas: 1 + worker: + options: # https://curvineio.github.io/zh-cn/docs/Deploy/Deploy-Curvine-Cluster/Distributed-Mode/conf#worker%E9%85%8D%E7%BD%AE%E9%A1%B9 + key1: worker-value1 + replicas: 1 + tieredStore: + levels: #worker缓存配置 + - low: "0.5" + high: "0.8" + path: + - "testing/data" + quota: + - 1Gi + medium: + volume: + emptyDir: {} + client: + options: + key1: value1 + volumeMounts: #可配置volume和对应的volumeMounts + - name: demo + mountPath: /mnt + volumes: + - name: demo + persistentVolumeClaim: + claimName: curvine-demo diff --git a/test/gha-e2e/curvine/cacheruntimeclass.yaml b/test/gha-e2e/curvine/cacheruntimeclass.yaml new file mode 100644 index 00000000000..2798c18333c --- /dev/null +++ b/test/gha-e2e/curvine/cacheruntimeclass.yaml @@ -0,0 +1,199 @@ +apiVersion: data.fluid.io/v1alpha1 +kind: CacheRuntimeClass +metadata: + name: curvine-demo +fileSystemType: curvinefs +extraResources: + configMaps: + - name: curvine-config + data: + # TOML格式配置模板, 使用 hairyhenderson/gomplate 镜像要求的模板格式 + cluster.toml: | + # master configuration + [master] + meta_dir = "testing/meta" + + # masta ha raft configuration. + [journal] + journal_addrs = [ + {{ range $index := seq 0 (sub (ds "config").master.replicas 1) }} + {{- if $index }},{{ end }} + {id = {{ add $index 1 }}, hostname = "{{(ds "config").master.name}}-{{$index}}.{{(ds "config").master.service.name}}", port = 8996} + {{- end }} + ] + journal_dir = "testing/journal" + + # Worker configuration + [worker] + dir_reserved = "0" + data_dir = [ + "[DISK]testing/data" + ] +topology: + master: + workloadType: # 以StatefulSet workload创建master + apiVersion: apps/v1 + kind: StatefulSet + service: #需要为master创建Headless Service,仅当 workloadType 为 Statefulset 时支持 + headless: { } + dependencies: + extraResources: + # 使用 extraResources 时,需要定义其挂载路径 + configMaps: + - name: curvine-config + mountPath: "/templates" + executionEntries: + mountUFS: + command: + - bash + - "-c" + - "/etc/curvine/mount/mountUfs.sh" + timeout: 30 + template: + spec: + restartPolicy: Always + # 根据 runtime 生成的配置文件 + initContainers: + - name: init-curvine + #image: hairyhenderson/gomplate:alpine + image: registry.cn-hangzhou.aliyuncs.com/xliu1992/gomplate:alpine + # 挂载共享卷到容器内路径 + volumeMounts: + - name: shared-config-volume + mountPath: /etc/curvine # 配置文件存放目录 + command: [ "gomplate" ] + args: [ "-d", "config=$(FLUID_RUNTIME_CONFIG_PATH)", "-f", "/templates/cluster.toml", "-o", "/etc/curvine/curvine.toml" ] + containers: + - name: master + # image: "curvine/curvine:latest" + image: "curvine:v1" + command: + - /entrypoint.sh + args: + - master + - start + env: + # /entrypoint.sh 不支持参数指定配置文件,支撑环境变量配置,默认/app/curvine/conf/curvine-cluster.toml + - name: CURVINE_CONF_FILE + value: "/etc/curvine/curvine.toml" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + # curvine checks master hostname which should be one of journal_addrs + - name: CURVINE_MASTER_HOSTNAME + value: "$(POD_NAME).$(FLUID_RUNTIME_COMPONENT_SVC_NAME)" + volumeMounts: + - name: shared-config-volume + mountPath: /etc/curvine # 配置文件存放目录 + - name: curvine-mount-volume + mountPath: /etc/curvine/mount # 配置文件存放目录 + imagePullPolicy: IfNotPresent + volumes: + # emptyDir 共享存储(init容器和主容器互通) + - name: shared-config-volume + emptyDir: { } + - name: curvine-mount-volume + configMap: + name: curvine-mount + defaultMode: 0755 + worker: + workloadType: #以StatefulSet workload创建worker + apiVersion: apps/v1 + kind: StatefulSet + service: + headless: { } #需要为worker创建Headless Service,仅当workloadType为Statefulset时支持 + dependencies: + extraResources: + # 使用 extraResources 时,需要定义其挂载路径 + configMaps: + - name: curvine-config + mountPath: "/templates" + template: + spec: + restartPolicy: Always + # 根据 runtime 生成的配置文件 + initContainers: + - name: init-curvine + #image: hairyhenderson/gomplate:alpine + image: registry.cn-hangzhou.aliyuncs.com/xliu1992/gomplate:alpine + # 挂载共享卷到容器内路径 + volumeMounts: + - name: shared-config-volume + mountPath: /etc/curvine # 配置文件存放目录 + command: [ "gomplate" ] + args: [ "-d", "config=$(FLUID_RUNTIME_CONFIG_PATH)", "-f", "/templates/cluster.toml", "-o", "/etc/curvine/curvine.toml" ] + containers: + - name: worker + # image: "curvine/curvine:latest" + image: "curvine:v1" + command: + - /entrypoint.sh + args: + - worker + - start + env: + # /entrypoint.sh 不支持参数指定配置文件,支撑环境变量配置,默认/app/curvine/conf/curvine-cluster.toml + - name: CURVINE_CONF_FILE + value: "/etc/curvine/curvine.toml" + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: CURVINE_WORKER_HOSTNAME + value: "$(POD_NAME).$(FLUID_RUNTIME_COMPONENT_SVC_NAME)" + volumeMounts: + - name: shared-config-volume + mountPath: /etc/curvine # 配置文件存放目录 + imagePullPolicy: IfNotPresent + volumes: + # emptyDir 共享存储(init容器和主容器互通) + - name: shared-config-volume + emptyDir: { } + client: + workloadType: #DaemonSet workload创建client + apiVersion: apps/v1 + kind: DaemonSet + dependencies: + extraResources: + # 使用 extraResources 时,需要定义其挂载路径 + configMaps: + - name: curvine-config + mountPath: "/templates" + template: + spec: + restartPolicy: Always + # 根据 runtime 生成的配置文件 + initContainers: + - name: init-curvine + #image: hairyhenderson/gomplate:alpine + image: registry.cn-hangzhou.aliyuncs.com/xliu1992/gomplate:alpine + # 挂载共享卷到容器内路径 + volumeMounts: + - name: shared-config-volume + mountPath: /etc/curvine # 配置文件存放目录 + command: [ "gomplate" ] + args: [ "-d", "config=$(FLUID_RUNTIME_CONFIG_PATH)", "-f", "/templates/cluster.toml", "-o", "/etc/curvine/curvine.toml" ] + containers: + - name: client + # image: "curvine/curvine:latest" + image: "curvine:v1" + securityContext: #通常client需要配置privileged,用于操作fuse设备 + privileged: true + runAsUser: 0 + command: + - /app/curvine/lib/curvine-fuse + args: + - "--mnt-path" + - "$(FLUID_RUNTIME_MOUNT_PATH)" + - "--conf" + - "/etc/curvine/curvine.toml" + volumeMounts: + - name: shared-config-volume + mountPath: /etc/curvine # 配置文件存放目录 + imagePullPolicy: IfNotPresent + volumes: + # emptyDir 共享存储(init容器和主容器互通) + - name: shared-config-volume + emptyDir: { } + diff --git a/test/gha-e2e/curvine/dataset.yaml b/test/gha-e2e/curvine/dataset.yaml new file mode 100644 index 00000000000..c58e09f5eb2 --- /dev/null +++ b/test/gha-e2e/curvine/dataset.yaml @@ -0,0 +1,30 @@ +# apiVersion: v1 +# kind: Secret +# metadata: +# name: curvine-secret +# stringData: +# access-key: minioadmin +# secret-key: minioadmin +# --- +apiVersion: data.fluid.io/v1alpha1 +kind: Dataset +metadata: + name: curvine-demo +spec: + accessModes: ["ReadWriteMany"] + mounts: + - mountPoint: "s3://test" + name: minio + path: /minio + # bin/cv mount s3://test/data /minio \ + # -c s3.endpoint_url=http://127.0.0.1:19000 \ + # -c s3.region_name=us-east-1 \ + # -c s3.path_style=true \ + # -c s3.credentials.access=minioadmin \ + # -c s3.credentials.secret=minioadmin + options: + endpoint_url: "http://minio:9000" + region_name: "us-east-1" + path_style: "true" + access: "minioadmin" + secret: "minioadmin" diff --git a/test/gha-e2e/curvine/minio.yaml b/test/gha-e2e/curvine/minio.yaml new file mode 100644 index 00000000000..b23502506ce --- /dev/null +++ b/test/gha-e2e/curvine/minio.yaml @@ -0,0 +1,50 @@ +apiVersion: v1 +kind: Service +metadata: + name: minio +spec: + type: ClusterIP + ports: + - port: 9000 + targetPort: 9000 + protocol: TCP + selector: + app: minio +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + # This name uniquely identifies the Deployment + name: minio +spec: + selector: + matchLabels: + app: minio + strategy: + type: Recreate + template: + metadata: + labels: + # Label is used as selector in the service. + app: minio + spec: + containers: + - name: minio + # Pulls the default Minio image from Docker Hub + image: minio/minio + resources: + limits: + memory: "512Mi" + args: + - server + - /data + env: + # Minio access key and secret key + - name: MINIO_ROOT_USER + value: "minioadmin" + - name: MINIO_ROOT_PASSWORD + value: "minioadmin" + ports: + - containerPort: 9000 + hostPort: 9000 + automountServiceAccountToken: false diff --git a/test/gha-e2e/curvine/minio_create_bucket.yaml b/test/gha-e2e/curvine/minio_create_bucket.yaml new file mode 100644 index 00000000000..2bd33302eec --- /dev/null +++ b/test/gha-e2e/curvine/minio_create_bucket.yaml @@ -0,0 +1,19 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: minio-bucket-create +spec: + template: + spec: + containers: + - name: mc + image: minio/mc + resources: + limits: + memory: "512Mi" + command: + - /bin/sh + - -c + - "mc alias set myminio http://minio:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD && mc mb myminio/test" + restartPolicy: OnFailure + backoffLimit: 4 diff --git a/test/gha-e2e/curvine/mount.yaml b/test/gha-e2e/curvine/mount.yaml new file mode 100644 index 00000000000..f0b45f30caf --- /dev/null +++ b/test/gha-e2e/curvine/mount.yaml @@ -0,0 +1,76 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: curvine-mount +data: + # 挂载底层文件系统的脚本(仅限测试场景) + mountUfs.sh: | + #!/bin/bash + # 1. 原始JSON + json=$(cat ${FLUID_RUNTIME_CONFIG_PATH}) + # 2. 提取mounts数组内部内容 + mounts_raw=$(echo "$json" | sed -E 's/.*"mounts":\[([^]]+)\].*/\1/') + + # ====================标准兼容的数组拆分(核心!) ==================== + # 替换},\s{ 为标准正则, 兼容所有POSIX sed, 自动处理空格 + mounts_items=$(echo "$mounts_raw" \ + | sed 's/},[[:space:]]*{/\n/g' \ + | sed 's/^{//; s/}$//; s/^[[:space:]]*//g; s/[[:space:]]*$//g') + + # 3. 遍历解析 + index=0 + echo "$mounts_items" | while IFS= read -r item; do + #echo "第 $index 个 mounts 元素" + index=$((index + 1)) + + # 提取基础字段 + mountPoint=$(echo "$item" | sed -nE 's/.*"mountPoint":"([^"]+)".*/\1/p') + path=$(echo "$item" | sed -nE 's/.*"path":"([^"]+)".*/\1/p') + + mounted=$(/app/curvine/bin/cv mount) + if echo "$mounted" | grep "$mountPoint"; then + continue; + fi + + [ -z "$mountPoint" ] && { echo "mountPoint is not set or empty"; exit 1; } + [ -z "$mountPoint" ] && { echo "path is not set or empty"; exit 1; } + + # ==================== 字段提取加p 仅输出匹配结果 ==================== + access=$(echo "$item" | sed -nE 's/.*"access":"([^"]+)".*/\1/p') + secret=$(echo "$item" | sed -nE 's/.*"secret":"([^"]+)".*/\1/p') + endpoint=$(echo "$item" | sed -nE 's/.*"endpoint_url":"([^"]+)".*/\1/p') + region=$(echo "$item" | sed -nE 's/.*"region_name":"([^"]+)".*/\1/p') + path_style=$(echo "$item" | sed -nE 's/.*"path_style":"([^"]+)".*/\1/p') + + # 打印调试(缺失字段会显示空,正确) + echo "access: $access" + echo "region: $region" + echo "endpoint: $endpoint" + echo "path_style: $path_style" + echo "secret: $secret" + + # 必填参数校验 + CV_PARAMS="" + [ -z "$endpoint" ] && { echo "endpoint option is not set or empty"; exit 1; } + [ -z "$access" ] && { echo "access option is not set or empty"; exit 1; } + [ -z "$secret" ] && { echo "secret option is not set or empty"; exit 1; } + + # 拼接必填参数 + CV_PARAMS="$CV_PARAMS -c s3.endpoint_url=$endpoint" + CV_PARAMS="$CV_PARAMS -c s3.credentials.access=$access" + CV_PARAMS="$CV_PARAMS -c s3.credentials.secret=$secret" + + # 可选参数:缺失自动跳过 + [ -n "$region" ] && CV_PARAMS="$CV_PARAMS -c s3.region_name=$region" + [ -n "$path_style" ] && CV_PARAMS="$CV_PARAMS -c s3.path_style=$path_style" + + # 最终命令 + CMD="/app/curvine/bin/cv mount --mnt-type orch $mountPoint $path $CV_PARAMS" + echo "执行命令:$CMD" + + eval "$CMD" + if [ $? -ne 0 ]; then + echo "mount $mountPoint failed" + exit 1 + fi + done \ No newline at end of file diff --git a/test/gha-e2e/curvine/read_job.yaml b/test/gha-e2e/curvine/read_job.yaml new file mode 100644 index 00000000000..dd3528dc90c --- /dev/null +++ b/test/gha-e2e/curvine/read_job.yaml @@ -0,0 +1,34 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: read-job + namespace: default + labels: + app: read-job +spec: + template: + metadata: + name: read-job + labels: + app: read-job + spec: + restartPolicy: Never + containers: + - name: read-job + image: busybox + resources: + limits: + memory: "512Mi" + ephemeral-storage: "5Gi" + command: ['sh'] + args: + - -c + - set -ex; test -n "$(cat /data/foo/bar)" + volumeMounts: + - name: data-vol + mountPath: /data + automountServiceAccountToken: false + volumes: + - name: data-vol + persistentVolumeClaim: + claimName: curvine-demo diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh new file mode 100644 index 00000000000..49b70352f9b --- /dev/null +++ b/test/gha-e2e/curvine/test.sh @@ -0,0 +1,120 @@ +#!/bin/bash + +testname="curvine cache runtime basic e2e" + +dataset_name="curvine-demo" +write_job_name="write-job" +read_job_name="read-job" +bucket_create_job_name="minio-bucket-create" + +function syslog() { + echo ">>> $1" +} + +function panic() { + local err_msg=$1 + syslog "test \"$testname\" failed: $err_msg" + exit 1 +} + +function setup() { + # minio 需要有 bucket 才能被 curvine 挂载 + kubectl create -f test/gha-e2e/curvine/minio.yaml + + kubectl create -f test/gha-e2e/curvine/minio_create_bucket.yaml + wait_job_completed "$bucket_create_job_name" + + kubectl create -f test/gha-e2e/curvine/mount.yaml +} + +function create_dataset() { + kubectl create -f test/gha-e2e/curvine/dataset.yaml + + if [[ -z "$(kubectl get dataset $dataset_name -oname)" ]]; then + panic "failed to create dataset $dataset_name" + fi + + if [[ -z "$(kubectl get cacheruntime $dataset_name -oname)" ]]; then + panic "failed to create curvine cache runtime $dataset_name" + fi +} + +function wait_dataset_bound() { + local deadline=180 # 3 minutes + local last_state="" + local log_interval=0 + local log_times=0 + while true; do + last_state=$(kubectl get dataset $dataset_name -ojsonpath='{@.status.phase}') + if [[ $log_interval -eq 3 ]]; then + log_times=$((log_times + 1)) + syslog "checking dataset.status.phase==Bound (already $((log_times * log_interval * 5))s, last state: $last_state)" + if [[ $((log_times * log_interval * 5)) -ge $deadline ]]; then + panic "timeout for ${deadline}s!" + fi + log_interval=0 + fi + + if [[ "$last_state" == "Bound" ]]; then + break + fi + log_interval=$((log_interval + 1)) + sleep 5 + done + syslog "Found dataset $dataset_name status.phase==Bound" +} + +function create_job() { + local job_file=$1 + local job_name=$2 + kubectl create -f "$job_file" + + if [[ -z "$(kubectl get job "$job_name" -oname)" ]]; then + panic "failed to create job $job_name" + fi +} + +function wait_job_completed() { + local job_name=$1 + while true; do + succeed=$(kubectl get job "$job_name" -ojsonpath='{@.status.succeeded}') + failed=$(kubectl get job "$job_name" -ojsonpath='{@.status.failed}') + if [[ "$failed" -ne "0" ]]; then + panic "job failed when accessing data" + fi + if [[ "$succeed" -eq "1" ]]; then + break + fi + sleep 5 + done + syslog "Found succeeded job $job_name" +} + +function dump_env_and_clean_up() { + bash tools/diagnose-fluid-curvine.sh collect --name $dataset_name --namespace default --collect-path ./e2e-tmp/testcase-curvine.tgz + syslog "Cleaning up resources for testcase $testname" + kubectl delete -f test/gha-e2e/curvine/read_job.yaml + kubectl delete -f test/gha-e2e/curvine/write_job.yaml + kubectl delete -f test/gha-e2e/curvine/dataset.yaml + kubectl delete -f test/gha-e2e/curvine/minio.yamls + kubectl delete -f test/gha-e2e/curvine/mount.yaml + kubectl delete -f test/gha-e2e/curvine/cacheruntime.yaml + kubectl delete -f test/gha-e2e/curvine/cacheruntimeclass.yaml +} + + +function main() { + syslog "[TESTCASE $testname STARTS AT $(date)]" + setup + create_dataset + + trap dump_env_and_clean_up EXIT + wait_dataset_bound + create_job test/gha-e2e/curvine/write_job.yaml $write_job_name + wait_job_completed $write_job_name + create_job test/gha-e2e/curvine/read_job.yaml $read_job_name + wait_job_completed $read_job_name + syslog "[TESTCASE $testname SUCCEEDED AT $(date)]" +} + +main diff --git a/test/gha-e2e/curvine/write_job.yaml b/test/gha-e2e/curvine/write_job.yaml new file mode 100644 index 00000000000..f60199f5af0 --- /dev/null +++ b/test/gha-e2e/curvine/write_job.yaml @@ -0,0 +1,31 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: write-job + namespace: default + labels: + app: write-job +spec: + template: + metadata: + name: write-job + labels: + app: write-job + spec: + restartPolicy: Never + containers: + - name: write-job + image: busybox + resources: + limits: + ephemeral-storage: "5Gi" + memory: "512Mi" + command: ['sh', '-c', 'mkdir /data/foo && echo helloworld > /data/foo/bar'] + volumeMounts: + - name: data-vol + mountPath: /data + automountServiceAccountToken: false + volumes: + - name: data-vol + persistentVolumeClaim: + claimName: curvine-demo From e65c91408f28a4f0a4325124f8b2b25e2d5bb3e0 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Thu, 9 Apr 2026 19:57:39 +0800 Subject: [PATCH 5/8] finish e2e test Signed-off-by: xliuqq --- test/gha-e2e/curvine/cacheruntimeclass.yaml | 18 ++++++------------ test/gha-e2e/curvine/minio_create_bucket.yaml | 8 +++++++- test/gha-e2e/curvine/test.sh | 10 ++++++---- 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/test/gha-e2e/curvine/cacheruntimeclass.yaml b/test/gha-e2e/curvine/cacheruntimeclass.yaml index 2798c18333c..d6d1d70bae4 100644 --- a/test/gha-e2e/curvine/cacheruntimeclass.yaml +++ b/test/gha-e2e/curvine/cacheruntimeclass.yaml @@ -55,8 +55,7 @@ topology: # 根据 runtime 生成的配置文件 initContainers: - name: init-curvine - #image: hairyhenderson/gomplate:alpine - image: registry.cn-hangzhou.aliyuncs.com/xliu1992/gomplate:alpine + image: hairyhenderson/gomplate:alpine # 挂载共享卷到容器内路径 volumeMounts: - name: shared-config-volume @@ -65,8 +64,7 @@ topology: args: [ "-d", "config=$(FLUID_RUNTIME_CONFIG_PATH)", "-f", "/templates/cluster.toml", "-o", "/etc/curvine/curvine.toml" ] containers: - name: master - # image: "curvine/curvine:latest" - image: "curvine:v1" + image: "curvine/curvine:latest" command: - /entrypoint.sh args: @@ -115,8 +113,7 @@ topology: # 根据 runtime 生成的配置文件 initContainers: - name: init-curvine - #image: hairyhenderson/gomplate:alpine - image: registry.cn-hangzhou.aliyuncs.com/xliu1992/gomplate:alpine + image: hairyhenderson/gomplate:alpine # 挂载共享卷到容器内路径 volumeMounts: - name: shared-config-volume @@ -125,8 +122,7 @@ topology: args: [ "-d", "config=$(FLUID_RUNTIME_CONFIG_PATH)", "-f", "/templates/cluster.toml", "-o", "/etc/curvine/curvine.toml" ] containers: - name: worker - # image: "curvine/curvine:latest" - image: "curvine:v1" + image: "curvine/curvine:latest" command: - /entrypoint.sh args: @@ -166,8 +162,7 @@ topology: # 根据 runtime 生成的配置文件 initContainers: - name: init-curvine - #image: hairyhenderson/gomplate:alpine - image: registry.cn-hangzhou.aliyuncs.com/xliu1992/gomplate:alpine + image: hairyhenderson/gomplate:alpine # 挂载共享卷到容器内路径 volumeMounts: - name: shared-config-volume @@ -176,8 +171,7 @@ topology: args: [ "-d", "config=$(FLUID_RUNTIME_CONFIG_PATH)", "-f", "/templates/cluster.toml", "-o", "/etc/curvine/curvine.toml" ] containers: - name: client - # image: "curvine/curvine:latest" - image: "curvine:v1" + image: "curvine/curvine:latest" securityContext: #通常client需要配置privileged,用于操作fuse设备 privileged: true runAsUser: 0 diff --git a/test/gha-e2e/curvine/minio_create_bucket.yaml b/test/gha-e2e/curvine/minio_create_bucket.yaml index 2bd33302eec..86ba5550590 100644 --- a/test/gha-e2e/curvine/minio_create_bucket.yaml +++ b/test/gha-e2e/curvine/minio_create_bucket.yaml @@ -15,5 +15,11 @@ spec: - /bin/sh - -c - "mc alias set myminio http://minio:9000 $MINIO_ROOT_USER $MINIO_ROOT_PASSWORD && mc mb myminio/test" - restartPolicy: OnFailure + env: + # Minio access key and secret key + - name: MINIO_ROOT_USER + value: "minioadmin" + - name: MINIO_ROOT_PASSWORD + value: "minioadmin" + restartPolicy: OnFailure backoffLimit: 4 diff --git a/test/gha-e2e/curvine/test.sh b/test/gha-e2e/curvine/test.sh index 49b70352f9b..7dbd56b91b0 100644 --- a/test/gha-e2e/curvine/test.sh +++ b/test/gha-e2e/curvine/test.sh @@ -29,6 +29,8 @@ function setup() { function create_dataset() { kubectl create -f test/gha-e2e/curvine/dataset.yaml + kubectl create -f test/gha-e2e/curvine/cacheruntime.yaml + kubectl create -f test/gha-e2e/curvine/cacheruntimeclass.yaml if [[ -z "$(kubectl get dataset $dataset_name -oname)" ]]; then panic "failed to create dataset $dataset_name" @@ -96,19 +98,19 @@ function dump_env_and_clean_up() { kubectl delete -f test/gha-e2e/curvine/read_job.yaml kubectl delete -f test/gha-e2e/curvine/write_job.yaml kubectl delete -f test/gha-e2e/curvine/dataset.yaml - kubectl delete -f test/gha-e2e/curvine/minio.yamls - kubectl delete -f test/gha-e2e/curvine/mount.yaml kubectl delete -f test/gha-e2e/curvine/cacheruntime.yaml kubectl delete -f test/gha-e2e/curvine/cacheruntimeclass.yaml + kubectl delete -f test/gha-e2e/curvine/minio.yaml + kubectl delete -f test/gha-e2e/curvine/mount.yaml + kubectl delete -f test/gha-e2e/curvine/minio_create_bucket.yaml } function main() { syslog "[TESTCASE $testname STARTS AT $(date)]" + trap dump_env_and_clean_up EXIT setup create_dataset - - trap dump_env_and_clean_up EXIT wait_dataset_bound create_job test/gha-e2e/curvine/write_job.yaml $write_job_name wait_job_completed $write_job_name From 2dd767ab34f2683b6f5110c9e4293d9a15f6753d Mon Sep 17 00:00:00 2001 From: xliuqq Date: Thu, 9 Apr 2026 20:00:12 +0800 Subject: [PATCH 6/8] remove self image prefix Signed-off-by: xliuqq --- charts/fluid/fluid/values.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/charts/fluid/fluid/values.yaml b/charts/fluid/fluid/values.yaml index 1a8fb024a3e..f7e44eeaac3 100644 --- a/charts/fluid/fluid/values.yaml +++ b/charts/fluid/fluid/values.yaml @@ -12,9 +12,9 @@ image: imagePullSecrets: [] # Default registry, namespace and version tag for images managed by fluid -imagePrefix: &defaultImagePrefix registry.cn-hangzhou.aliyuncs.com/xliu1992 +imagePrefix: &defaultImagePrefix fluidcloudnative # imagePrefix: &defaultImagePrefix registry.aliyuncs.com/fluid -version: &defaultVersion v1.1.0-b0bdac58 +version: &defaultVersion v1.1.0-b457855 crdUpgrade: enabled: true From 593055dec98587aac6a9a4ba3435864b9fadd388 Mon Sep 17 00:00:00 2001 From: xliuqq Date: Fri, 10 Apr 2026 08:52:14 +0800 Subject: [PATCH 7/8] make gen-openapi Signed-off-by: xliuqq --- api/v1alpha1/openapi_generated.go | 70 ++++++++++++++++++++++++++++++- 1 file changed, 69 insertions(+), 1 deletion(-) diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index 418350ba21b..c51b4a21e40 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -80,6 +80,8 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOption": schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOption(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOptionComponentDependency": schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOptionComponentDependency(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.EncryptOptionSource": schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOptionSource(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionCommonEntry": schema_fluid_cloudnative_fluid_api_v1alpha1_ExecutionCommonEntry(ref), + "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionEntries": schema_fluid_cloudnative_fluid_api_v1alpha1_ExecutionEntries(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExternalEndpointSpec": schema_fluid_cloudnative_fluid_api_v1alpha1_ExternalEndpointSpec(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExternalStorage": schema_fluid_cloudnative_fluid_api_v1alpha1_ExternalStorage(ref), "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExtraResourcesComponentDependency": schema_fluid_cloudnative_fluid_api_v1alpha1_ExtraResourcesComponentDependency(ref), @@ -3747,6 +3749,66 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_EncryptOptionSource(ref common. } } +func schema_fluid_cloudnative_fluid_api_v1alpha1_ExecutionCommonEntry(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "command": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "timeout": { + SchemaProps: spec.SchemaProps{ + Description: "Timeout is the timeout(seconds) for the execution entry", + Type: []string{"integer"}, + Format: "int32", + }, + }, + }, + Required: []string{"command"}, + }, + }, + } +} + +func schema_fluid_cloudnative_fluid_api_v1alpha1_ExecutionEntries(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "mountUFS": { + SchemaProps: spec.SchemaProps{ + Description: "MountUFS defines the operations for mounting UFS", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionCommonEntry"), + }, + }, + "reportSummary": { + SchemaProps: spec.SchemaProps{ + Description: "ReportSummary it defines the operation how to get cache status like capacity, hit ratio etc.", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionCommonEntry"), + }, + }, + }, + }, + }, + Dependencies: []string{ + "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionCommonEntry"}, + } +} + func schema_fluid_cloudnative_fluid_api_v1alpha1_ExternalEndpointSpec(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -6667,11 +6729,17 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_RuntimeComponentDefinition(ref Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentDependencies"), }, }, + "executionEntries": { + SchemaProps: spec.SchemaProps{ + Description: "ExecutionEntries entries to support out-of-tree integration.", + Ref: ref("github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionEntries"), + }, + }, }, }, }, Dependencies: []string{ - "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentDependencies", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentService", "k8s.io/api/core/v1.PodTemplateSpec", "k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta"}, + "github.com/fluid-cloudnative/fluid/api/v1alpha1.ExecutionEntries", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentDependencies", "github.com/fluid-cloudnative/fluid/api/v1alpha1.RuntimeComponentService", "k8s.io/api/core/v1.PodTemplateSpec", "k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta"}, } } From a29c0b7e2294c76a05eddbd2b8365aa3c3c3e42f Mon Sep 17 00:00:00 2001 From: xliuqq Date: Fri, 10 Apr 2026 09:25:58 +0800 Subject: [PATCH 8/8] make fmt Signed-off-by: xliuqq --- pkg/ddc/cache/engine/transform_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ddc/cache/engine/transform_client.go b/pkg/ddc/cache/engine/transform_client.go index c98320cbc90..35305148535 100644 --- a/pkg/ddc/cache/engine/transform_client.go +++ b/pkg/ddc/cache/engine/transform_client.go @@ -65,7 +65,7 @@ func (e *CacheEngine) transformClient(runtime *datav1alpha1.CacheRuntime, runtim if podTemplateSpec.Spec.NodeSelector == nil { podTemplateSpec.Spec.NodeSelector = map[string]string{} } - podTemplateSpec.Spec.NodeSelector[runtimeInfo.GetFuseLabelName()] = "true" + podTemplateSpec.Spec.NodeSelector[runtimeInfo.GetFuseLabelName()] = "true" // fuse volume mount e.transformFuseMountPointVolumes(podTemplateSpec)