Skip to content

Commit e07cede

Browse files
disagg: Fix TiFlash crash randomly when fails connect to s3 (#10676) (#10677)
close #10674 * Updates the async upload paths to capture shared resources by value and wait for all tasks to finish before returning, preventing early‑return lifetimes from leaving in‑flight tasks with dangling s3_client/file_provider references. * Propagates the same safety pattern to `putCheckpointFiles`, `getDataFilesInfo`, `copyToLocal`, `setTaggingsForKeys` Signed-off-by: JaySon-Huang <tshent@qq.com> Co-authored-by: JaySon-Huang <tshent@qq.com>
1 parent 4fda605 commit e07cede

6 files changed

Lines changed: 145 additions & 65 deletions

File tree

dbms/src/Common/FailPoint.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ namespace DB
107107
M(force_fail_in_flush_region_data) \
108108
M(force_use_dmfile_format_v3) \
109109
M(force_set_mocked_s3_object_mtime) \
110+
M(force_syncpoint_on_s3_upload) \
110111
M(force_stop_background_checkpoint_upload) \
111112
M(force_schema_sync_diff_fail) \
112113
M(exception_after_large_write_exceed) \

dbms/src/Flash/Mpp/MPPTask.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ void MPPTask::finishWrite()
223223

224224
// The finish of pushing all blocks not means that cte sink job has been done.
225225
// Execution summary statistic also need to be sent. So we can release cte
226-
// only when execution sumary statistic has been sent.
226+
// only when execution summary statistic has been sent.
227227
this->context->getCTEManager()->releaseCTEBySink(resp, this->dag_context->getQueryIDAndCTEIDForSink());
228228
this->notify_cte_sink_finish = true;
229229
}

