diff --git a/api/v1alpha1/dataset_types.go b/api/v1alpha1/dataset_types.go index 91524bc89e1..094ab341816 100644 --- a/api/v1alpha1/dataset_types.go +++ b/api/v1alpha1/dataset_types.go @@ -392,3 +392,24 @@ func (dataset *Dataset) RemoveDataOperationInProgress(operationType, name string dataset.Status.OperationRef[operationType] = strings.Join(dataOpKeys, ",") return dataset.Status.OperationRef[operationType] } + +// CanStartDataOperation checks if the data operation can be started on this dataset. +func (dataset *Dataset) CanStartDataOperation(operationType string, maxParallel int32, name string) bool { + if dataset.Status.OperationRef == nil { + return true + } + + opRef, ok := dataset.Status.OperationRef[operationType] + if !ok || opRef == "" { + return true + } + + opRefs := strings.Split(opRef, ",") + for _, op := range opRefs { + if op == name { + return true // Already in progress + } + } + + return int32(len(opRefs)) < maxParallel +} diff --git a/api/v1alpha1/dataset_types_test.go b/api/v1alpha1/dataset_types_test.go index 8a567e49857..7ab3ea74b62 100644 --- a/api/v1alpha1/dataset_types_test.go +++ b/api/v1alpha1/dataset_types_test.go @@ -185,3 +185,107 @@ func TestDataset_SetDataOperationInProgress(t *testing.T) { }) } } + +func TestDataset_CanStartDataOperation(t *testing.T) { + type fields struct { + Status DatasetStatus + } + type args struct { + operationType string + maxParallel int32 + name string + } + tests := []struct { + name string + fields fields + args args + want bool + }{ + { + name: "empty_status", + fields: fields{ + Status: DatasetStatus{}, + }, + args: args{ + operationType: "DataLoad", + maxParallel: 1, + name: "load-1", + }, + want: true, + }, + { + name: "already_in_progress_reentrant", + fields: fields{ + Status: DatasetStatus{ + OperationRef: map[string]string{ + "DataLoad": "load-1", + }, + }, + }, + args: args{ + operationType: "DataLoad", + maxParallel: 1, + name: "load-1", + }, + want: true, + }, + { + name: "blocked_by_max_parallel_1", + fields: fields{ + Status: DatasetStatus{ + OperationRef: map[string]string{ + "DataLoad": "load-1", + }, + }, + }, + args: args{ + operationType: "DataLoad", + maxParallel: 1, + name: "load-2", + }, + want: false, + }, + { + name: "allowed_by_max_parallel_2", + fields: fields{ + Status: DatasetStatus{ + OperationRef: map[string]string{ + "DataLoad": "load-1", + }, + }, + }, + args: args{ + operationType: "DataLoad", + maxParallel: 2, + name: "load-2", + }, + want: true, + }, + { + name: "blocked_by_max_parallel_2", + fields: fields{ + Status: DatasetStatus{ + OperationRef: map[string]string{ + "DataLoad": "load-1,load-2", + }, + }, + }, + args: args{ + operationType: "DataLoad", + maxParallel: 2, + name: "load-3", + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dataset := &Dataset{ + Status: tt.fields.Status, + } + if got := dataset.CanStartDataOperation(tt.args.operationType, tt.args.maxParallel, tt.args.name); got != tt.want { + t.Errorf("CanStartDataOperation() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/ddc/base/operation_lock.go b/pkg/ddc/base/operation_lock.go index f30a39fc24e..8fa0148b96b 100644 --- a/pkg/ddc/base/operation_lock.go +++ b/pkg/ddc/base/operation_lock.go @@ -70,8 +70,13 @@ func SetDataOperationInTargetDataset(ctx cruntime.ReconcileRequestContext, opera return err } + if !dataset.CanStartDataOperation(operationTypeName, operation.GetParallelTaskNumber(), dataOpKey) { + return fmt.Errorf("the dataset %s has reached the maximum number of parallel %s operations (limit: %d), please wait", targetDataset.Name, operationTypeName, operation.GetParallelTaskNumber()) + } + // set current data operation in the target dataset datasetToUpdate := dataset.DeepCopy() + datasetToUpdate.SetDataOperationInProgress(operationTypeName, dataOpKey) // different operation may set other fields operation.SetTargetDatasetStatusInProgress(datasetToUpdate)