Skip to content

Commit b962717

Browse files
fix data race of LocalAdmissionController (#10332) (#10344)
close #10331 Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io> Signed-off-by: guo-shaoge <shaoge1994@163.com> Co-authored-by: guo-shaoge <shaoge1994@163.com>
1 parent be3781e commit b962717

2 files changed

Lines changed: 18 additions & 13 deletions

File tree

dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ double ResourceGroup::getAcquireRUNumWithoutLock(double speed, uint32_t n_sec, d
140140
if unlikely (acquire_num == 0.0 && remaining_ru == 0.0)
141141
acquire_num = DEFAULT_BUFFER_TOKENS;
142142

143+
// The purpose of subtracting remaining_ru is try to ensure that the number of local tokens
144+
// always stays same with the amount consumed.
143145
acquire_num -= remaining_ru;
144146
acquire_num = (acquire_num > 0.0 ? acquire_num : 0.0);
145147
return acquire_num;
@@ -351,11 +353,9 @@ void LocalAdmissionController::mainLoop()
351353
static_assert(
352354
tick_interval <= ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL && tick_interval <= DEGRADE_MODE_DURATION
353355
&& tick_interval <= DEFAULT_TARGET_PERIOD);
354-
auto cur_tick_beg = current_tick;
355-
auto cur_tick_end = cur_tick_beg + tick_interval;
356+
auto cur_tick_end = current_tick + tick_interval;
356357
while (!stopped.load())
357358
{
358-
if (current_tick < cur_tick_end)
359359
{
360360
std::unique_lock<std::mutex> lock(mu);
361361
if (low_token_resource_groups.empty())
@@ -375,19 +375,16 @@ void LocalAdmissionController::mainLoop()
375375
try
376376
{
377377
while (current_tick >= cur_tick_end)
378-
{
379-
updateRUConsumptionSpeed();
380-
cur_tick_beg = cur_tick_end;
381378
cur_tick_end += tick_interval;
382-
}
383379

380+
updateRUConsumptionSpeed();
384381
if (const auto gac_req_opt = buildGACRequest(/*is_final_report=*/false); gac_req_opt.has_value())
385382
{
386383
std::lock_guard lock(gac_requests_mu);
387384
gac_requests.push_back(gac_req_opt.value());
388385
gac_requests_cv.notify_all();
389386
}
390-
clearCPUTimeWithoutLock(current_tick);
387+
clearCPUTime(current_tick);
391388
checkDegradeMode();
392389
}
393390
catch (...)
@@ -423,13 +420,16 @@ std::optional<resource_manager::TokenBucketsRequest> LocalAdmissionController::b
423420
else
424421
{
425422
std::unordered_set<std::string> local_low_token_resource_groups;
423+
std::unordered_map<std::string, ResourceGroupPtr> local_resource_groups;
426424
{
427425
std::lock_guard lock(mu);
428426
local_low_token_resource_groups = low_token_resource_groups;
429427
low_token_resource_groups.clear();
428+
429+
local_resource_groups = resource_groups;
430430
}
431431

432-
for (const auto & iter : resource_groups)
432+
for (const auto & iter : local_resource_groups)
433433
{
434434
const auto rg_name = iter.first;
435435
const bool need_fetch_token = local_low_token_resource_groups.contains(rg_name);
@@ -621,7 +621,7 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
621621

622622
const String err_msg = fmt::format("handle acquire token resp failed: rg: {}", name);
623623
// It's possible for one_resp.granted_r_u_tokens() to be empty
624-
// when the acquire_token_req is only for report RU consumption.
624+
// when the acquire_token_req is only for report RU consumption or GAC got error(like nan token).
625625
if (one_resp.granted_r_u_tokens().empty())
626626
{
627627
resource_group->endRequest();
@@ -630,6 +630,7 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
630630

631631
if unlikely (one_resp.granted_r_u_tokens().size() != 1)
632632
{
633+
resource_group->endRequest();
633634
LOG_ERROR(
634635
log,
635636
"{} unexpected resp.granted_r_u_tokens().size(): {} one_resp: {}",
@@ -642,13 +643,15 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
642643
const resource_manager::GrantedRUTokenBucket & granted_token_bucket = one_resp.granted_r_u_tokens()[0];
643644
if unlikely (granted_token_bucket.type() != resource_manager::RequestUnitType::RU)
644645
{
646+
resource_group->endRequest();
645647
LOG_ERROR(log, "{} unexpected request type, one_resp: {}", err_msg, one_resp.ShortDebugString());
646648
continue;
647649
}
648650

649651
const auto trickle_ms = granted_token_bucket.trickle_time_ms();
650652
if unlikely (trickle_ms < 0)
651653
{
654+
resource_group->endRequest();
652655
LOG_ERROR(
653656
log,
654657
"{} unexpected trickle_ms: {} one_resp: {}",
@@ -663,6 +666,7 @@ std::vector<std::string> LocalAdmissionController::handleTokenBucketsResp(
663666
double added_tokens = granted_token_bucket.granted_tokens().tokens();
664667
if unlikely (!std::isfinite(added_tokens) || added_tokens < 0.0)
665668
{
669+
resource_group->endRequest();
666670
LOG_ERROR(
667671
log,
668672
"{} invalid added_tokens: {} one_resp: {}",
@@ -833,7 +837,7 @@ bool LocalAdmissionController::handleDeleteEvent(const mvccpb::KeyValue & kv, st
833837
updateMaxRUPerSecAfterDeleteWithoutLock(deleted_user_ru_per_sec);
834838
}
835839
}
836-
LOG_DEBUG(log, "delete resource group {}, erase_num: {}", name, erase_num);
840+
LOG_INFO(log, "delete resource group {}, erase_num: {}", name, erase_num);
837841
return true;
838842
}
839843

@@ -868,7 +872,7 @@ bool LocalAdmissionController::handlePutEvent(const mvccpb::KeyValue & kv, std::
868872
updateMaxRUPerSecAfterDeleteWithoutLock(deleted_user_ru_per_sec);
869873
}
870874
}
871-
LOG_DEBUG(log, "modify resource group to: {}", group_pb.ShortDebugString());
875+
LOG_INFO(log, "modify resource group to: {}", group_pb.ShortDebugString());
872876
return true;
873877
}
874878

dbms/src/Flash/ResourceControl/LocalAdmissionController.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,9 +538,10 @@ class LocalAdmissionController final : private boost::noncopyable
538538
std::string & err_msg);
539539
void updateMaxRUPerSecAfterDeleteWithoutLock(uint64_t deleted_user_ru_per_sec);
540540

541-
void clearCPUTimeWithoutLock(const SteadyClock::time_point & now)
541+
void clearCPUTime(const SteadyClock::time_point & now)
542542
{
543543
static_assert(CLEAR_CPU_TIME_DURATION > ResourceGroup::COMPUTE_RU_CONSUMPTION_SPEED_INTERVAL);
544+
std::lock_guard lock(mu);
544545
if (now - last_clear_cpu_time >= CLEAR_CPU_TIME_DURATION)
545546
{
546547
for (auto & resource_group : resource_groups)

0 commit comments

Comments
 (0)