dbms/src/IO/IOThreadPools.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ struct BuildReadTaskTrait
5151
class FutureContainer
5252
{
5353
public:
54-
FutureContainer(const LoggerPtr & log_, size_t reserve_size = 0)
54+
explicit FutureContainer(const LoggerPtr & log_, size_t reserve_size = 0)
5555
: log(log_)
5656
{
5757
futures.reserve(reserve_size);

dbms/src/Storages/DeltaMerge/Remote/DataStore/DataStoreS3.cpp

Lines changed: 59 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -71,54 +71,55 @@ void DataStoreS3::putDMFileLocalFiles(
7171
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
7272

7373
// First, upload non-meta files.
74-
std::vector<std::future<void>> upload_results;
75-
upload_results.reserve(local_files.size() - 1);
74+
IOPoolHelper::FutureContainer upload_results(log, local_files.size());
7675
for (const auto & fname : local_files)
7776
{
7877
if (DMFileMetaV2::isMetaFileName(fname))
7978
continue;
8079

8180
auto local_fname = fmt::format("{}/{}", local_dir, fname);
8281
auto remote_fname = fmt::format("{}/{}", remote_dir, fname);
83-
auto task = std::make_shared<std::packaged_task<void()>>(
84-
[&, local_fname = std::move(local_fname), remote_fname = std::move(remote_fname)]() -> void {
85-
S3::uploadFile(
86-
*s3_client,
87-
local_fname,
88-
remote_fname,
89-
EncryptionPath(local_dir, fname, oid.keyspace_id),
90-
file_provider);
82+
auto encryption_path = EncryptionPath(local_dir, fname, oid.keyspace_id);
83+
// Capture shared resources by value in tasks to avoid dangling references on early errors.
84+
auto task = std::make_shared<std::packaged_task<void()>>( //
85+
[s3_client,
86+
provider = file_provider,
87+
local_fname = std::move(local_fname),
88+
remote_fname = std::move(remote_fname),
89+
encryption_path = std::move(encryption_path)]() -> void {
90+
S3::uploadFile(*s3_client, local_fname, remote_fname, encryption_path, provider);
9191
});
92-
upload_results.push_back(task->get_future());
92+
upload_results.add(task->get_future());
9393
DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); });
9494
}
95-
for (auto & f : upload_results)
96-
f.get();
95+
// Wait for all tasks to finish before returning to keep captured resources alive.
96+
upload_results.getAllResults();
9797

9898
// Then, upload meta files.
9999
// Only when the meta upload is successful, the dmfile upload can be considered successful.
100-
upload_results.clear();
100+
IOPoolHelper::FutureContainer meta_upload_results(log, local_files.size());
101101
for (const auto & fname : local_files)
102102
{
103103
if (!DMFileMetaV2::isMetaFileName(fname))
104104
continue;
105105

106106
auto local_fname = fmt::format("{}/{}", local_dir, fname);
107107
auto remote_fname = fmt::format("{}/{}", remote_dir, fname);
108-
auto task = std::make_shared<std::packaged_task<void()>>(
109-
[&, local_fname = std::move(local_fname), remote_fname = std::move(remote_fname)]() {
110-
S3::uploadFile(
111-
*s3_client,
112-
local_fname,
113-
remote_fname,
114-
EncryptionPath(local_dir, fname, oid.keyspace_id),
115-
file_provider);
108+
auto encryption_path = EncryptionPath(local_dir, fname, oid.keyspace_id);
109+
// Capture shared resources by value in tasks to avoid dangling references on early errors.
110+
auto task = std::make_shared<std::packaged_task<void()>>( //
111+
[s3_client,
112+
provider = file_provider,
113+
local_fname = std::move(local_fname),
114+
remote_fname = std::move(remote_fname),
115+
encryption_path = std::move(encryption_path)]() {
116+
S3::uploadFile(*s3_client, local_fname, remote_fname, encryption_path, provider);
116117
});
117-
upload_results.push_back(task->get_future());
118+
meta_upload_results.add(task->get_future());
118119
DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); });
119120
}
120-
for (auto & f : upload_results)
121-
f.get();
121+
// Wait for all tasks to finish before returning to keep captured resources alive.
122+
meta_upload_results.getAllResults();
122123

123124
LOG_INFO(log, "Upload DMFile finished, key={}, cost={}ms", remote_dir, sw.elapsedMilliseconds());
124125
}
@@ -134,30 +135,31 @@ bool DataStoreS3::putCheckpointFiles(
134135
/// then upload the CheckpointManifest to make the files within
135136
/// `upload_seq` public to S3GCManager.
136137

137-
std::vector<std::future<void>> upload_results;
138-
// upload in parallel
138+
// Upload in parallel.
139139
// Note: Local checkpoint files are always not encrypted.
140+
IOPoolHelper::FutureContainer upload_results(log, local_files.data_files.size());
140141
for (size_t file_idx = 0; file_idx < local_files.data_files.size(); ++file_idx)
141142
{
142-
auto task = std::make_shared<std::packaged_task<void()>>([&, idx = file_idx] {
143-
const auto & local_datafile = local_files.data_files[idx];
144-
auto s3key = S3::S3Filename::newCheckpointData(store_id, upload_seq, idx);
145-
auto lock_key = s3key.toView().getLockKey(store_id, upload_seq);
146-
S3::uploadFile(
147-
*s3_client,
148-
local_datafile,
149-
s3key.toFullKey(),
150-
EncryptionPath(local_datafile, "", NullspaceID),
151-
file_provider);
152-
S3::uploadEmptyFile(*s3_client, lock_key);
153-
});
154-
upload_results.push_back(task->get_future());
143+
auto local_datafile = local_files.data_files[file_idx];
144+
auto s3key = S3::S3Filename::newCheckpointData(store_id, upload_seq, file_idx);
145+
auto remote_key = s3key.toFullKey();
146+
auto lock_key = s3key.toView().getLockKey(store_id, upload_seq);
147+
auto encryption_path = EncryptionPath(local_datafile, "", NullspaceID);
148+
// Capture by value to avoid dangling references.
149+
auto task = std::make_shared<std::packaged_task<void()>>( //
150+
[s3_client,
151+
provider = file_provider,
152+
local_datafile = std::move(local_datafile),
153+
remote_key = std::move(remote_key),
154+
lock_key = std::move(lock_key),
155+
encryption_path = std::move(encryption_path)] {
156+
S3::uploadFile(*s3_client, local_datafile, remote_key, encryption_path, provider);
157+
S3::uploadEmptyFile(*s3_client, lock_key);
158+
});
159+
upload_results.add(task->get_future());
155160
DataStoreS3Pool::get().scheduleOrThrowOnError([task] { (*task)(); });
156161
}
157-
for (auto & f : upload_results)
158-
{
159-
f.get();
160-
}
162+
upload_results.getAllResults();
161163

162164
// upload manifest after all CheckpointData uploaded
163165
auto s3key = S3::S3Filename::newCheckpointManifest(store_id, upload_seq);
@@ -180,7 +182,7 @@ std::unordered_map<String, IDataStore::DataFileInfo> DataStoreS3::getDataFilesIn
180182
for (const auto & lock_key : lock_keys)
181183
{
182184
auto task = std::make_shared<std::packaged_task<std::tuple<String, DataFileInfo>()>>(
183-
[&s3_client, lock_key = lock_key, log = this->log]() noexcept {
185+
[s3_client, lock_key = lock_key, log = this->log]() noexcept {
184186
auto key_view = S3::S3FilenameView::fromKey(lock_key);
185187
auto datafile_key = key_view.asDataFile().toFullKey();
186188
try
@@ -232,43 +234,37 @@ void DataStoreS3::copyToLocal(
232234
{
233235
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
234236
const auto remote_dir = S3::S3Filename::fromDMFileOID(remote_oid).toFullKey();
235-
std::vector<std::future<void>> results;
236-
results.reserve(target_short_fnames.size());
237+
IOPoolHelper::FutureContainer results(Logger::get("DataStoreS3"), target_short_fnames.size());
237238
for (const auto & fname : target_short_fnames)
238239
{
239240
auto remote_fname = fmt::format("{}/{}", remote_dir, fname);
240241
auto local_fname = fmt::format("{}/{}", local_dir, fname);
241-
auto task = std::make_shared<std::packaged_task<void()>>(
242-
[&, local_fname = std::move(local_fname), remote_fname = std::move(remote_fname)]() {
242+
auto task = std::make_shared<std::packaged_task<void()>>( //
243+
[s3_client, local_fname = std::move(local_fname), remote_fname = std::move(remote_fname)]() {
243244
auto tmp_fname = fmt::format("{}.tmp", local_fname);
244245
S3::downloadFile(*s3_client, tmp_fname, remote_fname);
245246
Poco::File(tmp_fname).renameTo(local_fname);
246247
});
247-
results.push_back(task->get_future());
248+
results.add(task->get_future());
248249
DataStoreS3Pool::get().scheduleOrThrowOnError([task]() { (*task)(); });
249250
}
250-
for (auto & f : results)
251-
{
252-
f.get();
253-
}
251+
results.getAllResults();
254252
}
255253

256254
void DataStoreS3::setTaggingsForKeys(const std::vector<String> & keys, std::string_view tagging)
257255
{
258256
auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient();
259-
std::vector<std::future<void>> results;
260-
results.reserve(keys.size());
257+
IOPoolHelper::FutureContainer results(log, keys.size());
261258
for (const auto & k : keys)
262259
{
263-
auto task = std::make_shared<std::packaged_task<void()>>(
264-
[&s3_client, &tagging, key = k] { rewriteObjectWithTagging(*s3_client, key, String(tagging)); });
265-
results.emplace_back(task->get_future());
260+
auto task = std::make_shared<std::packaged_task<void()>>( //
261+
[s3_client, tagging_str = String(tagging), key = k] {
262+
rewriteObjectWithTagging(*s3_client, key, tagging_str);
263+
});
264+
results.add(task->get_future());
266265
DataStoreS3Pool::get().scheduleOrThrowOnError([task] { (*task)(); });
267266
}
268-
for (auto & f : results)
269-
{
270-
f.get();
271-
}
267+
results.getAllResults();
272268
}
273269

274270
IPreparedDMFileTokenPtr DataStoreS3::prepareDMFile(const S3::DMFileOID & oid, UInt64 page_id)

dbms/src/Storages/S3/S3Common.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <Common/RemoteHostFilter.h>
2020
#include <Common/Stopwatch.h>
2121
#include <Common/StringUtils/StringUtils.h>
22+
#include <Common/SyncPoint/SyncPoint.h>
2223
#include <Common/TiFlashMetrics.h>
2324
#include <IO/BaseFile/PosixRandomAccessFile.h>
2425
#include <IO/Buffer/ReadBufferFromRandomAccessFile.h>
@@ -156,6 +157,7 @@ class AWSLogger final : public Aws::Utils::Logging::LogSystemInterface
156157
namespace DB::FailPoints
157158
{
158159
extern const char force_set_mocked_s3_object_mtime[];
160+
extern const char force_syncpoint_on_s3_upload[];
159161
} // namespace DB::FailPoints
160162
namespace DB::S3
161163
{
@@ -659,6 +661,14 @@ static bool doUploadFile(
659661
{
660662
ProfileEvents::increment(is_dmfile ? ProfileEvents::S3PutDMFileRetry : ProfileEvents::S3PutObjectRetry);
661663
}
664+
#ifdef FIU_ENABLE
665+
if (auto v = FailPointHelper::getFailPointVal(FailPoints::force_syncpoint_on_s3_upload); v)
666+
{
667+
const auto & prefix = std::any_cast<String>(v.value());
668+
if (!prefix.empty() && startsWith(remote_fname, prefix))
669+
SYNC_FOR("before_S3Common::uploadFile");
670+
}
671+
#endif
662672
auto result = client.PutObject(req);
663673
if (!result.IsSuccess())
664674
{

dbms/src/Storages/S3/tests/gtest_s3file.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414

1515
#include <Common/Exception.h>
1616
#include <Common/FailPoint.h>
17+
#include <Common/SyncPoint/Ctl.h>
1718
#include <IO/BaseFile/PosixWritableFile.h>
1819
#include <IO/Buffer/ReadBufferFromRandomAccessFile.h>
1920
#include <IO/Encryption/MockKeyManager.h>
21+
#include <IO/IOThreadPools.h>
2022
#include <Interpreters/Context.h>
2123
#include <Poco/DigestStream.h>
2224
#include <Poco/MD5Engine.h>
@@ -49,7 +51,9 @@
4951
#include <chrono>
5052
#include <ext/scope_guard.h>
5153
#include <fstream>
54+
#include <future>
5255
#include <memory>
56+
#include <thread>
5357

5458

5559
using namespace std::chrono_literals;
@@ -60,6 +64,7 @@ using namespace DB::S3;
6064
namespace DB::FailPoints
6165
{
6266
extern const char force_set_mocked_s3_object_mtime[];
67+
extern const char force_syncpoint_on_s3_upload[];
6368
extern const char force_s3_random_access_file_init_fail[];
6469
extern const char force_s3_random_access_file_read_fail[];
6570
} // namespace DB::FailPoints
@@ -774,6 +779,74 @@ try
774779
}
775780
CATCH
776781

782+
TEST_P(S3FileTest, PutDMFileLocalFilesWaitsForAllTasks)
783+
try
784+
{
785+
// This test requires MockS3Client control hooks.
786+
if (dynamic_cast<DB::S3::tests::MockS3Client *>(s3_client.get()) == nullptr)
787+
return;
788+
789+
auto & pool = DataStoreS3Pool::get();
790+
// Ensure at least two threads so the blocked and failed uploads can run concurrently.
791+
const auto old_max_threads = pool.getMaxThreads();
792+
const auto old_queue_size = pool.getQueueSize();
793+
SCOPE_EXIT({
794+
pool.setMaxThreads(old_max_threads);
795+
pool.setQueueSize(old_queue_size);
796+
});
797+
pool.setMaxThreads(std::max<size_t>(2, old_max_threads));
798+
pool.setQueueSize(std::max<size_t>(2, old_queue_size));
799+
800+
const String local_dir = fmt::format("{}/dmf_local", getTemporaryPath());
801+
createIfNotExist(local_dir);
802+
std::vector<String> local_files{"failed.dat", "blocked.dat"};
803+
writeLocalFile(fmt::format("{}/{}", local_dir, local_files[0]), 16);
804+
writeLocalFile(fmt::format("{}/{}", local_dir, local_files[1]), 16);
805+
806+
const S3::DMFileOID oid{
807+
.store_id = 1,
808+
.keyspace_id = keyspace_id,
809+
.table_id = 100,
810+
.file_id = 1,
811+
};
812+
const auto remote_dir = S3::S3Filename::fromDMFileOID(oid).toFullKey();
813+
814+
// Set up a syncpoint to block the "blocked.dat" from being uploaded.
815+
const auto blocked_key = fmt::format("{}/{}", remote_dir, local_files[1]);
816+
FailPointHelper::enableFailPoint(FailPoints::force_syncpoint_on_s3_upload, blocked_key);
817+
SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_syncpoint_on_s3_upload); });
818+
819+
auto sp_upload = SyncPointCtl::enableInScope("before_S3Common::uploadFile");
820+
821+
// The "failed.dat" upload should fail.
822+
DB::S3::tests::MockS3Client::setPutObjectStatus(DB::S3::tests::MockS3Client::S3Status::FAILED);
823+
SCOPE_EXIT({ DB::S3::tests::MockS3Client::setPutObjectStatus(DB::S3::tests::MockS3Client::S3Status::NORMAL); });
824+
825+
std::promise<void> done;
826+
auto done_future = done.get_future();
827+
// Run in a separate thread so we can observe whether it returns before "blocked.dat" upload is unblocked.
828+
std::thread worker([&] {
829+
try
830+
{
831+
data_store->putDMFileLocalFiles(local_dir, local_files, oid);
832+
}
833+
catch (...)
834+
{}
835+
done.set_value();
836+
});
837+
838+
sp_upload.waitAndPause();
839+
840+
// The `putDMFileLocalFiles` should not return while "blocked.dat" is still blocked.
841+
EXPECT_EQ(done_future.wait_for(200ms), std::future_status::timeout);
842+
843+
// Avoid pausing again on retries after unblocking.
844+
FailPointHelper::disableFailPoint(FailPoints::force_syncpoint_on_s3_upload);
845+
sp_upload.next();
846+
worker.join();
847+
}
848+
CATCH
849+
777850
INSTANTIATE_TEST_CASE_P(S3File, S3FileTest, testing::Values(false, true));
778851

779852
} // namespace DB::tests

0 commit comments

Comments
 (0)