Skip to content

Commit 8112299

Browse files
fix: enforce concurrency limits for data operations
Signed-off-by: Monika Jhakar <jakharmonika364@gmail.com>
1 parent 965f661 commit 8112299

File tree

3 files changed

+130
-0
lines changed

3 files changed

+130
-0
lines changed

api/v1alpha1/dataset_types.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,3 +392,24 @@ func (dataset *Dataset) RemoveDataOperationInProgress(operationType, name string
392392
dataset.Status.OperationRef[operationType] = strings.Join(dataOpKeys, ",")
393393
return dataset.Status.OperationRef[operationType]
394394
}
395+
396+
// CanStartDataOperation checks if the data operation can be started on this dataset.
397+
func (dataset *Dataset) CanStartDataOperation(operationType string, maxParallel int32, name string) bool {
398+
if dataset.Status.OperationRef == nil {
399+
return true
400+
}
401+
402+
opRef, ok := dataset.Status.OperationRef[operationType]
403+
if !ok || opRef == "" {
404+
return true
405+
}
406+
407+
opRefs := strings.Split(opRef, ",")
408+
for _, op := range opRefs {
409+
if op == name {
410+
return true // Already in progress
411+
}
412+
}
413+
414+
return int32(len(opRefs)) < maxParallel
415+
}

api/v1alpha1/dataset_types_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,3 +185,107 @@ func TestDataset_SetDataOperationInProgress(t *testing.T) {
185185
})
186186
}
187187
}
188+
189+
func TestDataset_CanStartDataOperation(t *testing.T) {
190+
type fields struct {
191+
Status DatasetStatus
192+
}
193+
type args struct {
194+
operationType string
195+
maxParallel int32
196+
name string
197+
}
198+
tests := []struct {
199+
name string
200+
fields fields
201+
args args
202+
want bool
203+
}{
204+
{
205+
name: "empty_status",
206+
fields: fields{
207+
Status: DatasetStatus{},
208+
},
209+
args: args{
210+
operationType: "DataLoad",
211+
maxParallel: 1,
212+
name: "load-1",
213+
},
214+
want: true,
215+
},
216+
{
217+
name: "already_in_progress_reentrant",
218+
fields: fields{
219+
Status: DatasetStatus{
220+
OperationRef: map[string]string{
221+
"DataLoad": "load-1",
222+
},
223+
},
224+
},
225+
args: args{
226+
operationType: "DataLoad",
227+
maxParallel: 1,
228+
name: "load-1",
229+
},
230+
want: true,
231+
},
232+
{
233+
name: "blocked_by_max_parallel_1",
234+
fields: fields{
235+
Status: DatasetStatus{
236+
OperationRef: map[string]string{
237+
"DataLoad": "load-1",
238+
},
239+
},
240+
},
241+
args: args{
242+
operationType: "DataLoad",
243+
maxParallel: 1,
244+
name: "load-2",
245+
},
246+
want: false,
247+
},
248+
{
249+
name: "allowed_by_max_parallel_2",
250+
fields: fields{
251+
Status: DatasetStatus{
252+
OperationRef: map[string]string{
253+
"DataLoad": "load-1",
254+
},
255+
},
256+
},
257+
args: args{
258+
operationType: "DataLoad",
259+
maxParallel: 2,
260+
name: "load-2",
261+
},
262+
want: true,
263+
},
264+
{
265+
name: "blocked_by_max_parallel_2",
266+
fields: fields{
267+
Status: DatasetStatus{
268+
OperationRef: map[string]string{
269+
"DataLoad": "load-1,load-2",
270+
},
271+
},
272+
},
273+
args: args{
274+
operationType: "DataLoad",
275+
maxParallel: 2,
276+
name: "load-3",
277+
},
278+
want: false,
279+
},
280+
}
281+
for _, tt := range tests {
282+
t.Run(tt.name, func(t *testing.T) {
283+
dataset := &Dataset{
284+
Status: tt.fields.Status,
285+
}
286+
if got := dataset.CanStartDataOperation(tt.args.operationType, tt.args.maxParallel, tt.args.name); got != tt.want {
287+
t.Errorf("CanStartDataOperation() = %v, want %v", got, tt.want)
288+
}
289+
})
290+
}
291+
}

pkg/ddc/base/operation_lock.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ func SetDataOperationInTargetDataset(ctx cruntime.ReconcileRequestContext, opera
7272

7373
// set current data operation in the target dataset
7474
datasetToUpdate := dataset.DeepCopy()
75+
76+
if !datasetToUpdate.CanStartDataOperation(operationTypeName, operation.GetParallelTaskNumber(), dataOpKey) {
77+
return fmt.Errorf("the dataset %s is already in %s, please wait", targetDataset.Name, operationTypeName)
78+
}
79+
7580
datasetToUpdate.SetDataOperationInProgress(operationTypeName, dataOpKey)
7681
// different operation may set other fields
7782
operation.SetTargetDatasetStatusInProgress(datasetToUpdate)

0 commit comments

Comments
 (0)