fix: enforce concurrency limits for data operations#5754
fix: enforce concurrency limits for data operations#5754jakharmonika364 wants to merge 1 commit intofluid-cloudnative:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a concurrency control mechanism for data operations by adding a CanStartDataOperation method to the Dataset type. This method checks if a new operation can be initiated based on a maximum parallel task limit and the current state of operations. The changes include comprehensive unit tests for this new logic and its integration into the operation locking flow. The review feedback suggests a performance optimization by moving the concurrency check before the resource-intensive DeepCopy operation and recommends enhancing the error message to include the specific concurrency limit for better observability.
pkg/ddc/base/operation_lock.go
Outdated
| datasetToUpdate := dataset.DeepCopy() | ||
|
|
||
| if !datasetToUpdate.CanStartDataOperation(operationTypeName, operation.GetParallelTaskNumber(), dataOpKey) { | ||
| return fmt.Errorf("the dataset %s is already in %s, please wait", targetDataset.Name, operationTypeName) | ||
| } |
There was a problem hiding this comment.
There are two improvement opportunities here:
- Performance Optimization: Move the
CanStartDataOperationcheck before callingDeepCopy().DeepCopy()is an expensive operation, and it is unnecessary to perform it if the concurrency limit has already been reached and the operation cannot start. - Error Message Clarity: The error message is a bit generic. Since this logic now supports
ParallelTaskNumber > 1, it would be more helpful to include the limit in the error message to assist with troubleshooting.
Note: CanStartDataOperation is a read-only check on the Status field, so it is safe to call on the original dataset object.
| datasetToUpdate := dataset.DeepCopy() | |
| if !datasetToUpdate.CanStartDataOperation(operationTypeName, operation.GetParallelTaskNumber(), dataOpKey) { | |
| return fmt.Errorf("the dataset %s is already in %s, please wait", targetDataset.Name, operationTypeName) | |
| } | |
| if !dataset.CanStartDataOperation(operationTypeName, operation.GetParallelTaskNumber(), dataOpKey) { | |
| return fmt.Errorf("the dataset %s has reached the maximum number of parallel %s operations (limit: %d), please wait", targetDataset.Name, operationTypeName, operation.GetParallelTaskNumber()) | |
| } | |
| datasetToUpdate := dataset.DeepCopy() |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5754 +/- ##
==========================================
- Coverage 61.22% 61.22% -0.01%
==========================================
Files 444 444
Lines 30557 30561 +4
==========================================
Hits 18710 18710
- Misses 10307 10311 +4
Partials 1540 1540 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
8112299 to
2a18402
Compare
There was a problem hiding this comment.
Pull request overview
This PR tightens dataset-level locking for data operations so that operations (notably DataLoad) respect a configured parallelism limit when registering themselves in Dataset.Status.OperationRef, preventing unbounded concurrent operations on the same dataset.
Changes:
- Add a gate (
CanStartDataOperation) to check whether a new operation can be registered given the currentOperationRefentries and a max-parallel limit. - Enforce the gate inside the common lock acquisition path (
SetDataOperationInTargetDataset) before updating dataset status. - Add unit tests covering empty state, re-entrant behavior, and max-parallel enforcement.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| pkg/ddc/base/operation_lock.go | Adds concurrency gating before setting OperationRef during lock acquisition. |
| api/v1alpha1/dataset_types.go | Introduces Dataset.CanStartDataOperation to validate whether an operation can start given current refs and a limit. |
| api/v1alpha1/dataset_types_test.go | Adds unit tests for the new CanStartDataOperation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| // set current data operation in the target dataset | ||
| datasetToUpdate := dataset.DeepCopy() |
There was a problem hiding this comment.
The lock limit is derived from operation.GetParallelTaskNumber(). For DataMigrate this returns Spec.Parallelism (parallel workers within a single DataMigrate), so a DataMigrate with Parallelism>1 would allow multiple concurrent DataMigrate CRs on the same Dataset, contradicting the PR description that DataMigrate is limited to 1. Consider separating “max concurrent operations per dataset” from “intra-operation parallelism” (e.g., a dedicated interface method or a hard-coded 1 for DataMigrate).
|
|
||
| // set current data operation in the target dataset | ||
| datasetToUpdate := dataset.DeepCopy() |
There was a problem hiding this comment.
Returning a plain fmt.Errorf here causes SetDataOperationInTargetDataset to log at error level on every retry (and the reconciler requeues periodically), even though lock contention is an expected state when parallelism is exceeded. Consider using a typed/sentinel error so callers can log at Info/Debug and/or emit a clearer event, and include the dataset namespace/current OperationRef in the message to help users diagnose what is blocking.
Signed-off-by: Monika Jhakar <jakharmonika364@gmail.com> Signed-off-by: Monika Jakhar <jakharmonika364@gmail.com>
2a18402 to
7abe7f0
Compare
|
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
2 similar comments
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Hi @jakharmonika364. Thanks for your PR. I'm waiting for a fluid-cloudnative member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |



Ⅰ. Describe what this PR does
Ensures that data operations (specifically
DataLoad) strictly respect theirParallelTaskNumberlimit.Previously,
SetDataOperationInProgresswould always append new operation names to a comma-separated string, effectively allowing multiple tasks to run on the same dataset simultaneously. This PR introduces aCanStartDataOperationcheck in the common locking logic (operation_lock.go) to prevent new tasks from registering if the parallel limit (which is 1 for DataLoad/DataMigrate) has already been reached.Ⅱ. Does this pull request fix one issue?
fixes #1138
Ⅲ. List the added test cases (unit test/integration test) if any, please explain if no tests are needed.
Added unit tests in
api/v1alpha1/dataset_types_test.goto cover:ParallelTaskNumberis 1 and another operation is present.Ⅳ. Describe how to verify it
DataLoadtask for a dataset.Executing, create a secondDataLoadfor the same dataset.Pendingand the controller logs indicate the dataset is already performing an operation.Ⅴ. Special notes for reviews
The implementation is generic and applies to all operations using the
OperationInterface, ensuring consistent locking behavior across the project.