Skip to content

Commit ff34ccc

Browse files
authored
Support graceful shutdown in WN (#10430)
ref #10266, close #10425 Support graceful shutdown in WN Signed-off-by: gengliqi <gengliqiii@gmail.com>
1 parent 5aeab88 commit ff34ccc

3 files changed

Lines changed: 41 additions & 13 deletions

File tree

dbms/src/Flash/Mpp/MPPTaskManager.cpp

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
#include <Flash/Mpp/MPPTaskManager.h>
2323
#include <Interpreters/Context.h>
2424
#include <Interpreters/ProcessList.h>
25+
#include <Interpreters/SharedContexts/Disagg.h>
2526
#include <Interpreters/executeQuery.h>
27+
#include <Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h>
2628
#include <fmt/core.h>
2729

2830
#include <magic_enum.hpp>
@@ -86,39 +88,58 @@ MPPGatherTaskSetPtr MPPQuery::addMPPGatherTaskSet(const MPPGatherId & gather_id)
8688

8789
void MPPTaskMonitor::waitAllMPPTasksFinish(const std::unique_ptr<Context> & context)
8890
{
89-
// The maximum seconds TiFlash will wait for all current MPP tasks to finish before shutting down
91+
// The maximum seconds TiFlash will wait for all current MPP tasks or disagg snapshots to finish before shutting down
9092
static constexpr const char * GRACEFUL_WAIT_SHUTDOWN_TIMEOUT = "flash.graceful_wait_shutdown_timeout";
9193
// The default value of flash.graceful_wait_shutdown_timeout
9294
static constexpr UInt64 DEFAULT_GRACEFUL_WAIT_SHUTDOWN_TIMEOUT = 600;
9395
auto graceful_wait_shutdown_timeout
9496
= context->getUsersConfig()->getUInt64(GRACEFUL_WAIT_SHUTDOWN_TIMEOUT, DEFAULT_GRACEFUL_WAIT_SHUTDOWN_TIMEOUT);
95-
LOG_INFO(log, "Start to wait all MPP tasks to finish, timeout={}s", graceful_wait_shutdown_timeout);
97+
98+
bool is_disagg_storage_mode = context->getSharedContextDisagg()->isDisaggregatedStorageMode();
99+
const String & wait_target = is_disagg_storage_mode ? "disagg snapshots" : "MPP tasks";
100+
LOG_INFO(log, "Start to wait all {} to finish, timeout={}s", wait_target, graceful_wait_shutdown_timeout);
101+
96102
UInt64 graceful_wait_shutdown_timeout_ms = graceful_wait_shutdown_timeout * 1000;
97103
Stopwatch watch;
98104
// The first sleep before checking to reduce the chance of missing MPP tasks that are still in the process of being dispatched
99-
std::this_thread::sleep_for(std::chrono::seconds(1));
105+
std::this_thread::sleep_for(std::chrono::seconds(2));
100106
bool all_tasks_finished = false;
101107
while (true)
102108
{
103109
auto elapsed_ms = watch.elapsedMilliseconds();
104-
if (!all_tasks_finished)
110+
if (is_disagg_storage_mode)
105111
{
106-
std::unique_lock lock(mu);
107-
if (monitored_tasks.empty())
108-
all_tasks_finished = true;
112+
// For write node under disagg arch, should wait for all disagg establish task rpc being finished
113+
// and all snapshot being released.
114+
// Otherwise compute nodes may meet error when calling `FetchDisaggPages` on write nodes.
115+
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_disagg_establish_task).Value() == 0
116+
&& context->getSharedContextDisagg()->wn_snapshot_manager->getActiveSnapshotCount() == 0)
117+
{
118+
LOG_INFO(log, "All disagg snapshots have finished after {}ms", elapsed_ms);
119+
break;
120+
}
109121
}
110-
if (all_tasks_finished)
122+
else
111123
{
112-
// Also needs to check if all MPP gRPC connections are finished
113-
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0)
124+
if (!all_tasks_finished)
114125
{
115-
LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms);
116-
break;
126+
std::unique_lock lock(mu);
127+
if (monitored_tasks.empty())
128+
all_tasks_finished = true;
129+
}
130+
if (all_tasks_finished)
131+
{
132+
// Also needs to check if all MPP gRPC connections are finished
133+
if (GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Value() == 0)
134+
{
135+
LOG_INFO(log, "All MPP tasks have finished after {}ms", elapsed_ms);
136+
break;
137+
}
117138
}
118139
}
119140
if (elapsed_ms >= graceful_wait_shutdown_timeout_ms)
120141
{
121-
LOG_WARNING(log, "Timed out waiting for all MPP tasks to finish after {}ms", elapsed_ms);
142+
LOG_WARNING(log, "Timed out waiting for all {} to finish after {}ms", wait_target, elapsed_ms);
122143
break;
123144
}
124145
std::this_thread::sleep_for(std::chrono::milliseconds(200));

dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ bool WNDisaggSnapshotManager::unregisterSnapshot(const DisaggTaskId & task_id)
8181
});
8282
}
8383

84+
size_t WNDisaggSnapshotManager::getActiveSnapshotCount() const
85+
{
86+
return snapshots.withShared([&](auto & snapshots) { return snapshots.size(); });
87+
}
88+
8489
void WNDisaggSnapshotManager::clearExpiredSnapshots()
8590
{
8691
Timepoint now = Clock::now();

dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ class WNDisaggSnapshotManager
102102

103103
bool unregisterSnapshotIfEmpty(const DisaggTaskId & task_id);
104104

105+
size_t getActiveSnapshotCount() const;
106+
105107
DISALLOW_COPY_AND_MOVE(WNDisaggSnapshotManager);
106108

107109
private:

0 commit comments

Comments
 (0)